summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorJamie Heppenstall <jamie.heppenstall@mongodb.com>2020-04-25 21:30:46 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-05-08 14:12:42 +0000
commit307b547e672ffebe30461eae1ed5d6bc36e919ca (patch)
tree46adbb2b5beac5facfcd769d42d6f0f3ae9c19a6 /src/mongo
parent3f124535f04261cb5b9c85528d0971e44ec77128 (diff)
downloadmongo-307b547e672ffebe30461eae1ed5d6bc36e919ca.tar.gz
SERVER-46076 Stream incremental backup data
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h2
-rw-r--r--src/mongo/db/storage/backup_cursor_state.h5
-rw-r--r--src/mongo/db/storage/devnull/devnull_kv_engine.cpp40
-rw-r--r--src/mongo/db/storage/devnull/devnull_kv_engine.h2
-rw-r--r--src/mongo/db/storage/kv/kv_engine.h2
-rw-r--r--src/mongo/db/storage/storage_engine.h99
-rw-r--r--src/mongo/db/storage/storage_engine_impl.cpp5
-rw-r--r--src/mongo/db/storage/storage_engine_impl.h2
-rw-r--r--src/mongo/db/storage/storage_engine_mock.h2
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp355
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h23
11 files changed, 355 insertions, 182 deletions
diff --git a/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h b/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h
index 4a3b70dbe28..7ebd3beba13 100644
--- a/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h
+++ b/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h
@@ -209,7 +209,7 @@ public:
BackupCursorState openBackupCursor(OperationContext* opCtx,
const StorageEngine::BackupOptions& options) final {
- return BackupCursorState{UUID::gen(), boost::none, {}};
+ return BackupCursorState{UUID::gen(), boost::none, nullptr, {}};
}
void closeBackupCursor(OperationContext* opCtx, const UUID& backupId) final {}
diff --git a/src/mongo/db/storage/backup_cursor_state.h b/src/mongo/db/storage/backup_cursor_state.h
index 814e81c4765..317dd3ff05f 100644
--- a/src/mongo/db/storage/backup_cursor_state.h
+++ b/src/mongo/db/storage/backup_cursor_state.h
@@ -41,7 +41,10 @@ namespace mongo {
struct BackupCursorState {
UUID backupId;
boost::optional<Document> preamble;
- StorageEngine::BackupInformation backupInformation;
+ std::unique_ptr<StorageEngine::StreamingCursor> streamingCursor;
+ // 'otherBackupBlocks' includes the backup blocks for the encrypted storage engine in the
+ // enterprise module.
+ std::vector<StorageEngine::BackupBlock> otherBackupBlocks;
};
struct BackupCursorExtendState {
diff --git a/src/mongo/db/storage/devnull/devnull_kv_engine.cpp b/src/mongo/db/storage/devnull/devnull_kv_engine.cpp
index b73bb4a9d4d..4bbc0b9dc11 100644
--- a/src/mongo/db/storage/devnull/devnull_kv_engine.cpp
+++ b/src/mongo/db/storage/devnull/devnull_kv_engine.cpp
@@ -253,12 +253,42 @@ void DevNullKVEngine::setCachePressureForTest(int pressure) {
_cachePressureForTest = pressure;
}
-StatusWith<StorageEngine::BackupInformation> DevNullKVEngine::beginNonBlockingBackup(
+namespace {
+
+class StreamingCursorImpl : public StorageEngine::StreamingCursor {
+public:
+ StreamingCursorImpl() = delete;
+ StreamingCursorImpl(StorageEngine::BackupOptions options)
+ : StorageEngine::StreamingCursor(options) {
+ _backupBlocks = {{"filename.wt"}};
+ _exhaustCursor = false;
+ };
+
+ ~StreamingCursorImpl() = default;
+
+ BSONObj getMetadataObject(UUID backupId) {
+ return BSONObj();
+ }
+
+ StatusWith<std::vector<StorageEngine::BackupBlock>> getNextBatch(const std::size_t batchSize) {
+ if (_exhaustCursor) {
+ std::vector<StorageEngine::BackupBlock> emptyVector;
+ return emptyVector;
+ }
+ _exhaustCursor = true;
+ return _backupBlocks;
+ }
+
+private:
+ std::vector<StorageEngine::BackupBlock> _backupBlocks;
+ bool _exhaustCursor;
+};
+
+} // namespace
+
+StatusWith<std::unique_ptr<StorageEngine::StreamingCursor>> DevNullKVEngine::beginNonBlockingBackup(
OperationContext* opCtx, const StorageEngine::BackupOptions& options) {
- StorageEngine::BackupInformation backupInformation;
- StorageEngine::BackupFile backupFile(0);
- backupInformation.insert({"filename.wt", backupFile});
- return backupInformation;
+ return std::make_unique<StreamingCursorImpl>(options);
}
StatusWith<std::vector<std::string>> DevNullKVEngine::extendBackupCursor(OperationContext* opCtx) {
diff --git a/src/mongo/db/storage/devnull/devnull_kv_engine.h b/src/mongo/db/storage/devnull/devnull_kv_engine.h
index 3f73b4e4ade..f271ae0f0f8 100644
--- a/src/mongo/db/storage/devnull/devnull_kv_engine.h
+++ b/src/mongo/db/storage/devnull/devnull_kv_engine.h
@@ -139,7 +139,7 @@ public:
virtual void endBackup(OperationContext* opCtx) {}
- virtual StatusWith<StorageEngine::BackupInformation> beginNonBlockingBackup(
+ virtual StatusWith<std::unique_ptr<StorageEngine::StreamingCursor>> beginNonBlockingBackup(
OperationContext* opCtx, const StorageEngine::BackupOptions& options) override;
virtual void endNonBlockingBackup(OperationContext* opCtx) override {}
diff --git a/src/mongo/db/storage/kv/kv_engine.h b/src/mongo/db/storage/kv/kv_engine.h
index 76003cfc1d4..f20897f8be2 100644
--- a/src/mongo/db/storage/kv/kv_engine.h
+++ b/src/mongo/db/storage/kv/kv_engine.h
@@ -237,7 +237,7 @@ public:
MONGO_UNREACHABLE;
}
- virtual StatusWith<StorageEngine::BackupInformation> beginNonBlockingBackup(
+ virtual StatusWith<std::unique_ptr<StorageEngine::StreamingCursor>> beginNonBlockingBackup(
OperationContext* opCtx, const StorageEngine::BackupOptions& options) {
return Status(ErrorCodes::CommandNotSupported,
"The current storage engine doesn't support backup mode");
diff --git a/src/mongo/db/storage/storage_engine.h b/src/mongo/db/storage/storage_engine.h
index b2da331d6b5..98285ff0f30 100644
--- a/src/mongo/db/storage/storage_engine.h
+++ b/src/mongo/db/storage/storage_engine.h
@@ -76,35 +76,6 @@ public:
using OldestActiveTransactionTimestampCallback =
std::function<OldestActiveTransactionTimestampResult(Timestamp stableTimestamp)>;
- struct BackupOptions {
- bool disableIncrementalBackup = false;
- bool incrementalBackup = false;
- int blockSizeMB = 16;
- boost::optional<std::string> thisBackupName;
- boost::optional<std::string> srcBackupName;
- };
-
- struct BackupBlock {
- std::uint64_t offset = 0;
- std::uint64_t length = 0;
- };
-
- /**
- * Contains the size of the file to be backed up. This allows the backup application to safely
- * truncate the file for incremental backups. Files that have had changes since the last
- * incremental backup will have their changed file blocks listed.
- */
- struct BackupFile {
- BackupFile() = delete;
- explicit BackupFile(std::uint64_t fileSize) : fileSize(fileSize){};
-
- std::uint64_t fileSize;
- std::vector<BackupBlock> blocksToCopy;
- };
-
- // Map of filenames to backup file information.
- using BackupInformation = stdx::unordered_map<std::string, BackupFile>;
-
/**
* The interface for creating new instances of storage engines.
*
@@ -313,21 +284,67 @@ public:
virtual Status disableIncrementalBackup(OperationContext* opCtx) = 0;
/**
- * When performing an incremental backup, we first need a basis for future incremental backups.
- * The basis will be a full backup called 'thisBackupName'. For future incremental backups, the
- * storage engine will take a backup called 'thisBackupName' which will contain the changes made
- * to data files since the backup named 'srcBackupName'.
+ * Represents the options that the storage engine can use during full and incremental backups.
+ *
+ * When performing a full backup where incrementalBackup=false, the values of 'blockSizeMB',
+ * 'thisBackupName', and 'srcBackupName' should not be modified.
*
- * The storage engine must use an upper bound limit of 'blockSizeMB' when returning changed
- * file blocks.
+ * When performing an incremental backup where incrementalBackup=true, we first need a basis for
+ * future incremental backups. This first basis (named 'thisBackupName'), which is a full
+ * backup, must pass incrementalBackup=true and should not set 'srcBackupName'. An incremental
+ * backup will include changed blocks since 'srcBackupName' was taken. This backup (also named
+ * 'thisBackupName') will then become the basis for future incremental backups.
+ *
+ * Note that 'thisBackupName' must exist if and only if incrementalBackup=true while
+ * 'srcBackupName' must not exist if incrementalBackup=false but may or may not exist if
+ * incrementalBackup=true.
+ */
+ struct BackupOptions {
+ bool disableIncrementalBackup = false;
+ bool incrementalBackup = false;
+ int blockSizeMB = 16;
+ boost::optional<std::string> thisBackupName;
+ boost::optional<std::string> srcBackupName;
+ };
+
+ /**
+ * Represents the file blocks returned by the storage engine during both full and incremental
+ * backups. In the case of a full backup, each block is an entire file with offset=0 and
+ * length=fileSize. In the case of the first basis for future incremental backups, each block is
+ * an entire file with offset=0 and length=0. In the case of a subsequent incremental backup,
+ * each block reflects changes made to data files since the basis (named 'thisBackupName') and
+ * each block has a maximum size of 'blockSizeMB'.
*
- * The first full backup meant for incremental and future incremental backups must pass
- * 'incrementalBackup' as true.
- * 'thisBackupName' must exist only if 'incrementalBackup' is true.
- * 'srcBackupName' must not exist when 'incrementalBackup' is false but may or may not exist
- * when 'incrementalBackup' is true.
+ * If a file is unchanged in a subsequent incremental backup, a single block is returned with
+ * offset=0 and length=0. This allows consumers of the backup API to safely truncate files that
+ * are not returned by the backup cursor.
+ */
+ struct BackupBlock {
+ std::string filename;
+ std::uint64_t offset = 0;
+ std::uint64_t length = 0;
+ std::uint64_t fileSize = 0;
+ };
+
+ /**
+ * Abstract class required for streaming both full and incremental backups. The function
+ * getNextBatch() returns a vector containing 'batchSize' or less BackupBlocks. The
+ * StreamingCursor has been exhausted if getNextBatch() returns an empty vector.
*/
- virtual StatusWith<StorageEngine::BackupInformation> beginNonBlockingBackup(
+ class StreamingCursor {
+ public:
+ StreamingCursor() = delete;
+ explicit StreamingCursor(BackupOptions options) : options(options){};
+
+ virtual ~StreamingCursor() = default;
+
+ virtual StatusWith<std::vector<BackupBlock>> getNextBatch(const std::size_t batchSize) = 0;
+
+ protected:
+ BackupOptions options;
+ };
+
+ virtual StatusWith<std::unique_ptr<StreamingCursor>> beginNonBlockingBackup(
OperationContext* opCtx, const BackupOptions& options) = 0;
virtual void endNonBlockingBackup(OperationContext* opCtx) = 0;
diff --git a/src/mongo/db/storage/storage_engine_impl.cpp b/src/mongo/db/storage/storage_engine_impl.cpp
index 7cc90b1662a..91ac87542c6 100644
--- a/src/mongo/db/storage/storage_engine_impl.cpp
+++ b/src/mongo/db/storage/storage_engine_impl.cpp
@@ -688,8 +688,9 @@ Status StorageEngineImpl::disableIncrementalBackup(OperationContext* opCtx) {
return _engine->disableIncrementalBackup(opCtx);
}
-StatusWith<StorageEngine::BackupInformation> StorageEngineImpl::beginNonBlockingBackup(
- OperationContext* opCtx, const StorageEngine::BackupOptions& options) {
+StatusWith<std::unique_ptr<StorageEngine::StreamingCursor>>
+StorageEngineImpl::beginNonBlockingBackup(OperationContext* opCtx,
+ const StorageEngine::BackupOptions& options) {
return _engine->beginNonBlockingBackup(opCtx, options);
}
diff --git a/src/mongo/db/storage/storage_engine_impl.h b/src/mongo/db/storage/storage_engine_impl.h
index 9d063d4770e..3fd3bab86d1 100644
--- a/src/mongo/db/storage/storage_engine_impl.h
+++ b/src/mongo/db/storage/storage_engine_impl.h
@@ -99,7 +99,7 @@ public:
virtual Status disableIncrementalBackup(OperationContext* opCtx) override;
- virtual StatusWith<BackupInformation> beginNonBlockingBackup(
+ virtual StatusWith<std::unique_ptr<StreamingCursor>> beginNonBlockingBackup(
OperationContext* opCtx, const BackupOptions& options) override;
virtual void endNonBlockingBackup(OperationContext* opCtx) override;
diff --git a/src/mongo/db/storage/storage_engine_mock.h b/src/mongo/db/storage/storage_engine_mock.h
index fd4a2d388c9..b8cb1cbde02 100644
--- a/src/mongo/db/storage/storage_engine_mock.h
+++ b/src/mongo/db/storage/storage_engine_mock.h
@@ -81,7 +81,7 @@ public:
return Status(ErrorCodes::CommandNotSupported,
"The current storage engine doesn't support backup mode");
}
- StatusWith<StorageEngine::BackupInformation> beginNonBlockingBackup(
+ StatusWith<std::unique_ptr<StorageEngine::StreamingCursor>> beginNonBlockingBackup(
OperationContext* opCtx, const StorageEngine::BackupOptions& options) final {
return Status(ErrorCodes::CommandNotSupported,
"The current storage engine doesn't support backup mode");
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp
index fc0f17ffa26..661b0e009c1 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp
@@ -500,95 +500,6 @@ Status OpenReadTransactionParam::setFromString(const std::string& str) {
return _data->resize(num);
}
-namespace {
-
-StatusWith<StorageEngine::BackupInformation> getBackupInformationFromBackupCursor(
- WT_SESSION* session,
- WT_CURSOR* cursor,
- bool incrementalBackup,
- bool fullBackup,
- std::string dbPath,
- const char* statusPrefix) {
- int wtRet;
- StorageEngine::BackupInformation backupInformation;
- const char* filename;
- const auto directoryPath = boost::filesystem::path(dbPath);
- const auto wiredTigerLogFilePrefix = "WiredTigerLog";
- while ((wtRet = cursor->next(cursor)) == 0) {
- invariantWTOK(cursor->get_key(cursor, &filename));
-
- std::string name(filename);
-
- boost::filesystem::path filePath = directoryPath;
- if (name.find(wiredTigerLogFilePrefix) == 0) {
- // TODO SERVER-13455:replace `journal/` with the configurable journal path.
- filePath /= boost::filesystem::path("journal");
- }
- filePath /= name;
-
- boost::system::error_code errorCode;
- const std::uint64_t fileSize = boost::filesystem::file_size(filePath, errorCode);
- uassert(31403,
- "Failed to get a file's size. Filename: {} Error: {}"_format(filePath.string(),
- errorCode.message()),
- !errorCode);
-
- StorageEngine::BackupFile backupFile(fileSize);
- backupInformation.insert({filePath.string(), backupFile});
-
- // For the first full incremental backup, include the offset and length.
- if (incrementalBackup && fullBackup) {
- backupInformation.at(filePath.string()).blocksToCopy.push_back({0, fileSize});
- }
-
- // Full backups cannot open an incremental cursor, even if they are the first full backup
- // for incremental.
- if (!incrementalBackup || fullBackup) {
- continue;
- }
-
- // For each file listed, open a duplicate backup cursor and get the blocks to copy.
- std::stringstream ss;
- ss << "incremental=(file=" << filename << ")";
- const std::string config = ss.str();
- WT_CURSOR* dupCursor;
- wtRet = session->open_cursor(session, nullptr, cursor, config.c_str(), &dupCursor);
- if (wtRet != 0) {
- return wtRCToStatus(wtRet);
- }
-
- while ((wtRet = dupCursor->next(dupCursor)) == 0) {
- uint64_t offset, size, type;
- invariantWTOK(dupCursor->get_key(dupCursor, &offset, &size, &type));
- LOGV2_DEBUG(22311,
- 2,
- "Block to copy for incremental backup: filename: {filePath_string}, "
- "offset: {offset}, size: {size}, type: {type}",
- "filePath_string"_attr = filePath.string(),
- "offset"_attr = offset,
- "size"_attr = size,
- "type"_attr = type);
- backupInformation.at(filePath.string()).blocksToCopy.push_back({offset, size});
- }
-
- if (wtRet != WT_NOTFOUND) {
- return wtRCToStatus(wtRet);
- }
-
- wtRet = dupCursor->close(dupCursor);
- if (wtRet != 0) {
- return wtRCToStatus(wtRet);
- }
- }
-
- if (wtRet != WT_NOTFOUND) {
- return wtRCToStatus(wtRet, statusPrefix);
- }
- return backupInformation;
-}
-
-} // namespace
-
StringData WiredTigerKVEngine::kTableUriPrefix = "table:"_sd;
WiredTigerKVEngine::WiredTigerKVEngine(const std::string& canonicalName,
@@ -1170,8 +1081,191 @@ Status WiredTigerKVEngine::disableIncrementalBackup(OperationContext* opCtx) {
return Status::OK();
}
-StatusWith<StorageEngine::BackupInformation> WiredTigerKVEngine::beginNonBlockingBackup(
- OperationContext* opCtx, const StorageEngine::BackupOptions& options) {
+namespace {
+
+const boost::filesystem::path constructFilePath(std::string path, std::string filename) {
+ const auto directoryPath = boost::filesystem::path(path);
+ const auto wiredTigerLogFilePrefix = "WiredTigerLog";
+
+ boost::filesystem::path filePath = directoryPath;
+ if (filename.find(wiredTigerLogFilePrefix) == 0) {
+ // TODO SERVER-13455: Replace `journal/` with the configurable journal path.
+ filePath /= boost::filesystem::path("journal");
+ }
+ filePath /= filename;
+
+ return filePath;
+}
+
+std::vector<std::string> getUniqueFiles(const std::vector<std::string>& files,
+ const std::set<std::string>& referenceFiles) {
+ std::vector<std::string> result;
+ for (auto& file : files) {
+ if (referenceFiles.find(file) == referenceFiles.end()) {
+ result.push_back(file);
+ }
+ }
+ return result;
+}
+
+class StreamingCursorImpl : public StorageEngine::StreamingCursor {
+public:
+ StreamingCursorImpl() = delete;
+ explicit StreamingCursorImpl(WT_SESSION* session,
+ std::string path,
+ StorageEngine::BackupOptions options,
+ WiredTigerBackup* wtBackup)
+ : StorageEngine::StreamingCursor(options),
+ _session(session),
+ _path(path),
+ _wtBackup(wtBackup){};
+
+ ~StreamingCursorImpl() = default;
+
+ StatusWith<std::vector<StorageEngine::BackupBlock>> getNextBatch(const std::size_t batchSize) {
+ int wtRet;
+ std::vector<StorageEngine::BackupBlock> backupBlocks;
+
+ stdx::lock_guard<Latch> backupCursorLk(_wtBackup->wtBackupCursorMutex);
+ while (backupBlocks.size() < batchSize) {
+ stdx::lock_guard<Latch> backupDupCursorLk(_wtBackup->wtBackupDupCursorMutex);
+
+ // We may still have backup blocks to retrieve for the existing file that
+ // _wtBackup->cursor is open on if _wtBackup->dupCursor exists. In this case, do not
+ // call next() on _wtBackup->cursor.
+ if (!_wtBackup->dupCursor) {
+ wtRet = (_wtBackup->cursor)->next(_wtBackup->cursor);
+ if (wtRet != 0) {
+ break;
+ }
+ }
+
+ const char* filename;
+ invariantWTOK((_wtBackup->cursor)->get_key(_wtBackup->cursor, &filename));
+ const boost::filesystem::path filePath = constructFilePath(_path, {filename});
+
+ const auto wiredTigerLogFilePrefix = "WiredTigerLog";
+ if (std::string(filename).find(wiredTigerLogFilePrefix) == 0) {
+ // If extendBackupCursor() is called prior to the StreamingCursor running into log
+ // files, we must ensure that subsequent calls to getNextBatch() do not return
+ // duplicate files.
+ if ((_wtBackup->logFilePathsSeenByExtendBackupCursor).find(filePath.string()) !=
+ (_wtBackup->logFilePathsSeenByExtendBackupCursor).end()) {
+ break;
+ }
+ (_wtBackup->logFilePathsSeenByGetNextBatch).insert(filePath.string());
+ }
+
+ boost::system::error_code errorCode;
+ const std::uint64_t fileSize = boost::filesystem::file_size(filePath, errorCode);
+ uassert(31403,
+ "Failed to get a file's size. Filename: {} Error: {}"_format(
+ filePath.string(), errorCode.message()),
+ !errorCode);
+
+ if (options.incrementalBackup && options.srcBackupName) {
+ // For a subsequent incremental backup, each BackupBlock corresponds to changes
+ // made to data files since the initial incremental backup. Each BackupBlock has a
+ // maximum size of options.blockSizeMB. Incremental backups open a duplicate cursor,
+ // which is stored in _wtBackup->dupCursor.
+ //
+ // 'backupBlocks' is an out parameter.
+ Status status = _getNextIncrementalBatchForFile(
+ filename, filePath, fileSize, batchSize, &backupBlocks);
+
+ if (!status.isOK()) {
+ return status;
+ }
+ } else {
+ // For a full backup or the initial incremental backup, each BackupBlock corresponds
+ // to an entire file. Full backups cannot open an incremental cursor, even if they
+ // are the initial incremental backup.
+ const std::uint64_t length = options.incrementalBackup ? fileSize : 0;
+ backupBlocks.push_back({filePath.string(), 0 /* offset */, length, fileSize});
+ }
+ }
+
+ if (wtRet != WT_NOTFOUND && backupBlocks.size() != batchSize) {
+ return wtRCToStatus(wtRet);
+ }
+
+ return backupBlocks;
+ }
+
+private:
+ Status _getNextIncrementalBatchForFile(const char* filename,
+ boost::filesystem::path filePath,
+ const std::uint64_t fileSize,
+ const std::size_t batchSize,
+ std::vector<StorageEngine::BackupBlock>* backupBlocks) {
+ // For each file listed, open a duplicate backup cursor and get the blocks to copy.
+ std::stringstream ss;
+ ss << "incremental=(file=" << filename << ")";
+ const std::string config = ss.str();
+
+ int wtRet;
+ bool fileUnchangedFlag = false;
+ if (!_wtBackup->dupCursor) {
+ wtRet = (_session)->open_cursor(
+ _session, nullptr, _wtBackup->cursor, config.c_str(), &_wtBackup->dupCursor);
+ if (wtRet != 0) {
+ return wtRCToStatus(wtRet);
+ }
+ fileUnchangedFlag = true;
+ }
+
+ while (backupBlocks->size() < batchSize) {
+ wtRet = (_wtBackup->dupCursor)->next(_wtBackup->dupCursor);
+ if (wtRet == WT_NOTFOUND) {
+ break;
+ }
+ invariantWTOK(wtRet);
+ fileUnchangedFlag = false;
+
+ uint64_t offset, size, type;
+ invariantWTOK(
+ (_wtBackup->dupCursor)->get_key(_wtBackup->dupCursor, &offset, &size, &type));
+ LOGV2_DEBUG(22311,
+ 2,
+ "Block to copy for incremental backup: filename: {filePath_string}, "
+ "offset: {offset}, size: {size}, type: {type}",
+ "filePath_string"_attr = filePath.string(),
+ "offset"_attr = offset,
+ "size"_attr = size,
+ "type"_attr = type);
+ backupBlocks->push_back({filePath.string(), offset, size, fileSize});
+ }
+
+ // If the file is unchanged, push a BackupBlock with offset=0 and length=0. This allows us
+ // to distinguish between an unchanged file and a deleted file in an incremental backup.
+ if (fileUnchangedFlag) {
+ backupBlocks->push_back({filePath.string(), 0 /* offset */, 0 /* length */, fileSize});
+ }
+
+ // If the duplicate backup cursor has been exhausted, close it and set
+ // _wtBackup->dupCursor=nullptr.
+ if (wtRet != 0) {
+ if (wtRet != WT_NOTFOUND ||
+ (wtRet = (_wtBackup->dupCursor)->close(_wtBackup->dupCursor)) != 0) {
+ return wtRCToStatus(wtRet);
+ }
+ _wtBackup->dupCursor = nullptr;
+ (_wtBackup->wtBackupDupCursorCV).notify_one();
+ }
+
+ return Status::OK();
+ }
+
+ WT_SESSION* _session;
+ std::string _path;
+ WiredTigerBackup* _wtBackup; // '_wtBackup' is an out parameter.
+};
+
+} // namespace
+
+StatusWith<std::unique_ptr<StorageEngine::StreamingCursor>>
+WiredTigerKVEngine::beginNonBlockingBackup(OperationContext* opCtx,
+ const StorageEngine::BackupOptions& options) {
uassert(51034, "Cannot open backup cursor with in-memory mode.", !isEphemeral());
std::stringstream ss;
@@ -1188,6 +1282,8 @@ StatusWith<StorageEngine::BackupInformation> WiredTigerKVEngine::beginNonBlockin
ss << ")";
}
+ stdx::lock_guard<Latch> backupCursorLk(_wtBackup.wtBackupCursorMutex);
+
// Oplog truncation thread won't remove oplog since the checkpoint pinned by the backup cursor.
stdx::lock_guard<Latch> lock(_oplogPinnedByBackupMutex);
_checkpointThread->assignOplogNeededForCrashRecoveryTo(&_oplogPinnedByBackup);
@@ -1208,62 +1304,74 @@ StatusWith<StorageEngine::BackupInformation> WiredTigerKVEngine::beginNonBlockin
return wtRCToStatus(wtRet);
}
- const bool fullBackup = !options.srcBackupName;
- auto swBackupInfo = getBackupInformationFromBackupCursor(session,
- cursor,
- options.incrementalBackup,
- fullBackup,
- _path,
- "Error opening backup cursor.");
+ // A nullptr indicates that no duplicate cursor is open during an incremental backup.
+ stdx::lock_guard<Latch> backupDupCursorLk(_wtBackup.wtBackupDupCursorMutex);
+ _wtBackup.dupCursor = nullptr;
- if (!swBackupInfo.isOK()) {
- return swBackupInfo;
- }
+ invariant(_wtBackup.logFilePathsSeenByExtendBackupCursor.empty());
+ invariant(_wtBackup.logFilePathsSeenByGetNextBatch.empty());
+ auto streamingCursor =
+ std::make_unique<StreamingCursorImpl>(session, _path, options, &_wtBackup);
pinOplogGuard.dismiss();
_backupSession = std::move(sessionRaii);
- _backupCursor = cursor;
+ _wtBackup.cursor = cursor;
- return swBackupInfo;
+ return streamingCursor;
}
void WiredTigerKVEngine::endNonBlockingBackup(OperationContext* opCtx) {
+ stdx::lock_guard<Latch> backupCursorLk(_wtBackup.wtBackupCursorMutex);
+ stdx::lock_guard<Latch> backupDupCursorLk(_wtBackup.wtBackupDupCursorMutex);
_backupSession.reset();
- // Oplog truncation thread can now remove the pinned oplog.
- stdx::lock_guard<Latch> lock(_oplogPinnedByBackupMutex);
- _oplogPinnedByBackup = boost::none;
- _backupCursor = nullptr;
+ {
+ // Oplog truncation thread can now remove the pinned oplog.
+ stdx::lock_guard<Latch> lock(_oplogPinnedByBackupMutex);
+ _oplogPinnedByBackup = boost::none;
+ }
+ _wtBackup.cursor = nullptr;
+ _wtBackup.dupCursor = nullptr;
+ _wtBackup.logFilePathsSeenByExtendBackupCursor = {};
+ _wtBackup.logFilePathsSeenByGetNextBatch = {};
}
StatusWith<std::vector<std::string>> WiredTigerKVEngine::extendBackupCursor(
OperationContext* opCtx) {
uassert(51033, "Cannot extend backup cursor with in-memory mode.", !isEphemeral());
- invariant(_backupCursor);
+ invariant(_wtBackup.cursor);
+ stdx::unique_lock<Latch> backupDupCursorLk(_wtBackup.wtBackupDupCursorMutex);
+
+ MONGO_IDLE_THREAD_BLOCK;
+ _wtBackup.wtBackupDupCursorCV.wait(backupDupCursorLk, [&] { return !_wtBackup.dupCursor; });
// The "target=(\"log:\")" configuration string for the cursor will ensure that we only see the
// log files when iterating on the cursor.
WT_CURSOR* cursor = nullptr;
WT_SESSION* session = _backupSession->getSession();
- int wtRet = session->open_cursor(session, nullptr, _backupCursor, "target=(\"log:\")", &cursor);
+ int wtRet =
+ session->open_cursor(session, nullptr, _wtBackup.cursor, "target=(\"log:\")", &cursor);
if (wtRet != 0) {
return wtRCToStatus(wtRet);
}
- StatusWith<StorageEngine::BackupInformation> swBackupInfo =
- getBackupInformationFromBackupCursor(session,
- cursor,
- /*incrementalBackup=*/false,
- /*fullBackup=*/true,
- _path,
- "Error extending backup cursor.");
+ const char* filename;
+ std::vector<std::string> filePaths;
- wtRet = cursor->close(cursor);
- if (wtRet != 0) {
+ while ((wtRet = cursor->next(cursor)) == 0) {
+ invariantWTOK(cursor->get_key(cursor, &filename));
+ std::string name(filename);
+ const boost::filesystem::path filePath = constructFilePath(_path, name);
+ filePaths.push_back(filePath.string());
+ _wtBackup.logFilePathsSeenByExtendBackupCursor.insert(filePath.string());
+ }
+
+ if (wtRet != WT_NOTFOUND) {
return wtRCToStatus(wtRet);
}
- if (!swBackupInfo.isOK()) {
- return swBackupInfo.getStatus();
+ wtRet = cursor->close(cursor);
+ if (wtRet != 0) {
+ return wtRCToStatus(wtRet);
}
// Once all the backup cursors have been opened on a sharded cluster, we need to ensure that the
@@ -1271,12 +1379,7 @@ StatusWith<std::vector<std::string>> WiredTigerKVEngine::extendBackupCursor(
// have a consistent view of the data. For shards that opened their backup cursor before the
// established point-in-time for backup, they will need to create a full copy of the additional
// journal files returned by this method to ensure a consistent backup of the data is taken.
- std::vector<std::string> filenames;
- for (const auto& entry : swBackupInfo.getValue()) {
- filenames.push_back(entry.first);
- }
-
- return {filenames};
+ return getUniqueFiles(filePaths, _wtBackup.logFilePathsSeenByGetNextBatch);
}
void WiredTigerKVEngine::syncSizeInfo(bool sync) const {
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h
index 3458e072d4d..4bebe95e99b 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h
@@ -68,6 +68,24 @@ struct WiredTigerFileVersion {
std::string getDowngradeString();
};
+struct WiredTigerBackup {
+ WT_CURSOR* cursor = nullptr;
+ WT_CURSOR* dupCursor = nullptr;
+ std::set<std::string> logFilePathsSeenByExtendBackupCursor;
+ std::set<std::string> logFilePathsSeenByGetNextBatch;
+
+ // 'wtBackupCursorMutex' provides concurrency control between beginNonBlockingBackup(),
+ // endNonBlockingBackup(), and getNextBatch() because we stream the output of the backup cursor.
+ Mutex wtBackupCursorMutex = MONGO_MAKE_LATCH("WiredTigerKVEngine::wtBackupCursorMutex");
+
+ // 'wtBackupDupCursorMutex' provides concurrency control between getNextBatch() and
+ // extendBackupCursor() because WiredTiger only allows one duplicate cursor to be open at a
+ // time. extendBackupCursor() blocks on condition variable 'wtBackupDupCursorCV' if a duplicate
+ // cursor is already open.
+ Mutex wtBackupDupCursorMutex = MONGO_MAKE_LATCH("WiredTigerKVEngine::wtBackupDupCursorMutex");
+ stdx::condition_variable wtBackupDupCursorCV;
+};
+
class WiredTigerKVEngine final : public KVEngine {
public:
static StringData kTableUriPrefix;
@@ -183,7 +201,7 @@ public:
Status disableIncrementalBackup(OperationContext* opCtx) override;
- StatusWith<StorageEngine::BackupInformation> beginNonBlockingBackup(
+ StatusWith<std::unique_ptr<StorageEngine::StreamingCursor>> beginNonBlockingBackup(
OperationContext* opCtx, const StorageEngine::BackupOptions& options) override;
void endNonBlockingBackup(OperationContext* opCtx) override;
@@ -449,7 +467,8 @@ private:
mutable Date_t _previousCheckedDropsQueued;
std::unique_ptr<WiredTigerSession> _backupSession;
- WT_CURSOR* _backupCursor;
+ WiredTigerBackup _wtBackup;
+
mutable Mutex _oplogPinnedByBackupMutex =
MONGO_MAKE_LATCH("WiredTigerKVEngine::_oplogPinnedByBackupMutex");
boost::optional<Timestamp> _oplogPinnedByBackup;