diff options
22 files changed, 789 insertions, 289 deletions
diff --git a/jstests/aggregation/bugs/lookup_unwind_getmore.js b/jstests/aggregation/bugs/lookup_unwind_getmore.js index 6c8d886b78f..5d4a9286ef9 100644 --- a/jstests/aggregation/bugs/lookup_unwind_getmore.js +++ b/jstests/aggregation/bugs/lookup_unwind_getmore.js @@ -1,33 +1,30 @@ /** - * Tests that the server correctly handles when the OperationContext of the DBDirectClient used by - * the $lookup stage changes as it unwinds the results. + * Tests that the server correctly handles when the OperationContext used by the $lookup stage + * changes as it unwinds the results. * * This test was designed to reproduce SERVER-22537. */ (function() { 'use strict'; - // We use a batch size of 1 to ensure that the mongo shell issues a getMore when unwinding the - // results from the 'dest' collection for the same document in the 'source' collection under a - // different OperationContext. - const batchSize = 1; - - db.source.drop(); - db.dest.drop(); + const options = {setParameter: 'internalDocumentSourceCursorBatchSizeBytes=1'}; + const conn = MongoRunner.runMongod(options); + assert.neq(null, conn, 'mongod was unable to start up with options: ' + tojson(options)); - assert.writeOK(db.source.insert({local: 1})); + const testDB = conn.getDB('test'); - // We insert documents in the 'dest' collection such that their combined size is greater than - // 16MB in order to ensure that the DBDirectClient used by the $lookup stage issues a getMore - // under a different OperationContext. - const numMatches = 3; - const largeStr = new Array(6 * 1024 * 1024 + 1).join('x'); + // We use a batch size of 2 to ensure that the mongo shell issues a getMore when unwinding the + // results from the 'dest' collection for the same document in the 'source' collection under a + // different OperationContext. + const batchSize = 2; + const numMatches = 5; - for (var i = 0; i < numMatches; ++i) { - assert.writeOK(db.dest.insert({foreign: 1, largeStr: largeStr})); + assert.writeOK(testDB.source.insert({local: 1})); + for (let i = 0; i < numMatches; ++i) { + assert.writeOK(testDB.dest.insert({foreign: 1})); } - var res = db.runCommand({ + const res = assert.commandWorked(testDB.runCommand({ aggregate: 'source', pipeline: [ { @@ -47,9 +44,10 @@ cursor: { batchSize: batchSize, }, - }); - assert.commandWorked(res); + })); - var cursor = new DBCommandCursor(db.getMongo(), res, batchSize); + const cursor = new DBCommandCursor(conn, res, batchSize); assert.eq(numMatches, cursor.itcount()); + + MongoRunner.stopMongod(conn); })(); diff --git a/jstests/aggregation/bugs/lookup_unwind_killcursor.js b/jstests/aggregation/bugs/lookup_unwind_killcursor.js new file mode 100644 index 00000000000..939205007b5 --- /dev/null +++ b/jstests/aggregation/bugs/lookup_unwind_killcursor.js @@ -0,0 +1,55 @@ +/** + * Tests that the cursor underlying the $lookup stage is killed when the cursor returned to the + * client for the aggregation pipeline is killed. + * + * This test was designed to reproduce SERVER-24386. + */ +(function() { + 'use strict'; + + const options = {setParameter: 'internalDocumentSourceCursorBatchSizeBytes=1'}; + const conn = MongoRunner.runMongod(options); + assert.neq(null, conn, 'mongod was unable to start up with options: ' + tojson(options)); + + const testDB = conn.getDB('test'); + + // We use a batch size of 2 to ensure that the mongo shell does not exhaust the cursor on its + // first batch. + const batchSize = 2; + const numMatches = 5; + + assert.writeOK(testDB.source.insert({local: 1})); + for (let i = 0; i < numMatches; ++i) { + assert.writeOK(testDB.dest.insert({foreign: 1})); + } + + const res = assert.commandWorked(testDB.runCommand({ + aggregate: 'source', + pipeline: [ + { + $lookup: { + from: 'dest', + localField: 'local', + foreignField: 'foreign', + as: 'matches', + } + }, + { + $unwind: { + path: '$matches', + }, + }, + ], + cursor: { + batchSize: batchSize, + }, + })); + + const cursor = new DBCommandCursor(conn, res, batchSize); + cursor.close(); // Closing the cursor will issue the "killCursors" command. + + const serverStatus = assert.commandWorked(testDB.adminCommand({serverStatus: 1})); + assert.eq(0, serverStatus.metrics.cursor.open.total, tojson(serverStatus)); + + MongoRunner.stopMongod(conn); +})(); diff --git a/jstests/concurrency/fsm_all_sharded_replication.js b/jstests/concurrency/fsm_all_sharded_replication.js index 6c14334b33f..39cda5ce3f8 100644 --- a/jstests/concurrency/fsm_all_sharded_replication.js +++ b/jstests/concurrency/fsm_all_sharded_replication.js @@ -59,6 +59,13 @@ var blacklist = [ 'group_cond.js', // the group command cannot be issued against a sharded cluster 'indexed_insert_eval.js', // eval doesn't work with sharded collections 'indexed_insert_eval_nolock.js', // eval doesn't work with sharded collections + + // This workload sometimes triggers an 'unable to target write op for collection ... caused by + // ... database not found' error. Further investigation still needs to be done, but this + // workload may be failing due to SERVER-17397 'drops in a sharded cluster may not fully + // succeed' because it drops and reuses the same namespaces. + 'kill_multicollection_aggregation.js', + 'plan_cache_drop_database.js', // cannot ensureIndex after dropDatabase without sharding first 'remove_single_document.js', // our .remove(query, {justOne: true}) calls lack shard keys 'remove_single_document_eval.js', // eval doesn't work with sharded collections diff --git a/jstests/concurrency/fsm_all_sharded_replication_with_balancer.js b/jstests/concurrency/fsm_all_sharded_replication_with_balancer.js index c20464430bb..04a12b83eb3 100644 --- a/jstests/concurrency/fsm_all_sharded_replication_with_balancer.js +++ b/jstests/concurrency/fsm_all_sharded_replication_with_balancer.js @@ -64,6 +64,13 @@ var blacklist = [ 'group_cond.js', // the group command cannot be issued against a sharded cluster 'indexed_insert_eval.js', // eval doesn't work with sharded collections 'indexed_insert_eval_nolock.js', // eval doesn't work with sharded collections + + // This workload sometimes triggers an 'unable to target write op for collection ... caused by + // ... database not found' error. Further investigation still needs to be done, but this + // workload may be failing due to SERVER-17397 'drops in a sharded cluster may not fully + // succeed' because it drops and reuses the same namespaces. + 'kill_multicollection_aggregation.js', + 'plan_cache_drop_database.js', // cannot ensureIndex after dropDatabase without sharding first 'remove_single_document.js', // our .remove(query, {justOne: true}) calls lack shard keys 'remove_single_document_eval.js', // eval doesn't work with sharded collections diff --git a/jstests/concurrency/fsm_workloads/kill_multicollection_aggregation.js b/jstests/concurrency/fsm_workloads/kill_multicollection_aggregation.js new file mode 100644 index 00000000000..ba38bdfe94a --- /dev/null +++ b/jstests/concurrency/fsm_workloads/kill_multicollection_aggregation.js @@ -0,0 +1,301 @@ +'use strict'; + +/** + * kill_multicollection_aggregation.js + * + * This workload was designed to stress running and invalidating aggregation pipelines involving + * multiple collections and to reproduce issues like those described in SERVER-22537 and + * SERVER-24386. Threads perform an aggregation pipeline on one of a few collections, optionally + * specifying a $lookup stage, a $graphLookup stage, or a $facet stage, while the database, a + * collection, or an index is dropped concurrently. + */ +var $config = (function() { + + var data = { + chooseRandomlyFrom: function chooseRandomlyFrom(arr) { + if (!Array.isArray(arr)) { + throw new Error('Expected array for first argument, but got: ' + tojson(arr)); + } + return arr[Random.randInt(arr.length)]; + }, + + involvedCollections: ['coll0', 'coll1', 'coll2'], + indexSpecs: [{a: 1, b: 1}, {c: 1}], + + numDocs: 10, + batchSize: 2, + + /** + * Inserts 'this.numDocs' new documents into the specified collection and ensures that the + * indexes 'this.indexSpecs' exist on the collection. Note that means it is safe for + * multiple threads to perform this function simultaneously. + */ + populateDataAndIndexes: function populateDataAndIndexes(db, collName) { + var bulk = db[collName].initializeUnorderedBulkOp(); + for (var i = 0; i < this.numDocs; ++i) { + bulk.insert({}); + } + var res = bulk.execute(); + assertAlways.writeOK(res); + assertAlways.eq(this.numDocs, res.nInserted, tojson(res)); + + this.indexSpecs.forEach(indexSpec => { + assertAlways.commandWorked(db[collName].createIndex(indexSpec)); + }); + }, + + /** + * Runs the specified aggregation pipeline and stores the resulting cursor (if the command + * is successful) in 'this.cursor'. + */ + runAggregation: function runAggregation(db, collName, pipeline) { + var res = db.runCommand( + {aggregate: collName, pipeline: pipeline, cursor: {batchSize: this.batchSize}}); + + if (res.ok) { + this.cursor = new DBCommandCursor(db.getMongo(), res, this.batchSize); + } + }, + + makeLookupPipeline: function makeLookupPipeline(foreignColl) { + var pipeline = [ + { + $lookup: { + from: foreignColl, + // We perform the $lookup on a field that doesn't exist in either the document + // on the source collection or the document on the foreign collection. This + // ensures that every document in the source collection will match every + // document in the foreign collection and cause the cursor underlying the + // $lookup stage to need to request another batch. + localField: 'fieldThatDoesNotExistInDoc', + foreignField: 'fieldThatDoesNotExistInDoc', + as: 'results' + } + }, + {$unwind: '$results'} + ]; + + return pipeline; + }, + + makeGraphLookupPipeline: function makeGraphLookupPipeline(foreignName) { + var pipeline = [ + { + $graphLookup: { + from: foreignName, + startWith: '$fieldThatDoesNotExistInDoc', + connectToField: 'fieldThatDoesNotExistInDoc', + connectFromField: 'fieldThatDoesNotExistInDoc', + maxDepth: Random.randInt(5), + as: 'results' + } + }, + {$unwind: '$results'} + ]; + + return pipeline; + } + }; + + var states = { + /** + * This is a no-op, used only as a transition state. + */ + init: function init(db, collName) {}, + + /** + * Runs an aggregation involving only one collection and saves the resulting cursor to + * 'this.cursor'. + */ + normalAggregation: function normalAggregation(db, collName) { + var myDB = db.getSiblingDB(this.uniqueDBName); + var targetCollName = this.chooseRandomlyFrom(this.involvedCollections); + + var pipeline = [{$sort: this.chooseRandomlyFrom(this.indexSpecs)}]; + this.runAggregation(myDB, targetCollName, pipeline); + }, + + /** + * Runs an aggregation that uses the $lookup stage and saves the resulting cursor to + * 'this.cursor'. + */ + lookupAggregation: function lookupAggregation(db, collName) { + var myDB = db.getSiblingDB(this.uniqueDBName); + var targetCollName = this.chooseRandomlyFrom(this.involvedCollections); + var foreignCollName = this.chooseRandomlyFrom(this.involvedCollections); + + var pipeline = this.makeLookupPipeline(foreignCollName); + this.runAggregation(myDB, targetCollName, pipeline); + }, + + /** + * Runs an aggregation that uses the $graphLookup stage and saves the resulting cursor to + * 'this.cursor'. + */ + graphLookupAggregation: function graphLookupAggregation(db, collName) { + var myDB = db.getSiblingDB(this.uniqueDBName); + var targetCollName = this.chooseRandomlyFrom(this.involvedCollections); + var foreignCollName = this.chooseRandomlyFrom(this.involvedCollections); + + var pipeline = this.makeGraphLookupPipeline(foreignCollName); + this.runAggregation(myDB, targetCollName, pipeline); + }, + + /** + * Runs an aggregation that uses the $lookup and $graphLookup stages within a $facet stage + * and saves the resulting cursor to 'this.cursor'. + */ + facetAggregation: function facetAggregation(db, collName) { + var myDB = db.getSiblingDB(this.uniqueDBName); + var targetCollName = this.chooseRandomlyFrom(this.involvedCollections); + var lookupForeignCollName = this.chooseRandomlyFrom(this.involvedCollections); + var graphLookupForeignCollName = this.chooseRandomlyFrom(this.involvedCollections); + + var pipeline = [{ + $facet: { + lookup: this.makeLookupPipeline(lookupForeignCollName), + graphLookup: this.makeGraphLookupPipeline(graphLookupForeignCollName) + } + }]; + this.runAggregation(myDB, targetCollName, pipeline); + }, + + killCursor: function killCursor(db, collName) { + if (this.hasOwnProperty('cursor')) { + this.cursor.close(); + } + }, + + /** + * Requests enough results from 'this.cursor' to ensure that another batch is needed, and + * thus ensures that a getMore request is sent for 'this.cursor'. + */ + getMore: function getMore(db, collName) { + if (!this.hasOwnProperty('cursor')) { + return; + } + + for (var i = 0; i <= this.batchSize; ++i) { + try { + if (!this.cursor.hasNext()) { + break; + } + this.cursor.next(); + } catch (e) { + // The getMore request can fail if the database, a collection, or an index was + // dropped. + } + } + }, + + /** + * Drops the database being used by this workload and then re-creates each of + * 'this.involvedCollections' by repopulating them with data and indexes. + */ + dropDatabase: function dropDatabase(db, collName) { + var myDB = db.getSiblingDB(this.uniqueDBName); + myDB.dropDatabase(); + + // Re-create all of the collections and indexes that were dropped. + this.involvedCollections.forEach(collName => { + this.populateDataAndIndexes(myDB, collName); + }); + }, + + /** + * Randomly selects a collection from 'this.involvedCollections' and drops it. The + * collection is then re-created with data and indexes. + */ + dropCollection: function dropCollection(db, collName) { + var myDB = db.getSiblingDB(this.uniqueDBName); + var targetColl = this.chooseRandomlyFrom(this.involvedCollections); + + myDB[targetColl].drop(); + + // Re-create the collection that was dropped. + this.populateDataAndIndexes(myDB, targetColl); + }, + + /** + * Randomly selects a collection from 'this.involvedCollections' and an index from + * 'this.indexSpecs' and drops that particular index from the collection. The index is then + * re-created. + */ + dropIndex: function dropIndex(db, collName) { + var myDB = db.getSiblingDB(this.uniqueDBName); + var targetColl = this.chooseRandomlyFrom(this.involvedCollections); + var indexSpec = this.chooseRandomlyFrom(this.indexSpecs); + + // We don't assert that the command succeeded when dropping an index because it's + // possible another thread has already dropped this index. + myDB[targetColl].dropIndex(indexSpec); + + // Re-create the index that was dropped. + assertAlways.commandWorked(myDB[targetColl].createIndex(indexSpec)); + } + }; + + var transitions = { + init: { + normalAggregation: 0.1, + lookupAggregation: 0.25, + graphLookupAggregation: 0.25, + facetAggregation: 0.25, + dropDatabase: 0.05, + dropCollection: 0.05, + dropIndex: 0.05, + }, + + normalAggregation: {killCursor: 0.1, getMore: 0.9}, + lookupAggregation: {killCursor: 0.1, getMore: 0.9}, + graphLookupAggregation: {killCursor: 0.1, getMore: 0.9}, + facetAggregation: {killCursor: 0.1, getMore: 0.9}, + killCursor: {init: 1}, + getMore: {killCursor: 0.2, getMore: 0.6, init: 0.2}, + dropDatabase: {init: 1}, + dropCollection: {init: 1}, + dropIndex: {init: 1} + }; + + function setup(db, collName, cluster) { + // We decrease the batch size of the DocumentSourceCursor so that the PlanExecutor + // underlying it isn't exhausted when the "aggregate" command is sent. This makes it more + // likely for the "killCursors" command to need to handle destroying the underlying + // PlanExecutor. + cluster.executeOnMongodNodes(function lowerDocumentSourceCursorBatchSize(db) { + assertAlways.commandWorked( + db.adminCommand({setParameter: 1, internalDocumentSourceCursorBatchSizeBytes: 1})); + }); + + // Use the workload name as part of the database name, since the workload name is assumed to + // be unique. + this.uniqueDBName = db.getName() + 'kill_multicollection_aggregation'; + + var myDB = db.getSiblingDB(this.uniqueDBName); + this.involvedCollections.forEach(collName => { + this.populateDataAndIndexes(myDB, collName); + assertAlways.eq(this.numDocs, myDB[collName].find({}).itcount()); + }); + } + + function teardown(db, collName, cluster) { + cluster.executeOnMongodNodes(function lowerDocumentSourceCursorBatchSize(db) { + assertAlways.commandWorked(db.adminCommand( + {setParameter: 1, internalDocumentSourceCursorBatchSizeBytes: 16 * 1024 * 1024})); + }); + + var myDB = db.getSiblingDB(this.uniqueDBName); + myDB.dropDatabase(); + } + + return { + threadCount: 10, + iterations: 200, + states: states, + startState: 'init', + transitions: transitions, + data: data, + setup: setup, + teardown: teardown + }; +})(); diff --git a/src/mongo/db/catalog/cursor_manager.cpp b/src/mongo/db/catalog/cursor_manager.cpp index da521a6ab6a..876d8f7519d 100644 --- a/src/mongo/db/catalog/cursor_manager.cpp +++ b/src/mongo/db/catalog/cursor_manager.cpp @@ -506,32 +506,44 @@ void CursorManager::deregisterCursor(ClientCursor* cc) { } Status CursorManager::eraseCursor(OperationContext* txn, CursorId id, bool shouldAudit) { - stdx::lock_guard<SimpleMutex> lk(_mutex); + ClientCursor* cursor; - CursorMap::iterator it = _cursors.find(id); - if (it == _cursors.end()) { - if (shouldAudit) { - audit::logKillCursorsAuthzCheck(txn->getClient(), _nss, id, ErrorCodes::CursorNotFound); + { + stdx::lock_guard<SimpleMutex> lk(_mutex); + + CursorMap::iterator it = _cursors.find(id); + if (it == _cursors.end()) { + if (shouldAudit) { + audit::logKillCursorsAuthzCheck( + txn->getClient(), _nss, id, ErrorCodes::CursorNotFound); + } + return {ErrorCodes::CursorNotFound, str::stream() << "Cursor id not found: " << id}; } - return {ErrorCodes::CursorNotFound, str::stream() << "Cursor id not found: " << id}; - } - ClientCursor* cursor = it->second; + cursor = it->second; + + if (cursor->isPinned()) { + if (shouldAudit) { + audit::logKillCursorsAuthzCheck( + txn->getClient(), _nss, id, ErrorCodes::OperationFailed); + } + return {ErrorCodes::OperationFailed, + str::stream() << "Cannot kill pinned cursor: " << id}; + } - if (cursor->isPinned()) { if (shouldAudit) { - audit::logKillCursorsAuthzCheck( - txn->getClient(), _nss, id, ErrorCodes::OperationFailed); + audit::logKillCursorsAuthzCheck(txn->getClient(), _nss, id, ErrorCodes::OK); } - return {ErrorCodes::OperationFailed, str::stream() << "Cannot kill pinned cursor: " << id}; - } - if (shouldAudit) { - audit::logKillCursorsAuthzCheck(txn->getClient(), _nss, id, ErrorCodes::OK); + cursor->kill(); + _deregisterCursor_inlock(cursor); } - cursor->kill(); - _deregisterCursor_inlock(cursor); + // If 'cursor' represents an aggregation cursor, then the destructor of the ClientCursor will + // eventually cause the destructor of the underlying PlanExecutor to be called. Since the + // underlying PlanExecutor is also registered on this CursorManager, we must destruct 'cursor' + // without holding '_mutex' so that it's possible to call CursorManager::deregisterCursor() + // without deadlocking ourselves. delete cursor; return Status::OK(); } diff --git a/src/mongo/db/commands/pipeline_command.cpp b/src/mongo/db/commands/pipeline_command.cpp index 853dec82dd9..372cf203166 100644 --- a/src/mongo/db/commands/pipeline_command.cpp +++ b/src/mongo/db/commands/pipeline_command.cpp @@ -358,14 +358,13 @@ public: // This does mongod-specific stuff like creating the input PlanExecutor and adding // it to the front of the pipeline if needed. - std::shared_ptr<PlanExecutor> input = - PipelineD::prepareCursorSource(txn, collection, nss, pipeline, expCtx); + PipelineD::prepareCursorSource(collection, pipeline); // Create the PlanExecutor which returns results from the pipeline. The WorkingSet // ('ws') and the PipelineProxyStage ('proxy') will be owned by the created // PlanExecutor. auto ws = make_unique<WorkingSet>(); - auto proxy = make_unique<PipelineProxyStage>(txn, pipeline, input, ws.get()); + auto proxy = make_unique<PipelineProxyStage>(txn, pipeline, ws.get()); auto statusWithPlanExecutor = (NULL == collection) ? PlanExecutor::make( diff --git a/src/mongo/db/exec/pipeline_proxy.cpp b/src/mongo/db/exec/pipeline_proxy.cpp index fd2d1a3a09e..50fa14e0d2a 100644 --- a/src/mongo/db/exec/pipeline_proxy.cpp +++ b/src/mongo/db/exec/pipeline_proxy.cpp @@ -48,12 +48,10 @@ const char* PipelineProxyStage::kStageType = "PIPELINE_PROXY"; PipelineProxyStage::PipelineProxyStage(OperationContext* opCtx, intrusive_ptr<Pipeline> pipeline, - const std::shared_ptr<PlanExecutor>& child, WorkingSet* ws) : PlanStage(kStageType, opCtx), _pipeline(pipeline), _includeMetaData(_pipeline->getContext()->inShard), // send metadata to merger - _childExec(child), _ws(ws) {} PlanStage::StageState PipelineProxyStage::doWork(WorkingSetID* out) { @@ -93,27 +91,12 @@ bool PipelineProxyStage::isEOF() { return true; } -void PipelineProxyStage::doInvalidate(OperationContext* txn, - const RecordId& dl, - InvalidationType type) { - // Propagate the invalidation to the child executor of the pipeline if it is still in use. - if (auto child = getChildExecutor()) { - child->invalidate(txn, dl, type); - } -} - void PipelineProxyStage::doDetachFromOperationContext() { _pipeline->detachFromOperationContext(); - if (auto child = getChildExecutor()) { - child->detachFromOperationContext(); - } } void PipelineProxyStage::doReattachToOperationContext() { _pipeline->reattachToOperationContext(getOpCtx()); - if (auto child = getChildExecutor()) { - child->reattachToOperationContext(getOpCtx()); - } } unique_ptr<PlanStageStats> PipelineProxyStage::getStats() { @@ -135,10 +118,6 @@ boost::optional<BSONObj> PipelineProxyStage::getNextBson() { return boost::none; } -shared_ptr<PlanExecutor> PipelineProxyStage::getChildExecutor() { - return _childExec.lock(); -} - std::string PipelineProxyStage::getPlanSummaryStr() const { return PipelineD::getPlanSummaryStr(_pipeline); } diff --git a/src/mongo/db/exec/pipeline_proxy.h b/src/mongo/db/exec/pipeline_proxy.h index a638bcda547..9482f9ba565 100644 --- a/src/mongo/db/exec/pipeline_proxy.h +++ b/src/mongo/db/exec/pipeline_proxy.h @@ -47,27 +47,18 @@ class PipelineProxyStage final : public PlanStage { public: PipelineProxyStage(OperationContext* opCtx, boost::intrusive_ptr<Pipeline> pipeline, - const std::shared_ptr<PlanExecutor>& child, WorkingSet* ws); PlanStage::StageState doWork(WorkingSetID* out) final; bool isEOF() final; - void doInvalidate(OperationContext* txn, const RecordId& dl, InvalidationType type) final; - // // Manage our OperationContext. // void doDetachFromOperationContext() final; void doReattachToOperationContext() final; - /** - * Return a shared pointer to the PlanExecutor that feeds the pipeline. The returned - * pointer may be NULL. - */ - std::shared_ptr<PlanExecutor> getChildExecutor(); - // Returns empty PlanStageStats object std::unique_ptr<PlanStageStats> getStats() final; @@ -93,7 +84,6 @@ private: const boost::intrusive_ptr<Pipeline> _pipeline; std::vector<BSONObj> _stash; const bool _includeMetaData; - std::weak_ptr<PlanExecutor> _childExec; // Not owned by us. WorkingSet* _ws; diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript index 85fcf5c2da1..d8026f7ee04 100644 --- a/src/mongo/db/pipeline/SConscript +++ b/src/mongo/db/pipeline/SConscript @@ -10,6 +10,7 @@ env.Library( 'aggregation_request', 'document_source', 'document_source_facet', + 'document_source_lookup', 'expression_context', 'pipeline', ] @@ -118,7 +119,9 @@ env.CppUnitTest( source='document_source_test.cpp', LIBDEPS=[ 'document_source', + 'document_source_lookup', 'document_value_test_util', + '$BUILD_DIR/mongo/db/auth/authorization_manager_mock_init', '$BUILD_DIR/mongo/db/service_context', '$BUILD_DIR/mongo/util/clock_source_mock', '$BUILD_DIR/mongo/executor/thread_pool_task_executor', @@ -183,11 +186,9 @@ docSourceEnv.Library( 'document_source_coll_stats.cpp', 'document_source_count.cpp', 'document_source_geo_near.cpp', - 'document_source_graph_lookup.cpp', 'document_source_group.cpp', 'document_source_index_stats.cpp', 'document_source_limit.cpp', - 'document_source_lookup.cpp', 'document_source_match.cpp', 'document_source_merge_cursors.cpp', 'document_source_mock.cpp', @@ -246,6 +247,18 @@ env.Library( ) env.Library( + target='document_source_lookup', + source=[ + 'document_source_graph_lookup.cpp', + 'document_source_lookup.cpp', + ], + LIBDEPS=[ + 'document_source', + 'pipeline', + ], +) + +env.Library( target='document_source_facet', source=[ 'document_source_facet.cpp', @@ -311,6 +324,7 @@ env.CppUnitTest( '$BUILD_DIR/mongo/db/service_context', '$BUILD_DIR/mongo/db/service_context_noop_init', 'document_value_test_util', + 'document_source_lookup', 'pipeline', ], ) diff --git a/src/mongo/db/pipeline/document_source.h b/src/mongo/db/pipeline/document_source.h index e11d7d8d085..77a539d1b89 100644 --- a/src/mongo/db/pipeline/document_source.h +++ b/src/mongo/db/pipeline/document_source.h @@ -392,6 +392,16 @@ public: const BSONObj& originalCollectionOptions, const std::list<BSONObj>& originalIndexes) = 0; + /** + * Parses a Pipeline from a vector of BSONObjs representing DocumentSources and readies it + * for execution. The returned pipeline is optimized and has a cursor source prepared. + * + * This function returns a non-OK status if parsing the pipeline failed. + */ + virtual StatusWith<boost::intrusive_ptr<Pipeline>> makePipeline( + const std::vector<BSONObj>& rawPipeline, + const boost::intrusive_ptr<ExpressionContext>& expCtx) = 0; + // Add new methods as needed. }; @@ -447,7 +457,6 @@ protected: class DocumentSourceCursor final : public DocumentSource { public: // virtuals from DocumentSource - ~DocumentSourceCursor() final; boost::optional<Document> getNext() final; const char* getSourceName() const final; BSONObjSet getOutputSorts() final { @@ -464,6 +473,10 @@ public: } void dispose() final; + void detachFromOperationContext() final; + + void reattachToOperationContext(OperationContext* opCtx) final; + /** * Create a document source based on a passed-in PlanExecutor. * @@ -472,7 +485,7 @@ public: */ static boost::intrusive_ptr<DocumentSourceCursor> create( const std::string& ns, - const std::shared_ptr<PlanExecutor>& exec, + std::unique_ptr<PlanExecutor> exec, const boost::intrusive_ptr<ExpressionContext>& pExpCtx); /* @@ -533,7 +546,7 @@ protected: private: DocumentSourceCursor(const std::string& ns, - const std::shared_ptr<PlanExecutor>& exec, + std::unique_ptr<PlanExecutor> exec, const boost::intrusive_ptr<ExpressionContext>& pExpCtx); void loadBatch(); @@ -554,7 +567,7 @@ private: long long _docsAddedToBatches; // for _limit enforcement const std::string _ns; - std::shared_ptr<PlanExecutor> _exec; // PipelineProxyStage holds a weak_ptr to this. + std::unique_ptr<PlanExecutor> _exec; BSONObjSet _outputSorts; std::string _planSummary; PlanSummaryStats _planSummaryStats; @@ -1689,16 +1702,23 @@ public: collections->push_back(_fromNs); } + void doDetachFromOperationContext() final; + + void doReattachToOperationContext(OperationContext* opCtx) final; + static boost::intrusive_ptr<DocumentSource> createFromBson( BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx); /** - * Build the BSONObj used to query the foreign collection. + * Builds the BSONObj used to query the foreign collection and wraps it in a $match. */ - static BSONObj queryForInput(const Document& input, - const FieldPath& localFieldName, - const std::string& foreignFieldName, - const BSONObj& additionalFilter); + static BSONObj makeMatchStageFromInput(const Document& input, + const FieldPath& localFieldName, + const std::string& foreignFieldName, + const BSONObj& additionalFilter); + +protected: + void doInjectExpressionContext() final; private: DocumentSourceLookUp(NamespaceString fromNs, @@ -1708,7 +1728,8 @@ private: const boost::intrusive_ptr<ExpressionContext>& pExpCtx); Value serialize(bool explain = false) const final { - invariant(false); + // Should not be called; use serializeToArray instead. + MONGO_UNREACHABLE; } boost::optional<Document> unwindResult(); @@ -1720,14 +1741,22 @@ private: std::string _foreignFieldFieldName; boost::optional<BSONObj> _additionalFilter; + // The ExpressionContext used when performing aggregation pipelines against the '_fromNs' + // namespace. + boost::intrusive_ptr<ExpressionContext> _fromExpCtx; + boost::intrusive_ptr<DocumentSourceMatch> _matchSrc; boost::intrusive_ptr<DocumentSourceUnwind> _unwindSrc; bool _handlingUnwind = false; bool _handlingMatch = false; - std::unique_ptr<DBClientCursor> _cursor; + + // The following members are used to hold onto state across getNext() calls when + // '_handlingUnwind' is true. long long _cursorIndex = 0; + boost::intrusive_ptr<Pipeline> _pipeline; boost::optional<Document> _input; + boost::optional<Document> _nextValue; }; // TODO SERVER-25139: Make $graphLookup respect the collation. @@ -1758,11 +1787,16 @@ public: collections->push_back(_from); } - void doInjectExpressionContext() final; + void doDetachFromOperationContext() final; + + void doReattachToOperationContext(OperationContext* opCtx) final; static boost::intrusive_ptr<DocumentSource> createFromBson( BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx); +protected: + void doInjectExpressionContext() final; + private: DocumentSourceGraphLookUp(NamespaceString from, std::string as, @@ -1780,14 +1814,15 @@ private: } /** - * Prepare the query to execute on the 'from' collection, using the contents of '_frontier'. + * Prepares the query to execute on the 'from' collection wrapped in a $match by using the + * contents of '_frontier'. * * Fills 'cached' with any values that were retrieved from the cache. * * Returns boost::none if no query is necessary, i.e., all values were retrieved from the cache. * Otherwise, returns a query object. */ - boost::optional<BSONObj> constructQuery(BSONObjSet* cached); + boost::optional<BSONObj> makeMatchStageFromFrontier(BSONObjSet* cached); /** * If we have internalized a $unwind, getNext() dispatches to this function. @@ -1837,6 +1872,10 @@ private: boost::optional<FieldPath> _depthField; boost::optional<long long> _maxDepth; + // The ExpressionContext used when performing aggregation pipelines against the '_from' + // namespace. + boost::intrusive_ptr<ExpressionContext> _fromExpCtx; + size_t _maxMemoryUsageBytes = 100 * 1024 * 1024; // Track memory usage to ensure we don't exceed '_maxMemoryUsageBytes'. diff --git a/src/mongo/db/pipeline/document_source_cursor.cpp b/src/mongo/db/pipeline/document_source_cursor.cpp index aef76129244..14911203482 100644 --- a/src/mongo/db/pipeline/document_source_cursor.cpp +++ b/src/mongo/db/pipeline/document_source_cursor.cpp @@ -37,6 +37,7 @@ #include "mongo/db/pipeline/document.h" #include "mongo/db/query/explain.h" #include "mongo/db/query/find_common.h" +#include "mongo/db/server_parameters.h" #include "mongo/db/storage/storage_options.h" #include "mongo/util/scopeguard.h" @@ -46,9 +47,13 @@ using boost::intrusive_ptr; using std::shared_ptr; using std::string; -DocumentSourceCursor::~DocumentSourceCursor() { - dispose(); -} +namespace { + +MONGO_EXPORT_SERVER_PARAMETER(internalDocumentSourceCursorBatchSizeBytes, + int, + FindCommon::kMaxBytesToReturnToClientAtOnce); + +} // namespace const char* DocumentSourceCursor::getSourceName() const { return "$cursor"; @@ -70,8 +75,6 @@ boost::optional<Document> DocumentSourceCursor::getNext() { } void DocumentSourceCursor::dispose() { - // Can't call in to PlanExecutor or ClientCursor registries from this function since it - // will be called when an agg cursor is killed which would cause a deadlock. _exec.reset(); _currentBatch.clear(); } @@ -113,7 +116,7 @@ void DocumentSourceCursor::loadBatch() { memUsageBytes += _currentBatch.back().getApproximateSize(); - if (memUsageBytes > FindCommon::kMaxBytesToReturnToClientAtOnce) { + if (memUsageBytes > internalDocumentSourceCursorBatchSizeBytes) { // End this batch and prepare PlanExecutor for yielding. _exec->saveState(); return; @@ -227,14 +230,26 @@ void DocumentSourceCursor::doInjectExpressionContext() { } } +void DocumentSourceCursor::detachFromOperationContext() { + if (_exec) { + _exec->detachFromOperationContext(); + } +} + +void DocumentSourceCursor::reattachToOperationContext(OperationContext* opCtx) { + if (_exec) { + _exec->reattachToOperationContext(opCtx); + } +} + DocumentSourceCursor::DocumentSourceCursor(const string& ns, - const std::shared_ptr<PlanExecutor>& exec, + std::unique_ptr<PlanExecutor> exec, const intrusive_ptr<ExpressionContext>& pCtx) : DocumentSource(pCtx), _docsAddedToBatches(0), _ns(ns), - _exec(exec), - _outputSorts(exec->getOutputSorts()) { + _exec(std::move(exec)), + _outputSorts(_exec->getOutputSorts()) { recordPlanSummaryStr(); // We record execution metrics here to allow for capture of indexes used prior to execution. @@ -243,9 +258,10 @@ DocumentSourceCursor::DocumentSourceCursor(const string& ns, intrusive_ptr<DocumentSourceCursor> DocumentSourceCursor::create( const string& ns, - const std::shared_ptr<PlanExecutor>& exec, + std::unique_ptr<PlanExecutor> exec, const intrusive_ptr<ExpressionContext>& pExpCtx) { - intrusive_ptr<DocumentSourceCursor> source(new DocumentSourceCursor(ns, exec, pExpCtx)); + intrusive_ptr<DocumentSourceCursor> source( + new DocumentSourceCursor(ns, std::move(exec), pExpCtx)); source->injectExpressionContext(pExpCtx); return source; } diff --git a/src/mongo/db/pipeline/document_source_graph_lookup.cpp b/src/mongo/db/pipeline/document_source_graph_lookup.cpp index 01887184512..9bba9e68317 100644 --- a/src/mongo/db/pipeline/document_source_graph_lookup.cpp +++ b/src/mongo/db/pipeline/document_source_graph_lookup.cpp @@ -152,7 +152,7 @@ void DocumentSourceGraphLookUp::doBreadthFirstSearch() { // Check whether each key in the frontier exists in the cache or needs to be queried. BSONObjSet cached; - auto query = constructQuery(&cached); + auto matchStage = makeMatchStageFromFrontier(&cached); ValueUnorderedSet queried = pExpCtx->getValueComparator().makeUnorderedValueSet(); _frontier->swap(queried); @@ -167,14 +167,13 @@ void DocumentSourceGraphLookUp::doBreadthFirstSearch() { checkMemoryUsage(); } - if (query) { + if (matchStage) { // Query for all keys that were in the frontier and not in the cache, populating // '_frontier' for the next iteration of search. - unique_ptr<DBClientCursor> cursor = _mongod->directClient()->query(_from.ns(), *query); - // Iterate the cursor. - while (cursor->more()) { - BSONObj result = cursor->nextSafe(); + auto pipeline = uassertStatusOK(_mongod->makePipeline({*matchStage}, _fromExpCtx)); + while (auto next = pipeline->output()->getNext()) { + BSONObj result = next->toBson(); shouldPerformAnotherQuery = addToVisitedAndFrontier(result.getOwned(), depth) || shouldPerformAnotherQuery; addToCache(result, queried); @@ -269,7 +268,7 @@ void DocumentSourceGraphLookUp::addToCache(const BSONObj& result, } } -boost::optional<BSONObj> DocumentSourceGraphLookUp::constructQuery(BSONObjSet* cached) { +boost::optional<BSONObj> DocumentSourceGraphLookUp::makeMatchStageFromFrontier(BSONObjSet* cached) { // Add any cached values to 'cached' and remove them from '_frontier'. for (auto it = _frontier->begin(); it != _frontier->end();) { if (auto entry = _cache[*it]) { @@ -289,28 +288,34 @@ boost::optional<BSONObj> DocumentSourceGraphLookUp::constructQuery(BSONObjSet* c } // Create a query of the form {$and: [_additionalFilter, {_connectToField: {$in: [...]}}]}. - BSONObjBuilder query; + // + // We wrap the query in a $match so that it can be parsed into a DocumentSourceMatch when + // constructing a pipeline to execute. + BSONObjBuilder match; { - BSONArrayBuilder andObj(query.subarrayStart("$and")); - if (_additionalFilter) { - andObj << *_additionalFilter; - } - + BSONObjBuilder query(match.subobjStart("$match")); { - BSONObjBuilder connectToObj(andObj.subobjStart()); + BSONArrayBuilder andObj(query.subarrayStart("$and")); + if (_additionalFilter) { + andObj << *_additionalFilter; + } + { - BSONObjBuilder subObj(connectToObj.subobjStart(_connectToField.fullPath())); + BSONObjBuilder connectToObj(andObj.subobjStart()); { - BSONArrayBuilder in(subObj.subarrayStart("$in")); - for (auto&& value : *_frontier) { - in << value; + BSONObjBuilder subObj(connectToObj.subobjStart(_connectToField.fullPath())); + { + BSONArrayBuilder in(subObj.subarrayStart("$in")); + for (auto&& value : *_frontier) { + in << value; + } } } } } } - return _frontier->empty() ? boost::none : boost::optional<BSONObj>(query.obj()); + return _frontier->empty() ? boost::none : boost::optional<BSONObj>(match.obj()); } void DocumentSourceGraphLookUp::performSearch() { @@ -411,10 +416,19 @@ void DocumentSourceGraphLookUp::serializeToArray(std::vector<Value>& array, bool } void DocumentSourceGraphLookUp::doInjectExpressionContext() { + _fromExpCtx = pExpCtx->copyWith(_from); _frontier = pExpCtx->getValueComparator().makeUnorderedValueSet(); _visited = pExpCtx->getValueComparator().makeUnorderedValueMap<BSONObj>(); } +void DocumentSourceGraphLookUp::doDetachFromOperationContext() { + _fromExpCtx->opCtx = nullptr; +} + +void DocumentSourceGraphLookUp::doReattachToOperationContext(OperationContext* opCtx) { + _fromExpCtx->opCtx = opCtx; +} + DocumentSourceGraphLookUp::DocumentSourceGraphLookUp( NamespaceString from, std::string as, diff --git a/src/mongo/db/pipeline/document_source_lookup.cpp b/src/mongo/db/pipeline/document_source_lookup.cpp index 5be6239cb45..dc413d8fff8 100644 --- a/src/mongo/db/pipeline/document_source_lookup.cpp +++ b/src/mongo/db/pipeline/document_source_lookup.cpp @@ -110,20 +110,20 @@ boost::optional<Document> DocumentSourceLookUp::getNext() { // '_handlingUnwind' would be set to true, and we would not have made it here. invariant(!_matchSrc); - BSONObj query = queryForInput(*input, _localField, _foreignFieldFieldName, BSONObj()); - std::unique_ptr<DBClientCursor> cursor = _mongod->directClient()->query(_fromNs.ns(), query); + auto matchStage = + makeMatchStageFromInput(*input, _localField, _foreignFieldFieldName, BSONObj()); + auto pipeline = uassertStatusOK(_mongod->makePipeline({matchStage}, _fromExpCtx)); std::vector<Value> results; int objsize = 0; - while (cursor->more()) { - BSONObj result = cursor->nextSafe(); - objsize += result.objsize(); + while (auto result = pipeline->output()->getNext()) { + objsize += result->getApproximateSize(); uassert(4568, str::stream() << "Total size of documents in " << _fromNs.coll() << " matching " - << query + << matchStage << " exceeds maximum document size", objsize <= BSONObjMaxInternalSize); - results.push_back(Value(result)); + results.emplace_back(std::move(*result)); } MutableDocument output(std::move(*input)); @@ -264,14 +264,14 @@ Pipeline::SourceContainer::iterator DocumentSourceLookUp::optimizeAt( } void DocumentSourceLookUp::dispose() { - _cursor.reset(); + _pipeline.reset(); pSource->dispose(); } -BSONObj DocumentSourceLookUp::queryForInput(const Document& input, - const FieldPath& localFieldPath, - const std::string& foreignFieldName, - const BSONObj& additionalFilter) { +BSONObj DocumentSourceLookUp::makeMatchStageFromInput(const Document& input, + const FieldPath& localFieldPath, + const std::string& foreignFieldName, + const BSONObj& additionalFilter) { Value localFieldVal = input.getNestedField(localFieldPath); // Missing values are treated as null. @@ -279,38 +279,53 @@ BSONObj DocumentSourceLookUp::queryForInput(const Document& input, localFieldVal = Value(BSONNULL); } - // We are constructing a query of one of the following forms: - // {$and: [{<foreignFieldName>: {$eq: <localFieldVal>}}, <additionalFilter>]} - // {$and: [{<foreignFieldName>: {$in: [<value>, <value>, ...]}}, <additionalFilter>]} - // {$and: [{$or: [{<foreignFieldName>: {$eq: <value>}}, - // {<foreignFieldName>: {$eq: <value>}}, ...]}, - // <additionalFilter>]} - - BSONObjBuilder query; + // We construct a query of one of the following forms, depending on the contents of + // 'localFieldVal'. + // + // {$and: [{<foreignFieldName>: {$eq: <localFieldVal>}}, <additionalFilter>]} + // if 'localFieldVal' isn't an array value. + // + // {$and: [{<foreignFieldName>: {$in: [<value>, <value>, ...]}}, <additionalFilter>]} + // if 'localFieldVal' is an array value but doesn't contain any elements that are regular + // expressions. + // + // {$and: [{$or: [{<foreignFieldName>: {$eq: <value>}}, + // {<foreignFieldName>: {$eq: <value>}}, ...]}, + // <additionalFilter>]} + // if 'localFieldVal' is an array value and it contains at least one element that is a + // regular expression. + + // We wrap the query in a $match so that it can be parsed into a DocumentSourceMatch when + // constructing a pipeline to execute. + BSONObjBuilder match; + BSONObjBuilder query(match.subobjStart("$match")); BSONArrayBuilder andObj(query.subarrayStart("$and")); BSONObjBuilder joiningObj(andObj.subobjStart()); if (localFieldVal.isArray()) { - // Assume an array value logically corresponds to many documents, rather than logically - // corresponding to one document with an array value. + // A $lookup on an array value corresponds to finding documents in the foreign collection + // that have a value of any of the elements in the array value, rather than finding + // documents that have a value equal to the entire array value. These semantics are + // automatically provided to us by using the $in query operator. const vector<Value>& localArray = localFieldVal.getArray(); const bool containsRegex = std::any_of( localArray.begin(), localArray.end(), [](Value val) { return val.getType() == RegEx; }); if (containsRegex) { - // A regex inside of an $in will not be treated as an equality comparison, so use an - // $or. + // A regular expression inside the $in query operator will perform pattern matching on + // any string values. Since we want regular expressions to only match other RegEx types, + // we write the query as a $or of equality comparisons instead. BSONObj orQuery = buildEqualityOrQuery(foreignFieldName, localFieldVal.getArray()); joiningObj.appendElements(orQuery); } else { - // { _foreignFieldFieldName : { "$in" : localFieldValue } } + // { <foreignFieldName> : { "$in" : <localFieldVal> } } BSONObjBuilder subObj(joiningObj.subobjStart(foreignFieldName)); subObj << "$in" << localFieldVal; subObj.doneFast(); } } else { - // { _foreignFieldFieldName : { "$eq" : localFieldValue } } + // { <foreignFieldName> : { "$eq" : <localFieldVal> } } BSONObjBuilder subObj(joiningObj.subobjStart(foreignFieldName)); subObj << "$eq" << localFieldVal; subObj.doneFast(); @@ -324,7 +339,8 @@ BSONObj DocumentSourceLookUp::queryForInput(const Document& input, andObj.doneFast(); - return query.obj(); + query.doneFast(); + return match.obj(); } boost::optional<Document> DocumentSourceLookUp::unwindResult() { @@ -333,24 +349,25 @@ boost::optional<Document> DocumentSourceLookUp::unwindResult() { // Loop until we get a document that has at least one match. // Note we may return early from this loop if our source stage is exhausted or if the unwind // source was asked to return empty arrays and we get a document without a match. - while (!_cursor || !_cursor->more()) { + while (!_pipeline || !_nextValue) { _input = pSource->getNext(); if (!_input) return {}; BSONObj filter = _additionalFilter.value_or(BSONObj()); - _cursor = _mongod->directClient()->query( - _fromNs.ns(), - DocumentSourceLookUp::queryForInput( - *_input, _localField, _foreignFieldFieldName, filter)); + auto matchStage = + makeMatchStageFromInput(*_input, _localField, _foreignFieldFieldName, filter); + _pipeline = uassertStatusOK(_mongod->makePipeline({matchStage}, _fromExpCtx)); + _cursorIndex = 0; + _nextValue = _pipeline->output()->getNext(); - if (_unwindSrc->preserveNullAndEmptyArrays() && !_cursor->more()) { + if (_unwindSrc->preserveNullAndEmptyArrays() && !_nextValue) { // There were no results for this cursor, but the $unwind was asked to preserve empty // arrays, so we should return a document without the array. MutableDocument output(std::move(*_input)); - // Note this will correctly objects in the prefix of '_as', to act as if we had created - // an empty array and then removed it. + // Note this will correctly create objects in the prefix of '_as', to act as if we had + // created an empty array and then removed it. output.setNestedField(_as, Value()); if (indexPath) { output.setNestedField(*indexPath, Value(BSONNULL)); @@ -358,18 +375,20 @@ boost::optional<Document> DocumentSourceLookUp::unwindResult() { return output.freeze(); } } - invariant(_cursor->more() && bool(_input)); - auto nextVal = Value(_cursor->nextSafe()); + + invariant(bool(_input) && bool(_nextValue)); + auto currentValue = *_nextValue; + _nextValue = _pipeline->output()->getNext(); // Move input document into output if this is the last or only result, otherwise perform a copy. - MutableDocument output(_cursor->more() ? *_input : std::move(*_input)); - output.setNestedField(_as, nextVal); + MutableDocument output(_nextValue ? *_input : std::move(*_input)); + output.setNestedField(_as, Value(currentValue)); if (indexPath) { output.setNestedField(*indexPath, Value(_cursorIndex)); } - _cursorIndex++; + ++_cursorIndex; return output.freeze(); } @@ -420,6 +439,32 @@ DocumentSource::GetDepsReturn DocumentSourceLookUp::getDependencies(DepsTracker* return SEE_NEXT; } +void DocumentSourceLookUp::doInjectExpressionContext() { + _fromExpCtx = pExpCtx->copyWith(_fromNs); +} + +void DocumentSourceLookUp::doDetachFromOperationContext() { + if (_pipeline) { + // We have a pipeline we're going to be executing across multiple calls to getNext(), so we + // use Pipeline::detachFromOperationContext() to take care of updating '_fromExpCtx->opCtx'. + _pipeline->detachFromOperationContext(); + invariant(_fromExpCtx->opCtx == nullptr); + } else { + _fromExpCtx->opCtx = nullptr; + } +} + +void DocumentSourceLookUp::doReattachToOperationContext(OperationContext* opCtx) { + if (_pipeline) { + // We have a pipeline we're going to be executing across multiple calls to getNext(), so we + // use Pipeline::reattachToOperationContext() to take care of updating '_fromExpCtx->opCtx'. + _pipeline->reattachToOperationContext(opCtx); + invariant(_fromExpCtx->opCtx == opCtx); + } else { + _fromExpCtx->opCtx = opCtx; + } +} + intrusive_ptr<DocumentSource> DocumentSourceLookUp::createFromBson( BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx) { uassert(4569, "the $lookup specification must be an Object", elem.type() == Object); diff --git a/src/mongo/db/pipeline/document_source_test.cpp b/src/mongo/db/pipeline/document_source_test.cpp index 7cec430b264..3076eed189d 100644 --- a/src/mongo/db/pipeline/document_source_test.cpp +++ b/src/mongo/db/pipeline/document_source_test.cpp @@ -398,42 +398,44 @@ public: namespace DocumentSourceLookup { -TEST(QueryForInput, NonArrayValueUsesEqQuery) { +TEST(MakeMatchStageFromInput, NonArrayValueUsesEqQuery) { Document input = DOC("local" << 1); - BSONObj query = - DocumentSourceLookUp::queryForInput(input, FieldPath("local"), "foreign", BSONObj()); - ASSERT_EQ(query, fromjson("{$and: [{foreign: {$eq: 1}}, {}]}")); + BSONObj matchStage = DocumentSourceLookUp::makeMatchStageFromInput( + input, FieldPath("local"), "foreign", BSONObj()); + ASSERT_EQ(matchStage, fromjson("{$match: {$and: [{foreign: {$eq: 1}}, {}]}}")); } -TEST(QueryForInput, RegexValueUsesEqQuery) { +TEST(MakeMatchStageFromInput, RegexValueUsesEqQuery) { BSONRegEx regex("^a"); Document input = DOC("local" << Value(regex)); - BSONObj query = - DocumentSourceLookUp::queryForInput(input, FieldPath("local"), "foreign", BSONObj()); - ASSERT_EQ(query, - BSON("$and" << BSON_ARRAY(BSON("foreign" << BSON("$eq" << regex)) << BSONObj()))); + BSONObj matchStage = DocumentSourceLookUp::makeMatchStageFromInput( + input, FieldPath("local"), "foreign", BSONObj()); + ASSERT_EQ(matchStage, + BSON("$match" << BSON("$and" << BSON_ARRAY(BSON("foreign" << BSON("$eq" << regex)) + << BSONObj())))); } -TEST(QueryForInput, ArrayValueUsesInQuery) { +TEST(MakeMatchStageFromInput, ArrayValueUsesInQuery) { vector<Value> inputArray = {Value(1), Value(2)}; Document input = DOC("local" << Value(inputArray)); - BSONObj query = - DocumentSourceLookUp::queryForInput(input, FieldPath("local"), "foreign", BSONObj()); - ASSERT_EQ(query, fromjson("{$and: [{foreign: {$in: [1, 2]}}, {}]}")); + BSONObj matchStage = DocumentSourceLookUp::makeMatchStageFromInput( + input, FieldPath("local"), "foreign", BSONObj()); + ASSERT_EQ(matchStage, fromjson("{$match: {$and: [{foreign: {$in: [1, 2]}}, {}]}}")); } -TEST(QueryForInput, ArrayValueWithRegexUsesOrQuery) { +TEST(MakeMatchStageFromInput, ArrayValueWithRegexUsesOrQuery) { BSONRegEx regex("^a"); vector<Value> inputArray = {Value(1), Value(regex), Value(2)}; Document input = DOC("local" << Value(inputArray)); - BSONObj query = - DocumentSourceLookUp::queryForInput(input, FieldPath("local"), "foreign", BSONObj()); - ASSERT_EQ(query, - BSON("$and" << BSON_ARRAY( - BSON("$or" << BSON_ARRAY(BSON("foreign" << BSON("$eq" << Value(1))) - << BSON("foreign" << BSON("$eq" << regex)) - << BSON("foreign" << BSON("$eq" << Value(2))))) - << BSONObj()))); + BSONObj matchStage = DocumentSourceLookUp::makeMatchStageFromInput( + input, FieldPath("local"), "foreign", BSONObj()); + ASSERT_EQ(matchStage, + BSON("$match" << BSON( + "$and" << BSON_ARRAY( + BSON("$or" << BSON_ARRAY(BSON("foreign" << BSON("$eq" << Value(1))) + << BSON("foreign" << BSON("$eq" << regex)) + << BSON("foreign" << BSON("$eq" << Value(2))))) + << BSONObj())))); } } // namespace DocumentSourceLookUp diff --git a/src/mongo/db/pipeline/expression_context.cpp b/src/mongo/db/pipeline/expression_context.cpp index 5992b8aa708..494dbdb31ae 100644 --- a/src/mongo/db/pipeline/expression_context.cpp +++ b/src/mongo/db/pipeline/expression_context.cpp @@ -33,6 +33,8 @@ namespace mongo { +using boost::intrusive_ptr; + ExpressionContext::ExpressionContext(OperationContext* opCtx, const AggregationRequest& request) : isExplain(request.isExplain()), inShard(request.isFromRouter()), @@ -65,4 +67,29 @@ void ExpressionContext::setCollator(std::unique_ptr<CollatorInterface> coll) { _valueComparator = ValueComparator(_collator.get()); } +intrusive_ptr<ExpressionContext> ExpressionContext::copyWith(NamespaceString ns) const { + intrusive_ptr<ExpressionContext> expCtx = new ExpressionContext(); + + expCtx->isExplain = isExplain; + expCtx->inShard = inShard; + expCtx->inRouter = inRouter; + expCtx->extSortAllowed = extSortAllowed; + expCtx->bypassDocumentValidation = bypassDocumentValidation; + + expCtx->ns = std::move(ns); + expCtx->tempDir = tempDir; + + expCtx->opCtx = opCtx; + + expCtx->collation = collation; + if (_collator) { + expCtx->setCollator(_collator->clone()); + } + + // Note that we intentionally skip copying the value of 'interruptCounter' because 'expCtx' is + // intended to be used for executing a separate aggregation pipeline. + + return expCtx; +} + } // namespace mongo diff --git a/src/mongo/db/pipeline/expression_context.h b/src/mongo/db/pipeline/expression_context.h index 285318f7f7a..27708590b18 100644 --- a/src/mongo/db/pipeline/expression_context.h +++ b/src/mongo/db/pipeline/expression_context.h @@ -68,6 +68,12 @@ public: return _valueComparator; } + /** + * Returns an ExpressionContext that is identical to 'this' that can be used to execute a + * separate aggregation pipeline on 'ns'. + */ + boost::intrusive_ptr<ExpressionContext> copyWith(NamespaceString ns) const; + bool isExplain = false; bool inShard = false; bool inRouter = false; @@ -81,7 +87,7 @@ public: // Collation requested by the user for this pipeline. Empty if the user did not request a // collation. - const BSONObj collation; + BSONObj collation; static const int kInterruptCheckPeriod = 128; int interruptCounter = kInterruptCheckPeriod; // when 0, check interruptStatus diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp index a522d9cfefd..a1e77179f2f 100644 --- a/src/mongo/db/pipeline/pipeline_d.cpp +++ b/src/mongo/db/pipeline/pipeline_d.cpp @@ -163,6 +163,28 @@ public: str::stream() << "renameCollection failed: " << info}; } + StatusWith<boost::intrusive_ptr<Pipeline>> makePipeline( + const std::vector<BSONObj>& rawPipeline, + const boost::intrusive_ptr<ExpressionContext>& expCtx) final { + // 'expCtx' may represent the settings for an aggregation pipeline on a different namespace + // than the DocumentSource this MongodImplementation is injected into, but both + // ExpressionContext instances should still have the same OperationContext. + invariant(_ctx->opCtx == expCtx->opCtx); + + auto pipeline = Pipeline::parse(rawPipeline, expCtx); + if (!pipeline.isOK()) { + return pipeline.getStatus(); + } + + pipeline.getValue()->injectExpressionContext(expCtx); + pipeline.getValue()->optimizePipeline(); + + AutoGetCollectionForRead autoColl(expCtx->opCtx, expCtx->ns); + PipelineD::prepareCursorSource(autoColl.getCollection(), pipeline.getValue()); + + return pipeline; + } + private: intrusive_ptr<ExpressionContext> _ctx; DBDirectClient _client; @@ -282,21 +304,20 @@ StatusWith<std::unique_ptr<PlanExecutor>> attemptToGetExecutor( } } // namespace -shared_ptr<PlanExecutor> PipelineD::prepareCursorSource( - OperationContext* txn, - Collection* collection, - const NamespaceString& nss, - const intrusive_ptr<Pipeline>& pPipeline, - const intrusive_ptr<ExpressionContext>& pExpCtx) { +void PipelineD::prepareCursorSource(Collection* collection, + const intrusive_ptr<Pipeline>& pipeline) { + auto expCtx = pipeline->getContext(); + dassert(expCtx->opCtx->lockState()->isCollectionLockedForMode(expCtx->ns.ns(), MODE_IS)); + // We will be modifying the source vector as we go. - Pipeline::SourceContainer& sources = pPipeline->_sources; + Pipeline::SourceContainer& sources = pipeline->_sources; // Inject a MongodImplementation to sources that need them. for (auto&& source : sources) { DocumentSourceNeedsMongod* needsMongod = dynamic_cast<DocumentSourceNeedsMongod*>(source.get()); if (needsMongod) { - needsMongod->injectMongodInterface(std::make_shared<MongodImplementation>(pExpCtx)); + needsMongod->injectMongodInterface(std::make_shared<MongodImplementation>(expCtx)); } } @@ -309,35 +330,36 @@ shared_ptr<PlanExecutor> PipelineD::prepareCursorSource( // on secondaries, this is needed. ShardedConnectionInfo::addHook(); } - return std::shared_ptr<PlanExecutor>(); // don't need a cursor + return; // don't need a cursor } auto sampleStage = dynamic_cast<DocumentSourceSample*>(sources.front().get()); // Optimize an initial $sample stage if possible. if (collection && sampleStage) { const long long sampleSize = sampleStage->getSampleSize(); - const long long numRecords = collection->getRecordStore()->numRecords(txn); + const long long numRecords = collection->getRecordStore()->numRecords(expCtx->opCtx); auto exec = uassertStatusOK( - createRandomCursorExecutor(collection, txn, sampleSize, numRecords)); + createRandomCursorExecutor(collection, expCtx->opCtx, sampleSize, numRecords)); if (exec) { // Replace $sample stage with $sampleFromRandomCursor stage. sources.pop_front(); std::string idString = collection->ns().isOplog() ? "ts" : "_id"; sources.emplace_front(DocumentSourceSampleFromRandomCursor::create( - pExpCtx, sampleSize, idString, numRecords)); + expCtx, sampleSize, idString, numRecords)); - return addCursorSource( - pPipeline, - pExpCtx, + addCursorSource( + pipeline, + expCtx, std::move(exec), - pPipeline->getDependencies(DepsTracker::MetadataAvailable::kNoMetadata)); + pipeline->getDependencies(DepsTracker::MetadataAvailable::kNoMetadata)); + return; } } } // Look for an initial match. This works whether we got an initial query or not. If not, it // results in a "{}" query, which will be what we want in that case. - const BSONObj queryObj = pPipeline->getInitialQuery(); + const BSONObj queryObj = pipeline->getInitialQuery(); if (!queryObj.isEmpty()) { if (dynamic_cast<DocumentSourceMatch*>(sources.front().get())) { // If a $match query is pulled into the cursor, the $match is redundant, and can be @@ -351,9 +373,9 @@ shared_ptr<PlanExecutor> PipelineD::prepareCursorSource( } // Find the set of fields in the source documents depended on by this pipeline. - DepsTracker deps = pPipeline->getDependencies( - DocumentSourceMatch::isTextQuery(queryObj) ? DepsTracker::MetadataAvailable::kTextScore - : DepsTracker::MetadataAvailable::kNoMetadata); + DepsTracker deps = pipeline->getDependencies(DocumentSourceMatch::isTextQuery(queryObj) + ? DepsTracker::MetadataAvailable::kTextScore + : DepsTracker::MetadataAvailable::kNoMetadata); BSONObj projForQuery = deps.toProjection(); @@ -375,19 +397,18 @@ shared_ptr<PlanExecutor> PipelineD::prepareCursorSource( } // Create the PlanExecutor. - auto exec = uassertStatusOK(prepareExecutor(txn, + auto exec = uassertStatusOK(prepareExecutor(expCtx->opCtx, collection, - nss, - pPipeline, - pExpCtx, + expCtx->ns, + pipeline, + expCtx, sortStage, deps, queryObj, &sortObj, &projForQuery)); - return addCursorSource( - pPipeline, pExpCtx, std::move(exec), deps, queryObj, sortObj, projForQuery); + addCursorSource(pipeline, expCtx, std::move(exec), deps, queryObj, sortObj, projForQuery); } StatusWith<std::unique_ptr<PlanExecutor>> PipelineD::prepareExecutor( @@ -509,23 +530,22 @@ StatusWith<std::unique_ptr<PlanExecutor>> PipelineD::prepareExecutor( txn, collection, expCtx, queryObj, *projectionObj, *sortObj, plannerOpts); } -shared_ptr<PlanExecutor> PipelineD::addCursorSource(const intrusive_ptr<Pipeline>& pipeline, - const intrusive_ptr<ExpressionContext>& expCtx, - unique_ptr<PlanExecutor> exec, - DepsTracker deps, - const BSONObj& queryObj, - const BSONObj& sortObj, - const BSONObj& projectionObj) { +void PipelineD::addCursorSource(const intrusive_ptr<Pipeline>& pipeline, + const intrusive_ptr<ExpressionContext>& expCtx, + unique_ptr<PlanExecutor> exec, + DepsTracker deps, + const BSONObj& queryObj, + const BSONObj& sortObj, + const BSONObj& projectionObj) { // Get the full "namespace" name. const string& fullName = expCtx->ns.ns(); - // We convert the unique_ptr to a shared_ptr because both the PipelineProxyStage and the - // DocumentSourceCursor need to reference the PlanExecutor. - std::shared_ptr<PlanExecutor> sharedExec(std::move(exec)); + // DocumentSourceCursor expects a yielding PlanExecutor that has had its state saved. + exec->saveState(); // Put the PlanExecutor into a DocumentSourceCursor and add it to the front of the pipeline. intrusive_ptr<DocumentSourceCursor> pSource = - DocumentSourceCursor::create(fullName, sharedExec, expCtx); + DocumentSourceCursor::create(fullName, std::move(exec), expCtx); // Note the query, sort, and projection for explain. pSource->setQuery(queryObj); @@ -552,13 +572,6 @@ shared_ptr<PlanExecutor> PipelineD::addCursorSource(const intrusive_ptr<Pipeline // case the new stage can be absorbed with the first stages of the pipeline. pipeline->addInitialSource(pSource); pipeline->optimizePipeline(); - - // DocumentSourceCursor expects a yielding PlanExecutor that has had its state saved. We - // deregister the PlanExecutor so that it can be registered with ClientCursor. - sharedExec->deregisterExec(); - sharedExec->saveState(); - - return sharedExec; } std::string PipelineD::getPlanSummaryStr(const boost::intrusive_ptr<Pipeline>& pPipeline) { diff --git a/src/mongo/db/pipeline/pipeline_d.h b/src/mongo/db/pipeline/pipeline_d.h index 76214db4925..35d93a4cfd2 100644 --- a/src/mongo/db/pipeline/pipeline_d.h +++ b/src/mongo/db/pipeline/pipeline_d.h @@ -70,20 +70,10 @@ public: * * The cursor is added to the front of the pipeline's sources. * - * Must have a AutoGetCollectionForRead before entering. - * - * If the returned PlanExecutor is non-null, you are responsible for ensuring - * it receives appropriate invalidate and kill messages. - * - * @param pPipeline the logical "this" for this operation - * @param pExpCtx the expression context for this pipeline + * Callers must take care to ensure that 'collection' is locked in at least IS-mode. */ - static std::shared_ptr<PlanExecutor> prepareCursorSource( - OperationContext* txn, - Collection* collection, - const NamespaceString& nss, - const boost::intrusive_ptr<Pipeline>& pPipeline, - const boost::intrusive_ptr<ExpressionContext>& pExpCtx); + static void prepareCursorSource(Collection* collection, + const boost::intrusive_ptr<Pipeline>& pipeline); static std::string getPlanSummaryStr(const boost::intrusive_ptr<Pipeline>& pPipeline); @@ -118,14 +108,13 @@ private: * Creates a DocumentSourceCursor from the given PlanExecutor and adds it to the front of the * Pipeline. */ - static std::shared_ptr<PlanExecutor> addCursorSource( - const boost::intrusive_ptr<Pipeline>& pipeline, - const boost::intrusive_ptr<ExpressionContext>& expCtx, - std::unique_ptr<PlanExecutor> exec, - DepsTracker deps, - const BSONObj& queryObj = BSONObj(), - const BSONObj& sortObj = BSONObj(), - const BSONObj& projectionObj = BSONObj()); + static void addCursorSource(const boost::intrusive_ptr<Pipeline>& pipeline, + const boost::intrusive_ptr<ExpressionContext>& expCtx, + std::unique_ptr<PlanExecutor> exec, + DepsTracker deps, + const BSONObj& queryObj = BSONObj(), + const BSONObj& sortObj = BSONObj(), + const BSONObj& projectionObj = BSONObj()); }; } // namespace mongo diff --git a/src/mongo/db/query/plan_executor.cpp b/src/mongo/db/query/plan_executor.cpp index 395e84ce1a7..3b4159facbc 100644 --- a/src/mongo/db/query/plan_executor.cpp +++ b/src/mongo/db/query/plan_executor.cpp @@ -517,29 +517,6 @@ void PlanExecutor::deregisterExec() { void PlanExecutor::kill(string reason) { _killReason = std::move(reason); - - // XXX: PlanExecutor is designed to wrap a single execution tree. In the case of - // aggregation queries, PlanExecutor wraps a proxy stage responsible for pulling results - // from an aggregation pipeline. The aggregation pipeline pulls results from yet another - // PlanExecutor. Such nested PlanExecutors require us to manually propagate kill() to - // the "inner" executor. This is bad, and hopefully can be fixed down the line with the - // unification of agg and query. - // - // The CachedPlanStage is another special case. It needs to update the plan cache from - // its destructor. It needs to know whether it has been killed so that it can avoid - // touching a potentially invalid plan cache in this case. - // - // TODO: get rid of this code block. - { - PlanStage* foundStage = getStageByType(_root.get(), STAGE_PIPELINE_PROXY); - if (foundStage) { - PipelineProxyStage* proxyStage = static_cast<PipelineProxyStage*>(foundStage); - shared_ptr<PlanExecutor> childExec = proxyStage->getChildExecutor(); - if (childExec) { - childExec->kill(*_killReason); - } - } - } } Status PlanExecutor::executePlan() { diff --git a/src/mongo/dbtests/documentsourcetests.cpp b/src/mongo/dbtests/documentsourcetests.cpp index c98dcdd7a53..dab5b9b0f26 100644 --- a/src/mongo/dbtests/documentsourcetests.cpp +++ b/src/mongo/dbtests/documentsourcetests.cpp @@ -93,7 +93,6 @@ protected: void createSource(boost::optional<BSONObj> hint = boost::none) { // clean up first if this was called before _source.reset(); - _exec.reset(); OldClientWriteContext ctx(&_opCtx, nss.ns()); @@ -104,13 +103,13 @@ protected: auto cq = uassertStatusOK(CanonicalQuery::canonicalize( &_opCtx, std::move(qr), ExtensionsCallbackDisallowExtensions())); - _exec = uassertStatusOK( + auto exec = uassertStatusOK( getExecutor(&_opCtx, ctx.getCollection(), std::move(cq), PlanExecutor::YIELD_MANUAL)); - _exec->saveState(); - _exec->registerExec(ctx.getCollection()); + exec->saveState(); + exec->registerExec(ctx.getCollection()); - _source = DocumentSourceCursor::create(nss.ns(), _exec, _ctx); + _source = DocumentSourceCursor::create(nss.ns(), std::move(exec), _ctx); } intrusive_ptr<ExpressionContext> ctx() { @@ -123,7 +122,6 @@ protected: private: // It is important that these are ordered to ensure correct destruction order. - std::shared_ptr<PlanExecutor> _exec; intrusive_ptr<ExpressionContext> _ctx; intrusive_ptr<DocumentSourceCursor> _source; }; diff --git a/src/mongo/dbtests/query_plan_executor.cpp b/src/mongo/dbtests/query_plan_executor.cpp index 959a9fed617..eded231bd75 100644 --- a/src/mongo/dbtests/query_plan_executor.cpp +++ b/src/mongo/dbtests/query_plan_executor.cpp @@ -44,6 +44,7 @@ #include "mongo/db/json.h" #include "mongo/db/matcher/expression_parser.h" #include "mongo/db/matcher/extensions_callback_disallow_extensions.h" +#include "mongo/db/pipeline/document_source.h" #include "mongo/db/pipeline/expression_context.h" #include "mongo/db/pipeline/pipeline.h" #include "mongo/db/query/plan_executor.h" @@ -278,36 +279,47 @@ public: BSONObj indexSpec = BSON("a" << 1); addIndex(indexSpec); - // Create the PlanExecutor which feeds the aggregation pipeline. - std::shared_ptr<PlanExecutor> innerExec(makeIndexScanExec(ctx.db(), indexSpec, 7, 10)); + Collection* collection = ctx.getCollection(); // Create the aggregation pipeline. std::vector<BSONObj> rawPipeline = {fromjson("{$match: {a: {$gte: 7, $lte: 10}}}")}; - boost::intrusive_ptr<ExpressionContext> expCtx = new ExpressionContext( - &_txn, AggregationRequest(NamespaceString(nss.ns()), rawPipeline)); + boost::intrusive_ptr<ExpressionContext> expCtx = + new ExpressionContext(&_txn, AggregationRequest(nss, rawPipeline)); + + // Create an "inner" plan executor and register it with the cursor manager so that it can + // get notified when the collection is dropped. + unique_ptr<PlanExecutor> innerExec(makeIndexScanExec(ctx.db(), indexSpec, 7, 10)); + registerExec(innerExec.get()); - auto statusWithPipeline = Pipeline::parse(rawPipeline, expCtx); - auto pipeline = assertGet(statusWithPipeline); + // Wrap the "inner" plan executor in a DocumentSourceCursor and add it as the first source + // in the pipeline. + innerExec->saveState(); + auto cursorSource = DocumentSourceCursor::create(nss.ns(), std::move(innerExec), expCtx); + auto pipeline = assertGet(Pipeline::create({cursorSource}, expCtx)); // Create the output PlanExecutor that pulls results from the pipeline. auto ws = make_unique<WorkingSet>(); - auto proxy = make_unique<PipelineProxyStage>(&_txn, pipeline, innerExec, ws.get()); - Collection* collection = ctx.getCollection(); + auto proxy = make_unique<PipelineProxyStage>(&_txn, pipeline, ws.get()); auto statusWithPlanExecutor = PlanExecutor::make( &_txn, std::move(ws), std::move(proxy), collection, PlanExecutor::YIELD_MANUAL); ASSERT_OK(statusWithPlanExecutor.getStatus()); unique_ptr<PlanExecutor> outerExec = std::move(statusWithPlanExecutor.getValue()); - // Only the outer executor gets registered. + // Register the "outer" plan executor with the cursor manager so it can get notified when + // the collection is dropped. registerExec(outerExec.get()); - // Verify that both the "inner" and "outer" plan executors have been killed after - // dropping the collection. - BSONObj objOut; dropCollection(); - ASSERT_EQUALS(PlanExecutor::DEAD, innerExec->getNext(&objOut, NULL)); - ASSERT_EQUALS(PlanExecutor::DEAD, outerExec->getNext(&objOut, NULL)); + + // Verify that the aggregation pipeline returns an error because its "inner" plan executor + // has been killed due to the collection being dropped. + ASSERT_THROWS_CODE(pipeline->output()->getNext(), UserException, 16028); + + // Verify that the "outer" plan executor has been killed due to the collection being + // dropped. + BSONObj objOut; + ASSERT_EQUALS(PlanExecutor::DEAD, outerExec->getNext(&objOut, nullptr)); deregisterExec(outerExec.get()); } |