summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMax Hirschhorn <max.hirschhorn@mongodb.com>2016-09-08 09:48:29 -0400
committerMax Hirschhorn <max.hirschhorn@mongodb.com>2016-09-08 09:48:29 -0400
commit284c0eca4941a450a473267b54aef03ba419f993 (patch)
tree4e274ff5a0b6db7a7cd519899397a7e06b5ebff6
parent81b92cff01eee65a039d0cb74fbd3b312f95ddec (diff)
downloadmongo-284c0eca4941a450a473267b54aef03ba419f993.tar.gz
SERVER-25039 Abort aggregation planning when a catalog operation occurs.
This prevents the aggregation system from trying to continue query planning when the collection longer exists. (cherry picked from commit 82cd8943dab085447ee180d4d59c2c5da778c523)
-rw-r--r--jstests/concurrency/fsm_workloads/kill_aggregation.js47
-rw-r--r--jstests/concurrency/fsm_workloads/kill_rooted_or.js3
-rw-r--r--src/mongo/db/pipeline/pipeline_d.cpp78
-rw-r--r--src/mongo/db/pipeline/pipeline_d.h4
4 files changed, 101 insertions, 31 deletions
diff --git a/jstests/concurrency/fsm_workloads/kill_aggregation.js b/jstests/concurrency/fsm_workloads/kill_aggregation.js
new file mode 100644
index 00000000000..f1260f10a96
--- /dev/null
+++ b/jstests/concurrency/fsm_workloads/kill_aggregation.js
@@ -0,0 +1,47 @@
+'use strict';
+
+/**
+ * kill_aggregation.js
+ *
+ * Tests that the aggregation system correctly halts its planning to determine whether the query
+ * system can provide a non-blocking sort or can provide a covered projection when a catalog
+ * operation occurs.
+ *
+ * This workload was designed to reproduce SERVER-25039.
+ */
+
+load('jstests/concurrency/fsm_libs/extend_workload.js'); // for extendWorkload
+load('jstests/concurrency/fsm_workloads/kill_rooted_or.js'); // for $config
+
+var $config = extendWorkload(
+ $config,
+ function($config, $super) {
+
+ // Use the workload name as the collection name, since the workload name is assumed to be
+ // unique.
+ $config.data.collName = 'kill_aggregation';
+
+ $config.states.query = function query(db, collName) {
+ var res = db.runCommand({
+ aggregate: this.collName,
+ // We use a rooted $or query to cause plan selection to use the subplanner and thus
+ // yield.
+ pipeline: [{$match: {$or: [{a: 0}, {b: 0}]}}],
+ cursor: {}
+ });
+
+ if (!res.ok) {
+ return;
+ }
+
+ var cursor = new DBCommandCursor(db.getMongo(), res);
+ try {
+ // No documents are ever inserted into the collection.
+ assertAlways.eq(0, cursor.itcount());
+ } catch (e) {
+ // Ignore errors due to the plan executor being killed.
+ }
+ };
+
+ return $config;
+ });
diff --git a/jstests/concurrency/fsm_workloads/kill_rooted_or.js b/jstests/concurrency/fsm_workloads/kill_rooted_or.js
index e05bff72d40..0275bc774ac 100644
--- a/jstests/concurrency/fsm_workloads/kill_rooted_or.js
+++ b/jstests/concurrency/fsm_workloads/kill_rooted_or.js
@@ -24,7 +24,8 @@ var $config = (function() {
query: function query(db, collName) {
var cursor = db[this.collName].find({$or: [{a: 0}, {b: 0}]});
try {
- assert.eq(0, cursor.itcount());
+ // No documents are ever inserted into the collection.
+ assertAlways.eq(0, cursor.itcount());
} catch (e) {
// Ignore errors due to the plan executor being killed.
}
diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp
index bfe84545158..56e7040b5ec 100644
--- a/src/mongo/db/pipeline/pipeline_d.cpp
+++ b/src/mongo/db/pipeline/pipeline_d.cpp
@@ -126,13 +126,14 @@ private:
* if the storage engine doesn't support random cursors, or if 'sampleSize' is a large enough
* percentage of the collection.
*/
-shared_ptr<PlanExecutor> createRandomCursorExecutor(Collection* collection,
- OperationContext* txn,
- long long sampleSize,
- long long numRecords) {
+StatusWith<unique_ptr<PlanExecutor>> createRandomCursorExecutor(Collection* collection,
+ OperationContext* txn,
+ long long sampleSize,
+ long long numRecords) {
double kMaxSampleRatioForRandCursor = 0.05;
- if (sampleSize > numRecords * kMaxSampleRatioForRandCursor || numRecords <= 100)
- return {};
+ if (sampleSize > numRecords * kMaxSampleRatioForRandCursor || numRecords <= 100) {
+ return {nullptr};
+ }
// Attempt to get a random cursor from the RecordStore. If the RecordStore does not support
// random cursors, attempt to get one from the _id index.
@@ -151,7 +152,7 @@ shared_ptr<PlanExecutor> createRandomCursorExecutor(Collection* collection,
if (!indexDescriptor) {
// There was no _id index.
- return {};
+ return {nullptr};
}
IndexAccessMethod* idIam = indexCatalog->getIndex(indexDescriptor);
@@ -159,7 +160,7 @@ shared_ptr<PlanExecutor> createRandomCursorExecutor(Collection* collection,
if (!idxRandCursor) {
// Storage engine does not support any type of random cursor.
- return {};
+ return {nullptr};
}
auto idxIterator = stdx::make_unique<IndexIteratorStage>(txn,
@@ -178,12 +179,12 @@ shared_ptr<PlanExecutor> createRandomCursorExecutor(Collection* collection,
if (shardingState->needCollectionMetadata(txn, txn->getNS())) {
auto shardFilterStage = stdx::make_unique<ShardFilterStage>(
txn, shardingState->getCollectionMetadata(txn->getNS()), ws.get(), stage.release());
- return uassertStatusOK(PlanExecutor::make(
- txn, std::move(ws), std::move(shardFilterStage), collection, PlanExecutor::YIELD_AUTO));
+ return PlanExecutor::make(
+ txn, std::move(ws), std::move(shardFilterStage), collection, PlanExecutor::YIELD_AUTO);
}
- return uassertStatusOK(PlanExecutor::make(
- txn, std::move(ws), std::move(stage), collection, PlanExecutor::YIELD_AUTO));
+ return PlanExecutor::make(
+ txn, std::move(ws), std::move(stage), collection, PlanExecutor::YIELD_AUTO);
}
StatusWith<std::unique_ptr<PlanExecutor>> attemptToGetExecutor(
@@ -247,7 +248,8 @@ shared_ptr<PlanExecutor> PipelineD::prepareCursorSource(
if (collection && sampleStage) {
const long long sampleSize = sampleStage->getSampleSize();
const long long numRecords = collection->getRecordStore()->numRecords(txn);
- auto exec = createRandomCursorExecutor(collection, txn, sampleSize, numRecords);
+ auto exec = uassertStatusOK(
+ createRandomCursorExecutor(collection, txn, sampleSize, numRecords));
if (exec) {
// Replace $sample stage with $sampleFromRandomCursor stage.
sources.pop_front();
@@ -257,7 +259,7 @@ shared_ptr<PlanExecutor> PipelineD::prepareCursorSource(
const BSONObj initialQuery;
return addCursorSource(
- pPipeline, pExpCtx, exec, pPipeline->getDependencies(initialQuery));
+ pPipeline, pExpCtx, std::move(exec), pPipeline->getDependencies(initialQuery));
}
}
}
@@ -294,13 +296,14 @@ shared_ptr<PlanExecutor> PipelineD::prepareCursorSource(
}
// Create the PlanExecutor.
- auto exec = prepareExecutor(
- txn, collection, pPipeline, pExpCtx, sortStage, deps, queryObj, &sortObj, &projForQuery);
+ auto exec = uassertStatusOK(prepareExecutor(
+ txn, collection, pPipeline, pExpCtx, sortStage, deps, queryObj, &sortObj, &projForQuery));
- return addCursorSource(pPipeline, pExpCtx, exec, deps, queryObj, sortObj, projForQuery);
+ return addCursorSource(
+ pPipeline, pExpCtx, std::move(exec), deps, queryObj, sortObj, projForQuery);
}
-std::shared_ptr<PlanExecutor> PipelineD::prepareExecutor(
+StatusWith<std::unique_ptr<PlanExecutor>> PipelineD::prepareExecutor(
OperationContext* txn,
Collection* collection,
const intrusive_ptr<Pipeline>& pipeline,
@@ -338,8 +341,6 @@ std::shared_ptr<PlanExecutor> PipelineD::prepareExecutor(
plannerOpts |= QueryPlannerParams::NO_UNCOVERED_PROJECTIONS;
}
- std::shared_ptr<PlanExecutor> exec;
-
BSONObj emptyProjection;
if (sortStage) {
// See if the query system can provide a non-blocking sort.
@@ -351,9 +352,15 @@ std::shared_ptr<PlanExecutor> PipelineD::prepareExecutor(
auto swExecutorSortAndProj = attemptToGetExecutor(
txn, collection, expCtx, queryObj, *projectionObj, *sortObj, plannerOpts);
+ std::unique_ptr<PlanExecutor> exec;
if (swExecutorSortAndProj.isOK()) {
// Success! We have a non-blocking sort and a covered projection.
exec = std::move(swExecutorSortAndProj.getValue());
+ } else if (swExecutorSortAndProj == ErrorCodes::QueryPlanKilled) {
+ return {ErrorCodes::OperationFailed,
+ str::stream() << "Failed to determine whether query system can provide a "
+ "covered projection in addition to a non-blocking sort: "
+ << swExecutorSortAndProj.getStatus().toString()};
} else {
// The query system couldn't cover the projection.
*projectionObj = BSONObj();
@@ -367,7 +374,13 @@ std::shared_ptr<PlanExecutor> PipelineD::prepareExecutor(
// We need to reinsert the coalesced $limit after removing the $sort.
pipeline->sources.push_front(sortStage->getLimitSrc());
}
- return exec;
+ return std::move(exec);
+ } else if (swExecutorSort == ErrorCodes::QueryPlanKilled) {
+ return {
+ ErrorCodes::OperationFailed,
+ str::stream()
+ << "Failed to determine whether query system can provide a non-blocking sort: "
+ << swExecutorSort.getStatus().toString()};
}
// The query system can't provide a non-blocking sort.
*sortObj = BSONObj();
@@ -383,18 +396,23 @@ std::shared_ptr<PlanExecutor> PipelineD::prepareExecutor(
if (swExecutorProj.isOK()) {
// Success! We have a covered projection.
return std::move(swExecutorProj.getValue());
+ } else if (swExecutorProj == ErrorCodes::QueryPlanKilled) {
+ return {ErrorCodes::OperationFailed,
+ str::stream()
+ << "Failed to determine whether query system can provide a covered projection: "
+ << swExecutorProj.getStatus().toString()};
}
// The query system couldn't provide a covered projection.
*projectionObj = BSONObj();
// If this doesn't work, nothing will.
- return uassertStatusOK(attemptToGetExecutor(
- txn, collection, expCtx, queryObj, *projectionObj, *sortObj, plannerOpts));
+ return attemptToGetExecutor(
+ txn, collection, expCtx, queryObj, *projectionObj, *sortObj, plannerOpts);
}
shared_ptr<PlanExecutor> PipelineD::addCursorSource(const intrusive_ptr<Pipeline>& pipeline,
const intrusive_ptr<ExpressionContext>& expCtx,
- shared_ptr<PlanExecutor> exec,
+ unique_ptr<PlanExecutor> exec,
DepsTracker deps,
const BSONObj& queryObj,
const BSONObj& sortObj,
@@ -402,9 +420,13 @@ shared_ptr<PlanExecutor> PipelineD::addCursorSource(const intrusive_ptr<Pipeline
// Get the full "namespace" name.
const string& fullName = expCtx->ns.ns();
+ // We convert the unique_ptr to a shared_ptr because both the PipelineProxyStage and the
+ // DocumentSourceCursor need to reference the PlanExecutor.
+ std::shared_ptr<PlanExecutor> sharedExec(std::move(exec));
+
// Put the PlanExecutor into a DocumentSourceCursor and add it to the front of the pipeline.
intrusive_ptr<DocumentSourceCursor> pSource =
- DocumentSourceCursor::create(fullName, exec, expCtx);
+ DocumentSourceCursor::create(fullName, sharedExec, expCtx);
// Note the query, sort, and projection for explain.
pSource->setQuery(queryObj);
@@ -429,10 +451,10 @@ shared_ptr<PlanExecutor> PipelineD::addCursorSource(const intrusive_ptr<Pipeline
// DocumentSourceCursor expects a yielding PlanExecutor that has had its state saved. We
// deregister the PlanExecutor so that it can be registered with ClientCursor.
- exec->deregisterExec();
- exec->saveState();
+ sharedExec->deregisterExec();
+ sharedExec->saveState();
- return exec;
+ return sharedExec;
}
} // namespace mongo
diff --git a/src/mongo/db/pipeline/pipeline_d.h b/src/mongo/db/pipeline/pipeline_d.h
index a9993c6331b..2da76fe26e0 100644
--- a/src/mongo/db/pipeline/pipeline_d.h
+++ b/src/mongo/db/pipeline/pipeline_d.h
@@ -94,7 +94,7 @@ private:
* sort, and 'projectionObj' will be set to an empty object if the query system cannot provide a
* covered projection.
*/
- static std::shared_ptr<PlanExecutor> prepareExecutor(
+ static StatusWith<std::unique_ptr<PlanExecutor>> prepareExecutor(
OperationContext* txn,
Collection* collection,
const boost::intrusive_ptr<Pipeline>& pipeline,
@@ -112,7 +112,7 @@ private:
static std::shared_ptr<PlanExecutor> addCursorSource(
const boost::intrusive_ptr<Pipeline>& pipeline,
const boost::intrusive_ptr<ExpressionContext>& expCtx,
- std::shared_ptr<PlanExecutor> exec,
+ std::unique_ptr<PlanExecutor> exec,
DepsTracker deps,
const BSONObj& queryObj = BSONObj(),
const BSONObj& sortObj = BSONObj(),