summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDaniel Gottlieb <daniel.gottlieb@mongodb.com>2018-08-08 20:43:41 -0400
committerDaniel Gottlieb <daniel.gottlieb@mongodb.com>2018-08-08 20:43:41 -0400
commitca4d34ece27104dd9bb62f9e46120e83398b71f3 (patch)
tree449aefb3196379e9c55a57fa4dbcfaa476af8f72
parent0a28a3d8bf39436e02e0325dc5c02f4ef19abd20 (diff)
downloadmongo-ca4d34ece27104dd9bb62f9e46120e83398b71f3.tar.gz
SERVER-36265: Expose a $backupCursor aggregation stage.
-rw-r--r--jstests/noPassthrough/aggregation_backup_cursor.js69
-rw-r--r--src/mongo/base/error_codes.err2
-rw-r--r--src/mongo/db/commands/fsync.cpp43
-rw-r--r--src/mongo/db/commands/run_aggregate.cpp42
-rw-r--r--src/mongo/db/pipeline/SConscript6
-rw-r--r--src/mongo/db/pipeline/document_source_backup_cursor.cpp103
-rw-r--r--src/mongo/db/pipeline/document_source_backup_cursor.h117
-rw-r--r--src/mongo/db/pipeline/document_source_list_local_cursors.cpp6
-rw-r--r--src/mongo/db/pipeline/mongo_process_interface.h12
-rw-r--r--src/mongo/db/pipeline/mongod_process_interface.cpp20
-rw-r--r--src/mongo/db/pipeline/mongod_process_interface.h4
-rw-r--r--src/mongo/db/pipeline/mongos_process_interface.h20
-rw-r--r--src/mongo/db/pipeline/stub_mongo_process_interface.h16
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp4
-rw-r--r--src/mongo/shell/replsettest.js6
15 files changed, 408 insertions, 62 deletions
diff --git a/jstests/noPassthrough/aggregation_backup_cursor.js b/jstests/noPassthrough/aggregation_backup_cursor.js
new file mode 100644
index 00000000000..9d7b21959bb
--- /dev/null
+++ b/jstests/noPassthrough/aggregation_backup_cursor.js
@@ -0,0 +1,69 @@
+/**
+ * Test the basic operation of a `$backupCursor` aggregation stage.
+ *
+ * @tags: [requires_persistence, requires_wiredtiger]
+ */
+(function() {
+ "use strict";
+
+ let conn = MongoRunner.runMongod();
+ let db = conn.getDB("test");
+
+ let backupCursor = db.aggregate([{$backupCursor: {}}]);
+ // There should be about 14 files in total, but being precise would be unnecessarily fragile.
+ assert.gt(backupCursor.itcount(), 6);
+ assert(!backupCursor.isExhausted());
+ backupCursor.close();
+
+ // Open a backup cursor. Use a small batch size to ensure a getMore retrieves additional
+ // results.
+ let response = assert.commandWorked(
+ db.runCommand({aggregate: 1, pipeline: [{$backupCursor: {}}], cursor: {batchSize: 2}}));
+ assert.eq("test.$cmd.aggregate", response.cursor.ns);
+ assert.eq(2, response.cursor.firstBatch.length);
+ let cursorId = response.cursor.id;
+
+ response =
+ assert.commandWorked(db.runCommand({getMore: cursorId, collection: "$cmd.aggregate"}));
+ // Sanity check the results.
+ assert.neq(0, response.cursor.id);
+ assert.gt(response.cursor.nextBatch.length, 6);
+
+ // The $backupCursor is a tailable cursor. Even though we've exhausted the results, running a
+ // getMore should succeed.
+ response =
+ assert.commandWorked(db.runCommand({getMore: cursorId, collection: "$cmd.aggregate"}));
+ assert.neq(0, response.cursor.id);
+ assert.eq(0, response.cursor.nextBatch.length);
+
+ // Because the backup cursor is still open, trying to open a second cursor should fail.
+ assert.commandFailed(
+ db.runCommand({aggregate: 1, pipeline: [{$backupCursor: {}}], cursor: {}}));
+
+ // Kill the backup cursor.
+ response =
+ assert.commandWorked(db.runCommand({killCursors: "$cmd.aggregate", cursors: [cursorId]}));
+ assert.eq(1, response.cursorsKilled.length);
+ assert.eq(cursorId, response.cursorsKilled[0]);
+
+ // Open another backup cursor with a batch size of 0. The underlying backup cursor should be
+ // created.
+ response = assert.commandWorked(
+ db.runCommand({aggregate: 1, pipeline: [{$backupCursor: {}}], cursor: {batchSize: 0}}));
+ assert.neq(0, response.cursor.id);
+ assert.eq(0, response.cursor.firstBatch.length);
+
+ // Attempt to open a second backup cursor to demonstrate the original underlying cursor was
+ // opened.
+ assert.commandFailed(
+ db.runCommand({aggregate: 1, pipeline: [{$backupCursor: {}}], cursor: {}}));
+
+ // Demonstrate query cursor timeouts will kill backup cursors, closing the underlying resources.
+ assert.commandWorked(db.adminCommand({setParameter: 1, cursorTimeoutMillis: 1}));
+ assert.soon(() => {
+ return db.runCommand({aggregate: 1, pipeline: [{$backupCursor: {}}], cursor: {}})['ok'] ==
+ 1;
+ });
+
+ MongoRunner.stopMongod(conn);
+})();
diff --git a/src/mongo/base/error_codes.err b/src/mongo/base/error_codes.err
index 17e85d8ed6f..c869f99f704 100644
--- a/src/mongo/base/error_codes.err
+++ b/src/mongo/base/error_codes.err
@@ -265,7 +265,7 @@ error_code("TooManyLogicalSessions", 264);
error_code("OrphanedRangeCleanUpFailed", 265);
error_code("ExceededTimeLimit", 266);
error_code("PreparedTransactionInProgress", 267);
-
+error_code("CannotBackup", 268);
# Error codes 4000-8999 are reserved.
# Non-sequential error codes (for compatibility only)
diff --git a/src/mongo/db/commands/fsync.cpp b/src/mongo/db/commands/fsync.cpp
index f7113b06511..69a904578a8 100644
--- a/src/mongo/db/commands/fsync.cpp
+++ b/src/mongo/db/commands/fsync.cpp
@@ -71,12 +71,16 @@ Lock::ResourceMutex commandMutex("fsyncCommandMutex");
*/
class FSyncLockThread : public BackgroundJob {
public:
- FSyncLockThread() : BackgroundJob(false) {}
+ FSyncLockThread(bool allowFsyncFailure)
+ : BackgroundJob(false), _allowFsyncFailure(allowFsyncFailure) {}
virtual ~FSyncLockThread() {}
virtual string name() const {
return "FSyncLockThread";
}
virtual void run();
+
+private:
+ bool _allowFsyncFailure;
};
class FSyncCommand : public ErrmsgCommandDeprecated {
@@ -128,12 +132,16 @@ public:
return false;
}
-
const bool sync =
!cmdObj["async"].trueValue(); // async means do an fsync, but return immediately
const bool lock = cmdObj["lock"].trueValue();
log() << "CMD fsync: sync:" << sync << " lock:" << lock;
+ // fsync + lock is sometimes used to block writes out of the system and does not care if
+ // the `BackupCursorService::fsyncLock` call succeeds.
+ const bool allowFsyncFailure =
+ getTestCommandsEnabled() && cmdObj["allowFsyncFailure"].trueValue();
+
if (!lock) {
// Take a global IS lock to ensure the storage engine is not shutdown
Lock::GlobalLock global(opCtx, MODE_IS);
@@ -160,7 +168,7 @@ public:
stdx::unique_lock<stdx::mutex> lk(lockStateMutex);
threadStatus = Status::OK();
threadStarted = false;
- _lockThread = stdx::make_unique<FSyncLockThread>();
+ _lockThread = stdx::make_unique<FSyncLockThread>(allowFsyncFailure);
_lockThread->go();
while (!threadStarted && threadStatus.isOK()) {
@@ -346,16 +354,27 @@ void FSyncLockThread::run() {
return;
}
+ bool successfulFsyncLock = false;
auto backupCursorService = BackupCursorService::get(opCtx.getServiceContext());
try {
- writeConflictRetry(&opCtx, "beginBackup", "global", [&opCtx, backupCursorService] {
- backupCursorService->fsyncLock(&opCtx);
- });
+ writeConflictRetry(&opCtx,
+ "beginBackup",
+ "global",
+ [&opCtx, backupCursorService, &successfulFsyncLock] {
+ backupCursorService->fsyncLock(&opCtx);
+ successfulFsyncLock = true;
+ });
} catch (const DBException& e) {
- error() << "storage engine unable to begin backup : " << e.toString();
- fsyncCmd.threadStatus = e.toStatus();
- fsyncCmd.acquireFsyncLockSyncCV.notify_one();
- return;
+ if (_allowFsyncFailure) {
+ warning() << "Locking despite storage engine being unable to begin backup : "
+ << e.toString();
+ opCtx.recoveryUnit()->waitUntilDurable();
+ } else {
+ error() << "storage engine unable to begin backup : " << e.toString();
+ fsyncCmd.threadStatus = e.toStatus();
+ fsyncCmd.acquireFsyncLockSyncCV.notify_one();
+ return;
+ }
}
fsyncCmd.threadStarted = true;
@@ -365,7 +384,9 @@ void FSyncLockThread::run() {
fsyncCmd.releaseFsyncLockSyncCV.wait(lk);
}
- backupCursorService->fsyncUnlock(&opCtx);
+ if (successfulFsyncLock) {
+ backupCursorService->fsyncUnlock(&opCtx);
+ }
} catch (const std::exception& e) {
severe() << "FSyncLockThread exception: " << e.what();
diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp
index fda10f556ab..10fa0fd748a 100644
--- a/src/mongo/db/commands/run_aggregate.cpp
+++ b/src/mongo/db/commands/run_aggregate.cpp
@@ -269,36 +269,6 @@ StatusWith<StringMap<ExpressionContext::ResolvedNamespace>> resolveInvolvedNames
}
/**
- * Round trips the pipeline through serialization by calling serialize(), then Pipeline::parse().
- * fasserts if it fails to parse after being serialized.
- */
-std::unique_ptr<Pipeline, PipelineDeleter> reparsePipeline(
- const Pipeline* pipeline,
- const AggregationRequest& request,
- const boost::intrusive_ptr<ExpressionContext>& expCtx) {
- auto serialized = pipeline->serialize();
-
- // Convert vector<Value> to vector<BSONObj>.
- std::vector<BSONObj> parseableSerialization;
- parseableSerialization.reserve(serialized.size());
- for (auto&& serializedStage : serialized) {
- invariant(serializedStage.getType() == BSONType::Object);
- parseableSerialization.push_back(serializedStage.getDocument().toBson());
- }
-
- auto reparsedPipeline = Pipeline::parse(parseableSerialization, expCtx);
- if (!reparsedPipeline.isOK()) {
- error() << "Aggregation command did not round trip through parsing and serialization "
- "correctly. Input pipeline: "
- << Value(request.getPipeline()) << ", serialized pipeline: " << Value(serialized);
- fassertFailedWithStatusNoTrace(40175, reparsedPipeline.getStatus());
- }
-
- reparsedPipeline.getValue()->optimizePipeline();
- return std::move(reparsedPipeline.getValue());
-}
-
-/**
* Returns Status::OK if each view namespace in 'pipeline' has a default collator equivalent to
* 'collator'. Otherwise, returns ErrorCodes::OptionNotSupportedOnView.
*/
@@ -514,14 +484,6 @@ Status runAggregate(OperationContext* opCtx,
pipeline->optimizePipeline();
- if (kDebugBuild && !expCtx->explain && !expCtx->fromMongos) {
- // Make sure all operations round-trip through Pipeline::serialize() correctly by
- // re-parsing every command in debug builds. This is important because sharded
- // aggregations rely on this ability. Skipping when fromMongos because this has
- // already been through the transformation (and this un-sets expCtx->fromMongos).
- pipeline = reparsePipeline(pipeline.get(), request, expCtx);
- }
-
// Prepare a PlanExecutor to provide input into the pipeline, if needed.
if (liteParsedPipeline.hasChangeStream()) {
// If we are using a change stream, the cursor stage should have a simple collation,
@@ -603,7 +565,9 @@ Status runAggregate(OperationContext* opCtx,
AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserNames(),
repl::ReadConcernArgs::get(opCtx).getLevel(),
cmdObj);
- if (expCtx->tailableMode == TailableModeEnum::kTailableAndAwaitData) {
+ if (expCtx->tailableMode == TailableModeEnum::kTailable) {
+ cursorParams.setTailable(true);
+ } else if (expCtx->tailableMode == TailableModeEnum::kTailableAndAwaitData) {
cursorParams.setTailable(true);
cursorParams.setAwaitData(true);
}
diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript
index 944a908ac8c..e9157f816d0 100644
--- a/src/mongo/db/pipeline/SConscript
+++ b/src/mongo/db/pipeline/SConscript
@@ -277,9 +277,10 @@ env.Library(
'mongod_process_interface.cpp',
],
LIBDEPS=[
- '$BUILD_DIR/mongo/db/query_exec',
'$BUILD_DIR/mongo/db/ops/write_ops_exec',
+ '$BUILD_DIR/mongo/db/query_exec',
'$BUILD_DIR/mongo/db/stats/top',
+ '$BUILD_DIR/mongo/db/storage/backup_cursor_service',
'$BUILD_DIR/mongo/s/sharding_api',
'mongo_process_common',
]
@@ -302,8 +303,10 @@ pipelineeEnv.InjectThirdPartyIncludePaths(libraries=['snappy'])
pipelineeEnv.Library(
target='pipeline',
source=[
+ "cluster_aggregation_planner.cpp",
'document_source.cpp',
'document_source_add_fields.cpp',
+ 'document_source_backup_cursor.cpp',
'document_source_bucket.cpp',
'document_source_bucket_auto.cpp',
'document_source_change_stream.cpp',
@@ -343,7 +346,6 @@ pipelineeEnv.Library(
'document_source_skip.cpp',
'document_source_sort.cpp',
'document_source_sort_by_count.cpp',
- "cluster_aggregation_planner.cpp",
'document_source_tee_consumer.cpp',
'document_source_unwind.cpp',
'pipeline.cpp',
diff --git a/src/mongo/db/pipeline/document_source_backup_cursor.cpp b/src/mongo/db/pipeline/document_source_backup_cursor.cpp
new file mode 100644
index 00000000000..f6a8b8b5eca
--- /dev/null
+++ b/src/mongo/db/pipeline/document_source_backup_cursor.cpp
@@ -0,0 +1,103 @@
+/**
+ * Copyright (C) 2018 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kQuery
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/pipeline/document_source_backup_cursor.h"
+
+#include <vector>
+
+#include "mongo/bson/bsonmisc.h"
+#include "mongo/util/log.h"
+
+namespace mongo {
+
+REGISTER_DOCUMENT_SOURCE(backupCursor,
+ DocumentSourceBackupCursor::LiteParsed::parse,
+ DocumentSourceBackupCursor::createFromBson);
+
+const char* DocumentSourceBackupCursor::kStageName = "$backupCursor";
+
+DocumentSourceBackupCursor::DocumentSourceBackupCursor(
+ const boost::intrusive_ptr<ExpressionContext>& pExpCtx)
+ : DocumentSource(pExpCtx),
+ _backupCursorState(pExpCtx->mongoProcessInterface->openBackupCursor(pExpCtx->opCtx)) {}
+
+
+DocumentSourceBackupCursor::~DocumentSourceBackupCursor() {
+ try {
+ pExpCtx->mongoProcessInterface->closeBackupCursor(pExpCtx->opCtx,
+ _backupCursorState.cursorId);
+ } catch (DBException& exc) {
+ severe() << exc.toStatus("Error closing a backup cursor.");
+ fassertFailed(50909);
+ }
+}
+
+DocumentSource::GetNextResult DocumentSourceBackupCursor::getNext() {
+ pExpCtx->checkForInterrupt();
+
+ if (!_backupCursorState.filenames.empty()) {
+ Document doc = {{"filename", _backupCursorState.filenames.back()}};
+ _backupCursorState.filenames.pop_back();
+
+ return std::move(doc);
+ }
+
+ return GetNextResult::makeEOF();
+}
+
+boost::intrusive_ptr<DocumentSource> DocumentSourceBackupCursor::createFromBson(
+ BSONElement spec, const boost::intrusive_ptr<ExpressionContext>& pExpCtx) {
+ // The anticipated usage of a backup cursor: open the backup cursor, consume the results, copy
+ // data off disk, close the backup cursor. The backup cursor must be successfully closed for
+ // the data copied to be valid. Hence, the caller needs a way to keep the cursor open after
+ // consuming the results, as well as the ability to send "heartbeats" to prevent the client
+ // cursor manager from timing out the backup cursor. A backup cursor does consume resources;
+ // in the event the calling process crashes, the cursors should eventually be timed out.
+ pExpCtx->tailableMode = TailableModeEnum::kTailable;
+
+ uassert(
+ ErrorCodes::FailedToParse,
+ str::stream() << kStageName << " value must be an object. Found: " << typeName(spec.type()),
+ spec.type() == BSONType::Object);
+
+ uassert(ErrorCodes::CannotBackup,
+ str::stream() << kStageName << " cannot be executed against a MongoS.",
+ !pExpCtx->inMongos && !pExpCtx->fromMongos && !pExpCtx->needsMerge);
+
+ return new DocumentSourceBackupCursor(pExpCtx);
+}
+
+Value DocumentSourceBackupCursor::serialize(
+ boost::optional<ExplainOptions::Verbosity> explain) const {
+ return Value(BSON(kStageName << 1));
+}
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_backup_cursor.h b/src/mongo/db/pipeline/document_source_backup_cursor.h
new file mode 100644
index 00000000000..d0d0d83ab7a
--- /dev/null
+++ b/src/mongo/db/pipeline/document_source_backup_cursor.h
@@ -0,0 +1,117 @@
+/**
+ * Copyright (C) 2018 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#pragma once
+
+#include <vector>
+
+#include "mongo/bson/bsonobj.h"
+#include "mongo/db/pipeline/document_source.h"
+#include "mongo/db/repl/read_concern_args.h"
+#include "mongo/db/storage/backup_cursor_service.h"
+
+namespace mongo {
+
+/**
+ * Represents the `$backupCursor` aggregation stage. The lifetime of this object maps to storage
+ * engine calls on `beginNonBlockingBackup` and `endNonBlockingBackup`. The DocumentSource will
+ * return filenames in the running `dbpath` that an application can copy and optionally some
+ * metadata information.
+ */
+class DocumentSourceBackupCursor final : public DocumentSource {
+public:
+ static const char* kStageName;
+
+ class LiteParsed final : public LiteParsedDocumentSource {
+ public:
+ static std::unique_ptr<LiteParsed> parse(const AggregationRequest& request,
+ const BSONElement& spec) {
+ return stdx::make_unique<LiteParsed>();
+ }
+
+ stdx::unordered_set<NamespaceString> getInvolvedNamespaces() const final {
+ return stdx::unordered_set<NamespaceString>();
+ }
+
+ PrivilegeVector requiredPrivileges(bool isMongos) const final {
+ // SERVER-36266 will address requiring the appropriate privileges.
+ return PrivilegeVector();
+ }
+
+ bool isInitialSource() const final {
+ return true;
+ }
+
+ bool allowedToForwardFromMongos() const final {
+ return false;
+ }
+
+ void assertSupportsReadConcern(const repl::ReadConcernArgs& readConcern) const {
+ uassert(ErrorCodes::InvalidOptions,
+ str::stream() << "Aggregation stage " << kStageName
+ << " requires read concern local but found "
+ << readConcern.toString(),
+ readConcern.getLevel() == repl::ReadConcernLevel::kLocalReadConcern);
+ }
+ };
+
+ virtual ~DocumentSourceBackupCursor();
+
+ GetNextResult getNext() final;
+
+ const char* getSourceName() const final {
+ return kStageName;
+ }
+
+ Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final;
+
+ StageConstraints constraints(Pipeline::SplitState pipeState) const final {
+ StageConstraints constraints(StreamType::kStreaming,
+ PositionRequirement::kFirst,
+ // This stage `uasserts` on a MongoS; the
+ // `HostTypeRequirement` field has no effect.
+ HostTypeRequirement::kAnyShard,
+ DiskUseRequirement::kNoDiskUse,
+ FacetRequirement::kNotAllowed,
+ TransactionRequirement::kNotAllowed);
+
+ constraints.isIndependentOfAnyCollection = true;
+ constraints.requiresInputDocSource = false;
+ return constraints;
+ }
+
+ static boost::intrusive_ptr<DocumentSource> createFromBson(
+ BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
+
+private:
+ DocumentSourceBackupCursor(const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
+
+ BackupCursorState _backupCursorState;
+};
+
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_list_local_cursors.cpp b/src/mongo/db/pipeline/document_source_list_local_cursors.cpp
index 72e53d7b6dd..7246bc1d749 100644
--- a/src/mongo/db/pipeline/document_source_list_local_cursors.cpp
+++ b/src/mongo/db/pipeline/document_source_list_local_cursors.cpp
@@ -58,12 +58,6 @@ DocumentSource::GetNextResult DocumentSourceListLocalCursors::getNext() {
boost::intrusive_ptr<DocumentSource> DocumentSourceListLocalCursors::createFromBson(
BSONElement spec, const boost::intrusive_ptr<ExpressionContext>& pExpCtx) {
- uassert(
- ErrorCodes::InvalidNamespace,
- str::stream() << kStageName
- << " must be run against the database with {aggregate: 1}, not a collection",
- pExpCtx->ns.isCollectionlessAggregateNS());
-
uassert(ErrorCodes::BadValue,
str::stream() << kStageName << " must be run as { " << kStageName << ": {}}",
spec.isABSONObj() && spec.Obj().isEmpty());
diff --git a/src/mongo/db/pipeline/mongo_process_interface.h b/src/mongo/db/pipeline/mongo_process_interface.h
index b650855d438..13efd2928f3 100644
--- a/src/mongo/db/pipeline/mongo_process_interface.h
+++ b/src/mongo/db/pipeline/mongo_process_interface.h
@@ -44,6 +44,7 @@
#include "mongo/db/pipeline/lite_parsed_document_source.h"
#include "mongo/db/pipeline/value.h"
#include "mongo/db/query/explain_options.h"
+#include "mongo/db/storage/backup_cursor_service.h"
namespace mongo {
@@ -223,6 +224,17 @@ public:
*/
virtual std::vector<GenericCursor> getCursors(
const boost::intrusive_ptr<ExpressionContext>& expCtx) const = 0;
+
+ /**
+ * The following methods forward to the BackupCursorService decorating the ServiceContext.
+ */
+ virtual void fsyncLock(OperationContext* opCtx) = 0;
+
+ virtual void fsyncUnlock(OperationContext* opCtx) = 0;
+
+ virtual BackupCursorState openBackupCursor(OperationContext* opCtx) = 0;
+
+ virtual void closeBackupCursor(OperationContext* opCtx, std::uint64_t cursorId) = 0;
};
} // namespace mongo
diff --git a/src/mongo/db/pipeline/mongod_process_interface.cpp b/src/mongo/db/pipeline/mongod_process_interface.cpp
index 8e52cf4d057..566b43533d1 100644
--- a/src/mongo/db/pipeline/mongod_process_interface.cpp
+++ b/src/mongo/db/pipeline/mongod_process_interface.cpp
@@ -394,6 +394,26 @@ boost::optional<Document> MongoDInterface::lookupSingleDocument(
return lookedUpDocument;
}
+void MongoDInterface::fsyncLock(OperationContext* opCtx) {
+ auto backupCursorService = BackupCursorService::get(opCtx->getServiceContext());
+ backupCursorService->fsyncLock(opCtx);
+}
+
+void MongoDInterface::fsyncUnlock(OperationContext* opCtx) {
+ auto backupCursorService = BackupCursorService::get(opCtx->getServiceContext());
+ backupCursorService->fsyncUnlock(opCtx);
+}
+
+BackupCursorState MongoDInterface::openBackupCursor(OperationContext* opCtx) {
+ auto backupCursorService = BackupCursorService::get(opCtx->getServiceContext());
+ return backupCursorService->openBackupCursor(opCtx);
+}
+
+void MongoDInterface::closeBackupCursor(OperationContext* opCtx, std::uint64_t cursorId) {
+ auto backupCursorService = BackupCursorService::get(opCtx->getServiceContext());
+ backupCursorService->closeBackupCursor(opCtx, cursorId);
+}
+
BSONObj MongoDInterface::_reportCurrentOpForClient(OperationContext* opCtx,
Client* client,
CurrentOpTruncateMode truncateOps) const {
diff --git a/src/mongo/db/pipeline/mongod_process_interface.h b/src/mongo/db/pipeline/mongod_process_interface.h
index 8d452e54e62..945ffb5ca91 100644
--- a/src/mongo/db/pipeline/mongod_process_interface.h
+++ b/src/mongo/db/pipeline/mongod_process_interface.h
@@ -93,6 +93,10 @@ public:
boost::optional<BSONObj> readConcern) final;
std::vector<GenericCursor> getCursors(
const boost::intrusive_ptr<ExpressionContext>& expCtx) const final;
+ void fsyncLock(OperationContext* opCtx) final;
+ void fsyncUnlock(OperationContext* opCtx) final;
+ BackupCursorState openBackupCursor(OperationContext* opCtx) final;
+ void closeBackupCursor(OperationContext* opCtx, std::uint64_t cursorId) final;
protected:
BSONObj _reportCurrentOpForClient(OperationContext* opCtx,
diff --git a/src/mongo/db/pipeline/mongos_process_interface.h b/src/mongo/db/pipeline/mongos_process_interface.h
index f01a3390deb..1c654d83e2f 100644
--- a/src/mongo/db/pipeline/mongos_process_interface.h
+++ b/src/mongo/db/pipeline/mongos_process_interface.h
@@ -136,6 +136,26 @@ public:
MONGO_UNREACHABLE;
}
+ /**
+ * The following methods only make sense for data-bearing nodes and should never be called on
+ * a mongos.
+ */
+ void fsyncLock(OperationContext* opCtx) final {
+ MONGO_UNREACHABLE;
+ }
+
+ void fsyncUnlock(OperationContext* opCtx) final {
+ MONGO_UNREACHABLE;
+ }
+
+ BackupCursorState openBackupCursor(OperationContext* opCtx) final {
+ MONGO_UNREACHABLE;
+ }
+
+ void closeBackupCursor(OperationContext* opCtx, std::uint64_t cursorId) final {
+ MONGO_UNREACHABLE;
+ }
+
protected:
BSONObj _reportCurrentOpForClient(OperationContext* opCtx,
Client* client,
diff --git a/src/mongo/db/pipeline/stub_mongo_process_interface.h b/src/mongo/db/pipeline/stub_mongo_process_interface.h
index 3e80f90e3a5..bda78c03d93 100644
--- a/src/mongo/db/pipeline/stub_mongo_process_interface.h
+++ b/src/mongo/db/pipeline/stub_mongo_process_interface.h
@@ -152,5 +152,21 @@ public:
const boost::intrusive_ptr<ExpressionContext>& expCtx) const {
MONGO_UNREACHABLE;
}
+
+ void fsyncLock(OperationContext* opCtx) final {
+ MONGO_UNREACHABLE;
+ }
+
+ void fsyncUnlock(OperationContext* opCtx) final {
+ MONGO_UNREACHABLE;
+ }
+
+ BackupCursorState openBackupCursor(OperationContext* opCtx) final {
+ MONGO_UNREACHABLE;
+ }
+
+ void closeBackupCursor(OperationContext* opCtx, std::uint64_t cursorId) final {
+ MONGO_UNREACHABLE;
+ }
};
} // namespace mongo
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp
index 66b1832fe53..851965d418f 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp
@@ -810,7 +810,9 @@ StatusWith<std::vector<std::string>> WiredTigerKVEngine::beginNonBlockingBackup(
const auto wiredTigerLogFilePrefix = "WiredTigerLog";
if (name.find(wiredTigerLogFilePrefix) == 0) {
// TODO SERVER-13455:replace `journal/` with the configurable journal path.
- name = "journal/" + name;
+ auto path = boost::filesystem::path("journal");
+ path /= name;
+ name = path.string();
}
filesToCopy.push_back(std::move(name));
}
diff --git a/src/mongo/shell/replsettest.js b/src/mongo/shell/replsettest.js
index a5e7401b723..c4c22a491b9 100644
--- a/src/mongo/shell/replsettest.js
+++ b/src/mongo/shell/replsettest.js
@@ -1692,8 +1692,10 @@ var ReplSetTest = function(opts) {
var activeException = false;
// Lock the primary to prevent the TTL monitor from deleting expired documents in
- // the background while we are getting the dbhashes of the replica set members.
- assert.commandWorked(primary.adminCommand({fsync: 1, lock: 1}),
+ // the background while we are getting the dbhashes of the replica set members. It's not
+ // important if the storage engine fails to perform its fsync operation. The only
+ // requirement is that writes are locked out.
+ assert.commandWorked(primary.adminCommand({fsync: 1, lock: 1, allowFsyncFailure: true}),
'failed to lock the primary');
try {
this.awaitReplication(null, null, slaves);