* Copyright 2021 (c) Matthieu Gallien <matthieu.gallien@nextcloud.com>
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
* This program is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
* or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* for more details.
#include "bulkpropagatorjob.h"
#include "putmultifilejob.h"
#include "owncloudpropagator_p.h"
#include "syncfileitem.h"
#include "syncengine.h"
#include "propagateupload.h"
#include "propagatorjobs.h"
#include "filesystem.h"
#include "account.h"
#include "common/utility.h"
#include "common/checksums.h"
#include "networkjobs.h"
#include <QFileInfo>
#include <QDir>
#include <QJsonDocument>
#include <QJsonArray>
#include <QJsonObject>
#include <QJsonValue>
namespace OCC {
Q_LOGGING_CATEGORY(lcBulkPropagatorJob, "nextcloud.sync.propagator.bulkupload", QtInfoMsg)
namespace {
QByteArray getEtagFromJsonReply(const QJsonObject &reply)
const auto ocEtag = OCC::parseEtag(reply.value("OC-ETag").toString().toLatin1());
const auto ETag = OCC::parseEtag(reply.value("ETag").toString().toLatin1());
const auto etag = OCC::parseEtag(reply.value("etag").toString().toLatin1());
QByteArray ret = ocEtag;
if (ret.isEmpty()) {
ret = ETag;
if (ret.isEmpty()) {
ret = etag;
if (ocEtag.length() > 0 && ocEtag != etag && ocEtag != ETag) {
qCDebug(OCC::lcBulkPropagatorJob) << "Quite peculiar, we have an etag != OC-Etag [no problem!]" << etag << ETag << ocEtag;
return ret;
QByteArray getHeaderFromJsonReply(const QJsonObject &reply, const QByteArray &headerName)
return reply.value(headerName).toString().toLatin1();
namespace OCC {
BulkPropagatorJob::BulkPropagatorJob(OwncloudPropagator *propagator,
const std::deque<SyncFileItemPtr> &items)
: PropagatorJob(propagator)
, _items(items)
bool BulkPropagatorJob::scheduleSelfOrChild()
if (_items.empty()) {
return false;
_state = Running;
for(int i = 0; i < 100 && !_items.empty(); ++i) {
auto currentItem = _items.front();
QMetaObject::invokeMethod(this, [this, currentItem] () {
UploadFileInfo fileToUpload;
fileToUpload._file = currentItem->_file;
fileToUpload._size = currentItem->_size;
fileToUpload._path = propagator()->fullLocalPath(fileToUpload._file);
startUploadFile(currentItem, fileToUpload);
}); // We could be in a different thread (neon jobs)
return _items.empty() && _filesToUpload.empty();
PropagatorJob::JobParallelism BulkPropagatorJob::parallelism()
return PropagatorJob::JobParallelism::WaitForFinished;
void BulkPropagatorJob::startUploadFile(SyncFileItemPtr item, UploadFileInfo fileToUpload)
if (propagator()->_abortRequested) {
// Check if the specific file can be accessed
if (propagator()->hasCaseClashAccessibilityProblem(fileToUpload._file)) {
done(item, SyncFileItem::NormalError, tr("File %1 cannot be uploaded because another file with the same name, differing only in case, exists").arg(QDir::toNativeSeparators(item->_file)));
return slotComputeTransmissionChecksum(item, fileToUpload);
void BulkPropagatorJob::doStartUpload(SyncFileItemPtr item,
UploadFileInfo fileToUpload,
QByteArray transmissionChecksumHeader)
if (propagator()->_abortRequested) {
// write the checksum in the database, so if the POST is sent
// to the server, but the connection drops before we get the etag, we can check the checksum
// in reconcile (issue #5106)
SyncJournalDb::UploadInfo pi;
pi._valid = true;
pi._chunk = 0;
pi._transferid = 0; // We set a null transfer id because it is not chunked.
pi._modtime = item->_modtime;
pi._errorCount = 0;
pi._contentChecksum = item->_checksumHeader;
pi._size = item->_size;
propagator()->_journal->setUploadInfo(item->_file, pi);
propagator()->_journal->commit("Upload info");
auto currentHeaders = headers(item);
currentHeaders[QByteArrayLiteral("Content-Length")] = QByteArray::number(fileToUpload._size);
if (!item->_renameTarget.isEmpty() && item->_file != item->_renameTarget) {
// Try to rename the file
const auto originalFilePathAbsolute = propagator()->fullLocalPath(item->_file);
const auto newFilePathAbsolute = propagator()->fullLocalPath(item->_renameTarget);
const auto renameSuccess = QFile::rename(originalFilePathAbsolute, newFilePathAbsolute);
if (!renameSuccess) {
done(item, SyncFileItem::NormalError, "File contains trailing spaces and couldn't be renamed");
qCWarning(lcBulkPropagatorJob()) << item->_file << item->_renameTarget;
fileToUpload._file = item->_file = item->_renameTarget;
fileToUpload._path = propagator()->fullLocalPath(fileToUpload._file);
item->_modtime = FileSystem::getModTime(newFilePathAbsolute);
const auto remotePath = propagator()->fullRemotePath(fileToUpload._file);
currentHeaders["X-File-MD5"] = transmissionChecksumHeader;
BulkUploadItem newUploadFile{propagator()->account(), item, fileToUpload,
remotePath, fileToUpload._path,
fileToUpload._size, currentHeaders};
qCInfo(lcBulkPropagatorJob) << remotePath << "transmission checksum" << transmissionChecksumHeader << fileToUpload._path;
if (_pendingChecksumFiles.empty()) {
void BulkPropagatorJob::triggerUpload()
auto uploadParametersData = std::vector<SingleUploadFileData>{};
int timeout = 0;
for(auto &singleFile : _filesToUpload) {
// job takes ownership of device via a QScopedPointer. Job deletes itself when finishing
auto device = std::make_unique<UploadDevice>(
singleFile._localPath, 0, singleFile._fileSize, &propagator()->_bandwidthManager);
if (!device->open(QIODevice::ReadOnly)) {
qCWarning(lcBulkPropagatorJob) << "Could not prepare upload device: " << device->errorString();
// If the file is currently locked, we want to retry the sync
// when it becomes available again.
if (FileSystem::isFileLocked(singleFile._localPath)) {
emit propagator()->seenLockedFile(singleFile._localPath);
// Soft error because this is likely caused by the user modifying his files while syncing
abortWithError(singleFile._item, SyncFileItem::SoftError, device->errorString());
singleFile._headers["X-File-Path"] = singleFile._remotePath.toUtf8();
uploadParametersData.push_back({std::move(device), singleFile._headers});
timeout += singleFile._fileSize;
const auto bulkUploadUrl = Utility::concatUrlPath(propagator()->account()->url(), QStringLiteral("/remote.php/dav/bulk"));
auto job = std::make_unique<PutMultiFileJob>(propagator()->account(), bulkUploadUrl, std::move(uploadParametersData), this);
connect(job.get(), &PutMultiFileJob::finishedSignal, this, &BulkPropagatorJob::slotPutFinished);
for(auto &singleFile : _filesToUpload) {
connect(job.get(), &PutMultiFileJob::uploadProgress,
this, [this, singleFile] (qint64 sent, qint64 total) {
slotUploadProgress(singleFile._item, sent, total);
adjustLastJobTimeout(job.get(), timeout);
void BulkPropagatorJob::slotComputeTransmissionChecksum(SyncFileItemPtr item,
UploadFileInfo fileToUpload)
// Reuse the content checksum as the transmission checksum if possible
const auto supportedTransmissionChecksums =
// Compute the transmission checksum.
auto computeChecksum = std::make_unique<ComputeChecksum>(this);
if (uploadChecksumEnabled()) {
computeChecksum->setChecksumType("MD5" /*propagator()->account()->capabilities().uploadChecksumType()*/);
} else {
connect(computeChecksum.get(), &ComputeChecksum::done,
this, [this, item, fileToUpload] (const QByteArray &contentChecksumType, const QByteArray &contentChecksum) {
slotStartUpload(item, fileToUpload, contentChecksumType, contentChecksum);
connect(computeChecksum.get(), &ComputeChecksum::done,
computeChecksum.get(), &QObject::deleteLater);
void BulkPropagatorJob::slotStartUpload(SyncFileItemPtr item,
UploadFileInfo fileToUpload,
const QByteArray &transmissionChecksumType,
const QByteArray &transmissionChecksum)
const auto transmissionChecksumHeader = makeChecksumHeader(transmissionChecksumType, transmissionChecksum);
item->_checksumHeader = transmissionChecksumHeader;
const QString fullFilePath = fileToUpload._path;
const QString originalFilePath = propagator()->fullLocalPath(item->_file);
if (!FileSystem::fileExists(fullFilePath)) {
return slotOnErrorStartFolderUnlock(item, SyncFileItem::SoftError, tr("File Removed (start upload) %1").arg(fullFilePath));
const time_t prevModtime = item->_modtime; // the _item value was set in PropagateUploadFile::start()
// but a potential checksum calculation could have taken some time during which the file could
// have been changed again, so better check again here.
item->_modtime = FileSystem::getModTime(originalFilePath);
if (prevModtime != item->_modtime) {
propagator()->_anotherSyncNeeded = true;
qDebug() << "trigger another sync after checking modified time of item" << item->_file << "prevModtime" << prevModtime << "Curr" << item->_modtime;
return slotOnErrorStartFolderUnlock(item, SyncFileItem::SoftError, tr("Local file changed during syncing. It will be resumed."));
fileToUpload._size = FileSystem::getSize(fullFilePath);
item->_size = FileSystem::getSize(originalFilePath);
// But skip the file if the mtime is too close to 'now'!
// That usually indicates a file that is still being changed
// or not yet fully copied to the destination.
if (fileIsStillChanging(*item)) {
propagator()->_anotherSyncNeeded = true;
return slotOnErrorStartFolderUnlock(item, SyncFileItem::SoftError, tr("Local file changed during sync."));
doStartUpload(item, fileToUpload, transmissionChecksum);
void BulkPropagatorJob::slotOnErrorStartFolderUnlock(SyncFileItemPtr item,
SyncFileItem::Status status,
const QString &errorString)
qCInfo(lcBulkPropagatorJob()) << status << errorString;
done(item, status, errorString);
void BulkPropagatorJob::slotPutFinishedOneFile(const BulkUploadItem &singleFile,
PutMultiFileJob *job,
const QJsonObject &fullReplyObject)
bool finished = false;
const auto fileReply = fullReplyObject.value(QChar('/') + singleFile._item->_file).toObject();
qCInfo(lcBulkPropagatorJob()) << singleFile._item->_file << "file headers" << fileReply;
if (!fileReply[QStringLiteral("error")].toBool()) {
singleFile._item->_httpErrorCode = static_cast<quint16>(200);
} else {
singleFile._item->_httpErrorCode = static_cast<quint16>(412);
singleFile._item->_responseTimeStamp = job->responseTimestamp();
singleFile._item->_requestId = job->requestId();
if (singleFile._item->_httpErrorCode != 200) {
singleFile._item->_status = SyncFileItem::Success;
// Check the file again post upload.
// Two cases must be considered separately: If the upload is finished,
// the file is on the server and has a changed ETag. In that case,
// the etag has to be properly updated in the client journal, and because
// of that we can bail out here with an error. But we can reschedule a
// sync ASAP.
// But if the upload is ongoing, because not all chunks were uploaded
// yet, the upload can be stopped and an error can be displayed, because
// the server hasn't registered the new file yet.
const auto etag = getEtagFromJsonReply(fileReply);
finished = etag.length() > 0;
const auto fullFilePath(propagator()->fullLocalPath(singleFile._item->_file));
// Check if the file still exists
if (!checkFileStillExists(singleFile._item, finished, fullFilePath)) {
// Check whether the file changed since discovery. the file check here is the original and not the temporary.
if (!checkFileChanged(singleFile._item, finished, fullFilePath)) {
// the file id should only be empty for new files up- or downloaded
computeFileId(singleFile._item, fileReply);
singleFile._item->_etag = etag;
if (getHeaderFromJsonReply(fileReply, "X-OC-MTime") != "accepted") {
// X-OC-MTime is supported since owncloud 5.0. But not when chunking.
// Normally Owncloud 6 always puts X-OC-MTime
qCWarning(lcBulkPropagatorJob) << "Server does not support X-OC-MTime" << getHeaderFromJsonReply(fileReply, "X-OC-MTime");
// Well, the mtime was not set
void BulkPropagatorJob::slotPutFinished()
auto *job = qobject_cast<PutMultiFileJob *>(sender());
slotJobDestroyed(job); // remove it from the _jobs list
const auto replyData = job->reply()->readAll();
const auto replyJson = QJsonDocument::fromJson(replyData);
const auto fullReplyObject = replyJson.object();
for (const auto &oneFile : _filesToUpload) {
slotPutFinishedOneFile(oneFile, job, fullReplyObject);
void BulkPropagatorJob::slotUploadProgress(SyncFileItemPtr item, qint64 sent, qint64 total)
// Completion is signaled with sent=0, total=0; avoid accidentally
// resetting progress due to the sent being zero by ignoring it.
// finishedSignal() is bound to be emitted soon anyway.
// See https://bugreports.qt.io/browse/QTBUG-44782.
if (sent == 0 && total == 0) {
propagator()->reportProgress(*item, sent - total);
void BulkPropagatorJob::slotJobDestroyed(QObject *job)
_jobs.erase(std::remove(_jobs.begin(), _jobs.end(), job), _jobs.end());
void BulkPropagatorJob::adjustLastJobTimeout(AbstractNetworkJob *job, qint64 fileSize) const
constexpr double threeMinutes = 3.0 * 60 * 1000;
// Calculate 3 minutes for each gigabyte of data
qRound64(threeMinutes * static_cast<double>(fileSize) / 1e9),
// Maximum of 30 minutes
static_cast<qint64>(30 * 60 * 1000)));
void BulkPropagatorJob::finalizeOneFile(const BulkUploadItem &oneFile)
// Update the database entry
const auto result = propagator()->updateMetadata(*oneFile._item);
if (!result) {
done(oneFile._item, SyncFileItem::FatalError, tr("Error updating metadata: %1").arg(result.error()));
} else if (*result == Vfs::ConvertToPlaceholderResult::Locked) {
done(oneFile._item, SyncFileItem::SoftError, tr("The file %1 is currently in use").arg(oneFile._item->_file));
// Files that were new on the remote shouldn't have online-only pin state
// even if their parent folder is online-only.
if (oneFile._item->_instruction == CSYNC_INSTRUCTION_NEW
|| oneFile._item->_instruction == CSYNC_INSTRUCTION_TYPE_CHANGE) {
auto &vfs = propagator()->syncOptions()._vfs;
const auto pin = vfs->pinState(oneFile._item->_file);
if (pin && *pin == PinState::OnlineOnly && !vfs->setPinState(oneFile._item->_file, PinState::Unspecified)) {
qCWarning(lcBulkPropagatorJob) << "Could not set pin state of" << oneFile._item->_file << "to unspecified";
// Remove from the progress database:
propagator()->_journal->setUploadInfo(oneFile._item->_file, SyncJournalDb::UploadInfo());
propagator()->_journal->commit("upload file start");
void BulkPropagatorJob::finalize()
for(const auto &oneFile : _filesToUpload) {
if (!oneFile._item->hasErrorStatus()) {
done(oneFile._item, oneFile._item->_status, {});
if (_items.empty()) {
if (!_jobs.empty()) {
// just wait for the other job to finish.
if (!_pendingChecksumFiles.empty()) {
// just wait for the other job to finish.
qCInfo(lcBulkPropagatorJob) << "final status" << _finalStatus;
emit finished(_finalStatus);
} else {
void BulkPropagatorJob::done(SyncFileItemPtr item,
SyncFileItem::Status status,
const QString &errorString)
item->_status = status;
item->_errorString = errorString;
qCInfo(lcBulkPropagatorJob) << "Item completed" << item->destination() << item->_status << item->_instruction << item->_errorString;
handleFileRestoration(item, errorString);
if (propagator()->_abortRequested && (item->_status == SyncFileItem::NormalError
|| item->_status == SyncFileItem::FatalError)) {
// an abort request is ongoing. Change the status to Soft-Error
item->_status = SyncFileItem::SoftError;
if (item->_status != SyncFileItem::Success) {
// Blacklist handling
propagator()->_anotherSyncNeeded = true;
handleJobDoneErrors(item, status);
emit propagator()->itemCompleted(item);
QMap<QByteArray, QByteArray> BulkPropagatorJob::headers(SyncFileItemPtr item) const
QMap<QByteArray, QByteArray> headers;
headers[QByteArrayLiteral("Content-Type")] = QByteArrayLiteral("application/octet-stream");
headers[QByteArrayLiteral("X-File-Mtime")] = QByteArray::number(qint64(item->_modtime));
if (qEnvironmentVariableIntValue("OWNCLOUD_LAZYOPS")) {
headers[QByteArrayLiteral("OC-LazyOps")] = QByteArrayLiteral("true");
if (item->_file.contains(QLatin1String(".sys.admin#recall#"))) {
// This is a file recall triggered by the admin. Note: the
// recall list file created by the admin and downloaded by the
// client (.sys.admin#recall#) also falls into this category
// (albeit users are not supposed to mess up with it)
// We use a special tag header so that the server may decide to store this file away in some admin stage area
// And not directly in the user's area (which would trigger redownloads etc).
headers["OC-Tag"] = ".sys.admin#recall#";
if (!item->_etag.isEmpty() && item->_etag != "empty_etag"
&& item->_instruction != CSYNC_INSTRUCTION_NEW // On new files never send a If-Match
&& item->_instruction != CSYNC_INSTRUCTION_TYPE_CHANGE) {
// We add quotes because the owncloud server always adds quotes around the etag, and
// csync_owncloud.c's owncloud_file_id always strips the quotes.
headers[QByteArrayLiteral("If-Match")] = '"' + item->_etag + '"';
// Set up a conflict file header pointing to the original file
auto conflictRecord = propagator()->_journal->conflictRecord(item->_file.toUtf8());
if (conflictRecord.isValid()) {
headers[QByteArrayLiteral("OC-Conflict")] = "1";
if (!conflictRecord.initialBasePath.isEmpty()) {
headers[QByteArrayLiteral("OC-ConflictInitialBasePath")] = conflictRecord.initialBasePath;
if (!conflictRecord.baseFileId.isEmpty()) {
headers[QByteArrayLiteral("OC-ConflictBaseFileId")] = conflictRecord.baseFileId;
if (conflictRecord.baseModtime != -1) {
headers[QByteArrayLiteral("OC-ConflictBaseMtime")] = QByteArray::number(conflictRecord.baseModtime);
if (!conflictRecord.baseEtag.isEmpty()) {
headers[QByteArrayLiteral("OC-ConflictBaseEtag")] = conflictRecord.baseEtag;
return headers;
void BulkPropagatorJob::abortWithError(SyncFileItemPtr item,
SyncFileItem::Status status,
const QString &error)
done(item, status, error);
void BulkPropagatorJob::checkResettingErrors(SyncFileItemPtr item) const
if (item->_httpErrorCode == 412
|| propagator()->account()->capabilities().httpErrorCodesThatResetFailingChunkedUploads().contains(item->_httpErrorCode)) {
auto uploadInfo = propagator()->_journal->getUploadInfo(item->_file);
uploadInfo._errorCount += 1;
if (uploadInfo._errorCount > 3) {
qCInfo(lcBulkPropagatorJob) << "Reset transfer of" << item->_file
<< "due to repeated error" << item->_httpErrorCode;
uploadInfo = SyncJournalDb::UploadInfo();
} else {
qCInfo(lcBulkPropagatorJob) << "Error count for maybe-reset error" << item->_httpErrorCode
<< "on file" << item->_file
<< "is" << uploadInfo._errorCount;
propagator()->_journal->setUploadInfo(item->_file, uploadInfo);
propagator()->_journal->commit("Upload info");
void BulkPropagatorJob::commonErrorHandling(SyncFileItemPtr item)
// Ensure errors that should eventually reset the chunked upload are tracked.
abortWithError(item, SyncFileItem::NormalError, tr("Error"));
bool BulkPropagatorJob::checkFileStillExists(SyncFileItemPtr item,
const bool finished,
const QString &fullFilePath)
if (!FileSystem::fileExists(fullFilePath)) {
if (!finished) {
abortWithError(item, SyncFileItem::SoftError, tr("The local file was removed during sync."));
return false;
} else {
propagator()->_anotherSyncNeeded = true;
return true;
bool BulkPropagatorJob::checkFileChanged(SyncFileItemPtr item,
const bool finished,
const QString &fullFilePath)
if (!FileSystem::verifyFileUnchanged(fullFilePath, item->_size, item->_modtime)) {
propagator()->_anotherSyncNeeded = true;
if (!finished) {
abortWithError(item, SyncFileItem::SoftError, tr("Local file changed during sync."));
// FIXME: the legacy code was retrying for a few seconds.
// and also checking that after the last chunk, and removed the file in case of INSTRUCTION_NEW
return false;
return true;
void BulkPropagatorJob::computeFileId(SyncFileItemPtr item,
const QJsonObject &fileReply) const
const auto fid = getHeaderFromJsonReply(fileReply, "OC-FileID");
if (!fid.isEmpty()) {
if (!item->_fileId.isEmpty() && item->_fileId != fid) {
qCWarning(lcBulkPropagatorJob) << "File ID changed!" << item->_fileId << fid;
item->_fileId = fid;
void BulkPropagatorJob::handleFileRestoration(SyncFileItemPtr item,
const QString &errorString) const
if (item->_isRestoration) {
if (item->_status == SyncFileItem::Success
|| item->_status == SyncFileItem::Conflict) {
item->_status = SyncFileItem::Restoration;
} else {
item->_errorString += tr("; Restoration Failed: %1").arg(errorString);
} else {
if (item->_errorString.isEmpty()) {
item->_errorString = errorString;
void BulkPropagatorJob::handleBulkUploadBlackList(SyncFileItemPtr item) const
void BulkPropagatorJob::handleJobDoneErrors(SyncFileItemPtr item,
SyncFileItem::Status status)
if (item->hasErrorStatus()) {
qCWarning(lcPropagator) << "Could not complete propagation of" << item->destination() << "by" << this << "with status" << item->_status << "and error:" << item->_errorString;
} else {
qCInfo(lcPropagator) << "Completed propagation of" << item->destination() << "by" << this << "with status" << item->_status;
if (item->_status == SyncFileItem::FatalError) {
// Abort all remaining jobs.
switch (item->_status)
case SyncFileItem::BlacklistedError:
case SyncFileItem::Conflict:
case SyncFileItem::FatalError:
case SyncFileItem::FileIgnored:
case SyncFileItem::FileLocked:
case SyncFileItem::FileNameInvalid:
case SyncFileItem::NoStatus:
case SyncFileItem::NormalError:
case SyncFileItem::Restoration:
case SyncFileItem::SoftError:
_finalStatus = SyncFileItem::NormalError;
qCInfo(lcBulkPropagatorJob) << "modify final status NormalError" << _finalStatus << status;
case SyncFileItem::DetailError:
_finalStatus = SyncFileItem::DetailError;
qCInfo(lcBulkPropagatorJob) << "modify final status DetailError" << _finalStatus << status;
case SyncFileItem::Success: