summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--jstests/aggregation/bugs/lookup_unwind_getmore.js40
-rw-r--r--jstests/aggregation/bugs/lookup_unwind_killcursor.js55
-rw-r--r--jstests/concurrency/fsm_all_sharded_replication.js7
-rw-r--r--jstests/concurrency/fsm_all_sharded_replication_with_balancer.js7
-rw-r--r--jstests/concurrency/fsm_workloads/kill_multicollection_aggregation.js301
-rw-r--r--src/mongo/db/catalog/cursor_manager.cpp46
-rw-r--r--src/mongo/db/commands/pipeline_command.cpp5
-rw-r--r--src/mongo/db/exec/pipeline_proxy.cpp21
-rw-r--r--src/mongo/db/exec/pipeline_proxy.h10
-rw-r--r--src/mongo/db/pipeline/SConscript18
-rw-r--r--src/mongo/db/pipeline/document_source.h67
-rw-r--r--src/mongo/db/pipeline/document_source_cursor.cpp38
-rw-r--r--src/mongo/db/pipeline/document_source_graph_lookup.cpp52
-rw-r--r--src/mongo/db/pipeline/document_source_lookup.cpp125
-rw-r--r--src/mongo/db/pipeline/document_source_test.cpp46
-rw-r--r--src/mongo/db/pipeline/expression_context.cpp27
-rw-r--r--src/mongo/db/pipeline/expression_context.h8
-rw-r--r--src/mongo/db/pipeline/pipeline_d.cpp101
-rw-r--r--src/mongo/db/pipeline/pipeline_d.h31
-rw-r--r--src/mongo/db/query/plan_executor.cpp23
-rw-r--r--src/mongo/dbtests/documentsourcetests.cpp10
-rw-r--r--src/mongo/dbtests/query_plan_executor.cpp40
22 files changed, 789 insertions, 289 deletions
diff --git a/jstests/aggregation/bugs/lookup_unwind_getmore.js b/jstests/aggregation/bugs/lookup_unwind_getmore.js
index 6c8d886b78f..5d4a9286ef9 100644
--- a/jstests/aggregation/bugs/lookup_unwind_getmore.js
+++ b/jstests/aggregation/bugs/lookup_unwind_getmore.js
@@ -1,33 +1,30 @@
/**
- * Tests that the server correctly handles when the OperationContext of the DBDirectClient used by
- * the $lookup stage changes as it unwinds the results.
+ * Tests that the server correctly handles when the OperationContext used by the $lookup stage
+ * changes as it unwinds the results.
*
* This test was designed to reproduce SERVER-22537.
*/
(function() {
'use strict';
- // We use a batch size of 1 to ensure that the mongo shell issues a getMore when unwinding the
- // results from the 'dest' collection for the same document in the 'source' collection under a
- // different OperationContext.
- const batchSize = 1;
-
- db.source.drop();
- db.dest.drop();
+ const options = {setParameter: 'internalDocumentSourceCursorBatchSizeBytes=1'};
+ const conn = MongoRunner.runMongod(options);
+ assert.neq(null, conn, 'mongod was unable to start up with options: ' + tojson(options));
- assert.writeOK(db.source.insert({local: 1}));
+ const testDB = conn.getDB('test');
- // We insert documents in the 'dest' collection such that their combined size is greater than
- // 16MB in order to ensure that the DBDirectClient used by the $lookup stage issues a getMore
- // under a different OperationContext.
- const numMatches = 3;
- const largeStr = new Array(6 * 1024 * 1024 + 1).join('x');
+ // We use a batch size of 2 to ensure that the mongo shell issues a getMore when unwinding the
+ // results from the 'dest' collection for the same document in the 'source' collection under a
+ // different OperationContext.
+ const batchSize = 2;
+ const numMatches = 5;
- for (var i = 0; i < numMatches; ++i) {
- assert.writeOK(db.dest.insert({foreign: 1, largeStr: largeStr}));
+ assert.writeOK(testDB.source.insert({local: 1}));
+ for (let i = 0; i < numMatches; ++i) {
+ assert.writeOK(testDB.dest.insert({foreign: 1}));
}
- var res = db.runCommand({
+ const res = assert.commandWorked(testDB.runCommand({
aggregate: 'source',
pipeline: [
{
@@ -47,9 +44,10 @@
cursor: {
batchSize: batchSize,
},
- });
- assert.commandWorked(res);
+ }));
- var cursor = new DBCommandCursor(db.getMongo(), res, batchSize);
+ const cursor = new DBCommandCursor(conn, res, batchSize);
assert.eq(numMatches, cursor.itcount());
+
+ MongoRunner.stopMongod(conn);
})();
diff --git a/jstests/aggregation/bugs/lookup_unwind_killcursor.js b/jstests/aggregation/bugs/lookup_unwind_killcursor.js
new file mode 100644
index 00000000000..939205007b5
--- /dev/null
+++ b/jstests/aggregation/bugs/lookup_unwind_killcursor.js
@@ -0,0 +1,55 @@
+/**
+ * Tests that the cursor underlying the $lookup stage is killed when the cursor returned to the
+ * client for the aggregation pipeline is killed.
+ *
+ * This test was designed to reproduce SERVER-24386.
+ */
+(function() {
+ 'use strict';
+
+ const options = {setParameter: 'internalDocumentSourceCursorBatchSizeBytes=1'};
+ const conn = MongoRunner.runMongod(options);
+ assert.neq(null, conn, 'mongod was unable to start up with options: ' + tojson(options));
+
+ const testDB = conn.getDB('test');
+
+ // We use a batch size of 2 to ensure that the mongo shell does not exhaust the cursor on its
+ // first batch.
+ const batchSize = 2;
+ const numMatches = 5;
+
+ assert.writeOK(testDB.source.insert({local: 1}));
+ for (let i = 0; i < numMatches; ++i) {
+ assert.writeOK(testDB.dest.insert({foreign: 1}));
+ }
+
+ const res = assert.commandWorked(testDB.runCommand({
+ aggregate: 'source',
+ pipeline: [
+ {
+ $lookup: {
+ from: 'dest',
+ localField: 'local',
+ foreignField: 'foreign',
+ as: 'matches',
+ }
+ },
+ {
+ $unwind: {
+ path: '$matches',
+ },
+ },
+ ],
+ cursor: {
+ batchSize: batchSize,
+ },
+ }));
+
+ const cursor = new DBCommandCursor(conn, res, batchSize);
+ cursor.close(); // Closing the cursor will issue the "killCursors" command.
+
+ const serverStatus = assert.commandWorked(testDB.adminCommand({serverStatus: 1}));
+ assert.eq(0, serverStatus.metrics.cursor.open.total, tojson(serverStatus));
+
+ MongoRunner.stopMongod(conn);
+})();
diff --git a/jstests/concurrency/fsm_all_sharded_replication.js b/jstests/concurrency/fsm_all_sharded_replication.js
index 6c14334b33f..39cda5ce3f8 100644
--- a/jstests/concurrency/fsm_all_sharded_replication.js
+++ b/jstests/concurrency/fsm_all_sharded_replication.js
@@ -59,6 +59,13 @@ var blacklist = [
'group_cond.js', // the group command cannot be issued against a sharded cluster
'indexed_insert_eval.js', // eval doesn't work with sharded collections
'indexed_insert_eval_nolock.js', // eval doesn't work with sharded collections
+
+ // This workload sometimes triggers an 'unable to target write op for collection ... caused by
+ // ... database not found' error. Further investigation still needs to be done, but this
+ // workload may be failing due to SERVER-17397 'drops in a sharded cluster may not fully
+ // succeed' because it drops and reuses the same namespaces.
+ 'kill_multicollection_aggregation.js',
+
'plan_cache_drop_database.js', // cannot ensureIndex after dropDatabase without sharding first
'remove_single_document.js', // our .remove(query, {justOne: true}) calls lack shard keys
'remove_single_document_eval.js', // eval doesn't work with sharded collections
diff --git a/jstests/concurrency/fsm_all_sharded_replication_with_balancer.js b/jstests/concurrency/fsm_all_sharded_replication_with_balancer.js
index c20464430bb..04a12b83eb3 100644
--- a/jstests/concurrency/fsm_all_sharded_replication_with_balancer.js
+++ b/jstests/concurrency/fsm_all_sharded_replication_with_balancer.js
@@ -64,6 +64,13 @@ var blacklist = [
'group_cond.js', // the group command cannot be issued against a sharded cluster
'indexed_insert_eval.js', // eval doesn't work with sharded collections
'indexed_insert_eval_nolock.js', // eval doesn't work with sharded collections
+
+ // This workload sometimes triggers an 'unable to target write op for collection ... caused by
+ // ... database not found' error. Further investigation still needs to be done, but this
+ // workload may be failing due to SERVER-17397 'drops in a sharded cluster may not fully
+ // succeed' because it drops and reuses the same namespaces.
+ 'kill_multicollection_aggregation.js',
+
'plan_cache_drop_database.js', // cannot ensureIndex after dropDatabase without sharding first
'remove_single_document.js', // our .remove(query, {justOne: true}) calls lack shard keys
'remove_single_document_eval.js', // eval doesn't work with sharded collections
diff --git a/jstests/concurrency/fsm_workloads/kill_multicollection_aggregation.js b/jstests/concurrency/fsm_workloads/kill_multicollection_aggregation.js
new file mode 100644
index 00000000000..ba38bdfe94a
--- /dev/null
+++ b/jstests/concurrency/fsm_workloads/kill_multicollection_aggregation.js
@@ -0,0 +1,301 @@
+'use strict';
+
+/**
+ * kill_multicollection_aggregation.js
+ *
+ * This workload was designed to stress running and invalidating aggregation pipelines involving
+ * multiple collections and to reproduce issues like those described in SERVER-22537 and
+ * SERVER-24386. Threads perform an aggregation pipeline on one of a few collections, optionally
+ * specifying a $lookup stage, a $graphLookup stage, or a $facet stage, while the database, a
+ * collection, or an index is dropped concurrently.
+ */
+var $config = (function() {
+
+ var data = {
+ chooseRandomlyFrom: function chooseRandomlyFrom(arr) {
+ if (!Array.isArray(arr)) {
+ throw new Error('Expected array for first argument, but got: ' + tojson(arr));
+ }
+ return arr[Random.randInt(arr.length)];
+ },
+
+ involvedCollections: ['coll0', 'coll1', 'coll2'],
+ indexSpecs: [{a: 1, b: 1}, {c: 1}],
+
+ numDocs: 10,
+ batchSize: 2,
+
+ /**
+ * Inserts 'this.numDocs' new documents into the specified collection and ensures that the
+ * indexes 'this.indexSpecs' exist on the collection. Note that means it is safe for
+ * multiple threads to perform this function simultaneously.
+ */
+ populateDataAndIndexes: function populateDataAndIndexes(db, collName) {
+ var bulk = db[collName].initializeUnorderedBulkOp();
+ for (var i = 0; i < this.numDocs; ++i) {
+ bulk.insert({});
+ }
+ var res = bulk.execute();
+ assertAlways.writeOK(res);
+ assertAlways.eq(this.numDocs, res.nInserted, tojson(res));
+
+ this.indexSpecs.forEach(indexSpec => {
+ assertAlways.commandWorked(db[collName].createIndex(indexSpec));
+ });
+ },
+
+ /**
+ * Runs the specified aggregation pipeline and stores the resulting cursor (if the command
+ * is successful) in 'this.cursor'.
+ */
+ runAggregation: function runAggregation(db, collName, pipeline) {
+ var res = db.runCommand(
+ {aggregate: collName, pipeline: pipeline, cursor: {batchSize: this.batchSize}});
+
+ if (res.ok) {
+ this.cursor = new DBCommandCursor(db.getMongo(), res, this.batchSize);
+ }
+ },
+
+ makeLookupPipeline: function makeLookupPipeline(foreignColl) {
+ var pipeline = [
+ {
+ $lookup: {
+ from: foreignColl,
+ // We perform the $lookup on a field that doesn't exist in either the document
+ // on the source collection or the document on the foreign collection. This
+ // ensures that every document in the source collection will match every
+ // document in the foreign collection and cause the cursor underlying the
+ // $lookup stage to need to request another batch.
+ localField: 'fieldThatDoesNotExistInDoc',
+ foreignField: 'fieldThatDoesNotExistInDoc',
+ as: 'results'
+ }
+ },
+ {$unwind: '$results'}
+ ];
+
+ return pipeline;
+ },
+
+ makeGraphLookupPipeline: function makeGraphLookupPipeline(foreignName) {
+ var pipeline = [
+ {
+ $graphLookup: {
+ from: foreignName,
+ startWith: '$fieldThatDoesNotExistInDoc',
+ connectToField: 'fieldThatDoesNotExistInDoc',
+ connectFromField: 'fieldThatDoesNotExistInDoc',
+ maxDepth: Random.randInt(5),
+ as: 'results'
+ }
+ },
+ {$unwind: '$results'}
+ ];
+
+ return pipeline;
+ }
+ };
+
+ var states = {
+ /**
+ * This is a no-op, used only as a transition state.
+ */
+ init: function init(db, collName) {},
+
+ /**
+ * Runs an aggregation involving only one collection and saves the resulting cursor to
+ * 'this.cursor'.
+ */
+ normalAggregation: function normalAggregation(db, collName) {
+ var myDB = db.getSiblingDB(this.uniqueDBName);
+ var targetCollName = this.chooseRandomlyFrom(this.involvedCollections);
+
+ var pipeline = [{$sort: this.chooseRandomlyFrom(this.indexSpecs)}];
+ this.runAggregation(myDB, targetCollName, pipeline);
+ },
+
+ /**
+ * Runs an aggregation that uses the $lookup stage and saves the resulting cursor to
+ * 'this.cursor'.
+ */
+ lookupAggregation: function lookupAggregation(db, collName) {
+ var myDB = db.getSiblingDB(this.uniqueDBName);
+ var targetCollName = this.chooseRandomlyFrom(this.involvedCollections);
+ var foreignCollName = this.chooseRandomlyFrom(this.involvedCollections);
+
+ var pipeline = this.makeLookupPipeline(foreignCollName);
+ this.runAggregation(myDB, targetCollName, pipeline);
+ },
+
+ /**
+ * Runs an aggregation that uses the $graphLookup stage and saves the resulting cursor to
+ * 'this.cursor'.
+ */
+ graphLookupAggregation: function graphLookupAggregation(db, collName) {
+ var myDB = db.getSiblingDB(this.uniqueDBName);
+ var targetCollName = this.chooseRandomlyFrom(this.involvedCollections);
+ var foreignCollName = this.chooseRandomlyFrom(this.involvedCollections);
+
+ var pipeline = this.makeGraphLookupPipeline(foreignCollName);
+ this.runAggregation(myDB, targetCollName, pipeline);
+ },
+
+ /**
+ * Runs an aggregation that uses the $lookup and $graphLookup stages within a $facet stage
+ * and saves the resulting cursor to 'this.cursor'.
+ */
+ facetAggregation: function facetAggregation(db, collName) {
+ var myDB = db.getSiblingDB(this.uniqueDBName);
+ var targetCollName = this.chooseRandomlyFrom(this.involvedCollections);
+ var lookupForeignCollName = this.chooseRandomlyFrom(this.involvedCollections);
+ var graphLookupForeignCollName = this.chooseRandomlyFrom(this.involvedCollections);
+
+ var pipeline = [{
+ $facet: {
+ lookup: this.makeLookupPipeline(lookupForeignCollName),
+ graphLookup: this.makeGraphLookupPipeline(graphLookupForeignCollName)
+ }
+ }];
+ this.runAggregation(myDB, targetCollName, pipeline);
+ },
+
+ killCursor: function killCursor(db, collName) {
+ if (this.hasOwnProperty('cursor')) {
+ this.cursor.close();
+ }
+ },
+
+ /**
+ * Requests enough results from 'this.cursor' to ensure that another batch is needed, and
+ * thus ensures that a getMore request is sent for 'this.cursor'.
+ */
+ getMore: function getMore(db, collName) {
+ if (!this.hasOwnProperty('cursor')) {
+ return;
+ }
+
+ for (var i = 0; i <= this.batchSize; ++i) {
+ try {
+ if (!this.cursor.hasNext()) {
+ break;
+ }
+ this.cursor.next();
+ } catch (e) {
+ // The getMore request can fail if the database, a collection, or an index was
+ // dropped.
+ }
+ }
+ },
+
+ /**
+ * Drops the database being used by this workload and then re-creates each of
+ * 'this.involvedCollections' by repopulating them with data and indexes.
+ */
+ dropDatabase: function dropDatabase(db, collName) {
+ var myDB = db.getSiblingDB(this.uniqueDBName);
+ myDB.dropDatabase();
+
+ // Re-create all of the collections and indexes that were dropped.
+ this.involvedCollections.forEach(collName => {
+ this.populateDataAndIndexes(myDB, collName);
+ });
+ },
+
+ /**
+ * Randomly selects a collection from 'this.involvedCollections' and drops it. The
+ * collection is then re-created with data and indexes.
+ */
+ dropCollection: function dropCollection(db, collName) {
+ var myDB = db.getSiblingDB(this.uniqueDBName);
+ var targetColl = this.chooseRandomlyFrom(this.involvedCollections);
+
+ myDB[targetColl].drop();
+
+ // Re-create the collection that was dropped.
+ this.populateDataAndIndexes(myDB, targetColl);
+ },
+
+ /**
+ * Randomly selects a collection from 'this.involvedCollections' and an index from
+ * 'this.indexSpecs' and drops that particular index from the collection. The index is then
+ * re-created.
+ */
+ dropIndex: function dropIndex(db, collName) {
+ var myDB = db.getSiblingDB(this.uniqueDBName);
+ var targetColl = this.chooseRandomlyFrom(this.involvedCollections);
+ var indexSpec = this.chooseRandomlyFrom(this.indexSpecs);
+
+ // We don't assert that the command succeeded when dropping an index because it's
+ // possible another thread has already dropped this index.
+ myDB[targetColl].dropIndex(indexSpec);
+
+ // Re-create the index that was dropped.
+ assertAlways.commandWorked(myDB[targetColl].createIndex(indexSpec));
+ }
+ };
+
+ var transitions = {
+ init: {
+ normalAggregation: 0.1,
+ lookupAggregation: 0.25,
+ graphLookupAggregation: 0.25,
+ facetAggregation: 0.25,
+ dropDatabase: 0.05,
+ dropCollection: 0.05,
+ dropIndex: 0.05,
+ },
+
+ normalAggregation: {killCursor: 0.1, getMore: 0.9},
+ lookupAggregation: {killCursor: 0.1, getMore: 0.9},
+ graphLookupAggregation: {killCursor: 0.1, getMore: 0.9},
+ facetAggregation: {killCursor: 0.1, getMore: 0.9},
+ killCursor: {init: 1},
+ getMore: {killCursor: 0.2, getMore: 0.6, init: 0.2},
+ dropDatabase: {init: 1},
+ dropCollection: {init: 1},
+ dropIndex: {init: 1}
+ };
+
+ function setup(db, collName, cluster) {
+ // We decrease the batch size of the DocumentSourceCursor so that the PlanExecutor
+ // underlying it isn't exhausted when the "aggregate" command is sent. This makes it more
+ // likely for the "killCursors" command to need to handle destroying the underlying
+ // PlanExecutor.
+ cluster.executeOnMongodNodes(function lowerDocumentSourceCursorBatchSize(db) {
+ assertAlways.commandWorked(
+ db.adminCommand({setParameter: 1, internalDocumentSourceCursorBatchSizeBytes: 1}));
+ });
+
+ // Use the workload name as part of the database name, since the workload name is assumed to
+ // be unique.
+ this.uniqueDBName = db.getName() + 'kill_multicollection_aggregation';
+
+ var myDB = db.getSiblingDB(this.uniqueDBName);
+ this.involvedCollections.forEach(collName => {
+ this.populateDataAndIndexes(myDB, collName);
+ assertAlways.eq(this.numDocs, myDB[collName].find({}).itcount());
+ });
+ }
+
+ function teardown(db, collName, cluster) {
+ cluster.executeOnMongodNodes(function lowerDocumentSourceCursorBatchSize(db) {
+ assertAlways.commandWorked(db.adminCommand(
+ {setParameter: 1, internalDocumentSourceCursorBatchSizeBytes: 16 * 1024 * 1024}));
+ });
+
+ var myDB = db.getSiblingDB(this.uniqueDBName);
+ myDB.dropDatabase();
+ }
+
+ return {
+ threadCount: 10,
+ iterations: 200,
+ states: states,
+ startState: 'init',
+ transitions: transitions,
+ data: data,
+ setup: setup,
+ teardown: teardown
+ };
+})();
diff --git a/src/mongo/db/catalog/cursor_manager.cpp b/src/mongo/db/catalog/cursor_manager.cpp
index da521a6ab6a..876d8f7519d 100644
--- a/src/mongo/db/catalog/cursor_manager.cpp
+++ b/src/mongo/db/catalog/cursor_manager.cpp
@@ -506,32 +506,44 @@ void CursorManager::deregisterCursor(ClientCursor* cc) {
}
Status CursorManager::eraseCursor(OperationContext* txn, CursorId id, bool shouldAudit) {
- stdx::lock_guard<SimpleMutex> lk(_mutex);
+ ClientCursor* cursor;
- CursorMap::iterator it = _cursors.find(id);
- if (it == _cursors.end()) {
- if (shouldAudit) {
- audit::logKillCursorsAuthzCheck(txn->getClient(), _nss, id, ErrorCodes::CursorNotFound);
+ {
+ stdx::lock_guard<SimpleMutex> lk(_mutex);
+
+ CursorMap::iterator it = _cursors.find(id);
+ if (it == _cursors.end()) {
+ if (shouldAudit) {
+ audit::logKillCursorsAuthzCheck(
+ txn->getClient(), _nss, id, ErrorCodes::CursorNotFound);
+ }
+ return {ErrorCodes::CursorNotFound, str::stream() << "Cursor id not found: " << id};
}
- return {ErrorCodes::CursorNotFound, str::stream() << "Cursor id not found: " << id};
- }
- ClientCursor* cursor = it->second;
+ cursor = it->second;
+
+ if (cursor->isPinned()) {
+ if (shouldAudit) {
+ audit::logKillCursorsAuthzCheck(
+ txn->getClient(), _nss, id, ErrorCodes::OperationFailed);
+ }
+ return {ErrorCodes::OperationFailed,
+ str::stream() << "Cannot kill pinned cursor: " << id};
+ }
- if (cursor->isPinned()) {
if (shouldAudit) {
- audit::logKillCursorsAuthzCheck(
- txn->getClient(), _nss, id, ErrorCodes::OperationFailed);
+ audit::logKillCursorsAuthzCheck(txn->getClient(), _nss, id, ErrorCodes::OK);
}
- return {ErrorCodes::OperationFailed, str::stream() << "Cannot kill pinned cursor: " << id};
- }
- if (shouldAudit) {
- audit::logKillCursorsAuthzCheck(txn->getClient(), _nss, id, ErrorCodes::OK);
+ cursor->kill();
+ _deregisterCursor_inlock(cursor);
}
- cursor->kill();
- _deregisterCursor_inlock(cursor);
+ // If 'cursor' represents an aggregation cursor, then the destructor of the ClientCursor will
+ // eventually cause the destructor of the underlying PlanExecutor to be called. Since the
+ // underlying PlanExecutor is also registered on this CursorManager, we must destruct 'cursor'
+ // without holding '_mutex' so that it's possible to call CursorManager::deregisterCursor()
+ // without deadlocking ourselves.
delete cursor;
return Status::OK();
}
diff --git a/src/mongo/db/commands/pipeline_command.cpp b/src/mongo/db/commands/pipeline_command.cpp
index 853dec82dd9..372cf203166 100644
--- a/src/mongo/db/commands/pipeline_command.cpp
+++ b/src/mongo/db/commands/pipeline_command.cpp
@@ -358,14 +358,13 @@ public:
// This does mongod-specific stuff like creating the input PlanExecutor and adding
// it to the front of the pipeline if needed.
- std::shared_ptr<PlanExecutor> input =
- PipelineD::prepareCursorSource(txn, collection, nss, pipeline, expCtx);
+ PipelineD::prepareCursorSource(collection, pipeline);
// Create the PlanExecutor which returns results from the pipeline. The WorkingSet
// ('ws') and the PipelineProxyStage ('proxy') will be owned by the created
// PlanExecutor.
auto ws = make_unique<WorkingSet>();
- auto proxy = make_unique<PipelineProxyStage>(txn, pipeline, input, ws.get());
+ auto proxy = make_unique<PipelineProxyStage>(txn, pipeline, ws.get());
auto statusWithPlanExecutor = (NULL == collection)
? PlanExecutor::make(
diff --git a/src/mongo/db/exec/pipeline_proxy.cpp b/src/mongo/db/exec/pipeline_proxy.cpp
index fd2d1a3a09e..50fa14e0d2a 100644
--- a/src/mongo/db/exec/pipeline_proxy.cpp
+++ b/src/mongo/db/exec/pipeline_proxy.cpp
@@ -48,12 +48,10 @@ const char* PipelineProxyStage::kStageType = "PIPELINE_PROXY";
PipelineProxyStage::PipelineProxyStage(OperationContext* opCtx,
intrusive_ptr<Pipeline> pipeline,
- const std::shared_ptr<PlanExecutor>& child,
WorkingSet* ws)
: PlanStage(kStageType, opCtx),
_pipeline(pipeline),
_includeMetaData(_pipeline->getContext()->inShard), // send metadata to merger
- _childExec(child),
_ws(ws) {}
PlanStage::StageState PipelineProxyStage::doWork(WorkingSetID* out) {
@@ -93,27 +91,12 @@ bool PipelineProxyStage::isEOF() {
return true;
}
-void PipelineProxyStage::doInvalidate(OperationContext* txn,
- const RecordId& dl,
- InvalidationType type) {
- // Propagate the invalidation to the child executor of the pipeline if it is still in use.
- if (auto child = getChildExecutor()) {
- child->invalidate(txn, dl, type);
- }
-}
-
void PipelineProxyStage::doDetachFromOperationContext() {
_pipeline->detachFromOperationContext();
- if (auto child = getChildExecutor()) {
- child->detachFromOperationContext();
- }
}
void PipelineProxyStage::doReattachToOperationContext() {
_pipeline->reattachToOperationContext(getOpCtx());
- if (auto child = getChildExecutor()) {
- child->reattachToOperationContext(getOpCtx());
- }
}
unique_ptr<PlanStageStats> PipelineProxyStage::getStats() {
@@ -135,10 +118,6 @@ boost::optional<BSONObj> PipelineProxyStage::getNextBson() {
return boost::none;
}
-shared_ptr<PlanExecutor> PipelineProxyStage::getChildExecutor() {
- return _childExec.lock();
-}
-
std::string PipelineProxyStage::getPlanSummaryStr() const {
return PipelineD::getPlanSummaryStr(_pipeline);
}
diff --git a/src/mongo/db/exec/pipeline_proxy.h b/src/mongo/db/exec/pipeline_proxy.h
index a638bcda547..9482f9ba565 100644
--- a/src/mongo/db/exec/pipeline_proxy.h
+++ b/src/mongo/db/exec/pipeline_proxy.h
@@ -47,27 +47,18 @@ class PipelineProxyStage final : public PlanStage {
public:
PipelineProxyStage(OperationContext* opCtx,
boost::intrusive_ptr<Pipeline> pipeline,
- const std::shared_ptr<PlanExecutor>& child,
WorkingSet* ws);
PlanStage::StageState doWork(WorkingSetID* out) final;
bool isEOF() final;
- void doInvalidate(OperationContext* txn, const RecordId& dl, InvalidationType type) final;
-
//
// Manage our OperationContext.
//
void doDetachFromOperationContext() final;
void doReattachToOperationContext() final;
- /**
- * Return a shared pointer to the PlanExecutor that feeds the pipeline. The returned
- * pointer may be NULL.
- */
- std::shared_ptr<PlanExecutor> getChildExecutor();
-
// Returns empty PlanStageStats object
std::unique_ptr<PlanStageStats> getStats() final;
@@ -93,7 +84,6 @@ private:
const boost::intrusive_ptr<Pipeline> _pipeline;
std::vector<BSONObj> _stash;
const bool _includeMetaData;
- std::weak_ptr<PlanExecutor> _childExec;
// Not owned by us.
WorkingSet* _ws;
diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript
index 85fcf5c2da1..d8026f7ee04 100644
--- a/src/mongo/db/pipeline/SConscript
+++ b/src/mongo/db/pipeline/SConscript
@@ -10,6 +10,7 @@ env.Library(
'aggregation_request',
'document_source',
'document_source_facet',
+ 'document_source_lookup',
'expression_context',
'pipeline',
]
@@ -118,7 +119,9 @@ env.CppUnitTest(
source='document_source_test.cpp',
LIBDEPS=[
'document_source',
+ 'document_source_lookup',
'document_value_test_util',
+ '$BUILD_DIR/mongo/db/auth/authorization_manager_mock_init',
'$BUILD_DIR/mongo/db/service_context',
'$BUILD_DIR/mongo/util/clock_source_mock',
'$BUILD_DIR/mongo/executor/thread_pool_task_executor',
@@ -183,11 +186,9 @@ docSourceEnv.Library(
'document_source_coll_stats.cpp',
'document_source_count.cpp',
'document_source_geo_near.cpp',
- 'document_source_graph_lookup.cpp',
'document_source_group.cpp',
'document_source_index_stats.cpp',
'document_source_limit.cpp',
- 'document_source_lookup.cpp',
'document_source_match.cpp',
'document_source_merge_cursors.cpp',
'document_source_mock.cpp',
@@ -246,6 +247,18 @@ env.Library(
)
env.Library(
+ target='document_source_lookup',
+ source=[
+ 'document_source_graph_lookup.cpp',
+ 'document_source_lookup.cpp',
+ ],
+ LIBDEPS=[
+ 'document_source',
+ 'pipeline',
+ ],
+)
+
+env.Library(
target='document_source_facet',
source=[
'document_source_facet.cpp',
@@ -311,6 +324,7 @@ env.CppUnitTest(
'$BUILD_DIR/mongo/db/service_context',
'$BUILD_DIR/mongo/db/service_context_noop_init',
'document_value_test_util',
+ 'document_source_lookup',
'pipeline',
],
)
diff --git a/src/mongo/db/pipeline/document_source.h b/src/mongo/db/pipeline/document_source.h
index e11d7d8d085..77a539d1b89 100644
--- a/src/mongo/db/pipeline/document_source.h
+++ b/src/mongo/db/pipeline/document_source.h
@@ -392,6 +392,16 @@ public:
const BSONObj& originalCollectionOptions,
const std::list<BSONObj>& originalIndexes) = 0;
+ /**
+ * Parses a Pipeline from a vector of BSONObjs representing DocumentSources and readies it
+ * for execution. The returned pipeline is optimized and has a cursor source prepared.
+ *
+ * This function returns a non-OK status if parsing the pipeline failed.
+ */
+ virtual StatusWith<boost::intrusive_ptr<Pipeline>> makePipeline(
+ const std::vector<BSONObj>& rawPipeline,
+ const boost::intrusive_ptr<ExpressionContext>& expCtx) = 0;
+
// Add new methods as needed.
};
@@ -447,7 +457,6 @@ protected:
class DocumentSourceCursor final : public DocumentSource {
public:
// virtuals from DocumentSource
- ~DocumentSourceCursor() final;
boost::optional<Document> getNext() final;
const char* getSourceName() const final;
BSONObjSet getOutputSorts() final {
@@ -464,6 +473,10 @@ public:
}
void dispose() final;
+ void detachFromOperationContext() final;
+
+ void reattachToOperationContext(OperationContext* opCtx) final;
+
/**
* Create a document source based on a passed-in PlanExecutor.
*
@@ -472,7 +485,7 @@ public:
*/
static boost::intrusive_ptr<DocumentSourceCursor> create(
const std::string& ns,
- const std::shared_ptr<PlanExecutor>& exec,
+ std::unique_ptr<PlanExecutor> exec,
const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
/*
@@ -533,7 +546,7 @@ protected:
private:
DocumentSourceCursor(const std::string& ns,
- const std::shared_ptr<PlanExecutor>& exec,
+ std::unique_ptr<PlanExecutor> exec,
const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
void loadBatch();
@@ -554,7 +567,7 @@ private:
long long _docsAddedToBatches; // for _limit enforcement
const std::string _ns;
- std::shared_ptr<PlanExecutor> _exec; // PipelineProxyStage holds a weak_ptr to this.
+ std::unique_ptr<PlanExecutor> _exec;
BSONObjSet _outputSorts;
std::string _planSummary;
PlanSummaryStats _planSummaryStats;
@@ -1689,16 +1702,23 @@ public:
collections->push_back(_fromNs);
}
+ void doDetachFromOperationContext() final;
+
+ void doReattachToOperationContext(OperationContext* opCtx) final;
+
static boost::intrusive_ptr<DocumentSource> createFromBson(
BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
/**
- * Build the BSONObj used to query the foreign collection.
+ * Builds the BSONObj used to query the foreign collection and wraps it in a $match.
*/
- static BSONObj queryForInput(const Document& input,
- const FieldPath& localFieldName,
- const std::string& foreignFieldName,
- const BSONObj& additionalFilter);
+ static BSONObj makeMatchStageFromInput(const Document& input,
+ const FieldPath& localFieldName,
+ const std::string& foreignFieldName,
+ const BSONObj& additionalFilter);
+
+protected:
+ void doInjectExpressionContext() final;
private:
DocumentSourceLookUp(NamespaceString fromNs,
@@ -1708,7 +1728,8 @@ private:
const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
Value serialize(bool explain = false) const final {
- invariant(false);
+ // Should not be called; use serializeToArray instead.
+ MONGO_UNREACHABLE;
}
boost::optional<Document> unwindResult();
@@ -1720,14 +1741,22 @@ private:
std::string _foreignFieldFieldName;
boost::optional<BSONObj> _additionalFilter;
+ // The ExpressionContext used when performing aggregation pipelines against the '_fromNs'
+ // namespace.
+ boost::intrusive_ptr<ExpressionContext> _fromExpCtx;
+
boost::intrusive_ptr<DocumentSourceMatch> _matchSrc;
boost::intrusive_ptr<DocumentSourceUnwind> _unwindSrc;
bool _handlingUnwind = false;
bool _handlingMatch = false;
- std::unique_ptr<DBClientCursor> _cursor;
+
+ // The following members are used to hold onto state across getNext() calls when
+ // '_handlingUnwind' is true.
long long _cursorIndex = 0;
+ boost::intrusive_ptr<Pipeline> _pipeline;
boost::optional<Document> _input;
+ boost::optional<Document> _nextValue;
};
// TODO SERVER-25139: Make $graphLookup respect the collation.
@@ -1758,11 +1787,16 @@ public:
collections->push_back(_from);
}
- void doInjectExpressionContext() final;
+ void doDetachFromOperationContext() final;
+
+ void doReattachToOperationContext(OperationContext* opCtx) final;
static boost::intrusive_ptr<DocumentSource> createFromBson(
BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
+protected:
+ void doInjectExpressionContext() final;
+
private:
DocumentSourceGraphLookUp(NamespaceString from,
std::string as,
@@ -1780,14 +1814,15 @@ private:
}
/**
- * Prepare the query to execute on the 'from' collection, using the contents of '_frontier'.
+ * Prepares the query to execute on the 'from' collection wrapped in a $match by using the
+ * contents of '_frontier'.
*
* Fills 'cached' with any values that were retrieved from the cache.
*
* Returns boost::none if no query is necessary, i.e., all values were retrieved from the cache.
* Otherwise, returns a query object.
*/
- boost::optional<BSONObj> constructQuery(BSONObjSet* cached);
+ boost::optional<BSONObj> makeMatchStageFromFrontier(BSONObjSet* cached);
/**
* If we have internalized a $unwind, getNext() dispatches to this function.
@@ -1837,6 +1872,10 @@ private:
boost::optional<FieldPath> _depthField;
boost::optional<long long> _maxDepth;
+ // The ExpressionContext used when performing aggregation pipelines against the '_from'
+ // namespace.
+ boost::intrusive_ptr<ExpressionContext> _fromExpCtx;
+
size_t _maxMemoryUsageBytes = 100 * 1024 * 1024;
// Track memory usage to ensure we don't exceed '_maxMemoryUsageBytes'.
diff --git a/src/mongo/db/pipeline/document_source_cursor.cpp b/src/mongo/db/pipeline/document_source_cursor.cpp
index aef76129244..14911203482 100644
--- a/src/mongo/db/pipeline/document_source_cursor.cpp
+++ b/src/mongo/db/pipeline/document_source_cursor.cpp
@@ -37,6 +37,7 @@
#include "mongo/db/pipeline/document.h"
#include "mongo/db/query/explain.h"
#include "mongo/db/query/find_common.h"
+#include "mongo/db/server_parameters.h"
#include "mongo/db/storage/storage_options.h"
#include "mongo/util/scopeguard.h"
@@ -46,9 +47,13 @@ using boost::intrusive_ptr;
using std::shared_ptr;
using std::string;
-DocumentSourceCursor::~DocumentSourceCursor() {
- dispose();
-}
+namespace {
+
+MONGO_EXPORT_SERVER_PARAMETER(internalDocumentSourceCursorBatchSizeBytes,
+ int,
+ FindCommon::kMaxBytesToReturnToClientAtOnce);
+
+} // namespace
const char* DocumentSourceCursor::getSourceName() const {
return "$cursor";
@@ -70,8 +75,6 @@ boost::optional<Document> DocumentSourceCursor::getNext() {
}
void DocumentSourceCursor::dispose() {
- // Can't call in to PlanExecutor or ClientCursor registries from this function since it
- // will be called when an agg cursor is killed which would cause a deadlock.
_exec.reset();
_currentBatch.clear();
}
@@ -113,7 +116,7 @@ void DocumentSourceCursor::loadBatch() {
memUsageBytes += _currentBatch.back().getApproximateSize();
- if (memUsageBytes > FindCommon::kMaxBytesToReturnToClientAtOnce) {
+ if (memUsageBytes > internalDocumentSourceCursorBatchSizeBytes) {
// End this batch and prepare PlanExecutor for yielding.
_exec->saveState();
return;
@@ -227,14 +230,26 @@ void DocumentSourceCursor::doInjectExpressionContext() {
}
}
+void DocumentSourceCursor::detachFromOperationContext() {
+ if (_exec) {
+ _exec->detachFromOperationContext();
+ }
+}
+
+void DocumentSourceCursor::reattachToOperationContext(OperationContext* opCtx) {
+ if (_exec) {
+ _exec->reattachToOperationContext(opCtx);
+ }
+}
+
DocumentSourceCursor::DocumentSourceCursor(const string& ns,
- const std::shared_ptr<PlanExecutor>& exec,
+ std::unique_ptr<PlanExecutor> exec,
const intrusive_ptr<ExpressionContext>& pCtx)
: DocumentSource(pCtx),
_docsAddedToBatches(0),
_ns(ns),
- _exec(exec),
- _outputSorts(exec->getOutputSorts()) {
+ _exec(std::move(exec)),
+ _outputSorts(_exec->getOutputSorts()) {
recordPlanSummaryStr();
// We record execution metrics here to allow for capture of indexes used prior to execution.
@@ -243,9 +258,10 @@ DocumentSourceCursor::DocumentSourceCursor(const string& ns,
intrusive_ptr<DocumentSourceCursor> DocumentSourceCursor::create(
const string& ns,
- const std::shared_ptr<PlanExecutor>& exec,
+ std::unique_ptr<PlanExecutor> exec,
const intrusive_ptr<ExpressionContext>& pExpCtx) {
- intrusive_ptr<DocumentSourceCursor> source(new DocumentSourceCursor(ns, exec, pExpCtx));
+ intrusive_ptr<DocumentSourceCursor> source(
+ new DocumentSourceCursor(ns, std::move(exec), pExpCtx));
source->injectExpressionContext(pExpCtx);
return source;
}
diff --git a/src/mongo/db/pipeline/document_source_graph_lookup.cpp b/src/mongo/db/pipeline/document_source_graph_lookup.cpp
index 01887184512..9bba9e68317 100644
--- a/src/mongo/db/pipeline/document_source_graph_lookup.cpp
+++ b/src/mongo/db/pipeline/document_source_graph_lookup.cpp
@@ -152,7 +152,7 @@ void DocumentSourceGraphLookUp::doBreadthFirstSearch() {
// Check whether each key in the frontier exists in the cache or needs to be queried.
BSONObjSet cached;
- auto query = constructQuery(&cached);
+ auto matchStage = makeMatchStageFromFrontier(&cached);
ValueUnorderedSet queried = pExpCtx->getValueComparator().makeUnorderedValueSet();
_frontier->swap(queried);
@@ -167,14 +167,13 @@ void DocumentSourceGraphLookUp::doBreadthFirstSearch() {
checkMemoryUsage();
}
- if (query) {
+ if (matchStage) {
// Query for all keys that were in the frontier and not in the cache, populating
// '_frontier' for the next iteration of search.
- unique_ptr<DBClientCursor> cursor = _mongod->directClient()->query(_from.ns(), *query);
- // Iterate the cursor.
- while (cursor->more()) {
- BSONObj result = cursor->nextSafe();
+ auto pipeline = uassertStatusOK(_mongod->makePipeline({*matchStage}, _fromExpCtx));
+ while (auto next = pipeline->output()->getNext()) {
+ BSONObj result = next->toBson();
shouldPerformAnotherQuery =
addToVisitedAndFrontier(result.getOwned(), depth) || shouldPerformAnotherQuery;
addToCache(result, queried);
@@ -269,7 +268,7 @@ void DocumentSourceGraphLookUp::addToCache(const BSONObj& result,
}
}
-boost::optional<BSONObj> DocumentSourceGraphLookUp::constructQuery(BSONObjSet* cached) {
+boost::optional<BSONObj> DocumentSourceGraphLookUp::makeMatchStageFromFrontier(BSONObjSet* cached) {
// Add any cached values to 'cached' and remove them from '_frontier'.
for (auto it = _frontier->begin(); it != _frontier->end();) {
if (auto entry = _cache[*it]) {
@@ -289,28 +288,34 @@ boost::optional<BSONObj> DocumentSourceGraphLookUp::constructQuery(BSONObjSet* c
}
// Create a query of the form {$and: [_additionalFilter, {_connectToField: {$in: [...]}}]}.
- BSONObjBuilder query;
+ //
+ // We wrap the query in a $match so that it can be parsed into a DocumentSourceMatch when
+ // constructing a pipeline to execute.
+ BSONObjBuilder match;
{
- BSONArrayBuilder andObj(query.subarrayStart("$and"));
- if (_additionalFilter) {
- andObj << *_additionalFilter;
- }
-
+ BSONObjBuilder query(match.subobjStart("$match"));
{
- BSONObjBuilder connectToObj(andObj.subobjStart());
+ BSONArrayBuilder andObj(query.subarrayStart("$and"));
+ if (_additionalFilter) {
+ andObj << *_additionalFilter;
+ }
+
{
- BSONObjBuilder subObj(connectToObj.subobjStart(_connectToField.fullPath()));
+ BSONObjBuilder connectToObj(andObj.subobjStart());
{
- BSONArrayBuilder in(subObj.subarrayStart("$in"));
- for (auto&& value : *_frontier) {
- in << value;
+ BSONObjBuilder subObj(connectToObj.subobjStart(_connectToField.fullPath()));
+ {
+ BSONArrayBuilder in(subObj.subarrayStart("$in"));
+ for (auto&& value : *_frontier) {
+ in << value;
+ }
}
}
}
}
}
- return _frontier->empty() ? boost::none : boost::optional<BSONObj>(query.obj());
+ return _frontier->empty() ? boost::none : boost::optional<BSONObj>(match.obj());
}
void DocumentSourceGraphLookUp::performSearch() {
@@ -411,10 +416,19 @@ void DocumentSourceGraphLookUp::serializeToArray(std::vector<Value>& array, bool
}
void DocumentSourceGraphLookUp::doInjectExpressionContext() {
+ _fromExpCtx = pExpCtx->copyWith(_from);
_frontier = pExpCtx->getValueComparator().makeUnorderedValueSet();
_visited = pExpCtx->getValueComparator().makeUnorderedValueMap<BSONObj>();
}
+void DocumentSourceGraphLookUp::doDetachFromOperationContext() {
+ _fromExpCtx->opCtx = nullptr;
+}
+
+void DocumentSourceGraphLookUp::doReattachToOperationContext(OperationContext* opCtx) {
+ _fromExpCtx->opCtx = opCtx;
+}
+
DocumentSourceGraphLookUp::DocumentSourceGraphLookUp(
NamespaceString from,
std::string as,
diff --git a/src/mongo/db/pipeline/document_source_lookup.cpp b/src/mongo/db/pipeline/document_source_lookup.cpp
index 5be6239cb45..dc413d8fff8 100644
--- a/src/mongo/db/pipeline/document_source_lookup.cpp
+++ b/src/mongo/db/pipeline/document_source_lookup.cpp
@@ -110,20 +110,20 @@ boost::optional<Document> DocumentSourceLookUp::getNext() {
// '_handlingUnwind' would be set to true, and we would not have made it here.
invariant(!_matchSrc);
- BSONObj query = queryForInput(*input, _localField, _foreignFieldFieldName, BSONObj());
- std::unique_ptr<DBClientCursor> cursor = _mongod->directClient()->query(_fromNs.ns(), query);
+ auto matchStage =
+ makeMatchStageFromInput(*input, _localField, _foreignFieldFieldName, BSONObj());
+ auto pipeline = uassertStatusOK(_mongod->makePipeline({matchStage}, _fromExpCtx));
std::vector<Value> results;
int objsize = 0;
- while (cursor->more()) {
- BSONObj result = cursor->nextSafe();
- objsize += result.objsize();
+ while (auto result = pipeline->output()->getNext()) {
+ objsize += result->getApproximateSize();
uassert(4568,
str::stream() << "Total size of documents in " << _fromNs.coll() << " matching "
- << query
+ << matchStage
<< " exceeds maximum document size",
objsize <= BSONObjMaxInternalSize);
- results.push_back(Value(result));
+ results.emplace_back(std::move(*result));
}
MutableDocument output(std::move(*input));
@@ -264,14 +264,14 @@ Pipeline::SourceContainer::iterator DocumentSourceLookUp::optimizeAt(
}
void DocumentSourceLookUp::dispose() {
- _cursor.reset();
+ _pipeline.reset();
pSource->dispose();
}
-BSONObj DocumentSourceLookUp::queryForInput(const Document& input,
- const FieldPath& localFieldPath,
- const std::string& foreignFieldName,
- const BSONObj& additionalFilter) {
+BSONObj DocumentSourceLookUp::makeMatchStageFromInput(const Document& input,
+ const FieldPath& localFieldPath,
+ const std::string& foreignFieldName,
+ const BSONObj& additionalFilter) {
Value localFieldVal = input.getNestedField(localFieldPath);
// Missing values are treated as null.
@@ -279,38 +279,53 @@ BSONObj DocumentSourceLookUp::queryForInput(const Document& input,
localFieldVal = Value(BSONNULL);
}
- // We are constructing a query of one of the following forms:
- // {$and: [{<foreignFieldName>: {$eq: <localFieldVal>}}, <additionalFilter>]}
- // {$and: [{<foreignFieldName>: {$in: [<value>, <value>, ...]}}, <additionalFilter>]}
- // {$and: [{$or: [{<foreignFieldName>: {$eq: <value>}},
- // {<foreignFieldName>: {$eq: <value>}}, ...]},
- // <additionalFilter>]}
-
- BSONObjBuilder query;
+ // We construct a query of one of the following forms, depending on the contents of
+ // 'localFieldVal'.
+ //
+ // {$and: [{<foreignFieldName>: {$eq: <localFieldVal>}}, <additionalFilter>]}
+ // if 'localFieldVal' isn't an array value.
+ //
+ // {$and: [{<foreignFieldName>: {$in: [<value>, <value>, ...]}}, <additionalFilter>]}
+ // if 'localFieldVal' is an array value but doesn't contain any elements that are regular
+ // expressions.
+ //
+ // {$and: [{$or: [{<foreignFieldName>: {$eq: <value>}},
+ // {<foreignFieldName>: {$eq: <value>}}, ...]},
+ // <additionalFilter>]}
+ // if 'localFieldVal' is an array value and it contains at least one element that is a
+ // regular expression.
+
+ // We wrap the query in a $match so that it can be parsed into a DocumentSourceMatch when
+ // constructing a pipeline to execute.
+ BSONObjBuilder match;
+ BSONObjBuilder query(match.subobjStart("$match"));
BSONArrayBuilder andObj(query.subarrayStart("$and"));
BSONObjBuilder joiningObj(andObj.subobjStart());
if (localFieldVal.isArray()) {
- // Assume an array value logically corresponds to many documents, rather than logically
- // corresponding to one document with an array value.
+ // A $lookup on an array value corresponds to finding documents in the foreign collection
+ // that have a value of any of the elements in the array value, rather than finding
+ // documents that have a value equal to the entire array value. These semantics are
+ // automatically provided to us by using the $in query operator.
const vector<Value>& localArray = localFieldVal.getArray();
const bool containsRegex = std::any_of(
localArray.begin(), localArray.end(), [](Value val) { return val.getType() == RegEx; });
if (containsRegex) {
- // A regex inside of an $in will not be treated as an equality comparison, so use an
- // $or.
+ // A regular expression inside the $in query operator will perform pattern matching on
+ // any string values. Since we want regular expressions to only match other RegEx types,
+ // we write the query as a $or of equality comparisons instead.
BSONObj orQuery = buildEqualityOrQuery(foreignFieldName, localFieldVal.getArray());
joiningObj.appendElements(orQuery);
} else {
- // { _foreignFieldFieldName : { "$in" : localFieldValue } }
+ // { <foreignFieldName> : { "$in" : <localFieldVal> } }
BSONObjBuilder subObj(joiningObj.subobjStart(foreignFieldName));
subObj << "$in" << localFieldVal;
subObj.doneFast();
}
} else {
- // { _foreignFieldFieldName : { "$eq" : localFieldValue } }
+ // { <foreignFieldName> : { "$eq" : <localFieldVal> } }
BSONObjBuilder subObj(joiningObj.subobjStart(foreignFieldName));
subObj << "$eq" << localFieldVal;
subObj.doneFast();
@@ -324,7 +339,8 @@ BSONObj DocumentSourceLookUp::queryForInput(const Document& input,
andObj.doneFast();
- return query.obj();
+ query.doneFast();
+ return match.obj();
}
boost::optional<Document> DocumentSourceLookUp::unwindResult() {
@@ -333,24 +349,25 @@ boost::optional<Document> DocumentSourceLookUp::unwindResult() {
// Loop until we get a document that has at least one match.
// Note we may return early from this loop if our source stage is exhausted or if the unwind
// source was asked to return empty arrays and we get a document without a match.
- while (!_cursor || !_cursor->more()) {
+ while (!_pipeline || !_nextValue) {
_input = pSource->getNext();
if (!_input)
return {};
BSONObj filter = _additionalFilter.value_or(BSONObj());
- _cursor = _mongod->directClient()->query(
- _fromNs.ns(),
- DocumentSourceLookUp::queryForInput(
- *_input, _localField, _foreignFieldFieldName, filter));
+ auto matchStage =
+ makeMatchStageFromInput(*_input, _localField, _foreignFieldFieldName, filter);
+ _pipeline = uassertStatusOK(_mongod->makePipeline({matchStage}, _fromExpCtx));
+
_cursorIndex = 0;
+ _nextValue = _pipeline->output()->getNext();
- if (_unwindSrc->preserveNullAndEmptyArrays() && !_cursor->more()) {
+ if (_unwindSrc->preserveNullAndEmptyArrays() && !_nextValue) {
// There were no results for this cursor, but the $unwind was asked to preserve empty
// arrays, so we should return a document without the array.
MutableDocument output(std::move(*_input));
- // Note this will correctly objects in the prefix of '_as', to act as if we had created
- // an empty array and then removed it.
+ // Note this will correctly create objects in the prefix of '_as', to act as if we had
+ // created an empty array and then removed it.
output.setNestedField(_as, Value());
if (indexPath) {
output.setNestedField(*indexPath, Value(BSONNULL));
@@ -358,18 +375,20 @@ boost::optional<Document> DocumentSourceLookUp::unwindResult() {
return output.freeze();
}
}
- invariant(_cursor->more() && bool(_input));
- auto nextVal = Value(_cursor->nextSafe());
+
+ invariant(bool(_input) && bool(_nextValue));
+ auto currentValue = *_nextValue;
+ _nextValue = _pipeline->output()->getNext();
// Move input document into output if this is the last or only result, otherwise perform a copy.
- MutableDocument output(_cursor->more() ? *_input : std::move(*_input));
- output.setNestedField(_as, nextVal);
+ MutableDocument output(_nextValue ? *_input : std::move(*_input));
+ output.setNestedField(_as, Value(currentValue));
if (indexPath) {
output.setNestedField(*indexPath, Value(_cursorIndex));
}
- _cursorIndex++;
+ ++_cursorIndex;
return output.freeze();
}
@@ -420,6 +439,32 @@ DocumentSource::GetDepsReturn DocumentSourceLookUp::getDependencies(DepsTracker*
return SEE_NEXT;
}
+void DocumentSourceLookUp::doInjectExpressionContext() {
+ _fromExpCtx = pExpCtx->copyWith(_fromNs);
+}
+
+void DocumentSourceLookUp::doDetachFromOperationContext() {
+ if (_pipeline) {
+ // We have a pipeline we're going to be executing across multiple calls to getNext(), so we
+ // use Pipeline::detachFromOperationContext() to take care of updating '_fromExpCtx->opCtx'.
+ _pipeline->detachFromOperationContext();
+ invariant(_fromExpCtx->opCtx == nullptr);
+ } else {
+ _fromExpCtx->opCtx = nullptr;
+ }
+}
+
+void DocumentSourceLookUp::doReattachToOperationContext(OperationContext* opCtx) {
+ if (_pipeline) {
+ // We have a pipeline we're going to be executing across multiple calls to getNext(), so we
+ // use Pipeline::reattachToOperationContext() to take care of updating '_fromExpCtx->opCtx'.
+ _pipeline->reattachToOperationContext(opCtx);
+ invariant(_fromExpCtx->opCtx == opCtx);
+ } else {
+ _fromExpCtx->opCtx = opCtx;
+ }
+}
+
intrusive_ptr<DocumentSource> DocumentSourceLookUp::createFromBson(
BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx) {
uassert(4569, "the $lookup specification must be an Object", elem.type() == Object);
diff --git a/src/mongo/db/pipeline/document_source_test.cpp b/src/mongo/db/pipeline/document_source_test.cpp
index 7cec430b264..3076eed189d 100644
--- a/src/mongo/db/pipeline/document_source_test.cpp
+++ b/src/mongo/db/pipeline/document_source_test.cpp
@@ -398,42 +398,44 @@ public:
namespace DocumentSourceLookup {
-TEST(QueryForInput, NonArrayValueUsesEqQuery) {
+TEST(MakeMatchStageFromInput, NonArrayValueUsesEqQuery) {
Document input = DOC("local" << 1);
- BSONObj query =
- DocumentSourceLookUp::queryForInput(input, FieldPath("local"), "foreign", BSONObj());
- ASSERT_EQ(query, fromjson("{$and: [{foreign: {$eq: 1}}, {}]}"));
+ BSONObj matchStage = DocumentSourceLookUp::makeMatchStageFromInput(
+ input, FieldPath("local"), "foreign", BSONObj());
+ ASSERT_EQ(matchStage, fromjson("{$match: {$and: [{foreign: {$eq: 1}}, {}]}}"));
}
-TEST(QueryForInput, RegexValueUsesEqQuery) {
+TEST(MakeMatchStageFromInput, RegexValueUsesEqQuery) {
BSONRegEx regex("^a");
Document input = DOC("local" << Value(regex));
- BSONObj query =
- DocumentSourceLookUp::queryForInput(input, FieldPath("local"), "foreign", BSONObj());
- ASSERT_EQ(query,
- BSON("$and" << BSON_ARRAY(BSON("foreign" << BSON("$eq" << regex)) << BSONObj())));
+ BSONObj matchStage = DocumentSourceLookUp::makeMatchStageFromInput(
+ input, FieldPath("local"), "foreign", BSONObj());
+ ASSERT_EQ(matchStage,
+ BSON("$match" << BSON("$and" << BSON_ARRAY(BSON("foreign" << BSON("$eq" << regex))
+ << BSONObj()))));
}
-TEST(QueryForInput, ArrayValueUsesInQuery) {
+TEST(MakeMatchStageFromInput, ArrayValueUsesInQuery) {
vector<Value> inputArray = {Value(1), Value(2)};
Document input = DOC("local" << Value(inputArray));
- BSONObj query =
- DocumentSourceLookUp::queryForInput(input, FieldPath("local"), "foreign", BSONObj());
- ASSERT_EQ(query, fromjson("{$and: [{foreign: {$in: [1, 2]}}, {}]}"));
+ BSONObj matchStage = DocumentSourceLookUp::makeMatchStageFromInput(
+ input, FieldPath("local"), "foreign", BSONObj());
+ ASSERT_EQ(matchStage, fromjson("{$match: {$and: [{foreign: {$in: [1, 2]}}, {}]}}"));
}
-TEST(QueryForInput, ArrayValueWithRegexUsesOrQuery) {
+TEST(MakeMatchStageFromInput, ArrayValueWithRegexUsesOrQuery) {
BSONRegEx regex("^a");
vector<Value> inputArray = {Value(1), Value(regex), Value(2)};
Document input = DOC("local" << Value(inputArray));
- BSONObj query =
- DocumentSourceLookUp::queryForInput(input, FieldPath("local"), "foreign", BSONObj());
- ASSERT_EQ(query,
- BSON("$and" << BSON_ARRAY(
- BSON("$or" << BSON_ARRAY(BSON("foreign" << BSON("$eq" << Value(1)))
- << BSON("foreign" << BSON("$eq" << regex))
- << BSON("foreign" << BSON("$eq" << Value(2)))))
- << BSONObj())));
+ BSONObj matchStage = DocumentSourceLookUp::makeMatchStageFromInput(
+ input, FieldPath("local"), "foreign", BSONObj());
+ ASSERT_EQ(matchStage,
+ BSON("$match" << BSON(
+ "$and" << BSON_ARRAY(
+ BSON("$or" << BSON_ARRAY(BSON("foreign" << BSON("$eq" << Value(1)))
+ << BSON("foreign" << BSON("$eq" << regex))
+ << BSON("foreign" << BSON("$eq" << Value(2)))))
+ << BSONObj()))));
}
} // namespace DocumentSourceLookUp
diff --git a/src/mongo/db/pipeline/expression_context.cpp b/src/mongo/db/pipeline/expression_context.cpp
index 5992b8aa708..494dbdb31ae 100644
--- a/src/mongo/db/pipeline/expression_context.cpp
+++ b/src/mongo/db/pipeline/expression_context.cpp
@@ -33,6 +33,8 @@
namespace mongo {
+using boost::intrusive_ptr;
+
ExpressionContext::ExpressionContext(OperationContext* opCtx, const AggregationRequest& request)
: isExplain(request.isExplain()),
inShard(request.isFromRouter()),
@@ -65,4 +67,29 @@ void ExpressionContext::setCollator(std::unique_ptr<CollatorInterface> coll) {
_valueComparator = ValueComparator(_collator.get());
}
+intrusive_ptr<ExpressionContext> ExpressionContext::copyWith(NamespaceString ns) const {
+ intrusive_ptr<ExpressionContext> expCtx = new ExpressionContext();
+
+ expCtx->isExplain = isExplain;
+ expCtx->inShard = inShard;
+ expCtx->inRouter = inRouter;
+ expCtx->extSortAllowed = extSortAllowed;
+ expCtx->bypassDocumentValidation = bypassDocumentValidation;
+
+ expCtx->ns = std::move(ns);
+ expCtx->tempDir = tempDir;
+
+ expCtx->opCtx = opCtx;
+
+ expCtx->collation = collation;
+ if (_collator) {
+ expCtx->setCollator(_collator->clone());
+ }
+
+ // Note that we intentionally skip copying the value of 'interruptCounter' because 'expCtx' is
+ // intended to be used for executing a separate aggregation pipeline.
+
+ return expCtx;
+}
+
} // namespace mongo
diff --git a/src/mongo/db/pipeline/expression_context.h b/src/mongo/db/pipeline/expression_context.h
index 285318f7f7a..27708590b18 100644
--- a/src/mongo/db/pipeline/expression_context.h
+++ b/src/mongo/db/pipeline/expression_context.h
@@ -68,6 +68,12 @@ public:
return _valueComparator;
}
+ /**
+ * Returns an ExpressionContext that is identical to 'this' that can be used to execute a
+ * separate aggregation pipeline on 'ns'.
+ */
+ boost::intrusive_ptr<ExpressionContext> copyWith(NamespaceString ns) const;
+
bool isExplain = false;
bool inShard = false;
bool inRouter = false;
@@ -81,7 +87,7 @@ public:
// Collation requested by the user for this pipeline. Empty if the user did not request a
// collation.
- const BSONObj collation;
+ BSONObj collation;
static const int kInterruptCheckPeriod = 128;
int interruptCounter = kInterruptCheckPeriod; // when 0, check interruptStatus
diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp
index a522d9cfefd..a1e77179f2f 100644
--- a/src/mongo/db/pipeline/pipeline_d.cpp
+++ b/src/mongo/db/pipeline/pipeline_d.cpp
@@ -163,6 +163,28 @@ public:
str::stream() << "renameCollection failed: " << info};
}
+ StatusWith<boost::intrusive_ptr<Pipeline>> makePipeline(
+ const std::vector<BSONObj>& rawPipeline,
+ const boost::intrusive_ptr<ExpressionContext>& expCtx) final {
+ // 'expCtx' may represent the settings for an aggregation pipeline on a different namespace
+ // than the DocumentSource this MongodImplementation is injected into, but both
+ // ExpressionContext instances should still have the same OperationContext.
+ invariant(_ctx->opCtx == expCtx->opCtx);
+
+ auto pipeline = Pipeline::parse(rawPipeline, expCtx);
+ if (!pipeline.isOK()) {
+ return pipeline.getStatus();
+ }
+
+ pipeline.getValue()->injectExpressionContext(expCtx);
+ pipeline.getValue()->optimizePipeline();
+
+ AutoGetCollectionForRead autoColl(expCtx->opCtx, expCtx->ns);
+ PipelineD::prepareCursorSource(autoColl.getCollection(), pipeline.getValue());
+
+ return pipeline;
+ }
+
private:
intrusive_ptr<ExpressionContext> _ctx;
DBDirectClient _client;
@@ -282,21 +304,20 @@ StatusWith<std::unique_ptr<PlanExecutor>> attemptToGetExecutor(
}
} // namespace
-shared_ptr<PlanExecutor> PipelineD::prepareCursorSource(
- OperationContext* txn,
- Collection* collection,
- const NamespaceString& nss,
- const intrusive_ptr<Pipeline>& pPipeline,
- const intrusive_ptr<ExpressionContext>& pExpCtx) {
+void PipelineD::prepareCursorSource(Collection* collection,
+ const intrusive_ptr<Pipeline>& pipeline) {
+ auto expCtx = pipeline->getContext();
+ dassert(expCtx->opCtx->lockState()->isCollectionLockedForMode(expCtx->ns.ns(), MODE_IS));
+
// We will be modifying the source vector as we go.
- Pipeline::SourceContainer& sources = pPipeline->_sources;
+ Pipeline::SourceContainer& sources = pipeline->_sources;
// Inject a MongodImplementation to sources that need them.
for (auto&& source : sources) {
DocumentSourceNeedsMongod* needsMongod =
dynamic_cast<DocumentSourceNeedsMongod*>(source.get());
if (needsMongod) {
- needsMongod->injectMongodInterface(std::make_shared<MongodImplementation>(pExpCtx));
+ needsMongod->injectMongodInterface(std::make_shared<MongodImplementation>(expCtx));
}
}
@@ -309,35 +330,36 @@ shared_ptr<PlanExecutor> PipelineD::prepareCursorSource(
// on secondaries, this is needed.
ShardedConnectionInfo::addHook();
}
- return std::shared_ptr<PlanExecutor>(); // don't need a cursor
+ return; // don't need a cursor
}
auto sampleStage = dynamic_cast<DocumentSourceSample*>(sources.front().get());
// Optimize an initial $sample stage if possible.
if (collection && sampleStage) {
const long long sampleSize = sampleStage->getSampleSize();
- const long long numRecords = collection->getRecordStore()->numRecords(txn);
+ const long long numRecords = collection->getRecordStore()->numRecords(expCtx->opCtx);
auto exec = uassertStatusOK(
- createRandomCursorExecutor(collection, txn, sampleSize, numRecords));
+ createRandomCursorExecutor(collection, expCtx->opCtx, sampleSize, numRecords));
if (exec) {
// Replace $sample stage with $sampleFromRandomCursor stage.
sources.pop_front();
std::string idString = collection->ns().isOplog() ? "ts" : "_id";
sources.emplace_front(DocumentSourceSampleFromRandomCursor::create(
- pExpCtx, sampleSize, idString, numRecords));
+ expCtx, sampleSize, idString, numRecords));
- return addCursorSource(
- pPipeline,
- pExpCtx,
+ addCursorSource(
+ pipeline,
+ expCtx,
std::move(exec),
- pPipeline->getDependencies(DepsTracker::MetadataAvailable::kNoMetadata));
+ pipeline->getDependencies(DepsTracker::MetadataAvailable::kNoMetadata));
+ return;
}
}
}
// Look for an initial match. This works whether we got an initial query or not. If not, it
// results in a "{}" query, which will be what we want in that case.
- const BSONObj queryObj = pPipeline->getInitialQuery();
+ const BSONObj queryObj = pipeline->getInitialQuery();
if (!queryObj.isEmpty()) {
if (dynamic_cast<DocumentSourceMatch*>(sources.front().get())) {
// If a $match query is pulled into the cursor, the $match is redundant, and can be
@@ -351,9 +373,9 @@ shared_ptr<PlanExecutor> PipelineD::prepareCursorSource(
}
// Find the set of fields in the source documents depended on by this pipeline.
- DepsTracker deps = pPipeline->getDependencies(
- DocumentSourceMatch::isTextQuery(queryObj) ? DepsTracker::MetadataAvailable::kTextScore
- : DepsTracker::MetadataAvailable::kNoMetadata);
+ DepsTracker deps = pipeline->getDependencies(DocumentSourceMatch::isTextQuery(queryObj)
+ ? DepsTracker::MetadataAvailable::kTextScore
+ : DepsTracker::MetadataAvailable::kNoMetadata);
BSONObj projForQuery = deps.toProjection();
@@ -375,19 +397,18 @@ shared_ptr<PlanExecutor> PipelineD::prepareCursorSource(
}
// Create the PlanExecutor.
- auto exec = uassertStatusOK(prepareExecutor(txn,
+ auto exec = uassertStatusOK(prepareExecutor(expCtx->opCtx,
collection,
- nss,
- pPipeline,
- pExpCtx,
+ expCtx->ns,
+ pipeline,
+ expCtx,
sortStage,
deps,
queryObj,
&sortObj,
&projForQuery));
- return addCursorSource(
- pPipeline, pExpCtx, std::move(exec), deps, queryObj, sortObj, projForQuery);
+ addCursorSource(pipeline, expCtx, std::move(exec), deps, queryObj, sortObj, projForQuery);
}
StatusWith<std::unique_ptr<PlanExecutor>> PipelineD::prepareExecutor(
@@ -509,23 +530,22 @@ StatusWith<std::unique_ptr<PlanExecutor>> PipelineD::prepareExecutor(
txn, collection, expCtx, queryObj, *projectionObj, *sortObj, plannerOpts);
}
-shared_ptr<PlanExecutor> PipelineD::addCursorSource(const intrusive_ptr<Pipeline>& pipeline,
- const intrusive_ptr<ExpressionContext>& expCtx,
- unique_ptr<PlanExecutor> exec,
- DepsTracker deps,
- const BSONObj& queryObj,
- const BSONObj& sortObj,
- const BSONObj& projectionObj) {
+void PipelineD::addCursorSource(const intrusive_ptr<Pipeline>& pipeline,
+ const intrusive_ptr<ExpressionContext>& expCtx,
+ unique_ptr<PlanExecutor> exec,
+ DepsTracker deps,
+ const BSONObj& queryObj,
+ const BSONObj& sortObj,
+ const BSONObj& projectionObj) {
// Get the full "namespace" name.
const string& fullName = expCtx->ns.ns();
- // We convert the unique_ptr to a shared_ptr because both the PipelineProxyStage and the
- // DocumentSourceCursor need to reference the PlanExecutor.
- std::shared_ptr<PlanExecutor> sharedExec(std::move(exec));
+ // DocumentSourceCursor expects a yielding PlanExecutor that has had its state saved.
+ exec->saveState();
// Put the PlanExecutor into a DocumentSourceCursor and add it to the front of the pipeline.
intrusive_ptr<DocumentSourceCursor> pSource =
- DocumentSourceCursor::create(fullName, sharedExec, expCtx);
+ DocumentSourceCursor::create(fullName, std::move(exec), expCtx);
// Note the query, sort, and projection for explain.
pSource->setQuery(queryObj);
@@ -552,13 +572,6 @@ shared_ptr<PlanExecutor> PipelineD::addCursorSource(const intrusive_ptr<Pipeline
// case the new stage can be absorbed with the first stages of the pipeline.
pipeline->addInitialSource(pSource);
pipeline->optimizePipeline();
-
- // DocumentSourceCursor expects a yielding PlanExecutor that has had its state saved. We
- // deregister the PlanExecutor so that it can be registered with ClientCursor.
- sharedExec->deregisterExec();
- sharedExec->saveState();
-
- return sharedExec;
}
std::string PipelineD::getPlanSummaryStr(const boost::intrusive_ptr<Pipeline>& pPipeline) {
diff --git a/src/mongo/db/pipeline/pipeline_d.h b/src/mongo/db/pipeline/pipeline_d.h
index 76214db4925..35d93a4cfd2 100644
--- a/src/mongo/db/pipeline/pipeline_d.h
+++ b/src/mongo/db/pipeline/pipeline_d.h
@@ -70,20 +70,10 @@ public:
*
* The cursor is added to the front of the pipeline's sources.
*
- * Must have a AutoGetCollectionForRead before entering.
- *
- * If the returned PlanExecutor is non-null, you are responsible for ensuring
- * it receives appropriate invalidate and kill messages.
- *
- * @param pPipeline the logical "this" for this operation
- * @param pExpCtx the expression context for this pipeline
+ * Callers must take care to ensure that 'collection' is locked in at least IS-mode.
*/
- static std::shared_ptr<PlanExecutor> prepareCursorSource(
- OperationContext* txn,
- Collection* collection,
- const NamespaceString& nss,
- const boost::intrusive_ptr<Pipeline>& pPipeline,
- const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
+ static void prepareCursorSource(Collection* collection,
+ const boost::intrusive_ptr<Pipeline>& pipeline);
static std::string getPlanSummaryStr(const boost::intrusive_ptr<Pipeline>& pPipeline);
@@ -118,14 +108,13 @@ private:
* Creates a DocumentSourceCursor from the given PlanExecutor and adds it to the front of the
* Pipeline.
*/
- static std::shared_ptr<PlanExecutor> addCursorSource(
- const boost::intrusive_ptr<Pipeline>& pipeline,
- const boost::intrusive_ptr<ExpressionContext>& expCtx,
- std::unique_ptr<PlanExecutor> exec,
- DepsTracker deps,
- const BSONObj& queryObj = BSONObj(),
- const BSONObj& sortObj = BSONObj(),
- const BSONObj& projectionObj = BSONObj());
+ static void addCursorSource(const boost::intrusive_ptr<Pipeline>& pipeline,
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ std::unique_ptr<PlanExecutor> exec,
+ DepsTracker deps,
+ const BSONObj& queryObj = BSONObj(),
+ const BSONObj& sortObj = BSONObj(),
+ const BSONObj& projectionObj = BSONObj());
};
} // namespace mongo
diff --git a/src/mongo/db/query/plan_executor.cpp b/src/mongo/db/query/plan_executor.cpp
index 395e84ce1a7..3b4159facbc 100644
--- a/src/mongo/db/query/plan_executor.cpp
+++ b/src/mongo/db/query/plan_executor.cpp
@@ -517,29 +517,6 @@ void PlanExecutor::deregisterExec() {
void PlanExecutor::kill(string reason) {
_killReason = std::move(reason);
-
- // XXX: PlanExecutor is designed to wrap a single execution tree. In the case of
- // aggregation queries, PlanExecutor wraps a proxy stage responsible for pulling results
- // from an aggregation pipeline. The aggregation pipeline pulls results from yet another
- // PlanExecutor. Such nested PlanExecutors require us to manually propagate kill() to
- // the "inner" executor. This is bad, and hopefully can be fixed down the line with the
- // unification of agg and query.
- //
- // The CachedPlanStage is another special case. It needs to update the plan cache from
- // its destructor. It needs to know whether it has been killed so that it can avoid
- // touching a potentially invalid plan cache in this case.
- //
- // TODO: get rid of this code block.
- {
- PlanStage* foundStage = getStageByType(_root.get(), STAGE_PIPELINE_PROXY);
- if (foundStage) {
- PipelineProxyStage* proxyStage = static_cast<PipelineProxyStage*>(foundStage);
- shared_ptr<PlanExecutor> childExec = proxyStage->getChildExecutor();
- if (childExec) {
- childExec->kill(*_killReason);
- }
- }
- }
}
Status PlanExecutor::executePlan() {
diff --git a/src/mongo/dbtests/documentsourcetests.cpp b/src/mongo/dbtests/documentsourcetests.cpp
index c98dcdd7a53..dab5b9b0f26 100644
--- a/src/mongo/dbtests/documentsourcetests.cpp
+++ b/src/mongo/dbtests/documentsourcetests.cpp
@@ -93,7 +93,6 @@ protected:
void createSource(boost::optional<BSONObj> hint = boost::none) {
// clean up first if this was called before
_source.reset();
- _exec.reset();
OldClientWriteContext ctx(&_opCtx, nss.ns());
@@ -104,13 +103,13 @@ protected:
auto cq = uassertStatusOK(CanonicalQuery::canonicalize(
&_opCtx, std::move(qr), ExtensionsCallbackDisallowExtensions()));
- _exec = uassertStatusOK(
+ auto exec = uassertStatusOK(
getExecutor(&_opCtx, ctx.getCollection(), std::move(cq), PlanExecutor::YIELD_MANUAL));
- _exec->saveState();
- _exec->registerExec(ctx.getCollection());
+ exec->saveState();
+ exec->registerExec(ctx.getCollection());
- _source = DocumentSourceCursor::create(nss.ns(), _exec, _ctx);
+ _source = DocumentSourceCursor::create(nss.ns(), std::move(exec), _ctx);
}
intrusive_ptr<ExpressionContext> ctx() {
@@ -123,7 +122,6 @@ protected:
private:
// It is important that these are ordered to ensure correct destruction order.
- std::shared_ptr<PlanExecutor> _exec;
intrusive_ptr<ExpressionContext> _ctx;
intrusive_ptr<DocumentSourceCursor> _source;
};
diff --git a/src/mongo/dbtests/query_plan_executor.cpp b/src/mongo/dbtests/query_plan_executor.cpp
index 959a9fed617..eded231bd75 100644
--- a/src/mongo/dbtests/query_plan_executor.cpp
+++ b/src/mongo/dbtests/query_plan_executor.cpp
@@ -44,6 +44,7 @@
#include "mongo/db/json.h"
#include "mongo/db/matcher/expression_parser.h"
#include "mongo/db/matcher/extensions_callback_disallow_extensions.h"
+#include "mongo/db/pipeline/document_source.h"
#include "mongo/db/pipeline/expression_context.h"
#include "mongo/db/pipeline/pipeline.h"
#include "mongo/db/query/plan_executor.h"
@@ -278,36 +279,47 @@ public:
BSONObj indexSpec = BSON("a" << 1);
addIndex(indexSpec);
- // Create the PlanExecutor which feeds the aggregation pipeline.
- std::shared_ptr<PlanExecutor> innerExec(makeIndexScanExec(ctx.db(), indexSpec, 7, 10));
+ Collection* collection = ctx.getCollection();
// Create the aggregation pipeline.
std::vector<BSONObj> rawPipeline = {fromjson("{$match: {a: {$gte: 7, $lte: 10}}}")};
- boost::intrusive_ptr<ExpressionContext> expCtx = new ExpressionContext(
- &_txn, AggregationRequest(NamespaceString(nss.ns()), rawPipeline));
+ boost::intrusive_ptr<ExpressionContext> expCtx =
+ new ExpressionContext(&_txn, AggregationRequest(nss, rawPipeline));
+
+ // Create an "inner" plan executor and register it with the cursor manager so that it can
+ // get notified when the collection is dropped.
+ unique_ptr<PlanExecutor> innerExec(makeIndexScanExec(ctx.db(), indexSpec, 7, 10));
+ registerExec(innerExec.get());
- auto statusWithPipeline = Pipeline::parse(rawPipeline, expCtx);
- auto pipeline = assertGet(statusWithPipeline);
+ // Wrap the "inner" plan executor in a DocumentSourceCursor and add it as the first source
+ // in the pipeline.
+ innerExec->saveState();
+ auto cursorSource = DocumentSourceCursor::create(nss.ns(), std::move(innerExec), expCtx);
+ auto pipeline = assertGet(Pipeline::create({cursorSource}, expCtx));
// Create the output PlanExecutor that pulls results from the pipeline.
auto ws = make_unique<WorkingSet>();
- auto proxy = make_unique<PipelineProxyStage>(&_txn, pipeline, innerExec, ws.get());
- Collection* collection = ctx.getCollection();
+ auto proxy = make_unique<PipelineProxyStage>(&_txn, pipeline, ws.get());
auto statusWithPlanExecutor = PlanExecutor::make(
&_txn, std::move(ws), std::move(proxy), collection, PlanExecutor::YIELD_MANUAL);
ASSERT_OK(statusWithPlanExecutor.getStatus());
unique_ptr<PlanExecutor> outerExec = std::move(statusWithPlanExecutor.getValue());
- // Only the outer executor gets registered.
+ // Register the "outer" plan executor with the cursor manager so it can get notified when
+ // the collection is dropped.
registerExec(outerExec.get());
- // Verify that both the "inner" and "outer" plan executors have been killed after
- // dropping the collection.
- BSONObj objOut;
dropCollection();
- ASSERT_EQUALS(PlanExecutor::DEAD, innerExec->getNext(&objOut, NULL));
- ASSERT_EQUALS(PlanExecutor::DEAD, outerExec->getNext(&objOut, NULL));
+
+ // Verify that the aggregation pipeline returns an error because its "inner" plan executor
+ // has been killed due to the collection being dropped.
+ ASSERT_THROWS_CODE(pipeline->output()->getNext(), UserException, 16028);
+
+ // Verify that the "outer" plan executor has been killed due to the collection being
+ // dropped.
+ BSONObj objOut;
+ ASSERT_EQUALS(PlanExecutor::DEAD, outerExec->getNext(&objOut, nullptr));
deregisterExec(outerExec.get());
}