diff options
author | Daniel Gottlieb <daniel.gottlieb@mongodb.com> | 2018-08-08 20:43:41 -0400 |
---|---|---|
committer | Daniel Gottlieb <daniel.gottlieb@mongodb.com> | 2018-08-08 20:43:41 -0400 |
commit | ca4d34ece27104dd9bb62f9e46120e83398b71f3 (patch) | |
tree | 449aefb3196379e9c55a57fa4dbcfaa476af8f72 /src/mongo/db/commands | |
parent | 0a28a3d8bf39436e02e0325dc5c02f4ef19abd20 (diff) | |
download | mongo-ca4d34ece27104dd9bb62f9e46120e83398b71f3.tar.gz |
SERVER-36265: Expose a $backupCursor aggregation stage.
Diffstat (limited to 'src/mongo/db/commands')
-rw-r--r-- | src/mongo/db/commands/fsync.cpp | 43 | ||||
-rw-r--r-- | src/mongo/db/commands/run_aggregate.cpp | 42 |
2 files changed, 35 insertions, 50 deletions
diff --git a/src/mongo/db/commands/fsync.cpp b/src/mongo/db/commands/fsync.cpp index f7113b06511..69a904578a8 100644 --- a/src/mongo/db/commands/fsync.cpp +++ b/src/mongo/db/commands/fsync.cpp @@ -71,12 +71,16 @@ Lock::ResourceMutex commandMutex("fsyncCommandMutex"); */ class FSyncLockThread : public BackgroundJob { public: - FSyncLockThread() : BackgroundJob(false) {} + FSyncLockThread(bool allowFsyncFailure) + : BackgroundJob(false), _allowFsyncFailure(allowFsyncFailure) {} virtual ~FSyncLockThread() {} virtual string name() const { return "FSyncLockThread"; } virtual void run(); + +private: + bool _allowFsyncFailure; }; class FSyncCommand : public ErrmsgCommandDeprecated { @@ -128,12 +132,16 @@ public: return false; } - const bool sync = !cmdObj["async"].trueValue(); // async means do an fsync, but return immediately const bool lock = cmdObj["lock"].trueValue(); log() << "CMD fsync: sync:" << sync << " lock:" << lock; + // fsync + lock is sometimes used to block writes out of the system and does not care if + // the `BackupCursorService::fsyncLock` call succeeds. + const bool allowFsyncFailure = + getTestCommandsEnabled() && cmdObj["allowFsyncFailure"].trueValue(); + if (!lock) { // Take a global IS lock to ensure the storage engine is not shutdown Lock::GlobalLock global(opCtx, MODE_IS); @@ -160,7 +168,7 @@ public: stdx::unique_lock<stdx::mutex> lk(lockStateMutex); threadStatus = Status::OK(); threadStarted = false; - _lockThread = stdx::make_unique<FSyncLockThread>(); + _lockThread = stdx::make_unique<FSyncLockThread>(allowFsyncFailure); _lockThread->go(); while (!threadStarted && threadStatus.isOK()) { @@ -346,16 +354,27 @@ void FSyncLockThread::run() { return; } + bool successfulFsyncLock = false; auto backupCursorService = BackupCursorService::get(opCtx.getServiceContext()); try { - writeConflictRetry(&opCtx, "beginBackup", "global", [&opCtx, backupCursorService] { - backupCursorService->fsyncLock(&opCtx); - }); + writeConflictRetry(&opCtx, + "beginBackup", + "global", + [&opCtx, backupCursorService, &successfulFsyncLock] { + backupCursorService->fsyncLock(&opCtx); + successfulFsyncLock = true; + }); } catch (const DBException& e) { - error() << "storage engine unable to begin backup : " << e.toString(); - fsyncCmd.threadStatus = e.toStatus(); - fsyncCmd.acquireFsyncLockSyncCV.notify_one(); - return; + if (_allowFsyncFailure) { + warning() << "Locking despite storage engine being unable to begin backup : " + << e.toString(); + opCtx.recoveryUnit()->waitUntilDurable(); + } else { + error() << "storage engine unable to begin backup : " << e.toString(); + fsyncCmd.threadStatus = e.toStatus(); + fsyncCmd.acquireFsyncLockSyncCV.notify_one(); + return; + } } fsyncCmd.threadStarted = true; @@ -365,7 +384,9 @@ void FSyncLockThread::run() { fsyncCmd.releaseFsyncLockSyncCV.wait(lk); } - backupCursorService->fsyncUnlock(&opCtx); + if (successfulFsyncLock) { + backupCursorService->fsyncUnlock(&opCtx); + } } catch (const std::exception& e) { severe() << "FSyncLockThread exception: " << e.what(); diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp index fda10f556ab..10fa0fd748a 100644 --- a/src/mongo/db/commands/run_aggregate.cpp +++ b/src/mongo/db/commands/run_aggregate.cpp @@ -269,36 +269,6 @@ StatusWith<StringMap<ExpressionContext::ResolvedNamespace>> resolveInvolvedNames } /** - * Round trips the pipeline through serialization by calling serialize(), then Pipeline::parse(). - * fasserts if it fails to parse after being serialized. - */ -std::unique_ptr<Pipeline, PipelineDeleter> reparsePipeline( - const Pipeline* pipeline, - const AggregationRequest& request, - const boost::intrusive_ptr<ExpressionContext>& expCtx) { - auto serialized = pipeline->serialize(); - - // Convert vector<Value> to vector<BSONObj>. - std::vector<BSONObj> parseableSerialization; - parseableSerialization.reserve(serialized.size()); - for (auto&& serializedStage : serialized) { - invariant(serializedStage.getType() == BSONType::Object); - parseableSerialization.push_back(serializedStage.getDocument().toBson()); - } - - auto reparsedPipeline = Pipeline::parse(parseableSerialization, expCtx); - if (!reparsedPipeline.isOK()) { - error() << "Aggregation command did not round trip through parsing and serialization " - "correctly. Input pipeline: " - << Value(request.getPipeline()) << ", serialized pipeline: " << Value(serialized); - fassertFailedWithStatusNoTrace(40175, reparsedPipeline.getStatus()); - } - - reparsedPipeline.getValue()->optimizePipeline(); - return std::move(reparsedPipeline.getValue()); -} - -/** * Returns Status::OK if each view namespace in 'pipeline' has a default collator equivalent to * 'collator'. Otherwise, returns ErrorCodes::OptionNotSupportedOnView. */ @@ -514,14 +484,6 @@ Status runAggregate(OperationContext* opCtx, pipeline->optimizePipeline(); - if (kDebugBuild && !expCtx->explain && !expCtx->fromMongos) { - // Make sure all operations round-trip through Pipeline::serialize() correctly by - // re-parsing every command in debug builds. This is important because sharded - // aggregations rely on this ability. Skipping when fromMongos because this has - // already been through the transformation (and this un-sets expCtx->fromMongos). - pipeline = reparsePipeline(pipeline.get(), request, expCtx); - } - // Prepare a PlanExecutor to provide input into the pipeline, if needed. if (liteParsedPipeline.hasChangeStream()) { // If we are using a change stream, the cursor stage should have a simple collation, @@ -603,7 +565,9 @@ Status runAggregate(OperationContext* opCtx, AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserNames(), repl::ReadConcernArgs::get(opCtx).getLevel(), cmdObj); - if (expCtx->tailableMode == TailableModeEnum::kTailableAndAwaitData) { + if (expCtx->tailableMode == TailableModeEnum::kTailable) { + cursorParams.setTailable(true); + } else if (expCtx->tailableMode == TailableModeEnum::kTailableAndAwaitData) { cursorParams.setTailable(true); cursorParams.setAwaitData(true); } |