summaryrefslogtreecommitdiff
path: root/src/mongo/db/commands
diff options
context:
space:
mode:
authorDaniel Gottlieb <daniel.gottlieb@mongodb.com>2018-08-08 20:43:41 -0400
committerDaniel Gottlieb <daniel.gottlieb@mongodb.com>2018-08-08 20:43:41 -0400
commitca4d34ece27104dd9bb62f9e46120e83398b71f3 (patch)
tree449aefb3196379e9c55a57fa4dbcfaa476af8f72 /src/mongo/db/commands
parent0a28a3d8bf39436e02e0325dc5c02f4ef19abd20 (diff)
downloadmongo-ca4d34ece27104dd9bb62f9e46120e83398b71f3.tar.gz
SERVER-36265: Expose a $backupCursor aggregation stage.
Diffstat (limited to 'src/mongo/db/commands')
-rw-r--r--src/mongo/db/commands/fsync.cpp43
-rw-r--r--src/mongo/db/commands/run_aggregate.cpp42
2 files changed, 35 insertions, 50 deletions
diff --git a/src/mongo/db/commands/fsync.cpp b/src/mongo/db/commands/fsync.cpp
index f7113b06511..69a904578a8 100644
--- a/src/mongo/db/commands/fsync.cpp
+++ b/src/mongo/db/commands/fsync.cpp
@@ -71,12 +71,16 @@ Lock::ResourceMutex commandMutex("fsyncCommandMutex");
*/
class FSyncLockThread : public BackgroundJob {
public:
- FSyncLockThread() : BackgroundJob(false) {}
+ FSyncLockThread(bool allowFsyncFailure)
+ : BackgroundJob(false), _allowFsyncFailure(allowFsyncFailure) {}
virtual ~FSyncLockThread() {}
virtual string name() const {
return "FSyncLockThread";
}
virtual void run();
+
+private:
+ bool _allowFsyncFailure;
};
class FSyncCommand : public ErrmsgCommandDeprecated {
@@ -128,12 +132,16 @@ public:
return false;
}
-
const bool sync =
!cmdObj["async"].trueValue(); // async means do an fsync, but return immediately
const bool lock = cmdObj["lock"].trueValue();
log() << "CMD fsync: sync:" << sync << " lock:" << lock;
+ // fsync + lock is sometimes used to block writes out of the system and does not care if
+ // the `BackupCursorService::fsyncLock` call succeeds.
+ const bool allowFsyncFailure =
+ getTestCommandsEnabled() && cmdObj["allowFsyncFailure"].trueValue();
+
if (!lock) {
// Take a global IS lock to ensure the storage engine is not shutdown
Lock::GlobalLock global(opCtx, MODE_IS);
@@ -160,7 +168,7 @@ public:
stdx::unique_lock<stdx::mutex> lk(lockStateMutex);
threadStatus = Status::OK();
threadStarted = false;
- _lockThread = stdx::make_unique<FSyncLockThread>();
+ _lockThread = stdx::make_unique<FSyncLockThread>(allowFsyncFailure);
_lockThread->go();
while (!threadStarted && threadStatus.isOK()) {
@@ -346,16 +354,27 @@ void FSyncLockThread::run() {
return;
}
+ bool successfulFsyncLock = false;
auto backupCursorService = BackupCursorService::get(opCtx.getServiceContext());
try {
- writeConflictRetry(&opCtx, "beginBackup", "global", [&opCtx, backupCursorService] {
- backupCursorService->fsyncLock(&opCtx);
- });
+ writeConflictRetry(&opCtx,
+ "beginBackup",
+ "global",
+ [&opCtx, backupCursorService, &successfulFsyncLock] {
+ backupCursorService->fsyncLock(&opCtx);
+ successfulFsyncLock = true;
+ });
} catch (const DBException& e) {
- error() << "storage engine unable to begin backup : " << e.toString();
- fsyncCmd.threadStatus = e.toStatus();
- fsyncCmd.acquireFsyncLockSyncCV.notify_one();
- return;
+ if (_allowFsyncFailure) {
+ warning() << "Locking despite storage engine being unable to begin backup : "
+ << e.toString();
+ opCtx.recoveryUnit()->waitUntilDurable();
+ } else {
+ error() << "storage engine unable to begin backup : " << e.toString();
+ fsyncCmd.threadStatus = e.toStatus();
+ fsyncCmd.acquireFsyncLockSyncCV.notify_one();
+ return;
+ }
}
fsyncCmd.threadStarted = true;
@@ -365,7 +384,9 @@ void FSyncLockThread::run() {
fsyncCmd.releaseFsyncLockSyncCV.wait(lk);
}
- backupCursorService->fsyncUnlock(&opCtx);
+ if (successfulFsyncLock) {
+ backupCursorService->fsyncUnlock(&opCtx);
+ }
} catch (const std::exception& e) {
severe() << "FSyncLockThread exception: " << e.what();
diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp
index fda10f556ab..10fa0fd748a 100644
--- a/src/mongo/db/commands/run_aggregate.cpp
+++ b/src/mongo/db/commands/run_aggregate.cpp
@@ -269,36 +269,6 @@ StatusWith<StringMap<ExpressionContext::ResolvedNamespace>> resolveInvolvedNames
}
/**
- * Round trips the pipeline through serialization by calling serialize(), then Pipeline::parse().
- * fasserts if it fails to parse after being serialized.
- */
-std::unique_ptr<Pipeline, PipelineDeleter> reparsePipeline(
- const Pipeline* pipeline,
- const AggregationRequest& request,
- const boost::intrusive_ptr<ExpressionContext>& expCtx) {
- auto serialized = pipeline->serialize();
-
- // Convert vector<Value> to vector<BSONObj>.
- std::vector<BSONObj> parseableSerialization;
- parseableSerialization.reserve(serialized.size());
- for (auto&& serializedStage : serialized) {
- invariant(serializedStage.getType() == BSONType::Object);
- parseableSerialization.push_back(serializedStage.getDocument().toBson());
- }
-
- auto reparsedPipeline = Pipeline::parse(parseableSerialization, expCtx);
- if (!reparsedPipeline.isOK()) {
- error() << "Aggregation command did not round trip through parsing and serialization "
- "correctly. Input pipeline: "
- << Value(request.getPipeline()) << ", serialized pipeline: " << Value(serialized);
- fassertFailedWithStatusNoTrace(40175, reparsedPipeline.getStatus());
- }
-
- reparsedPipeline.getValue()->optimizePipeline();
- return std::move(reparsedPipeline.getValue());
-}
-
-/**
* Returns Status::OK if each view namespace in 'pipeline' has a default collator equivalent to
* 'collator'. Otherwise, returns ErrorCodes::OptionNotSupportedOnView.
*/
@@ -514,14 +484,6 @@ Status runAggregate(OperationContext* opCtx,
pipeline->optimizePipeline();
- if (kDebugBuild && !expCtx->explain && !expCtx->fromMongos) {
- // Make sure all operations round-trip through Pipeline::serialize() correctly by
- // re-parsing every command in debug builds. This is important because sharded
- // aggregations rely on this ability. Skipping when fromMongos because this has
- // already been through the transformation (and this un-sets expCtx->fromMongos).
- pipeline = reparsePipeline(pipeline.get(), request, expCtx);
- }
-
// Prepare a PlanExecutor to provide input into the pipeline, if needed.
if (liteParsedPipeline.hasChangeStream()) {
// If we are using a change stream, the cursor stage should have a simple collation,
@@ -603,7 +565,9 @@ Status runAggregate(OperationContext* opCtx,
AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserNames(),
repl::ReadConcernArgs::get(opCtx).getLevel(),
cmdObj);
- if (expCtx->tailableMode == TailableModeEnum::kTailableAndAwaitData) {
+ if (expCtx->tailableMode == TailableModeEnum::kTailable) {
+ cursorParams.setTailable(true);
+ } else if (expCtx->tailableMode == TailableModeEnum::kTailableAndAwaitData) {
cursorParams.setTailable(true);
cursorParams.setAwaitData(true);
}