diff options
author | Jamie Heppenstall <jamie.heppenstall@mongodb.com> | 2020-04-25 21:30:46 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-05-08 14:12:42 +0000 |
commit | 307b547e672ffebe30461eae1ed5d6bc36e919ca (patch) | |
tree | 46adbb2b5beac5facfcd769d42d6f0f3ae9c19a6 /src | |
parent | 3f124535f04261cb5b9c85528d0971e44ec77128 (diff) | |
download | mongo-307b547e672ffebe30461eae1ed5d6bc36e919ca.tar.gz |
SERVER-46076 Stream incremental backup data
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h | 2 | ||||
-rw-r--r-- | src/mongo/db/storage/backup_cursor_state.h | 5 | ||||
-rw-r--r-- | src/mongo/db/storage/devnull/devnull_kv_engine.cpp | 40 | ||||
-rw-r--r-- | src/mongo/db/storage/devnull/devnull_kv_engine.h | 2 | ||||
-rw-r--r-- | src/mongo/db/storage/kv/kv_engine.h | 2 | ||||
-rw-r--r-- | src/mongo/db/storage/storage_engine.h | 99 | ||||
-rw-r--r-- | src/mongo/db/storage/storage_engine_impl.cpp | 5 | ||||
-rw-r--r-- | src/mongo/db/storage/storage_engine_impl.h | 2 | ||||
-rw-r--r-- | src/mongo/db/storage/storage_engine_mock.h | 2 | ||||
-rw-r--r-- | src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp | 355 | ||||
-rw-r--r-- | src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h | 23 |
11 files changed, 355 insertions, 182 deletions
diff --git a/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h b/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h index 4a3b70dbe28..7ebd3beba13 100644 --- a/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h +++ b/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h @@ -209,7 +209,7 @@ public: BackupCursorState openBackupCursor(OperationContext* opCtx, const StorageEngine::BackupOptions& options) final { - return BackupCursorState{UUID::gen(), boost::none, {}}; + return BackupCursorState{UUID::gen(), boost::none, nullptr, {}}; } void closeBackupCursor(OperationContext* opCtx, const UUID& backupId) final {} diff --git a/src/mongo/db/storage/backup_cursor_state.h b/src/mongo/db/storage/backup_cursor_state.h index 814e81c4765..317dd3ff05f 100644 --- a/src/mongo/db/storage/backup_cursor_state.h +++ b/src/mongo/db/storage/backup_cursor_state.h @@ -41,7 +41,10 @@ namespace mongo { struct BackupCursorState { UUID backupId; boost::optional<Document> preamble; - StorageEngine::BackupInformation backupInformation; + std::unique_ptr<StorageEngine::StreamingCursor> streamingCursor; + // 'otherBackupBlocks' includes the backup blocks for the encrypted storage engine in the + // enterprise module. + std::vector<StorageEngine::BackupBlock> otherBackupBlocks; }; struct BackupCursorExtendState { diff --git a/src/mongo/db/storage/devnull/devnull_kv_engine.cpp b/src/mongo/db/storage/devnull/devnull_kv_engine.cpp index b73bb4a9d4d..4bbc0b9dc11 100644 --- a/src/mongo/db/storage/devnull/devnull_kv_engine.cpp +++ b/src/mongo/db/storage/devnull/devnull_kv_engine.cpp @@ -253,12 +253,42 @@ void DevNullKVEngine::setCachePressureForTest(int pressure) { _cachePressureForTest = pressure; } -StatusWith<StorageEngine::BackupInformation> DevNullKVEngine::beginNonBlockingBackup( +namespace { + +class StreamingCursorImpl : public StorageEngine::StreamingCursor { +public: + StreamingCursorImpl() = delete; + StreamingCursorImpl(StorageEngine::BackupOptions options) + : StorageEngine::StreamingCursor(options) { + _backupBlocks = {{"filename.wt"}}; + _exhaustCursor = false; + }; + + ~StreamingCursorImpl() = default; + + BSONObj getMetadataObject(UUID backupId) { + return BSONObj(); + } + + StatusWith<std::vector<StorageEngine::BackupBlock>> getNextBatch(const std::size_t batchSize) { + if (_exhaustCursor) { + std::vector<StorageEngine::BackupBlock> emptyVector; + return emptyVector; + } + _exhaustCursor = true; + return _backupBlocks; + } + +private: + std::vector<StorageEngine::BackupBlock> _backupBlocks; + bool _exhaustCursor; +}; + +} // namespace + +StatusWith<std::unique_ptr<StorageEngine::StreamingCursor>> DevNullKVEngine::beginNonBlockingBackup( OperationContext* opCtx, const StorageEngine::BackupOptions& options) { - StorageEngine::BackupInformation backupInformation; - StorageEngine::BackupFile backupFile(0); - backupInformation.insert({"filename.wt", backupFile}); - return backupInformation; + return std::make_unique<StreamingCursorImpl>(options); } StatusWith<std::vector<std::string>> DevNullKVEngine::extendBackupCursor(OperationContext* opCtx) { diff --git a/src/mongo/db/storage/devnull/devnull_kv_engine.h b/src/mongo/db/storage/devnull/devnull_kv_engine.h index 3f73b4e4ade..f271ae0f0f8 100644 --- a/src/mongo/db/storage/devnull/devnull_kv_engine.h +++ b/src/mongo/db/storage/devnull/devnull_kv_engine.h @@ -139,7 +139,7 @@ public: virtual void endBackup(OperationContext* opCtx) {} - virtual StatusWith<StorageEngine::BackupInformation> beginNonBlockingBackup( + virtual StatusWith<std::unique_ptr<StorageEngine::StreamingCursor>> beginNonBlockingBackup( OperationContext* opCtx, const StorageEngine::BackupOptions& options) override; virtual void endNonBlockingBackup(OperationContext* opCtx) override {} diff --git a/src/mongo/db/storage/kv/kv_engine.h b/src/mongo/db/storage/kv/kv_engine.h index 76003cfc1d4..f20897f8be2 100644 --- a/src/mongo/db/storage/kv/kv_engine.h +++ b/src/mongo/db/storage/kv/kv_engine.h @@ -237,7 +237,7 @@ public: MONGO_UNREACHABLE; } - virtual StatusWith<StorageEngine::BackupInformation> beginNonBlockingBackup( + virtual StatusWith<std::unique_ptr<StorageEngine::StreamingCursor>> beginNonBlockingBackup( OperationContext* opCtx, const StorageEngine::BackupOptions& options) { return Status(ErrorCodes::CommandNotSupported, "The current storage engine doesn't support backup mode"); diff --git a/src/mongo/db/storage/storage_engine.h b/src/mongo/db/storage/storage_engine.h index b2da331d6b5..98285ff0f30 100644 --- a/src/mongo/db/storage/storage_engine.h +++ b/src/mongo/db/storage/storage_engine.h @@ -76,35 +76,6 @@ public: using OldestActiveTransactionTimestampCallback = std::function<OldestActiveTransactionTimestampResult(Timestamp stableTimestamp)>; - struct BackupOptions { - bool disableIncrementalBackup = false; - bool incrementalBackup = false; - int blockSizeMB = 16; - boost::optional<std::string> thisBackupName; - boost::optional<std::string> srcBackupName; - }; - - struct BackupBlock { - std::uint64_t offset = 0; - std::uint64_t length = 0; - }; - - /** - * Contains the size of the file to be backed up. This allows the backup application to safely - * truncate the file for incremental backups. Files that have had changes since the last - * incremental backup will have their changed file blocks listed. - */ - struct BackupFile { - BackupFile() = delete; - explicit BackupFile(std::uint64_t fileSize) : fileSize(fileSize){}; - - std::uint64_t fileSize; - std::vector<BackupBlock> blocksToCopy; - }; - - // Map of filenames to backup file information. - using BackupInformation = stdx::unordered_map<std::string, BackupFile>; - /** * The interface for creating new instances of storage engines. * @@ -313,21 +284,67 @@ public: virtual Status disableIncrementalBackup(OperationContext* opCtx) = 0; /** - * When performing an incremental backup, we first need a basis for future incremental backups. - * The basis will be a full backup called 'thisBackupName'. For future incremental backups, the - * storage engine will take a backup called 'thisBackupName' which will contain the changes made - * to data files since the backup named 'srcBackupName'. + * Represents the options that the storage engine can use during full and incremental backups. + * + * When performing a full backup where incrementalBackup=false, the values of 'blockSizeMB', + * 'thisBackupName', and 'srcBackupName' should not be modified. * - * The storage engine must use an upper bound limit of 'blockSizeMB' when returning changed - * file blocks. + * When performing an incremental backup where incrementalBackup=true, we first need a basis for + * future incremental backups. This first basis (named 'thisBackupName'), which is a full + * backup, must pass incrementalBackup=true and should not set 'srcBackupName'. An incremental + * backup will include changed blocks since 'srcBackupName' was taken. This backup (also named + * 'thisBackupName') will then become the basis for future incremental backups. + * + * Note that 'thisBackupName' must exist if and only if incrementalBackup=true while + * 'srcBackupName' must not exist if incrementalBackup=false but may or may not exist if + * incrementalBackup=true. + */ + struct BackupOptions { + bool disableIncrementalBackup = false; + bool incrementalBackup = false; + int blockSizeMB = 16; + boost::optional<std::string> thisBackupName; + boost::optional<std::string> srcBackupName; + }; + + /** + * Represents the file blocks returned by the storage engine during both full and incremental + * backups. In the case of a full backup, each block is an entire file with offset=0 and + * length=fileSize. In the case of the first basis for future incremental backups, each block is + * an entire file with offset=0 and length=0. In the case of a subsequent incremental backup, + * each block reflects changes made to data files since the basis (named 'thisBackupName') and + * each block has a maximum size of 'blockSizeMB'. * - * The first full backup meant for incremental and future incremental backups must pass - * 'incrementalBackup' as true. - * 'thisBackupName' must exist only if 'incrementalBackup' is true. - * 'srcBackupName' must not exist when 'incrementalBackup' is false but may or may not exist - * when 'incrementalBackup' is true. + * If a file is unchanged in a subsequent incremental backup, a single block is returned with + * offset=0 and length=0. This allows consumers of the backup API to safely truncate files that + * are not returned by the backup cursor. + */ + struct BackupBlock { + std::string filename; + std::uint64_t offset = 0; + std::uint64_t length = 0; + std::uint64_t fileSize = 0; + }; + + /** + * Abstract class required for streaming both full and incremental backups. The function + * getNextBatch() returns a vector containing 'batchSize' or less BackupBlocks. The + * StreamingCursor has been exhausted if getNextBatch() returns an empty vector. */ - virtual StatusWith<StorageEngine::BackupInformation> beginNonBlockingBackup( + class StreamingCursor { + public: + StreamingCursor() = delete; + explicit StreamingCursor(BackupOptions options) : options(options){}; + + virtual ~StreamingCursor() = default; + + virtual StatusWith<std::vector<BackupBlock>> getNextBatch(const std::size_t batchSize) = 0; + + protected: + BackupOptions options; + }; + + virtual StatusWith<std::unique_ptr<StreamingCursor>> beginNonBlockingBackup( OperationContext* opCtx, const BackupOptions& options) = 0; virtual void endNonBlockingBackup(OperationContext* opCtx) = 0; diff --git a/src/mongo/db/storage/storage_engine_impl.cpp b/src/mongo/db/storage/storage_engine_impl.cpp index 7cc90b1662a..91ac87542c6 100644 --- a/src/mongo/db/storage/storage_engine_impl.cpp +++ b/src/mongo/db/storage/storage_engine_impl.cpp @@ -688,8 +688,9 @@ Status StorageEngineImpl::disableIncrementalBackup(OperationContext* opCtx) { return _engine->disableIncrementalBackup(opCtx); } -StatusWith<StorageEngine::BackupInformation> StorageEngineImpl::beginNonBlockingBackup( - OperationContext* opCtx, const StorageEngine::BackupOptions& options) { +StatusWith<std::unique_ptr<StorageEngine::StreamingCursor>> +StorageEngineImpl::beginNonBlockingBackup(OperationContext* opCtx, + const StorageEngine::BackupOptions& options) { return _engine->beginNonBlockingBackup(opCtx, options); } diff --git a/src/mongo/db/storage/storage_engine_impl.h b/src/mongo/db/storage/storage_engine_impl.h index 9d063d4770e..3fd3bab86d1 100644 --- a/src/mongo/db/storage/storage_engine_impl.h +++ b/src/mongo/db/storage/storage_engine_impl.h @@ -99,7 +99,7 @@ public: virtual Status disableIncrementalBackup(OperationContext* opCtx) override; - virtual StatusWith<BackupInformation> beginNonBlockingBackup( + virtual StatusWith<std::unique_ptr<StreamingCursor>> beginNonBlockingBackup( OperationContext* opCtx, const BackupOptions& options) override; virtual void endNonBlockingBackup(OperationContext* opCtx) override; diff --git a/src/mongo/db/storage/storage_engine_mock.h b/src/mongo/db/storage/storage_engine_mock.h index fd4a2d388c9..b8cb1cbde02 100644 --- a/src/mongo/db/storage/storage_engine_mock.h +++ b/src/mongo/db/storage/storage_engine_mock.h @@ -81,7 +81,7 @@ public: return Status(ErrorCodes::CommandNotSupported, "The current storage engine doesn't support backup mode"); } - StatusWith<StorageEngine::BackupInformation> beginNonBlockingBackup( + StatusWith<std::unique_ptr<StorageEngine::StreamingCursor>> beginNonBlockingBackup( OperationContext* opCtx, const StorageEngine::BackupOptions& options) final { return Status(ErrorCodes::CommandNotSupported, "The current storage engine doesn't support backup mode"); diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp index fc0f17ffa26..661b0e009c1 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp @@ -500,95 +500,6 @@ Status OpenReadTransactionParam::setFromString(const std::string& str) { return _data->resize(num); } -namespace { - -StatusWith<StorageEngine::BackupInformation> getBackupInformationFromBackupCursor( - WT_SESSION* session, - WT_CURSOR* cursor, - bool incrementalBackup, - bool fullBackup, - std::string dbPath, - const char* statusPrefix) { - int wtRet; - StorageEngine::BackupInformation backupInformation; - const char* filename; - const auto directoryPath = boost::filesystem::path(dbPath); - const auto wiredTigerLogFilePrefix = "WiredTigerLog"; - while ((wtRet = cursor->next(cursor)) == 0) { - invariantWTOK(cursor->get_key(cursor, &filename)); - - std::string name(filename); - - boost::filesystem::path filePath = directoryPath; - if (name.find(wiredTigerLogFilePrefix) == 0) { - // TODO SERVER-13455:replace `journal/` with the configurable journal path. - filePath /= boost::filesystem::path("journal"); - } - filePath /= name; - - boost::system::error_code errorCode; - const std::uint64_t fileSize = boost::filesystem::file_size(filePath, errorCode); - uassert(31403, - "Failed to get a file's size. Filename: {} Error: {}"_format(filePath.string(), - errorCode.message()), - !errorCode); - - StorageEngine::BackupFile backupFile(fileSize); - backupInformation.insert({filePath.string(), backupFile}); - - // For the first full incremental backup, include the offset and length. - if (incrementalBackup && fullBackup) { - backupInformation.at(filePath.string()).blocksToCopy.push_back({0, fileSize}); - } - - // Full backups cannot open an incremental cursor, even if they are the first full backup - // for incremental. - if (!incrementalBackup || fullBackup) { - continue; - } - - // For each file listed, open a duplicate backup cursor and get the blocks to copy. - std::stringstream ss; - ss << "incremental=(file=" << filename << ")"; - const std::string config = ss.str(); - WT_CURSOR* dupCursor; - wtRet = session->open_cursor(session, nullptr, cursor, config.c_str(), &dupCursor); - if (wtRet != 0) { - return wtRCToStatus(wtRet); - } - - while ((wtRet = dupCursor->next(dupCursor)) == 0) { - uint64_t offset, size, type; - invariantWTOK(dupCursor->get_key(dupCursor, &offset, &size, &type)); - LOGV2_DEBUG(22311, - 2, - "Block to copy for incremental backup: filename: {filePath_string}, " - "offset: {offset}, size: {size}, type: {type}", - "filePath_string"_attr = filePath.string(), - "offset"_attr = offset, - "size"_attr = size, - "type"_attr = type); - backupInformation.at(filePath.string()).blocksToCopy.push_back({offset, size}); - } - - if (wtRet != WT_NOTFOUND) { - return wtRCToStatus(wtRet); - } - - wtRet = dupCursor->close(dupCursor); - if (wtRet != 0) { - return wtRCToStatus(wtRet); - } - } - - if (wtRet != WT_NOTFOUND) { - return wtRCToStatus(wtRet, statusPrefix); - } - return backupInformation; -} - -} // namespace - StringData WiredTigerKVEngine::kTableUriPrefix = "table:"_sd; WiredTigerKVEngine::WiredTigerKVEngine(const std::string& canonicalName, @@ -1170,8 +1081,191 @@ Status WiredTigerKVEngine::disableIncrementalBackup(OperationContext* opCtx) { return Status::OK(); } -StatusWith<StorageEngine::BackupInformation> WiredTigerKVEngine::beginNonBlockingBackup( - OperationContext* opCtx, const StorageEngine::BackupOptions& options) { +namespace { + +const boost::filesystem::path constructFilePath(std::string path, std::string filename) { + const auto directoryPath = boost::filesystem::path(path); + const auto wiredTigerLogFilePrefix = "WiredTigerLog"; + + boost::filesystem::path filePath = directoryPath; + if (filename.find(wiredTigerLogFilePrefix) == 0) { + // TODO SERVER-13455: Replace `journal/` with the configurable journal path. + filePath /= boost::filesystem::path("journal"); + } + filePath /= filename; + + return filePath; +} + +std::vector<std::string> getUniqueFiles(const std::vector<std::string>& files, + const std::set<std::string>& referenceFiles) { + std::vector<std::string> result; + for (auto& file : files) { + if (referenceFiles.find(file) == referenceFiles.end()) { + result.push_back(file); + } + } + return result; +} + +class StreamingCursorImpl : public StorageEngine::StreamingCursor { +public: + StreamingCursorImpl() = delete; + explicit StreamingCursorImpl(WT_SESSION* session, + std::string path, + StorageEngine::BackupOptions options, + WiredTigerBackup* wtBackup) + : StorageEngine::StreamingCursor(options), + _session(session), + _path(path), + _wtBackup(wtBackup){}; + + ~StreamingCursorImpl() = default; + + StatusWith<std::vector<StorageEngine::BackupBlock>> getNextBatch(const std::size_t batchSize) { + int wtRet; + std::vector<StorageEngine::BackupBlock> backupBlocks; + + stdx::lock_guard<Latch> backupCursorLk(_wtBackup->wtBackupCursorMutex); + while (backupBlocks.size() < batchSize) { + stdx::lock_guard<Latch> backupDupCursorLk(_wtBackup->wtBackupDupCursorMutex); + + // We may still have backup blocks to retrieve for the existing file that + // _wtBackup->cursor is open on if _wtBackup->dupCursor exists. In this case, do not + // call next() on _wtBackup->cursor. + if (!_wtBackup->dupCursor) { + wtRet = (_wtBackup->cursor)->next(_wtBackup->cursor); + if (wtRet != 0) { + break; + } + } + + const char* filename; + invariantWTOK((_wtBackup->cursor)->get_key(_wtBackup->cursor, &filename)); + const boost::filesystem::path filePath = constructFilePath(_path, {filename}); + + const auto wiredTigerLogFilePrefix = "WiredTigerLog"; + if (std::string(filename).find(wiredTigerLogFilePrefix) == 0) { + // If extendBackupCursor() is called prior to the StreamingCursor running into log + // files, we must ensure that subsequent calls to getNextBatch() do not return + // duplicate files. + if ((_wtBackup->logFilePathsSeenByExtendBackupCursor).find(filePath.string()) != + (_wtBackup->logFilePathsSeenByExtendBackupCursor).end()) { + break; + } + (_wtBackup->logFilePathsSeenByGetNextBatch).insert(filePath.string()); + } + + boost::system::error_code errorCode; + const std::uint64_t fileSize = boost::filesystem::file_size(filePath, errorCode); + uassert(31403, + "Failed to get a file's size. Filename: {} Error: {}"_format( + filePath.string(), errorCode.message()), + !errorCode); + + if (options.incrementalBackup && options.srcBackupName) { + // For a subsequent incremental backup, each BackupBlock corresponds to changes + // made to data files since the initial incremental backup. Each BackupBlock has a + // maximum size of options.blockSizeMB. Incremental backups open a duplicate cursor, + // which is stored in _wtBackup->dupCursor. + // + // 'backupBlocks' is an out parameter. + Status status = _getNextIncrementalBatchForFile( + filename, filePath, fileSize, batchSize, &backupBlocks); + + if (!status.isOK()) { + return status; + } + } else { + // For a full backup or the initial incremental backup, each BackupBlock corresponds + // to an entire file. Full backups cannot open an incremental cursor, even if they + // are the initial incremental backup. + const std::uint64_t length = options.incrementalBackup ? fileSize : 0; + backupBlocks.push_back({filePath.string(), 0 /* offset */, length, fileSize}); + } + } + + if (wtRet != WT_NOTFOUND && backupBlocks.size() != batchSize) { + return wtRCToStatus(wtRet); + } + + return backupBlocks; + } + +private: + Status _getNextIncrementalBatchForFile(const char* filename, + boost::filesystem::path filePath, + const std::uint64_t fileSize, + const std::size_t batchSize, + std::vector<StorageEngine::BackupBlock>* backupBlocks) { + // For each file listed, open a duplicate backup cursor and get the blocks to copy. + std::stringstream ss; + ss << "incremental=(file=" << filename << ")"; + const std::string config = ss.str(); + + int wtRet; + bool fileUnchangedFlag = false; + if (!_wtBackup->dupCursor) { + wtRet = (_session)->open_cursor( + _session, nullptr, _wtBackup->cursor, config.c_str(), &_wtBackup->dupCursor); + if (wtRet != 0) { + return wtRCToStatus(wtRet); + } + fileUnchangedFlag = true; + } + + while (backupBlocks->size() < batchSize) { + wtRet = (_wtBackup->dupCursor)->next(_wtBackup->dupCursor); + if (wtRet == WT_NOTFOUND) { + break; + } + invariantWTOK(wtRet); + fileUnchangedFlag = false; + + uint64_t offset, size, type; + invariantWTOK( + (_wtBackup->dupCursor)->get_key(_wtBackup->dupCursor, &offset, &size, &type)); + LOGV2_DEBUG(22311, + 2, + "Block to copy for incremental backup: filename: {filePath_string}, " + "offset: {offset}, size: {size}, type: {type}", + "filePath_string"_attr = filePath.string(), + "offset"_attr = offset, + "size"_attr = size, + "type"_attr = type); + backupBlocks->push_back({filePath.string(), offset, size, fileSize}); + } + + // If the file is unchanged, push a BackupBlock with offset=0 and length=0. This allows us + // to distinguish between an unchanged file and a deleted file in an incremental backup. + if (fileUnchangedFlag) { + backupBlocks->push_back({filePath.string(), 0 /* offset */, 0 /* length */, fileSize}); + } + + // If the duplicate backup cursor has been exhausted, close it and set + // _wtBackup->dupCursor=nullptr. + if (wtRet != 0) { + if (wtRet != WT_NOTFOUND || + (wtRet = (_wtBackup->dupCursor)->close(_wtBackup->dupCursor)) != 0) { + return wtRCToStatus(wtRet); + } + _wtBackup->dupCursor = nullptr; + (_wtBackup->wtBackupDupCursorCV).notify_one(); + } + + return Status::OK(); + } + + WT_SESSION* _session; + std::string _path; + WiredTigerBackup* _wtBackup; // '_wtBackup' is an out parameter. +}; + +} // namespace + +StatusWith<std::unique_ptr<StorageEngine::StreamingCursor>> +WiredTigerKVEngine::beginNonBlockingBackup(OperationContext* opCtx, + const StorageEngine::BackupOptions& options) { uassert(51034, "Cannot open backup cursor with in-memory mode.", !isEphemeral()); std::stringstream ss; @@ -1188,6 +1282,8 @@ StatusWith<StorageEngine::BackupInformation> WiredTigerKVEngine::beginNonBlockin ss << ")"; } + stdx::lock_guard<Latch> backupCursorLk(_wtBackup.wtBackupCursorMutex); + // Oplog truncation thread won't remove oplog since the checkpoint pinned by the backup cursor. stdx::lock_guard<Latch> lock(_oplogPinnedByBackupMutex); _checkpointThread->assignOplogNeededForCrashRecoveryTo(&_oplogPinnedByBackup); @@ -1208,62 +1304,74 @@ StatusWith<StorageEngine::BackupInformation> WiredTigerKVEngine::beginNonBlockin return wtRCToStatus(wtRet); } - const bool fullBackup = !options.srcBackupName; - auto swBackupInfo = getBackupInformationFromBackupCursor(session, - cursor, - options.incrementalBackup, - fullBackup, - _path, - "Error opening backup cursor."); + // A nullptr indicates that no duplicate cursor is open during an incremental backup. + stdx::lock_guard<Latch> backupDupCursorLk(_wtBackup.wtBackupDupCursorMutex); + _wtBackup.dupCursor = nullptr; - if (!swBackupInfo.isOK()) { - return swBackupInfo; - } + invariant(_wtBackup.logFilePathsSeenByExtendBackupCursor.empty()); + invariant(_wtBackup.logFilePathsSeenByGetNextBatch.empty()); + auto streamingCursor = + std::make_unique<StreamingCursorImpl>(session, _path, options, &_wtBackup); pinOplogGuard.dismiss(); _backupSession = std::move(sessionRaii); - _backupCursor = cursor; + _wtBackup.cursor = cursor; - return swBackupInfo; + return streamingCursor; } void WiredTigerKVEngine::endNonBlockingBackup(OperationContext* opCtx) { + stdx::lock_guard<Latch> backupCursorLk(_wtBackup.wtBackupCursorMutex); + stdx::lock_guard<Latch> backupDupCursorLk(_wtBackup.wtBackupDupCursorMutex); _backupSession.reset(); - // Oplog truncation thread can now remove the pinned oplog. - stdx::lock_guard<Latch> lock(_oplogPinnedByBackupMutex); - _oplogPinnedByBackup = boost::none; - _backupCursor = nullptr; + { + // Oplog truncation thread can now remove the pinned oplog. + stdx::lock_guard<Latch> lock(_oplogPinnedByBackupMutex); + _oplogPinnedByBackup = boost::none; + } + _wtBackup.cursor = nullptr; + _wtBackup.dupCursor = nullptr; + _wtBackup.logFilePathsSeenByExtendBackupCursor = {}; + _wtBackup.logFilePathsSeenByGetNextBatch = {}; } StatusWith<std::vector<std::string>> WiredTigerKVEngine::extendBackupCursor( OperationContext* opCtx) { uassert(51033, "Cannot extend backup cursor with in-memory mode.", !isEphemeral()); - invariant(_backupCursor); + invariant(_wtBackup.cursor); + stdx::unique_lock<Latch> backupDupCursorLk(_wtBackup.wtBackupDupCursorMutex); + + MONGO_IDLE_THREAD_BLOCK; + _wtBackup.wtBackupDupCursorCV.wait(backupDupCursorLk, [&] { return !_wtBackup.dupCursor; }); // The "target=(\"log:\")" configuration string for the cursor will ensure that we only see the // log files when iterating on the cursor. WT_CURSOR* cursor = nullptr; WT_SESSION* session = _backupSession->getSession(); - int wtRet = session->open_cursor(session, nullptr, _backupCursor, "target=(\"log:\")", &cursor); + int wtRet = + session->open_cursor(session, nullptr, _wtBackup.cursor, "target=(\"log:\")", &cursor); if (wtRet != 0) { return wtRCToStatus(wtRet); } - StatusWith<StorageEngine::BackupInformation> swBackupInfo = - getBackupInformationFromBackupCursor(session, - cursor, - /*incrementalBackup=*/false, - /*fullBackup=*/true, - _path, - "Error extending backup cursor."); + const char* filename; + std::vector<std::string> filePaths; - wtRet = cursor->close(cursor); - if (wtRet != 0) { + while ((wtRet = cursor->next(cursor)) == 0) { + invariantWTOK(cursor->get_key(cursor, &filename)); + std::string name(filename); + const boost::filesystem::path filePath = constructFilePath(_path, name); + filePaths.push_back(filePath.string()); + _wtBackup.logFilePathsSeenByExtendBackupCursor.insert(filePath.string()); + } + + if (wtRet != WT_NOTFOUND) { return wtRCToStatus(wtRet); } - if (!swBackupInfo.isOK()) { - return swBackupInfo.getStatus(); + wtRet = cursor->close(cursor); + if (wtRet != 0) { + return wtRCToStatus(wtRet); } // Once all the backup cursors have been opened on a sharded cluster, we need to ensure that the @@ -1271,12 +1379,7 @@ StatusWith<std::vector<std::string>> WiredTigerKVEngine::extendBackupCursor( // have a consistent view of the data. For shards that opened their backup cursor before the // established point-in-time for backup, they will need to create a full copy of the additional // journal files returned by this method to ensure a consistent backup of the data is taken. - std::vector<std::string> filenames; - for (const auto& entry : swBackupInfo.getValue()) { - filenames.push_back(entry.first); - } - - return {filenames}; + return getUniqueFiles(filePaths, _wtBackup.logFilePathsSeenByGetNextBatch); } void WiredTigerKVEngine::syncSizeInfo(bool sync) const { diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h index 3458e072d4d..4bebe95e99b 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h @@ -68,6 +68,24 @@ struct WiredTigerFileVersion { std::string getDowngradeString(); }; +struct WiredTigerBackup { + WT_CURSOR* cursor = nullptr; + WT_CURSOR* dupCursor = nullptr; + std::set<std::string> logFilePathsSeenByExtendBackupCursor; + std::set<std::string> logFilePathsSeenByGetNextBatch; + + // 'wtBackupCursorMutex' provides concurrency control between beginNonBlockingBackup(), + // endNonBlockingBackup(), and getNextBatch() because we stream the output of the backup cursor. + Mutex wtBackupCursorMutex = MONGO_MAKE_LATCH("WiredTigerKVEngine::wtBackupCursorMutex"); + + // 'wtBackupDupCursorMutex' provides concurrency control between getNextBatch() and + // extendBackupCursor() because WiredTiger only allows one duplicate cursor to be open at a + // time. extendBackupCursor() blocks on condition variable 'wtBackupDupCursorCV' if a duplicate + // cursor is already open. + Mutex wtBackupDupCursorMutex = MONGO_MAKE_LATCH("WiredTigerKVEngine::wtBackupDupCursorMutex"); + stdx::condition_variable wtBackupDupCursorCV; +}; + class WiredTigerKVEngine final : public KVEngine { public: static StringData kTableUriPrefix; @@ -183,7 +201,7 @@ public: Status disableIncrementalBackup(OperationContext* opCtx) override; - StatusWith<StorageEngine::BackupInformation> beginNonBlockingBackup( + StatusWith<std::unique_ptr<StorageEngine::StreamingCursor>> beginNonBlockingBackup( OperationContext* opCtx, const StorageEngine::BackupOptions& options) override; void endNonBlockingBackup(OperationContext* opCtx) override; @@ -449,7 +467,8 @@ private: mutable Date_t _previousCheckedDropsQueued; std::unique_ptr<WiredTigerSession> _backupSession; - WT_CURSOR* _backupCursor; + WiredTigerBackup _wtBackup; + mutable Mutex _oplogPinnedByBackupMutex = MONGO_MAKE_LATCH("WiredTigerKVEngine::_oplogPinnedByBackupMutex"); boost::optional<Timestamp> _oplogPinnedByBackup; |