summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCharlie Swanson <charlie.swanson@mongodb.com>2016-10-27 14:48:12 -0400
committerCharlie Swanson <charlie.swanson@mongodb.com>2016-11-07 11:51:19 -0500
commitfcf04452bcfb0b169380743f7041308f397e2196 (patch)
tree16e3e5a22a9b8c1f72b4a61146b16664051a12c5
parentf3a4d0914fca1ad2fc9a659f4ece42f9dcf0b2c2 (diff)
downloadmongo-fcf04452bcfb0b169380743f7041308f397e2196.tar.gz
SERVER-24386 Use a valid OperationContext when killing $lookup's cursor
-rw-r--r--jstests/aggregation/bugs/lookup_unwind_getmore.js31
-rw-r--r--jstests/aggregation/bugs/lookup_unwind_killcursor.js127
-rw-r--r--src/mongo/db/pipeline/SConscript1
-rw-r--r--src/mongo/db/pipeline/document_source.h6
-rw-r--r--src/mongo/db/pipeline/document_source_lookup.cpp59
-rw-r--r--src/mongo/db/server_options_helpers.cpp12
6 files changed, 219 insertions, 17 deletions
diff --git a/jstests/aggregation/bugs/lookup_unwind_getmore.js b/jstests/aggregation/bugs/lookup_unwind_getmore.js
index 6c8d886b78f..0650b2b940b 100644
--- a/jstests/aggregation/bugs/lookup_unwind_getmore.js
+++ b/jstests/aggregation/bugs/lookup_unwind_getmore.js
@@ -7,27 +7,30 @@
(function() {
'use strict';
- // We use a batch size of 1 to ensure that the mongo shell issues a getMore when unwinding the
+ // We use a batch size of 2 to ensure that the mongo shell issues a getMore when unwinding the
// results from the 'dest' collection for the same document in the 'source' collection under a
// different OperationContext.
- const batchSize = 1;
+ const batchSize = 2;
- db.source.drop();
- db.dest.drop();
+ const conn =
+ MongoRunner.runMongod({setParameter: "internalAggregationLookupBatchSize=" + batchSize});
+ assert.neq(null, conn, "mongod failed to start up");
+ const testDB = conn.getDB("test");
- assert.writeOK(db.source.insert({local: 1}));
+ testDB.source.drop();
+ testDB.dest.drop();
- // We insert documents in the 'dest' collection such that their combined size is greater than
- // 16MB in order to ensure that the DBDirectClient used by the $lookup stage issues a getMore
- // under a different OperationContext.
- const numMatches = 3;
- const largeStr = new Array(6 * 1024 * 1024 + 1).join('x');
+ assert.writeOK(testDB.source.insert({local: 1}));
+ // The cursor batching logic actually requests one more document than it needs to fill the
+ // first batch, so if we want to leave the $lookup stage paused with a cursor open we'll
+ // need two more matching documents than the batch size.
+ const numMatches = batchSize + 2;
for (var i = 0; i < numMatches; ++i) {
- assert.writeOK(db.dest.insert({foreign: 1, largeStr: largeStr}));
+ assert.writeOK(testDB.dest.insert({foreign: 1}));
}
- var res = db.runCommand({
+ var res = testDB.runCommand({
aggregate: 'source',
pipeline: [
{
@@ -50,6 +53,8 @@
});
assert.commandWorked(res);
- var cursor = new DBCommandCursor(db.getMongo(), res, batchSize);
+ var cursor = new DBCommandCursor(conn, res, batchSize);
assert.eq(numMatches, cursor.itcount());
+
+ assert.eq(0, MongoRunner.stopMongod(conn), "expected mongod to shutdown cleanly");
})();
diff --git a/jstests/aggregation/bugs/lookup_unwind_killcursor.js b/jstests/aggregation/bugs/lookup_unwind_killcursor.js
new file mode 100644
index 00000000000..1668e68c436
--- /dev/null
+++ b/jstests/aggregation/bugs/lookup_unwind_killcursor.js
@@ -0,0 +1,127 @@
+/**
+ * Tests that the server can successfully kill the cursor of an aggregation pipeline which is
+ * using a $lookup stage with its own cursor.
+ *
+ * This test was designed to reproduce SERVER-24386.
+ */
+(function() {
+ 'use strict';
+
+ // Use a low batch size for the aggregation commands to ensure that the mongo shell does not
+ // exhaust cursors during the first batch.
+ const batchSize = 2;
+
+ // Use a small batch in the $lookup stage to ensure it needs to issue a getMore. This will help
+ // ensure that the $lookup stage has an open cursor when the first batch is returned.
+ var conn =
+ MongoRunner.runMongod({setParameter: "internalAggregationLookupBatchSize=" + batchSize});
+ assert.neq(null, conn, 'mongod was unable to start up');
+ const testDB = conn.getDB("test");
+
+ function setup() {
+ testDB.source.drop();
+ testDB.dest.drop();
+
+ assert.writeOK(testDB.source.insert({local: 1}));
+
+ // The cursor batching logic actually requests one more document than it needs to fill the
+ // first batch, so if we want to leave the $lookup stage paused with a cursor open we'll
+ // need two more matching documents than the batch size.
+ const numMatches = batchSize + 2;
+ for (var i = 0; i < numMatches; ++i) {
+ assert.writeOK(testDB.dest.insert({foreign: 1}));
+ }
+ }
+
+ setup();
+
+ const cmdObj = {
+ aggregate: 'source',
+ pipeline: [
+ {
+ $lookup: {
+ from: 'dest',
+ localField: 'local',
+ foreignField: 'foreign',
+ as: 'matches',
+ }
+ },
+ {
+ $unwind: {
+ path: '$matches',
+ },
+ },
+ ],
+ cursor: {
+ batchSize: batchSize,
+ },
+ };
+
+ var res = testDB.runCommand(cmdObj);
+ assert.commandWorked(res);
+
+ var cursor = new DBCommandCursor(conn, res, batchSize);
+ cursor.close(); // Closing the cursor will issue a killCursor command.
+
+ // Ensure the $lookup stage can be killed by dropping the collection.
+ res = testDB.runCommand(cmdObj);
+ assert.commandWorked(res);
+
+ cursor = new DBCommandCursor(conn, res, batchSize);
+ testDB.source.drop();
+
+ assert.throws(function() {
+ cursor.itcount();
+ }, [], "expected cursor to have been destroyed during collection drop");
+
+ // Ensure the $lookup stage can be killed by dropping the database.
+ setup();
+ res = testDB.runCommand(cmdObj);
+ assert.commandWorked(res);
+
+ cursor = new DBCommandCursor(conn, res, batchSize);
+ assert.commandWorked(testDB.dropDatabase());
+
+ assert.throws(function() {
+ cursor.itcount();
+ }, [], "expected cursor to have been destroyed during database drop");
+
+ // Ensure the $lookup stage can be killed by the ClientCursorMonitor.
+ setup();
+ res = testDB.runCommand(cmdObj);
+ assert.commandWorked(res);
+ cursor = new DBCommandCursor(conn, res, batchSize);
+
+ var serverStatus = assert.commandWorked(testDB.serverStatus());
+ const expectedNumTimedOutCursors = serverStatus.metrics.cursor.timedOut + 1;
+
+ // Wait until the idle cursor background job has killed the aggregation cursor.
+ assert.commandWorked(testDB.adminCommand({setParameter: 1, cursorTimeoutMillis: 1000}));
+ const cursorTimeoutFrequencySeconds = 4;
+ assert.soon(
+ function() {
+ serverStatus = assert.commandWorked(testDB.serverStatus());
+ // Use >= here since we may time out the $lookup stage's cursor as well.
+ return serverStatus.metrics.cursor.timedOut >= expectedNumTimedOutCursors;
+ },
+ function() {
+ return "aggregation cursor failed to time out: " + tojson(serverStatus);
+ },
+ cursorTimeoutFrequencySeconds * 1000 * 5);
+
+ assert.eq(0, serverStatus.metrics.cursor.open.total, tojson(serverStatus));
+
+ // We attempt to exhaust the aggregation cursor to verify that sending a getMore returns an
+ // error due to the cursor being killed.
+ var err = assert.throws(function() {
+ cursor.itcount();
+ });
+ assert.eq(ErrorCodes.CursorNotFound, err.code, tojson(err));
+
+ // Ensure the $lookup stage can be killed by shutting down the server.
+ setup();
+ res = testDB.runCommand(cmdObj);
+ assert.commandWorked(res);
+
+ assert.eq(0, MongoRunner.stopMongod(conn), "expected mongod to shutdown cleanly");
+})();
diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript
index c8d379f88ce..1e70c1fa376 100644
--- a/src/mongo/db/pipeline/SConscript
+++ b/src/mongo/db/pipeline/SConscript
@@ -122,6 +122,7 @@ docSourceEnv.Library(
'expression',
'$BUILD_DIR/mongo/client/clientdriver',
'$BUILD_DIR/mongo/db/matcher/expressions',
+ '$BUILD_DIR/mongo/db/query/lite_parsed_query',
'$BUILD_DIR/mongo/db/service_context',
'$BUILD_DIR/mongo/db/storage/wiredtiger/storage_wiredtiger_customization_hooks',
'$BUILD_DIR/third_party/shim_snappy',
diff --git a/src/mongo/db/pipeline/document_source.h b/src/mongo/db/pipeline/document_source.h
index 3276a3d3d16..6a2845ecddc 100644
--- a/src/mongo/db/pipeline/document_source.h
+++ b/src/mongo/db/pipeline/document_source.h
@@ -1295,11 +1295,17 @@ private:
std::string localField,
std::string foreignField,
const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
+ ~DocumentSourceLookUp() final;
Value serialize(bool explain = false) const final {
invariant(false);
}
+ /**
+ * Builds the required query and executes it.
+ */
+ std::unique_ptr<DBClientCursor> doQuery(const Document& docToLookUp) const;
+
boost::optional<Document> unwindResult();
BSONObj queryForInput(const Document& input) const;
diff --git a/src/mongo/db/pipeline/document_source_lookup.cpp b/src/mongo/db/pipeline/document_source_lookup.cpp
index 83708c8756c..ff23fb8eb48 100644
--- a/src/mongo/db/pipeline/document_source_lookup.cpp
+++ b/src/mongo/db/pipeline/document_source_lookup.cpp
@@ -31,17 +31,24 @@
#include "document_source.h"
#include "mongo/base/init.h"
+#include "mongo/db/client.h"
#include "mongo/db/jsobj.h"
#include "mongo/db/pipeline/document.h"
#include "mongo/db/pipeline/expression.h"
#include "mongo/db/pipeline/expression_context.h"
#include "mongo/db/pipeline/value.h"
+#include "mongo/db/query/lite_parsed_query.h"
+#include "mongo/db/server_parameters.h"
#include "mongo/stdx/memory.h"
namespace mongo {
using boost::intrusive_ptr;
+MONGO_EXPORT_STARTUP_SERVER_PARAMETER(internalAggregationLookupBatchSize,
+ int,
+ LiteParsedQuery::kDefaultBatchSize);
+
DocumentSourceLookUp::DocumentSourceLookUp(NamespaceString fromNs,
std::string as,
std::string localField,
@@ -54,12 +61,56 @@ DocumentSourceLookUp::DocumentSourceLookUp(NamespaceString fromNs,
_foreignField(foreignField),
_foreignFieldFieldName(std::move(foreignField)) {}
+DocumentSourceLookUp::~DocumentSourceLookUp() {
+ DESTRUCTOR_GUARD(
+ // A DBClientCursor will issue a killCursors command through its parent DBDirectClient when
+ // it goes out of scope. To issue a killCursors command, a DBDirectClient needs a valid
+ // OperationContext. So here we set the OperationContext on the DBDirectClient, then
+ // aggressively destroy the DBClientCursor.
+ // Note that we cannot rely on any sort of callback from above to provide a valid
+ // OperationContext, since we might be destroyed from the destructor of a CursorManager,
+ // which does not have an OperationContext. Thus, we unfortunately have to make a new one or
+ // use the one on our thread's Client.
+ if (_mongod && _cursor) {
+ auto& client = cc();
+ if (auto opCtx = client.getOperationContext()) {
+ pExpCtx->opCtx = opCtx;
+ _mongod->setOperationContext(opCtx);
+ _cursor.reset();
+ } else {
+ auto newOpCtx = client.makeOperationContext();
+ pExpCtx->opCtx = newOpCtx.get();
+ _mongod->setOperationContext(newOpCtx.get());
+ _cursor.reset();
+ }
+ });
+}
+
REGISTER_DOCUMENT_SOURCE(lookup, DocumentSourceLookUp::createFromBson);
const char* DocumentSourceLookUp::getSourceName() const {
return "$lookup";
}
+std::unique_ptr<DBClientCursor> DocumentSourceLookUp::doQuery(const Document& docToLookUp) const {
+ auto query = DocumentSourceLookUp::queryForInput(docToLookUp);
+
+ // Defaults for everything except batch size.
+ const int nToReturn = 0;
+ const int nToSkip = 0;
+ const BSONObj* fieldsToReturn = nullptr;
+ const int queryOptions = 0;
+
+ const int batchSize = internalAggregationLookupBatchSize;
+ return _mongod->directClient()->query(_fromNs.ns(),
+ std::move(query),
+ nToReturn,
+ nToSkip,
+ fieldsToReturn,
+ queryOptions,
+ batchSize);
+}
+
boost::optional<Document> DocumentSourceLookUp::getNext() {
pExpCtx->checkForInterrupt();
@@ -72,8 +123,7 @@ boost::optional<Document> DocumentSourceLookUp::getNext() {
boost::optional<Document> input = pSource->getNext();
if (!input)
return {};
- BSONObj query = queryForInput(*input);
- std::unique_ptr<DBClientCursor> cursor = _mongod->directClient()->query(_fromNs.ns(), query);
+ auto cursor = doQuery(*input);
std::vector<Value> results;
int objsize = 0;
@@ -82,7 +132,8 @@ boost::optional<Document> DocumentSourceLookUp::getNext() {
objsize += result.objsize();
uassert(4568,
str::stream() << "Total size of documents in " << _fromNs.coll() << " matching "
- << query << " exceeds maximum document size",
+ << DocumentSourceLookUp::queryForInput(*input)
+ << " exceeds maximum document size",
objsize <= BSONObjMaxInternalSize);
results.push_back(Value(result));
}
@@ -136,8 +187,8 @@ boost::optional<Document> DocumentSourceLookUp::unwindResult() {
if (!_input)
return {};
- _cursor = _mongod->directClient()->query(_fromNs.ns(), queryForInput(*_input));
_cursorIndex = 0;
+ _cursor = doQuery(*_input);
if (_unwindSrc->preserveNullAndEmptyArrays() && !_cursor->more()) {
// There were no results for this cursor, but the $unwind was asked to preserve empty
diff --git a/src/mongo/db/server_options_helpers.cpp b/src/mongo/db/server_options_helpers.cpp
index 807627ed4b8..331f285ca8c 100644
--- a/src/mongo/db/server_options_helpers.cpp
+++ b/src/mongo/db/server_options_helpers.cpp
@@ -541,6 +541,18 @@ Status validateServerOptions(const moe::Environment& params) {
if (authMechParameter != parameters.end() && authMechParameter->second.empty()) {
haveAuthenticationMechanisms = false;
}
+
+ // Make sure 'internalLookupStageBatchSize' can only be set if test commands are enabled.
+ auto lookupBatchSizeParameter = parameters.find("internalLookupStageBatchSize");
+ auto enableTestCommandsParameter = parameters.find("enableTestCommands");
+ if (lookupBatchSizeParameter != parameters.end()) {
+ if (enableTestCommandsParameter == parameters.end() ||
+ enableTestCommandsParameter->second != "1") {
+ return Status(
+ ErrorCodes::BadValue,
+ "internalLookupStageBatchSize can only be set if test commands are enabled.");
+ }
+ }
}
if ((params.count("security.authorization") &&
params["security.authorization"].as<std::string>() == "enabled") ||