diff options
author | Daniel Gottlieb <daniel.gottlieb@mongodb.com> | 2018-08-08 20:43:41 -0400 |
---|---|---|
committer | Daniel Gottlieb <daniel.gottlieb@mongodb.com> | 2018-08-08 20:43:41 -0400 |
commit | ca4d34ece27104dd9bb62f9e46120e83398b71f3 (patch) | |
tree | 449aefb3196379e9c55a57fa4dbcfaa476af8f72 | |
parent | 0a28a3d8bf39436e02e0325dc5c02f4ef19abd20 (diff) | |
download | mongo-ca4d34ece27104dd9bb62f9e46120e83398b71f3.tar.gz |
SERVER-36265: Expose a $backupCursor aggregation stage.
-rw-r--r-- | jstests/noPassthrough/aggregation_backup_cursor.js | 69 | ||||
-rw-r--r-- | src/mongo/base/error_codes.err | 2 | ||||
-rw-r--r-- | src/mongo/db/commands/fsync.cpp | 43 | ||||
-rw-r--r-- | src/mongo/db/commands/run_aggregate.cpp | 42 | ||||
-rw-r--r-- | src/mongo/db/pipeline/SConscript | 6 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_backup_cursor.cpp | 103 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_backup_cursor.h | 117 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_list_local_cursors.cpp | 6 | ||||
-rw-r--r-- | src/mongo/db/pipeline/mongo_process_interface.h | 12 | ||||
-rw-r--r-- | src/mongo/db/pipeline/mongod_process_interface.cpp | 20 | ||||
-rw-r--r-- | src/mongo/db/pipeline/mongod_process_interface.h | 4 | ||||
-rw-r--r-- | src/mongo/db/pipeline/mongos_process_interface.h | 20 | ||||
-rw-r--r-- | src/mongo/db/pipeline/stub_mongo_process_interface.h | 16 | ||||
-rw-r--r-- | src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp | 4 | ||||
-rw-r--r-- | src/mongo/shell/replsettest.js | 6 |
15 files changed, 408 insertions, 62 deletions
diff --git a/jstests/noPassthrough/aggregation_backup_cursor.js b/jstests/noPassthrough/aggregation_backup_cursor.js new file mode 100644 index 00000000000..9d7b21959bb --- /dev/null +++ b/jstests/noPassthrough/aggregation_backup_cursor.js @@ -0,0 +1,69 @@ +/** + * Test the basic operation of a `$backupCursor` aggregation stage. + * + * @tags: [requires_persistence, requires_wiredtiger] + */ +(function() { + "use strict"; + + let conn = MongoRunner.runMongod(); + let db = conn.getDB("test"); + + let backupCursor = db.aggregate([{$backupCursor: {}}]); + // There should be about 14 files in total, but being precise would be unnecessarily fragile. + assert.gt(backupCursor.itcount(), 6); + assert(!backupCursor.isExhausted()); + backupCursor.close(); + + // Open a backup cursor. Use a small batch size to ensure a getMore retrieves additional + // results. + let response = assert.commandWorked( + db.runCommand({aggregate: 1, pipeline: [{$backupCursor: {}}], cursor: {batchSize: 2}})); + assert.eq("test.$cmd.aggregate", response.cursor.ns); + assert.eq(2, response.cursor.firstBatch.length); + let cursorId = response.cursor.id; + + response = + assert.commandWorked(db.runCommand({getMore: cursorId, collection: "$cmd.aggregate"})); + // Sanity check the results. + assert.neq(0, response.cursor.id); + assert.gt(response.cursor.nextBatch.length, 6); + + // The $backupCursor is a tailable cursor. Even though we've exhausted the results, running a + // getMore should succeed. + response = + assert.commandWorked(db.runCommand({getMore: cursorId, collection: "$cmd.aggregate"})); + assert.neq(0, response.cursor.id); + assert.eq(0, response.cursor.nextBatch.length); + + // Because the backup cursor is still open, trying to open a second cursor should fail. + assert.commandFailed( + db.runCommand({aggregate: 1, pipeline: [{$backupCursor: {}}], cursor: {}})); + + // Kill the backup cursor. + response = + assert.commandWorked(db.runCommand({killCursors: "$cmd.aggregate", cursors: [cursorId]})); + assert.eq(1, response.cursorsKilled.length); + assert.eq(cursorId, response.cursorsKilled[0]); + + // Open another backup cursor with a batch size of 0. The underlying backup cursor should be + // created. + response = assert.commandWorked( + db.runCommand({aggregate: 1, pipeline: [{$backupCursor: {}}], cursor: {batchSize: 0}})); + assert.neq(0, response.cursor.id); + assert.eq(0, response.cursor.firstBatch.length); + + // Attempt to open a second backup cursor to demonstrate the original underlying cursor was + // opened. + assert.commandFailed( + db.runCommand({aggregate: 1, pipeline: [{$backupCursor: {}}], cursor: {}})); + + // Demonstrate query cursor timeouts will kill backup cursors, closing the underlying resources. + assert.commandWorked(db.adminCommand({setParameter: 1, cursorTimeoutMillis: 1})); + assert.soon(() => { + return db.runCommand({aggregate: 1, pipeline: [{$backupCursor: {}}], cursor: {}})['ok'] == + 1; + }); + + MongoRunner.stopMongod(conn); +})(); diff --git a/src/mongo/base/error_codes.err b/src/mongo/base/error_codes.err index 17e85d8ed6f..c869f99f704 100644 --- a/src/mongo/base/error_codes.err +++ b/src/mongo/base/error_codes.err @@ -265,7 +265,7 @@ error_code("TooManyLogicalSessions", 264); error_code("OrphanedRangeCleanUpFailed", 265); error_code("ExceededTimeLimit", 266); error_code("PreparedTransactionInProgress", 267); - +error_code("CannotBackup", 268); # Error codes 4000-8999 are reserved. # Non-sequential error codes (for compatibility only) diff --git a/src/mongo/db/commands/fsync.cpp b/src/mongo/db/commands/fsync.cpp index f7113b06511..69a904578a8 100644 --- a/src/mongo/db/commands/fsync.cpp +++ b/src/mongo/db/commands/fsync.cpp @@ -71,12 +71,16 @@ Lock::ResourceMutex commandMutex("fsyncCommandMutex"); */ class FSyncLockThread : public BackgroundJob { public: - FSyncLockThread() : BackgroundJob(false) {} + FSyncLockThread(bool allowFsyncFailure) + : BackgroundJob(false), _allowFsyncFailure(allowFsyncFailure) {} virtual ~FSyncLockThread() {} virtual string name() const { return "FSyncLockThread"; } virtual void run(); + +private: + bool _allowFsyncFailure; }; class FSyncCommand : public ErrmsgCommandDeprecated { @@ -128,12 +132,16 @@ public: return false; } - const bool sync = !cmdObj["async"].trueValue(); // async means do an fsync, but return immediately const bool lock = cmdObj["lock"].trueValue(); log() << "CMD fsync: sync:" << sync << " lock:" << lock; + // fsync + lock is sometimes used to block writes out of the system and does not care if + // the `BackupCursorService::fsyncLock` call succeeds. + const bool allowFsyncFailure = + getTestCommandsEnabled() && cmdObj["allowFsyncFailure"].trueValue(); + if (!lock) { // Take a global IS lock to ensure the storage engine is not shutdown Lock::GlobalLock global(opCtx, MODE_IS); @@ -160,7 +168,7 @@ public: stdx::unique_lock<stdx::mutex> lk(lockStateMutex); threadStatus = Status::OK(); threadStarted = false; - _lockThread = stdx::make_unique<FSyncLockThread>(); + _lockThread = stdx::make_unique<FSyncLockThread>(allowFsyncFailure); _lockThread->go(); while (!threadStarted && threadStatus.isOK()) { @@ -346,16 +354,27 @@ void FSyncLockThread::run() { return; } + bool successfulFsyncLock = false; auto backupCursorService = BackupCursorService::get(opCtx.getServiceContext()); try { - writeConflictRetry(&opCtx, "beginBackup", "global", [&opCtx, backupCursorService] { - backupCursorService->fsyncLock(&opCtx); - }); + writeConflictRetry(&opCtx, + "beginBackup", + "global", + [&opCtx, backupCursorService, &successfulFsyncLock] { + backupCursorService->fsyncLock(&opCtx); + successfulFsyncLock = true; + }); } catch (const DBException& e) { - error() << "storage engine unable to begin backup : " << e.toString(); - fsyncCmd.threadStatus = e.toStatus(); - fsyncCmd.acquireFsyncLockSyncCV.notify_one(); - return; + if (_allowFsyncFailure) { + warning() << "Locking despite storage engine being unable to begin backup : " + << e.toString(); + opCtx.recoveryUnit()->waitUntilDurable(); + } else { + error() << "storage engine unable to begin backup : " << e.toString(); + fsyncCmd.threadStatus = e.toStatus(); + fsyncCmd.acquireFsyncLockSyncCV.notify_one(); + return; + } } fsyncCmd.threadStarted = true; @@ -365,7 +384,9 @@ void FSyncLockThread::run() { fsyncCmd.releaseFsyncLockSyncCV.wait(lk); } - backupCursorService->fsyncUnlock(&opCtx); + if (successfulFsyncLock) { + backupCursorService->fsyncUnlock(&opCtx); + } } catch (const std::exception& e) { severe() << "FSyncLockThread exception: " << e.what(); diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp index fda10f556ab..10fa0fd748a 100644 --- a/src/mongo/db/commands/run_aggregate.cpp +++ b/src/mongo/db/commands/run_aggregate.cpp @@ -269,36 +269,6 @@ StatusWith<StringMap<ExpressionContext::ResolvedNamespace>> resolveInvolvedNames } /** - * Round trips the pipeline through serialization by calling serialize(), then Pipeline::parse(). - * fasserts if it fails to parse after being serialized. - */ -std::unique_ptr<Pipeline, PipelineDeleter> reparsePipeline( - const Pipeline* pipeline, - const AggregationRequest& request, - const boost::intrusive_ptr<ExpressionContext>& expCtx) { - auto serialized = pipeline->serialize(); - - // Convert vector<Value> to vector<BSONObj>. - std::vector<BSONObj> parseableSerialization; - parseableSerialization.reserve(serialized.size()); - for (auto&& serializedStage : serialized) { - invariant(serializedStage.getType() == BSONType::Object); - parseableSerialization.push_back(serializedStage.getDocument().toBson()); - } - - auto reparsedPipeline = Pipeline::parse(parseableSerialization, expCtx); - if (!reparsedPipeline.isOK()) { - error() << "Aggregation command did not round trip through parsing and serialization " - "correctly. Input pipeline: " - << Value(request.getPipeline()) << ", serialized pipeline: " << Value(serialized); - fassertFailedWithStatusNoTrace(40175, reparsedPipeline.getStatus()); - } - - reparsedPipeline.getValue()->optimizePipeline(); - return std::move(reparsedPipeline.getValue()); -} - -/** * Returns Status::OK if each view namespace in 'pipeline' has a default collator equivalent to * 'collator'. Otherwise, returns ErrorCodes::OptionNotSupportedOnView. */ @@ -514,14 +484,6 @@ Status runAggregate(OperationContext* opCtx, pipeline->optimizePipeline(); - if (kDebugBuild && !expCtx->explain && !expCtx->fromMongos) { - // Make sure all operations round-trip through Pipeline::serialize() correctly by - // re-parsing every command in debug builds. This is important because sharded - // aggregations rely on this ability. Skipping when fromMongos because this has - // already been through the transformation (and this un-sets expCtx->fromMongos). - pipeline = reparsePipeline(pipeline.get(), request, expCtx); - } - // Prepare a PlanExecutor to provide input into the pipeline, if needed. if (liteParsedPipeline.hasChangeStream()) { // If we are using a change stream, the cursor stage should have a simple collation, @@ -603,7 +565,9 @@ Status runAggregate(OperationContext* opCtx, AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserNames(), repl::ReadConcernArgs::get(opCtx).getLevel(), cmdObj); - if (expCtx->tailableMode == TailableModeEnum::kTailableAndAwaitData) { + if (expCtx->tailableMode == TailableModeEnum::kTailable) { + cursorParams.setTailable(true); + } else if (expCtx->tailableMode == TailableModeEnum::kTailableAndAwaitData) { cursorParams.setTailable(true); cursorParams.setAwaitData(true); } diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript index 944a908ac8c..e9157f816d0 100644 --- a/src/mongo/db/pipeline/SConscript +++ b/src/mongo/db/pipeline/SConscript @@ -277,9 +277,10 @@ env.Library( 'mongod_process_interface.cpp', ], LIBDEPS=[ - '$BUILD_DIR/mongo/db/query_exec', '$BUILD_DIR/mongo/db/ops/write_ops_exec', + '$BUILD_DIR/mongo/db/query_exec', '$BUILD_DIR/mongo/db/stats/top', + '$BUILD_DIR/mongo/db/storage/backup_cursor_service', '$BUILD_DIR/mongo/s/sharding_api', 'mongo_process_common', ] @@ -302,8 +303,10 @@ pipelineeEnv.InjectThirdPartyIncludePaths(libraries=['snappy']) pipelineeEnv.Library( target='pipeline', source=[ + "cluster_aggregation_planner.cpp", 'document_source.cpp', 'document_source_add_fields.cpp', + 'document_source_backup_cursor.cpp', 'document_source_bucket.cpp', 'document_source_bucket_auto.cpp', 'document_source_change_stream.cpp', @@ -343,7 +346,6 @@ pipelineeEnv.Library( 'document_source_skip.cpp', 'document_source_sort.cpp', 'document_source_sort_by_count.cpp', - "cluster_aggregation_planner.cpp", 'document_source_tee_consumer.cpp', 'document_source_unwind.cpp', 'pipeline.cpp', diff --git a/src/mongo/db/pipeline/document_source_backup_cursor.cpp b/src/mongo/db/pipeline/document_source_backup_cursor.cpp new file mode 100644 index 00000000000..f6a8b8b5eca --- /dev/null +++ b/src/mongo/db/pipeline/document_source_backup_cursor.cpp @@ -0,0 +1,103 @@ +/** + * Copyright (C) 2018 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kQuery + +#include "mongo/platform/basic.h" + +#include "mongo/db/pipeline/document_source_backup_cursor.h" + +#include <vector> + +#include "mongo/bson/bsonmisc.h" +#include "mongo/util/log.h" + +namespace mongo { + +REGISTER_DOCUMENT_SOURCE(backupCursor, + DocumentSourceBackupCursor::LiteParsed::parse, + DocumentSourceBackupCursor::createFromBson); + +const char* DocumentSourceBackupCursor::kStageName = "$backupCursor"; + +DocumentSourceBackupCursor::DocumentSourceBackupCursor( + const boost::intrusive_ptr<ExpressionContext>& pExpCtx) + : DocumentSource(pExpCtx), + _backupCursorState(pExpCtx->mongoProcessInterface->openBackupCursor(pExpCtx->opCtx)) {} + + +DocumentSourceBackupCursor::~DocumentSourceBackupCursor() { + try { + pExpCtx->mongoProcessInterface->closeBackupCursor(pExpCtx->opCtx, + _backupCursorState.cursorId); + } catch (DBException& exc) { + severe() << exc.toStatus("Error closing a backup cursor."); + fassertFailed(50909); + } +} + +DocumentSource::GetNextResult DocumentSourceBackupCursor::getNext() { + pExpCtx->checkForInterrupt(); + + if (!_backupCursorState.filenames.empty()) { + Document doc = {{"filename", _backupCursorState.filenames.back()}}; + _backupCursorState.filenames.pop_back(); + + return std::move(doc); + } + + return GetNextResult::makeEOF(); +} + +boost::intrusive_ptr<DocumentSource> DocumentSourceBackupCursor::createFromBson( + BSONElement spec, const boost::intrusive_ptr<ExpressionContext>& pExpCtx) { + // The anticipated usage of a backup cursor: open the backup cursor, consume the results, copy + // data off disk, close the backup cursor. The backup cursor must be successfully closed for + // the data copied to be valid. Hence, the caller needs a way to keep the cursor open after + // consuming the results, as well as the ability to send "heartbeats" to prevent the client + // cursor manager from timing out the backup cursor. A backup cursor does consume resources; + // in the event the calling process crashes, the cursors should eventually be timed out. + pExpCtx->tailableMode = TailableModeEnum::kTailable; + + uassert( + ErrorCodes::FailedToParse, + str::stream() << kStageName << " value must be an object. Found: " << typeName(spec.type()), + spec.type() == BSONType::Object); + + uassert(ErrorCodes::CannotBackup, + str::stream() << kStageName << " cannot be executed against a MongoS.", + !pExpCtx->inMongos && !pExpCtx->fromMongos && !pExpCtx->needsMerge); + + return new DocumentSourceBackupCursor(pExpCtx); +} + +Value DocumentSourceBackupCursor::serialize( + boost::optional<ExplainOptions::Verbosity> explain) const { + return Value(BSON(kStageName << 1)); +} +} // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_backup_cursor.h b/src/mongo/db/pipeline/document_source_backup_cursor.h new file mode 100644 index 00000000000..d0d0d83ab7a --- /dev/null +++ b/src/mongo/db/pipeline/document_source_backup_cursor.h @@ -0,0 +1,117 @@ +/** + * Copyright (C) 2018 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#pragma once + +#include <vector> + +#include "mongo/bson/bsonobj.h" +#include "mongo/db/pipeline/document_source.h" +#include "mongo/db/repl/read_concern_args.h" +#include "mongo/db/storage/backup_cursor_service.h" + +namespace mongo { + +/** + * Represents the `$backupCursor` aggregation stage. The lifetime of this object maps to storage + * engine calls on `beginNonBlockingBackup` and `endNonBlockingBackup`. The DocumentSource will + * return filenames in the running `dbpath` that an application can copy and optionally some + * metadata information. + */ +class DocumentSourceBackupCursor final : public DocumentSource { +public: + static const char* kStageName; + + class LiteParsed final : public LiteParsedDocumentSource { + public: + static std::unique_ptr<LiteParsed> parse(const AggregationRequest& request, + const BSONElement& spec) { + return stdx::make_unique<LiteParsed>(); + } + + stdx::unordered_set<NamespaceString> getInvolvedNamespaces() const final { + return stdx::unordered_set<NamespaceString>(); + } + + PrivilegeVector requiredPrivileges(bool isMongos) const final { + // SERVER-36266 will address requiring the appropriate privileges. + return PrivilegeVector(); + } + + bool isInitialSource() const final { + return true; + } + + bool allowedToForwardFromMongos() const final { + return false; + } + + void assertSupportsReadConcern(const repl::ReadConcernArgs& readConcern) const { + uassert(ErrorCodes::InvalidOptions, + str::stream() << "Aggregation stage " << kStageName + << " requires read concern local but found " + << readConcern.toString(), + readConcern.getLevel() == repl::ReadConcernLevel::kLocalReadConcern); + } + }; + + virtual ~DocumentSourceBackupCursor(); + + GetNextResult getNext() final; + + const char* getSourceName() const final { + return kStageName; + } + + Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final; + + StageConstraints constraints(Pipeline::SplitState pipeState) const final { + StageConstraints constraints(StreamType::kStreaming, + PositionRequirement::kFirst, + // This stage `uasserts` on a MongoS; the + // `HostTypeRequirement` field has no effect. + HostTypeRequirement::kAnyShard, + DiskUseRequirement::kNoDiskUse, + FacetRequirement::kNotAllowed, + TransactionRequirement::kNotAllowed); + + constraints.isIndependentOfAnyCollection = true; + constraints.requiresInputDocSource = false; + return constraints; + } + + static boost::intrusive_ptr<DocumentSource> createFromBson( + BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx); + +private: + DocumentSourceBackupCursor(const boost::intrusive_ptr<ExpressionContext>& pExpCtx); + + BackupCursorState _backupCursorState; +}; + +} // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_list_local_cursors.cpp b/src/mongo/db/pipeline/document_source_list_local_cursors.cpp index 72e53d7b6dd..7246bc1d749 100644 --- a/src/mongo/db/pipeline/document_source_list_local_cursors.cpp +++ b/src/mongo/db/pipeline/document_source_list_local_cursors.cpp @@ -58,12 +58,6 @@ DocumentSource::GetNextResult DocumentSourceListLocalCursors::getNext() { boost::intrusive_ptr<DocumentSource> DocumentSourceListLocalCursors::createFromBson( BSONElement spec, const boost::intrusive_ptr<ExpressionContext>& pExpCtx) { - uassert( - ErrorCodes::InvalidNamespace, - str::stream() << kStageName - << " must be run against the database with {aggregate: 1}, not a collection", - pExpCtx->ns.isCollectionlessAggregateNS()); - uassert(ErrorCodes::BadValue, str::stream() << kStageName << " must be run as { " << kStageName << ": {}}", spec.isABSONObj() && spec.Obj().isEmpty()); diff --git a/src/mongo/db/pipeline/mongo_process_interface.h b/src/mongo/db/pipeline/mongo_process_interface.h index b650855d438..13efd2928f3 100644 --- a/src/mongo/db/pipeline/mongo_process_interface.h +++ b/src/mongo/db/pipeline/mongo_process_interface.h @@ -44,6 +44,7 @@ #include "mongo/db/pipeline/lite_parsed_document_source.h" #include "mongo/db/pipeline/value.h" #include "mongo/db/query/explain_options.h" +#include "mongo/db/storage/backup_cursor_service.h" namespace mongo { @@ -223,6 +224,17 @@ public: */ virtual std::vector<GenericCursor> getCursors( const boost::intrusive_ptr<ExpressionContext>& expCtx) const = 0; + + /** + * The following methods forward to the BackupCursorService decorating the ServiceContext. + */ + virtual void fsyncLock(OperationContext* opCtx) = 0; + + virtual void fsyncUnlock(OperationContext* opCtx) = 0; + + virtual BackupCursorState openBackupCursor(OperationContext* opCtx) = 0; + + virtual void closeBackupCursor(OperationContext* opCtx, std::uint64_t cursorId) = 0; }; } // namespace mongo diff --git a/src/mongo/db/pipeline/mongod_process_interface.cpp b/src/mongo/db/pipeline/mongod_process_interface.cpp index 8e52cf4d057..566b43533d1 100644 --- a/src/mongo/db/pipeline/mongod_process_interface.cpp +++ b/src/mongo/db/pipeline/mongod_process_interface.cpp @@ -394,6 +394,26 @@ boost::optional<Document> MongoDInterface::lookupSingleDocument( return lookedUpDocument; } +void MongoDInterface::fsyncLock(OperationContext* opCtx) { + auto backupCursorService = BackupCursorService::get(opCtx->getServiceContext()); + backupCursorService->fsyncLock(opCtx); +} + +void MongoDInterface::fsyncUnlock(OperationContext* opCtx) { + auto backupCursorService = BackupCursorService::get(opCtx->getServiceContext()); + backupCursorService->fsyncUnlock(opCtx); +} + +BackupCursorState MongoDInterface::openBackupCursor(OperationContext* opCtx) { + auto backupCursorService = BackupCursorService::get(opCtx->getServiceContext()); + return backupCursorService->openBackupCursor(opCtx); +} + +void MongoDInterface::closeBackupCursor(OperationContext* opCtx, std::uint64_t cursorId) { + auto backupCursorService = BackupCursorService::get(opCtx->getServiceContext()); + backupCursorService->closeBackupCursor(opCtx, cursorId); +} + BSONObj MongoDInterface::_reportCurrentOpForClient(OperationContext* opCtx, Client* client, CurrentOpTruncateMode truncateOps) const { diff --git a/src/mongo/db/pipeline/mongod_process_interface.h b/src/mongo/db/pipeline/mongod_process_interface.h index 8d452e54e62..945ffb5ca91 100644 --- a/src/mongo/db/pipeline/mongod_process_interface.h +++ b/src/mongo/db/pipeline/mongod_process_interface.h @@ -93,6 +93,10 @@ public: boost::optional<BSONObj> readConcern) final; std::vector<GenericCursor> getCursors( const boost::intrusive_ptr<ExpressionContext>& expCtx) const final; + void fsyncLock(OperationContext* opCtx) final; + void fsyncUnlock(OperationContext* opCtx) final; + BackupCursorState openBackupCursor(OperationContext* opCtx) final; + void closeBackupCursor(OperationContext* opCtx, std::uint64_t cursorId) final; protected: BSONObj _reportCurrentOpForClient(OperationContext* opCtx, diff --git a/src/mongo/db/pipeline/mongos_process_interface.h b/src/mongo/db/pipeline/mongos_process_interface.h index f01a3390deb..1c654d83e2f 100644 --- a/src/mongo/db/pipeline/mongos_process_interface.h +++ b/src/mongo/db/pipeline/mongos_process_interface.h @@ -136,6 +136,26 @@ public: MONGO_UNREACHABLE; } + /** + * The following methods only make sense for data-bearing nodes and should never be called on + * a mongos. + */ + void fsyncLock(OperationContext* opCtx) final { + MONGO_UNREACHABLE; + } + + void fsyncUnlock(OperationContext* opCtx) final { + MONGO_UNREACHABLE; + } + + BackupCursorState openBackupCursor(OperationContext* opCtx) final { + MONGO_UNREACHABLE; + } + + void closeBackupCursor(OperationContext* opCtx, std::uint64_t cursorId) final { + MONGO_UNREACHABLE; + } + protected: BSONObj _reportCurrentOpForClient(OperationContext* opCtx, Client* client, diff --git a/src/mongo/db/pipeline/stub_mongo_process_interface.h b/src/mongo/db/pipeline/stub_mongo_process_interface.h index 3e80f90e3a5..bda78c03d93 100644 --- a/src/mongo/db/pipeline/stub_mongo_process_interface.h +++ b/src/mongo/db/pipeline/stub_mongo_process_interface.h @@ -152,5 +152,21 @@ public: const boost::intrusive_ptr<ExpressionContext>& expCtx) const { MONGO_UNREACHABLE; } + + void fsyncLock(OperationContext* opCtx) final { + MONGO_UNREACHABLE; + } + + void fsyncUnlock(OperationContext* opCtx) final { + MONGO_UNREACHABLE; + } + + BackupCursorState openBackupCursor(OperationContext* opCtx) final { + MONGO_UNREACHABLE; + } + + void closeBackupCursor(OperationContext* opCtx, std::uint64_t cursorId) final { + MONGO_UNREACHABLE; + } }; } // namespace mongo diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp index 66b1832fe53..851965d418f 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp @@ -810,7 +810,9 @@ StatusWith<std::vector<std::string>> WiredTigerKVEngine::beginNonBlockingBackup( const auto wiredTigerLogFilePrefix = "WiredTigerLog"; if (name.find(wiredTigerLogFilePrefix) == 0) { // TODO SERVER-13455:replace `journal/` with the configurable journal path. - name = "journal/" + name; + auto path = boost::filesystem::path("journal"); + path /= name; + name = path.string(); } filesToCopy.push_back(std::move(name)); } diff --git a/src/mongo/shell/replsettest.js b/src/mongo/shell/replsettest.js index a5e7401b723..c4c22a491b9 100644 --- a/src/mongo/shell/replsettest.js +++ b/src/mongo/shell/replsettest.js @@ -1692,8 +1692,10 @@ var ReplSetTest = function(opts) { var activeException = false; // Lock the primary to prevent the TTL monitor from deleting expired documents in - // the background while we are getting the dbhashes of the replica set members. - assert.commandWorked(primary.adminCommand({fsync: 1, lock: 1}), + // the background while we are getting the dbhashes of the replica set members. It's not + // important if the storage engine fails to perform its fsync operation. The only + // requirement is that writes are locked out. + assert.commandWorked(primary.adminCommand({fsync: 1, lock: 1, allowFsyncFailure: true}), 'failed to lock the primary'); try { this.awaitReplication(null, null, slaves); |