diff options
author | Max Hirschhorn <max.hirschhorn@mongodb.com> | 2016-09-08 09:48:29 -0400 |
---|---|---|
committer | Max Hirschhorn <max.hirschhorn@mongodb.com> | 2016-09-08 09:48:29 -0400 |
commit | 284c0eca4941a450a473267b54aef03ba419f993 (patch) | |
tree | 4e274ff5a0b6db7a7cd519899397a7e06b5ebff6 | |
parent | 81b92cff01eee65a039d0cb74fbd3b312f95ddec (diff) | |
download | mongo-284c0eca4941a450a473267b54aef03ba419f993.tar.gz |
SERVER-25039 Abort aggregation planning when a catalog operation occurs.
This prevents the aggregation system from trying to continue query
planning when the collection longer exists.
(cherry picked from commit 82cd8943dab085447ee180d4d59c2c5da778c523)
-rw-r--r-- | jstests/concurrency/fsm_workloads/kill_aggregation.js | 47 | ||||
-rw-r--r-- | jstests/concurrency/fsm_workloads/kill_rooted_or.js | 3 | ||||
-rw-r--r-- | src/mongo/db/pipeline/pipeline_d.cpp | 78 | ||||
-rw-r--r-- | src/mongo/db/pipeline/pipeline_d.h | 4 |
4 files changed, 101 insertions, 31 deletions
diff --git a/jstests/concurrency/fsm_workloads/kill_aggregation.js b/jstests/concurrency/fsm_workloads/kill_aggregation.js new file mode 100644 index 00000000000..f1260f10a96 --- /dev/null +++ b/jstests/concurrency/fsm_workloads/kill_aggregation.js @@ -0,0 +1,47 @@ +'use strict'; + +/** + * kill_aggregation.js + * + * Tests that the aggregation system correctly halts its planning to determine whether the query + * system can provide a non-blocking sort or can provide a covered projection when a catalog + * operation occurs. + * + * This workload was designed to reproduce SERVER-25039. + */ + +load('jstests/concurrency/fsm_libs/extend_workload.js'); // for extendWorkload +load('jstests/concurrency/fsm_workloads/kill_rooted_or.js'); // for $config + +var $config = extendWorkload( + $config, + function($config, $super) { + + // Use the workload name as the collection name, since the workload name is assumed to be + // unique. + $config.data.collName = 'kill_aggregation'; + + $config.states.query = function query(db, collName) { + var res = db.runCommand({ + aggregate: this.collName, + // We use a rooted $or query to cause plan selection to use the subplanner and thus + // yield. + pipeline: [{$match: {$or: [{a: 0}, {b: 0}]}}], + cursor: {} + }); + + if (!res.ok) { + return; + } + + var cursor = new DBCommandCursor(db.getMongo(), res); + try { + // No documents are ever inserted into the collection. + assertAlways.eq(0, cursor.itcount()); + } catch (e) { + // Ignore errors due to the plan executor being killed. + } + }; + + return $config; + }); diff --git a/jstests/concurrency/fsm_workloads/kill_rooted_or.js b/jstests/concurrency/fsm_workloads/kill_rooted_or.js index e05bff72d40..0275bc774ac 100644 --- a/jstests/concurrency/fsm_workloads/kill_rooted_or.js +++ b/jstests/concurrency/fsm_workloads/kill_rooted_or.js @@ -24,7 +24,8 @@ var $config = (function() { query: function query(db, collName) { var cursor = db[this.collName].find({$or: [{a: 0}, {b: 0}]}); try { - assert.eq(0, cursor.itcount()); + // No documents are ever inserted into the collection. + assertAlways.eq(0, cursor.itcount()); } catch (e) { // Ignore errors due to the plan executor being killed. } diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp index bfe84545158..56e7040b5ec 100644 --- a/src/mongo/db/pipeline/pipeline_d.cpp +++ b/src/mongo/db/pipeline/pipeline_d.cpp @@ -126,13 +126,14 @@ private: * if the storage engine doesn't support random cursors, or if 'sampleSize' is a large enough * percentage of the collection. */ -shared_ptr<PlanExecutor> createRandomCursorExecutor(Collection* collection, - OperationContext* txn, - long long sampleSize, - long long numRecords) { +StatusWith<unique_ptr<PlanExecutor>> createRandomCursorExecutor(Collection* collection, + OperationContext* txn, + long long sampleSize, + long long numRecords) { double kMaxSampleRatioForRandCursor = 0.05; - if (sampleSize > numRecords * kMaxSampleRatioForRandCursor || numRecords <= 100) - return {}; + if (sampleSize > numRecords * kMaxSampleRatioForRandCursor || numRecords <= 100) { + return {nullptr}; + } // Attempt to get a random cursor from the RecordStore. If the RecordStore does not support // random cursors, attempt to get one from the _id index. @@ -151,7 +152,7 @@ shared_ptr<PlanExecutor> createRandomCursorExecutor(Collection* collection, if (!indexDescriptor) { // There was no _id index. - return {}; + return {nullptr}; } IndexAccessMethod* idIam = indexCatalog->getIndex(indexDescriptor); @@ -159,7 +160,7 @@ shared_ptr<PlanExecutor> createRandomCursorExecutor(Collection* collection, if (!idxRandCursor) { // Storage engine does not support any type of random cursor. - return {}; + return {nullptr}; } auto idxIterator = stdx::make_unique<IndexIteratorStage>(txn, @@ -178,12 +179,12 @@ shared_ptr<PlanExecutor> createRandomCursorExecutor(Collection* collection, if (shardingState->needCollectionMetadata(txn, txn->getNS())) { auto shardFilterStage = stdx::make_unique<ShardFilterStage>( txn, shardingState->getCollectionMetadata(txn->getNS()), ws.get(), stage.release()); - return uassertStatusOK(PlanExecutor::make( - txn, std::move(ws), std::move(shardFilterStage), collection, PlanExecutor::YIELD_AUTO)); + return PlanExecutor::make( + txn, std::move(ws), std::move(shardFilterStage), collection, PlanExecutor::YIELD_AUTO); } - return uassertStatusOK(PlanExecutor::make( - txn, std::move(ws), std::move(stage), collection, PlanExecutor::YIELD_AUTO)); + return PlanExecutor::make( + txn, std::move(ws), std::move(stage), collection, PlanExecutor::YIELD_AUTO); } StatusWith<std::unique_ptr<PlanExecutor>> attemptToGetExecutor( @@ -247,7 +248,8 @@ shared_ptr<PlanExecutor> PipelineD::prepareCursorSource( if (collection && sampleStage) { const long long sampleSize = sampleStage->getSampleSize(); const long long numRecords = collection->getRecordStore()->numRecords(txn); - auto exec = createRandomCursorExecutor(collection, txn, sampleSize, numRecords); + auto exec = uassertStatusOK( + createRandomCursorExecutor(collection, txn, sampleSize, numRecords)); if (exec) { // Replace $sample stage with $sampleFromRandomCursor stage. sources.pop_front(); @@ -257,7 +259,7 @@ shared_ptr<PlanExecutor> PipelineD::prepareCursorSource( const BSONObj initialQuery; return addCursorSource( - pPipeline, pExpCtx, exec, pPipeline->getDependencies(initialQuery)); + pPipeline, pExpCtx, std::move(exec), pPipeline->getDependencies(initialQuery)); } } } @@ -294,13 +296,14 @@ shared_ptr<PlanExecutor> PipelineD::prepareCursorSource( } // Create the PlanExecutor. - auto exec = prepareExecutor( - txn, collection, pPipeline, pExpCtx, sortStage, deps, queryObj, &sortObj, &projForQuery); + auto exec = uassertStatusOK(prepareExecutor( + txn, collection, pPipeline, pExpCtx, sortStage, deps, queryObj, &sortObj, &projForQuery)); - return addCursorSource(pPipeline, pExpCtx, exec, deps, queryObj, sortObj, projForQuery); + return addCursorSource( + pPipeline, pExpCtx, std::move(exec), deps, queryObj, sortObj, projForQuery); } -std::shared_ptr<PlanExecutor> PipelineD::prepareExecutor( +StatusWith<std::unique_ptr<PlanExecutor>> PipelineD::prepareExecutor( OperationContext* txn, Collection* collection, const intrusive_ptr<Pipeline>& pipeline, @@ -338,8 +341,6 @@ std::shared_ptr<PlanExecutor> PipelineD::prepareExecutor( plannerOpts |= QueryPlannerParams::NO_UNCOVERED_PROJECTIONS; } - std::shared_ptr<PlanExecutor> exec; - BSONObj emptyProjection; if (sortStage) { // See if the query system can provide a non-blocking sort. @@ -351,9 +352,15 @@ std::shared_ptr<PlanExecutor> PipelineD::prepareExecutor( auto swExecutorSortAndProj = attemptToGetExecutor( txn, collection, expCtx, queryObj, *projectionObj, *sortObj, plannerOpts); + std::unique_ptr<PlanExecutor> exec; if (swExecutorSortAndProj.isOK()) { // Success! We have a non-blocking sort and a covered projection. exec = std::move(swExecutorSortAndProj.getValue()); + } else if (swExecutorSortAndProj == ErrorCodes::QueryPlanKilled) { + return {ErrorCodes::OperationFailed, + str::stream() << "Failed to determine whether query system can provide a " + "covered projection in addition to a non-blocking sort: " + << swExecutorSortAndProj.getStatus().toString()}; } else { // The query system couldn't cover the projection. *projectionObj = BSONObj(); @@ -367,7 +374,13 @@ std::shared_ptr<PlanExecutor> PipelineD::prepareExecutor( // We need to reinsert the coalesced $limit after removing the $sort. pipeline->sources.push_front(sortStage->getLimitSrc()); } - return exec; + return std::move(exec); + } else if (swExecutorSort == ErrorCodes::QueryPlanKilled) { + return { + ErrorCodes::OperationFailed, + str::stream() + << "Failed to determine whether query system can provide a non-blocking sort: " + << swExecutorSort.getStatus().toString()}; } // The query system can't provide a non-blocking sort. *sortObj = BSONObj(); @@ -383,18 +396,23 @@ std::shared_ptr<PlanExecutor> PipelineD::prepareExecutor( if (swExecutorProj.isOK()) { // Success! We have a covered projection. return std::move(swExecutorProj.getValue()); + } else if (swExecutorProj == ErrorCodes::QueryPlanKilled) { + return {ErrorCodes::OperationFailed, + str::stream() + << "Failed to determine whether query system can provide a covered projection: " + << swExecutorProj.getStatus().toString()}; } // The query system couldn't provide a covered projection. *projectionObj = BSONObj(); // If this doesn't work, nothing will. - return uassertStatusOK(attemptToGetExecutor( - txn, collection, expCtx, queryObj, *projectionObj, *sortObj, plannerOpts)); + return attemptToGetExecutor( + txn, collection, expCtx, queryObj, *projectionObj, *sortObj, plannerOpts); } shared_ptr<PlanExecutor> PipelineD::addCursorSource(const intrusive_ptr<Pipeline>& pipeline, const intrusive_ptr<ExpressionContext>& expCtx, - shared_ptr<PlanExecutor> exec, + unique_ptr<PlanExecutor> exec, DepsTracker deps, const BSONObj& queryObj, const BSONObj& sortObj, @@ -402,9 +420,13 @@ shared_ptr<PlanExecutor> PipelineD::addCursorSource(const intrusive_ptr<Pipeline // Get the full "namespace" name. const string& fullName = expCtx->ns.ns(); + // We convert the unique_ptr to a shared_ptr because both the PipelineProxyStage and the + // DocumentSourceCursor need to reference the PlanExecutor. + std::shared_ptr<PlanExecutor> sharedExec(std::move(exec)); + // Put the PlanExecutor into a DocumentSourceCursor and add it to the front of the pipeline. intrusive_ptr<DocumentSourceCursor> pSource = - DocumentSourceCursor::create(fullName, exec, expCtx); + DocumentSourceCursor::create(fullName, sharedExec, expCtx); // Note the query, sort, and projection for explain. pSource->setQuery(queryObj); @@ -429,10 +451,10 @@ shared_ptr<PlanExecutor> PipelineD::addCursorSource(const intrusive_ptr<Pipeline // DocumentSourceCursor expects a yielding PlanExecutor that has had its state saved. We // deregister the PlanExecutor so that it can be registered with ClientCursor. - exec->deregisterExec(); - exec->saveState(); + sharedExec->deregisterExec(); + sharedExec->saveState(); - return exec; + return sharedExec; } } // namespace mongo diff --git a/src/mongo/db/pipeline/pipeline_d.h b/src/mongo/db/pipeline/pipeline_d.h index a9993c6331b..2da76fe26e0 100644 --- a/src/mongo/db/pipeline/pipeline_d.h +++ b/src/mongo/db/pipeline/pipeline_d.h @@ -94,7 +94,7 @@ private: * sort, and 'projectionObj' will be set to an empty object if the query system cannot provide a * covered projection. */ - static std::shared_ptr<PlanExecutor> prepareExecutor( + static StatusWith<std::unique_ptr<PlanExecutor>> prepareExecutor( OperationContext* txn, Collection* collection, const boost::intrusive_ptr<Pipeline>& pipeline, @@ -112,7 +112,7 @@ private: static std::shared_ptr<PlanExecutor> addCursorSource( const boost::intrusive_ptr<Pipeline>& pipeline, const boost::intrusive_ptr<ExpressionContext>& expCtx, - std::shared_ptr<PlanExecutor> exec, + std::unique_ptr<PlanExecutor> exec, DepsTracker deps, const BSONObj& queryObj = BSONObj(), const BSONObj& sortObj = BSONObj(), |