diff options
author | Charlie Swanson <charlie.swanson@mongodb.com> | 2016-10-27 14:48:12 -0400 |
---|---|---|
committer | Charlie Swanson <charlie.swanson@mongodb.com> | 2016-11-07 11:51:19 -0500 |
commit | fcf04452bcfb0b169380743f7041308f397e2196 (patch) | |
tree | 16e3e5a22a9b8c1f72b4a61146b16664051a12c5 | |
parent | f3a4d0914fca1ad2fc9a659f4ece42f9dcf0b2c2 (diff) | |
download | mongo-fcf04452bcfb0b169380743f7041308f397e2196.tar.gz |
SERVER-24386 Use a valid OperationContext when killing $lookup's cursor
-rw-r--r-- | jstests/aggregation/bugs/lookup_unwind_getmore.js | 31 | ||||
-rw-r--r-- | jstests/aggregation/bugs/lookup_unwind_killcursor.js | 127 | ||||
-rw-r--r-- | src/mongo/db/pipeline/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source.h | 6 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_lookup.cpp | 59 | ||||
-rw-r--r-- | src/mongo/db/server_options_helpers.cpp | 12 |
6 files changed, 219 insertions, 17 deletions
diff --git a/jstests/aggregation/bugs/lookup_unwind_getmore.js b/jstests/aggregation/bugs/lookup_unwind_getmore.js index 6c8d886b78f..0650b2b940b 100644 --- a/jstests/aggregation/bugs/lookup_unwind_getmore.js +++ b/jstests/aggregation/bugs/lookup_unwind_getmore.js @@ -7,27 +7,30 @@ (function() { 'use strict'; - // We use a batch size of 1 to ensure that the mongo shell issues a getMore when unwinding the + // We use a batch size of 2 to ensure that the mongo shell issues a getMore when unwinding the // results from the 'dest' collection for the same document in the 'source' collection under a // different OperationContext. - const batchSize = 1; + const batchSize = 2; - db.source.drop(); - db.dest.drop(); + const conn = + MongoRunner.runMongod({setParameter: "internalAggregationLookupBatchSize=" + batchSize}); + assert.neq(null, conn, "mongod failed to start up"); + const testDB = conn.getDB("test"); - assert.writeOK(db.source.insert({local: 1})); + testDB.source.drop(); + testDB.dest.drop(); - // We insert documents in the 'dest' collection such that their combined size is greater than - // 16MB in order to ensure that the DBDirectClient used by the $lookup stage issues a getMore - // under a different OperationContext. - const numMatches = 3; - const largeStr = new Array(6 * 1024 * 1024 + 1).join('x'); + assert.writeOK(testDB.source.insert({local: 1})); + // The cursor batching logic actually requests one more document than it needs to fill the + // first batch, so if we want to leave the $lookup stage paused with a cursor open we'll + // need two more matching documents than the batch size. + const numMatches = batchSize + 2; for (var i = 0; i < numMatches; ++i) { - assert.writeOK(db.dest.insert({foreign: 1, largeStr: largeStr})); + assert.writeOK(testDB.dest.insert({foreign: 1})); } - var res = db.runCommand({ + var res = testDB.runCommand({ aggregate: 'source', pipeline: [ { @@ -50,6 +53,8 @@ }); assert.commandWorked(res); - var cursor = new DBCommandCursor(db.getMongo(), res, batchSize); + var cursor = new DBCommandCursor(conn, res, batchSize); assert.eq(numMatches, cursor.itcount()); + + assert.eq(0, MongoRunner.stopMongod(conn), "expected mongod to shutdown cleanly"); })(); diff --git a/jstests/aggregation/bugs/lookup_unwind_killcursor.js b/jstests/aggregation/bugs/lookup_unwind_killcursor.js new file mode 100644 index 00000000000..1668e68c436 --- /dev/null +++ b/jstests/aggregation/bugs/lookup_unwind_killcursor.js @@ -0,0 +1,127 @@ +/** + * Tests that the server can successfully kill the cursor of an aggregation pipeline which is + * using a $lookup stage with its own cursor. + * + * This test was designed to reproduce SERVER-24386. + */ +(function() { + 'use strict'; + + // Use a low batch size for the aggregation commands to ensure that the mongo shell does not + // exhaust cursors during the first batch. + const batchSize = 2; + + // Use a small batch in the $lookup stage to ensure it needs to issue a getMore. This will help + // ensure that the $lookup stage has an open cursor when the first batch is returned. + var conn = + MongoRunner.runMongod({setParameter: "internalAggregationLookupBatchSize=" + batchSize}); + assert.neq(null, conn, 'mongod was unable to start up'); + const testDB = conn.getDB("test"); + + function setup() { + testDB.source.drop(); + testDB.dest.drop(); + + assert.writeOK(testDB.source.insert({local: 1})); + + // The cursor batching logic actually requests one more document than it needs to fill the + // first batch, so if we want to leave the $lookup stage paused with a cursor open we'll + // need two more matching documents than the batch size. + const numMatches = batchSize + 2; + for (var i = 0; i < numMatches; ++i) { + assert.writeOK(testDB.dest.insert({foreign: 1})); + } + } + + setup(); + + const cmdObj = { + aggregate: 'source', + pipeline: [ + { + $lookup: { + from: 'dest', + localField: 'local', + foreignField: 'foreign', + as: 'matches', + } + }, + { + $unwind: { + path: '$matches', + }, + }, + ], + cursor: { + batchSize: batchSize, + }, + }; + + var res = testDB.runCommand(cmdObj); + assert.commandWorked(res); + + var cursor = new DBCommandCursor(conn, res, batchSize); + cursor.close(); // Closing the cursor will issue a killCursor command. + + // Ensure the $lookup stage can be killed by dropping the collection. + res = testDB.runCommand(cmdObj); + assert.commandWorked(res); + + cursor = new DBCommandCursor(conn, res, batchSize); + testDB.source.drop(); + + assert.throws(function() { + cursor.itcount(); + }, [], "expected cursor to have been destroyed during collection drop"); + + // Ensure the $lookup stage can be killed by dropping the database. + setup(); + res = testDB.runCommand(cmdObj); + assert.commandWorked(res); + + cursor = new DBCommandCursor(conn, res, batchSize); + assert.commandWorked(testDB.dropDatabase()); + + assert.throws(function() { + cursor.itcount(); + }, [], "expected cursor to have been destroyed during database drop"); + + // Ensure the $lookup stage can be killed by the ClientCursorMonitor. + setup(); + res = testDB.runCommand(cmdObj); + assert.commandWorked(res); + cursor = new DBCommandCursor(conn, res, batchSize); + + var serverStatus = assert.commandWorked(testDB.serverStatus()); + const expectedNumTimedOutCursors = serverStatus.metrics.cursor.timedOut + 1; + + // Wait until the idle cursor background job has killed the aggregation cursor. + assert.commandWorked(testDB.adminCommand({setParameter: 1, cursorTimeoutMillis: 1000})); + const cursorTimeoutFrequencySeconds = 4; + assert.soon( + function() { + serverStatus = assert.commandWorked(testDB.serverStatus()); + // Use >= here since we may time out the $lookup stage's cursor as well. + return serverStatus.metrics.cursor.timedOut >= expectedNumTimedOutCursors; + }, + function() { + return "aggregation cursor failed to time out: " + tojson(serverStatus); + }, + cursorTimeoutFrequencySeconds * 1000 * 5); + + assert.eq(0, serverStatus.metrics.cursor.open.total, tojson(serverStatus)); + + // We attempt to exhaust the aggregation cursor to verify that sending a getMore returns an + // error due to the cursor being killed. + var err = assert.throws(function() { + cursor.itcount(); + }); + assert.eq(ErrorCodes.CursorNotFound, err.code, tojson(err)); + + // Ensure the $lookup stage can be killed by shutting down the server. + setup(); + res = testDB.runCommand(cmdObj); + assert.commandWorked(res); + + assert.eq(0, MongoRunner.stopMongod(conn), "expected mongod to shutdown cleanly"); +})(); diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript index c8d379f88ce..1e70c1fa376 100644 --- a/src/mongo/db/pipeline/SConscript +++ b/src/mongo/db/pipeline/SConscript @@ -122,6 +122,7 @@ docSourceEnv.Library( 'expression', '$BUILD_DIR/mongo/client/clientdriver', '$BUILD_DIR/mongo/db/matcher/expressions', + '$BUILD_DIR/mongo/db/query/lite_parsed_query', '$BUILD_DIR/mongo/db/service_context', '$BUILD_DIR/mongo/db/storage/wiredtiger/storage_wiredtiger_customization_hooks', '$BUILD_DIR/third_party/shim_snappy', diff --git a/src/mongo/db/pipeline/document_source.h b/src/mongo/db/pipeline/document_source.h index 3276a3d3d16..6a2845ecddc 100644 --- a/src/mongo/db/pipeline/document_source.h +++ b/src/mongo/db/pipeline/document_source.h @@ -1295,11 +1295,17 @@ private: std::string localField, std::string foreignField, const boost::intrusive_ptr<ExpressionContext>& pExpCtx); + ~DocumentSourceLookUp() final; Value serialize(bool explain = false) const final { invariant(false); } + /** + * Builds the required query and executes it. + */ + std::unique_ptr<DBClientCursor> doQuery(const Document& docToLookUp) const; + boost::optional<Document> unwindResult(); BSONObj queryForInput(const Document& input) const; diff --git a/src/mongo/db/pipeline/document_source_lookup.cpp b/src/mongo/db/pipeline/document_source_lookup.cpp index 83708c8756c..ff23fb8eb48 100644 --- a/src/mongo/db/pipeline/document_source_lookup.cpp +++ b/src/mongo/db/pipeline/document_source_lookup.cpp @@ -31,17 +31,24 @@ #include "document_source.h" #include "mongo/base/init.h" +#include "mongo/db/client.h" #include "mongo/db/jsobj.h" #include "mongo/db/pipeline/document.h" #include "mongo/db/pipeline/expression.h" #include "mongo/db/pipeline/expression_context.h" #include "mongo/db/pipeline/value.h" +#include "mongo/db/query/lite_parsed_query.h" +#include "mongo/db/server_parameters.h" #include "mongo/stdx/memory.h" namespace mongo { using boost::intrusive_ptr; +MONGO_EXPORT_STARTUP_SERVER_PARAMETER(internalAggregationLookupBatchSize, + int, + LiteParsedQuery::kDefaultBatchSize); + DocumentSourceLookUp::DocumentSourceLookUp(NamespaceString fromNs, std::string as, std::string localField, @@ -54,12 +61,56 @@ DocumentSourceLookUp::DocumentSourceLookUp(NamespaceString fromNs, _foreignField(foreignField), _foreignFieldFieldName(std::move(foreignField)) {} +DocumentSourceLookUp::~DocumentSourceLookUp() { + DESTRUCTOR_GUARD( + // A DBClientCursor will issue a killCursors command through its parent DBDirectClient when + // it goes out of scope. To issue a killCursors command, a DBDirectClient needs a valid + // OperationContext. So here we set the OperationContext on the DBDirectClient, then + // aggressively destroy the DBClientCursor. + // Note that we cannot rely on any sort of callback from above to provide a valid + // OperationContext, since we might be destroyed from the destructor of a CursorManager, + // which does not have an OperationContext. Thus, we unfortunately have to make a new one or + // use the one on our thread's Client. + if (_mongod && _cursor) { + auto& client = cc(); + if (auto opCtx = client.getOperationContext()) { + pExpCtx->opCtx = opCtx; + _mongod->setOperationContext(opCtx); + _cursor.reset(); + } else { + auto newOpCtx = client.makeOperationContext(); + pExpCtx->opCtx = newOpCtx.get(); + _mongod->setOperationContext(newOpCtx.get()); + _cursor.reset(); + } + }); +} + REGISTER_DOCUMENT_SOURCE(lookup, DocumentSourceLookUp::createFromBson); const char* DocumentSourceLookUp::getSourceName() const { return "$lookup"; } +std::unique_ptr<DBClientCursor> DocumentSourceLookUp::doQuery(const Document& docToLookUp) const { + auto query = DocumentSourceLookUp::queryForInput(docToLookUp); + + // Defaults for everything except batch size. + const int nToReturn = 0; + const int nToSkip = 0; + const BSONObj* fieldsToReturn = nullptr; + const int queryOptions = 0; + + const int batchSize = internalAggregationLookupBatchSize; + return _mongod->directClient()->query(_fromNs.ns(), + std::move(query), + nToReturn, + nToSkip, + fieldsToReturn, + queryOptions, + batchSize); +} + boost::optional<Document> DocumentSourceLookUp::getNext() { pExpCtx->checkForInterrupt(); @@ -72,8 +123,7 @@ boost::optional<Document> DocumentSourceLookUp::getNext() { boost::optional<Document> input = pSource->getNext(); if (!input) return {}; - BSONObj query = queryForInput(*input); - std::unique_ptr<DBClientCursor> cursor = _mongod->directClient()->query(_fromNs.ns(), query); + auto cursor = doQuery(*input); std::vector<Value> results; int objsize = 0; @@ -82,7 +132,8 @@ boost::optional<Document> DocumentSourceLookUp::getNext() { objsize += result.objsize(); uassert(4568, str::stream() << "Total size of documents in " << _fromNs.coll() << " matching " - << query << " exceeds maximum document size", + << DocumentSourceLookUp::queryForInput(*input) + << " exceeds maximum document size", objsize <= BSONObjMaxInternalSize); results.push_back(Value(result)); } @@ -136,8 +187,8 @@ boost::optional<Document> DocumentSourceLookUp::unwindResult() { if (!_input) return {}; - _cursor = _mongod->directClient()->query(_fromNs.ns(), queryForInput(*_input)); _cursorIndex = 0; + _cursor = doQuery(*_input); if (_unwindSrc->preserveNullAndEmptyArrays() && !_cursor->more()) { // There were no results for this cursor, but the $unwind was asked to preserve empty diff --git a/src/mongo/db/server_options_helpers.cpp b/src/mongo/db/server_options_helpers.cpp index 807627ed4b8..331f285ca8c 100644 --- a/src/mongo/db/server_options_helpers.cpp +++ b/src/mongo/db/server_options_helpers.cpp @@ -541,6 +541,18 @@ Status validateServerOptions(const moe::Environment& params) { if (authMechParameter != parameters.end() && authMechParameter->second.empty()) { haveAuthenticationMechanisms = false; } + + // Make sure 'internalLookupStageBatchSize' can only be set if test commands are enabled. + auto lookupBatchSizeParameter = parameters.find("internalLookupStageBatchSize"); + auto enableTestCommandsParameter = parameters.find("enableTestCommands"); + if (lookupBatchSizeParameter != parameters.end()) { + if (enableTestCommandsParameter == parameters.end() || + enableTestCommandsParameter->second != "1") { + return Status( + ErrorCodes::BadValue, + "internalLookupStageBatchSize can only be set if test commands are enabled."); + } + } } if ((params.count("security.authorization") && params["security.authorization"].as<std::string>() == "enabled") || |