summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Russotto <matthew.russotto@10gen.com>2017-07-14 17:15:52 -0400
committerMatthew Russotto <matthew.russotto@10gen.com>2017-07-17 08:52:57 -0400
commit3d38a6ff86b47b71d735b77f39704adec3ef3da7 (patch)
tree8f318b2b52852a1511ed6da6ede9ac62cbe67d4d
parenta1c67941bf08c69cab04eba20bc9ce9a763e1c7f (diff)
downloadmongo-3d38a6ff86b47b71d735b77f39704adec3ef3da7.tar.gz
SERVER-29128 Fix performance regression on awaitData with lastKnownCommittedOpTime
Revert "Revert "SERVER-29128 Make $changeNotification stage return a tailable, awaitData cursor that continuously gives out oplog entries"" This reverts commit d29e92cffcb4db3cdd77b1e53d5d005db6cc309d.
-rw-r--r--jstests/aggregation/sources/changeNotification/change_notification.js158
-rw-r--r--jstests/noPassthrough/awaitdata_getmore_cmd.js50
-rw-r--r--src/mongo/db/clientcursor.h27
-rw-r--r--src/mongo/db/commands/getmore_cmd.cpp72
-rw-r--r--src/mongo/db/commands/run_aggregate.cpp31
-rw-r--r--src/mongo/db/pipeline/document_source_change_notification.cpp4
-rw-r--r--src/mongo/db/pipeline/document_source_change_notification_test.cpp5
-rw-r--r--src/mongo/db/pipeline/document_source_cursor.cpp12
-rw-r--r--src/mongo/db/pipeline/expression_context.h11
-rw-r--r--src/mongo/db/pipeline/pipeline_d.cpp8
-rw-r--r--src/mongo/db/query/SConscript1
-rw-r--r--src/mongo/db/query/find.cpp5
-rw-r--r--src/mongo/db/query/plan_executor.cpp71
-rw-r--r--src/mongo/db/query/plan_executor.h27
-rw-r--r--src/mongo/db/query/plan_yield_policy.cpp18
-rw-r--r--src/mongo/db/query/plan_yield_policy.h7
-rw-r--r--src/mongo/db/query/query_yield.cpp20
-rw-r--r--src/mongo/db/query/query_yield.h5
18 files changed, 434 insertions, 98 deletions
diff --git a/jstests/aggregation/sources/changeNotification/change_notification.js b/jstests/aggregation/sources/changeNotification/change_notification.js
index 7636b02fd53..e0df38950d6 100644
--- a/jstests/aggregation/sources/changeNotification/change_notification.js
+++ b/jstests/aggregation/sources/changeNotification/change_notification.js
@@ -4,20 +4,28 @@
(function() {
"use strict";
+ var oplogProjection = {$project: {"_id.ts": 0}};
+
// Helper for testing that pipeline returns correct set of results.
function testPipeline(pipeline, expectedResult, collection) {
+ // Limit to the last N documents from the end of the oplog, because currently
+ // $changeNotification always comes from the start of the oplog.
+ pipeline.push({$sort: {"_id.ts": -1}});
+ if (expectedResult.length > 0) {
+ pipeline.push({$limit: expectedResult.length});
+ }
// Strip the oplog fields we aren't testing.
- pipeline.push({$limit: 1});
- pipeline.push({$project: {"_id.ts": 0}});
- assert.docEq(collection.aggregate(pipeline).toArray(), expectedResult);
+ pipeline.push(oplogProjection);
+ assert.docEq(collection.aggregate(pipeline).toArray().reverse(), expectedResult);
}
- var replTest = new ReplSetTest({name: 'changeNotificationTest', nodes: 1});
- var nodes = replTest.startSet();
+ let replTest = new ReplSetTest({name: 'changeNotificationTest', nodes: 1});
+ let nodes = replTest.startSet();
replTest.initiate();
replTest.awaitReplication();
db = replTest.getPrimary().getDB('test');
+ db.getMongo().forceReadMode('commands');
jsTestLog("Testing single insert");
assert.writeOK(db.t1.insert({_id: 0, a: 1}));
@@ -129,5 +137,145 @@
assert.writeOK(db.t3.insert({_id: 101, renameCollection: "test.dne1", to: "test.dne2"}));
testPipeline([{$changeNotification: {}}], [], db.dne1);
testPipeline([{$changeNotification: {}}], [], db.dne2);
+
+ // Now make sure the cursor behaves like a tailable awaitData cursor.
+ jsTestLog("Testing tailability");
+ let tailableCursor = db.tailable1.aggregate([{$changeNotification: {}}, oplogProjection]);
+ assert(!tailableCursor.hasNext());
+ assert.writeOK(db.tailable1.insert({_id: 101, a: 1}));
+ assert(tailableCursor.hasNext());
+ assert.docEq(tailableCursor.next(), {
+ "_id": {
+ "_id": 101,
+ "ns": "test.tailable1",
+ },
+ "documentKey": {"_id": 101},
+ "newDocument": {"_id": 101, "a": 1},
+ "ns": {"coll": "tailable1", "db": "test"},
+ "operationType": "insert"
+ });
+
+ jsTestLog("Testing awaitdata");
+ let res = assert.commandWorked(db.runCommand({
+ aggregate: "tailable2",
+ pipeline: [{$changeNotification: {}}, oplogProjection],
+ cursor: {}
+ }));
+ let aggcursor = res.cursor;
+
+ // We should get a valid cursor.
+ assert.neq(aggcursor.id, 0);
+
+ // Initial batch size should be zero as there should be no data.
+ assert.eq(aggcursor.firstBatch.length, 0);
+
+ // No data, so should return no results, but cursor should remain valid.
+ res = assert.commandWorked(
+ db.runCommand({getMore: aggcursor.id, collection: "tailable2", maxTimeMS: 50}));
+ aggcursor = res.cursor;
+ assert.neq(aggcursor.id, 0);
+ assert.eq(aggcursor.nextBatch.length, 0);
+
+ // Now insert something in parallel while waiting for it.
+ let insertshell = startParallelShell(function() {
+ // Wait for the getMore to appear in currentop.
+ assert.soon(function() {
+ return db.currentOp({op: "getmore", "command.collection": "tailable2"}).inprog.length ==
+ 1;
+ });
+ assert.writeOK(db.tailable2.insert({_id: 102, a: 2}));
+ });
+ res = assert.commandWorked(
+ db.runCommand({getMore: aggcursor.id, collection: "tailable2", maxTimeMS: 5 * 60 * 1000}));
+ aggcursor = res.cursor;
+ assert.eq(aggcursor.nextBatch.length, 1);
+ assert.docEq(aggcursor.nextBatch[0], {
+ "_id": {
+ "_id": 102,
+ "ns": "test.tailable2",
+ },
+ "documentKey": {"_id": 102},
+ "newDocument": {"_id": 102, "a": 2},
+ "ns": {"coll": "tailable2", "db": "test"},
+ "operationType": "insert"
+ });
+
+ // Wait for insert shell to terminate.
+ insertshell();
+
+ jsTestLog("Testing awaitdata - no wake on insert to another collection");
+ res = assert.commandWorked(db.runCommand({
+ aggregate: "tailable3",
+ pipeline: [{$changeNotification: {}}, oplogProjection],
+ cursor: {}
+ }));
+ aggcursor = res.cursor;
+ // We should get a valid cursor.
+ assert.neq(aggcursor.id, 0);
+
+ // Initial batch size should be zero as there should be no data.
+ assert.eq(aggcursor.firstBatch.length, 0);
+
+ // Now insert something in a different collection in parallel while waiting.
+ insertshell = startParallelShell(function() {
+ // Wait for the getMore to appear in currentop.
+ assert.soon(function() {
+ return db.currentOp({op: "getmore", "command.collection": "tailable3"}).inprog.length ==
+ 1;
+ });
+ assert.writeOK(db.tailable3a.insert({_id: 103, a: 2}));
+ });
+ let start = new Date();
+ res = assert.commandWorked(
+ db.runCommand({getMore: aggcursor.id, collection: "tailable3", maxTimeMS: 1000}));
+ let diff = (new Date()).getTime() - start.getTime();
+ assert.gt(diff, 900, "AwaitData returned prematurely on insert to unrelated collection.");
+ aggcursor = res.cursor;
+ // Cursor should be valid with no data.
+ assert.neq(aggcursor.id, 0);
+ assert.eq(aggcursor.nextBatch.length, 0);
+
+ // Wait for insert shell to terminate.
+ insertshell();
+
+ // This time, put something in a different collection, then in the correct collection.
+ // We should wake up with just the correct data.
+ insertshell = startParallelShell(function() {
+ // Wait for the getMore to appear in currentop.
+ assert.soon(function() {
+ return db.currentOp({op: "getmore", "command.collection": "tailable3"}).inprog.length ==
+ 1;
+ });
+ assert.writeOK(db.tailable3a.insert({_id: 104, a: 2}));
+ assert(db.currentOp({op: "getmore", "command.collection": "tailable3"}).inprog.length == 1);
+ assert.writeOK(db.tailable3.insert({_id: 105, a: 3}));
+ });
+ res = assert.commandWorked(
+ db.runCommand({getMore: aggcursor.id, collection: "tailable3", maxTimeMS: 5 * 60 * 1000}));
+ aggcursor = res.cursor;
+ assert.neq(aggcursor.id, 0);
+ assert.eq(aggcursor.nextBatch.length, 1);
+ assert.docEq(aggcursor.nextBatch[0], {
+ "_id": {
+ "_id": 105,
+ "ns": "test.tailable3",
+ },
+ "documentKey": {"_id": 105},
+ "newDocument": {"_id": 105, "a": 3},
+ "ns": {"coll": "tailable3", "db": "test"},
+ "operationType": "insert"
+ });
+
+ // Wait for insert shell to terminate.
+ insertshell();
+
+ jsTestLog("Ensuring attempt to read with legacy operations fails.");
+ db.getMongo().forceReadMode('legacy');
+ tailableCursor = db.tailable2.aggregate([{$changeNotification: {}}, oplogProjection],
+ {cursor: {batchSize: 0}});
+ assert.throws(function() {
+ tailableCursor.next();
+ }, [], "Legacy getMore expected to fail on changeNotification cursor.");
+
replTest.stopSet();
}());
diff --git a/jstests/noPassthrough/awaitdata_getmore_cmd.js b/jstests/noPassthrough/awaitdata_getmore_cmd.js
index ef47efb0e67..5de2c8f83c1 100644
--- a/jstests/noPassthrough/awaitdata_getmore_cmd.js
+++ b/jstests/noPassthrough/awaitdata_getmore_cmd.js
@@ -28,8 +28,13 @@
cmdRes = db.runCommand({find: collName, tailable: true, awaitData: true});
assert.commandFailed(cmdRes);
- // Create a capped collection with 10 documents.
+ // With a non-existent collection, should succeed but return no data and a closed cursor.
coll.drop();
+ cmdRes = assert.commandWorked(db.runCommand({find: collName, tailable: true}));
+ assert.eq(cmdRes.cursor.id, NumberLong(0));
+ assert.eq(cmdRes.cursor.firstBatch.length, 0);
+
+ // Create a capped collection with 10 documents.
assert.commandWorked(db.createCollection(collName, {capped: true, size: 2048}));
for (var i = 0; i < 10; i++) {
assert.writeOK(coll.insert({a: i}));
@@ -123,4 +128,47 @@
}
assert.gte((new Date()) - now, 2000);
+ // Test filtered inserts while writing to a capped collection.
+ // Find with a filter which doesn't match any documents in the collection.
+ cmdRes = assert.commandWorked(db.runCommand(
+ {find: collName, batchSize: 2, filter: {x: 1}, awaitData: true, tailable: true}));
+ assert.gt(cmdRes.cursor.id, NumberLong(0));
+ assert.eq(cmdRes.cursor.ns, coll.getFullName());
+ assert.eq(cmdRes.cursor.firstBatch.length, 0);
+
+ // getMore should time out if we insert a non-matching document.
+ let insertshell = startParallelShell(function() {
+ assert.soon(function() {
+ return db.currentOp({op: "getmore", "command.collection": "await_data"})
+ .inprog.length == 1;
+ });
+ assert.writeOK(db.await_data.insert({x: 0}));
+ }, mongo.port);
+
+ now = new Date();
+ cmdRes = db.runCommand({getMore: cmdRes.cursor.id, collection: collName, maxTimeMS: 4000});
+ assert.commandWorked(cmdRes);
+ assert.gt(cmdRes.cursor.id, NumberLong(0));
+ assert.eq(cmdRes.cursor.ns, coll.getFullName());
+ assert.eq(cmdRes.cursor.nextBatch.length, 0);
+ assert.gte((new Date()) - now,
+ 4000,
+ "Insert not matching filter caused awaitData getMore to return prematurely.");
+ insertshell();
+
+ // getMore should succeed if we insert a non-matching document followed by a matching one.
+ insertshell = startParallelShell(function() {
+ assert.writeOK(db.await_data.insert({x: 0}));
+ assert.writeOK(db.await_data.insert({_id: "match", x: 1}));
+ jsTestLog("Written");
+ }, mongo.port);
+
+ cmdRes =
+ db.runCommand({getMore: cmdRes.cursor.id, collection: collName, maxTimeMS: 5 * 60 * 1000});
+ assert.commandWorked(cmdRes);
+ assert.gt(cmdRes.cursor.id, NumberLong(0));
+ assert.eq(cmdRes.cursor.ns, coll.getFullName());
+ assert.eq(cmdRes.cursor.nextBatch.length, 1);
+ assert.docEq(cmdRes.cursor.nextBatch[0], {_id: "match", x: 1});
+ insertshell();
})();
diff --git a/src/mongo/db/clientcursor.h b/src/mongo/db/clientcursor.h
index 32fb018a8e1..f75d41e3089 100644
--- a/src/mongo/db/clientcursor.h
+++ b/src/mongo/db/clientcursor.h
@@ -71,6 +71,20 @@ struct ClientCursorParams {
}
}
+ void setTailable(bool tailable) {
+ if (tailable)
+ queryOptions |= QueryOption_CursorTailable;
+ else
+ queryOptions &= ~QueryOption_CursorTailable;
+ }
+
+ void setAwaitData(bool awaitData) {
+ if (awaitData)
+ queryOptions |= QueryOption_AwaitData;
+ else
+ queryOptions &= ~QueryOption_AwaitData;
+ }
+
std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> exec;
const NamespaceString nss;
std::vector<UserName> authenticatedUsers;
@@ -127,10 +141,23 @@ public:
return _exec.get();
}
+ /**
+ * Returns the query options bitmask. If you'd like to know if the cursor is tailable or
+ * awaitData, prefer using the specific methods isTailable() and isAwaitData() over using this
+ * method.
+ */
int queryOptions() const {
return _queryOptions;
}
+ bool isTailable() const {
+ return _queryOptions & QueryOption_CursorTailable;
+ }
+
+ bool isAwaitData() const {
+ return _queryOptions & QueryOption_AwaitData;
+ }
+
const BSONObj& getOriginatingCommandObj() const {
return _originatingCommand;
}
diff --git a/src/mongo/db/commands/getmore_cmd.cpp b/src/mongo/db/commands/getmore_cmd.cpp
index e429c268dd6..d41867d16b9 100644
--- a/src/mongo/db/commands/getmore_cmd.cpp
+++ b/src/mongo/db/commands/getmore_cmd.cpp
@@ -47,6 +47,7 @@
#include "mongo/db/query/find.h"
#include "mongo/db/query/find_common.h"
#include "mongo/db/query/getmore_request.h"
+#include "mongo/db/query/plan_executor.h"
#include "mongo/db/query/plan_summary_stats.h"
#include "mongo/db/repl/oplog.h"
#include "mongo/db/repl/replication_coordinator_global.h"
@@ -269,11 +270,6 @@ public:
// Validation related to awaitData.
if (isCursorAwaitData(cursor)) {
invariant(isCursorTailable(cursor));
-
- if (CursorManager::isGloballyManagedCursor(request.cursorid)) {
- Status status(ErrorCodes::BadValue, "awaitData cannot be set on this cursor");
- return appendCommandStatus(result, status);
- }
}
if (request.awaitDataTimeout && !isCursorAwaitData(cursor)) {
@@ -322,21 +318,6 @@ public:
}
}
- uint64_t notifierVersion = 0;
- std::shared_ptr<CappedInsertNotifier> notifier;
- if (isCursorAwaitData(cursor)) {
- invariant(readLock->getCollection()->isCapped());
- // Retrieve the notifier which we will wait on until new data arrives. We make sure
- // to do this in the lock because once we drop the lock it is possible for the
- // collection to become invalid. The notifier itself will outlive the collection if
- // the collection is dropped, as we keep a shared_ptr to it.
- notifier = readLock->getCollection()->getCappedInsertNotifier();
-
- // Must get the version before we call generateBatch in case a write comes in after
- // that call and before we call wait on the notifier.
- notifierVersion = notifier->getVersion();
- }
-
CursorId respondWithId = 0;
CursorResponseBuilder nextBatch(/*isInitialResponse*/ false, &result);
BSONObj obj;
@@ -352,46 +333,16 @@ public:
PlanSummaryStats preExecutionStats;
Explain::getSummaryStats(*exec, &preExecutionStats);
- Status batchStatus = generateBatch(cursor, request, &nextBatch, &state, &numResults);
- if (!batchStatus.isOK()) {
- return appendCommandStatus(result, batchStatus);
+ // Mark this as an AwaitData operation if appropriate.
+ if (isCursorAwaitData(cursor)) {
+ if (request.lastKnownCommittedOpTime)
+ clientsLastKnownCommittedOpTime(opCtx) = request.lastKnownCommittedOpTime.get();
+ shouldWaitForInserts(opCtx) = true;
}
- // If this is an await data cursor, and we hit EOF without generating any results, then
- // we block waiting for new data to arrive.
- if (isCursorAwaitData(cursor) && state == PlanExecutor::IS_EOF && numResults == 0) {
- auto replCoord = repl::ReplicationCoordinator::get(opCtx);
- // Return immediately if we need to update the commit time.
- if (!request.lastKnownCommittedOpTime ||
- (request.lastKnownCommittedOpTime == replCoord->getLastCommittedOpTime())) {
- // Retrieve the notifier which we will wait on until new data arrives. We make sure
- // to do this in the lock because once we drop the lock it is possible for the
- // collection to become invalid. The notifier itself will outlive the collection if
- // the collection is dropped, as we keep a shared_ptr to it.
- auto notifier = readLock->getCollection()->getCappedInsertNotifier();
-
- // Save the PlanExecutor and drop our locks.
- exec->saveState();
- readLock.reset();
-
- // Block waiting for data. Time spent blocking is not counted towards the total
- // operation latency.
- curOp->pauseTimer();
- const auto timeout = opCtx->getRemainingMaxTimeMicros();
- notifier->wait(notifierVersion, timeout);
- notifier.reset();
- curOp->resumeTimer();
-
- readLock.emplace(opCtx, request.nss);
- exec->restoreState();
-
- // We woke up because either the timed_wait expired, or there was more data. Either
- // way, attempt to generate another batch of results.
- batchStatus = generateBatch(cursor, request, &nextBatch, &state, &numResults);
- if (!batchStatus.isOK()) {
- return appendCommandStatus(result, batchStatus);
- }
- }
+ Status batchStatus = generateBatch(opCtx, cursor, request, &nextBatch, &state, &numResults);
+ if (!batchStatus.isOK()) {
+ return appendCommandStatus(result, batchStatus);
}
PlanSummaryStats postExecutionStats;
@@ -472,7 +423,8 @@ public:
* Returns an OK status if the batch was successfully generated, and a non-OK status if the
* PlanExecutor encounters a failure.
*/
- Status generateBatch(ClientCursor* cursor,
+ Status generateBatch(OperationContext* opCtx,
+ ClientCursor* cursor,
const GetMoreRequest& request,
CursorResponseBuilder* nextBatch,
PlanExecutor::ExecState* state,
@@ -494,6 +446,8 @@ public:
break;
}
+ // As soon as we get a result, this operation no longer waits.
+ shouldWaitForInserts(opCtx) = false;
// Add result to output buffer.
nextBatch->append(obj);
(*numResults)++;
diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp
index 7b4d40f0bdb..1bee5cb5f07 100644
--- a/src/mongo/db/commands/run_aggregate.cpp
+++ b/src/mongo/db/commands/run_aggregate.cpp
@@ -99,8 +99,10 @@ bool handleCursorCommand(OperationContext* opCtx,
// do it when batchSize is 0 since that indicates a desire for a fast return.
PlanExecutor::ExecState state;
if ((state = cursor->getExecutor()->getNext(&next, nullptr)) == PlanExecutor::IS_EOF) {
- // make it an obvious error to use cursor or executor after this point
- cursor = nullptr;
+ if (!cursor->isTailable()) {
+ // make it an obvious error to use cursor or executor after this point
+ cursor = nullptr;
+ }
break;
}
@@ -393,6 +395,10 @@ Status runAggregate(OperationContext* opCtx,
uassertStatusOK(resolveInvolvedNamespaces(opCtx, request))));
expCtx->tempDir = storageGlobalParams.dbpath + "/_tmp";
+ if (liteParsedPipeline.startsWithChangeNotification()) {
+ expCtx->tailableMode = ExpressionContext::TailableMode::kTailableAndAwaitData;
+ }
+
// Parse the pipeline.
auto statusWithPipeline = Pipeline::parse(request.getPipeline(), expCtx);
if (!statusWithPipeline.isOK()) {
@@ -451,13 +457,20 @@ Status runAggregate(OperationContext* opCtx,
// cursor manager. The global cursor manager does not deliver invalidations or kill
// notifications; the underlying PlanExecutor(s) used by the pipeline will be receiving
// invalidations and kill notifications themselves, not the cursor we create here.
- auto pin = CursorManager::getGlobalCursorManager()->registerCursor(
- opCtx,
- {std::move(exec),
- origNss,
- AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserNames(),
- opCtx->recoveryUnit()->isReadingFromMajorityCommittedSnapshot(),
- cmdObj});
+ ClientCursorParams cursorParams(
+ std::move(exec),
+ origNss,
+ AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserNames(),
+ opCtx->recoveryUnit()->isReadingFromMajorityCommittedSnapshot(),
+ cmdObj);
+ if (expCtx->tailableMode == ExpressionContext::TailableMode::kTailableAndAwaitData) {
+ cursorParams.setTailable(true);
+ cursorParams.setAwaitData(true);
+ }
+
+ auto pin =
+ CursorManager::getGlobalCursorManager()->registerCursor(opCtx, std::move(cursorParams));
+
ScopeGuard cursorFreer = MakeGuard(&ClientCursorPin::deleteUnderlying, &pin);
// If both explain and cursor are specified, explain wins.
diff --git a/src/mongo/db/pipeline/document_source_change_notification.cpp b/src/mongo/db/pipeline/document_source_change_notification.cpp
index faf5bcf97ac..b8d7027fc5f 100644
--- a/src/mongo/db/pipeline/document_source_change_notification.cpp
+++ b/src/mongo/db/pipeline/document_source_change_notification.cpp
@@ -94,12 +94,10 @@ vector<intrusive_ptr<DocumentSource>> DocumentSourceChangeNotification::createFr
!expCtx->getCollator());
BSONObj matchObj = buildMatch(elem, expCtx->ns);
- BSONObj sortObj = BSON("$sort" << BSON("ts" << -1));
auto matchSource = DocumentSourceMatch::createFromBson(matchObj.firstElement(), expCtx);
- auto sortSource = DocumentSourceSort::createFromBson(sortObj.firstElement(), expCtx);
auto transformSource = createTransformationStage(expCtx);
- return {matchSource, sortSource, transformSource};
+ return {matchSource, transformSource};
}
intrusive_ptr<DocumentSource> DocumentSourceChangeNotification::createTransformationStage(
diff --git a/src/mongo/db/pipeline/document_source_change_notification_test.cpp b/src/mongo/db/pipeline/document_source_change_notification_test.cpp
index 0e2b53ea591..0fd14fd9a92 100644
--- a/src/mongo/db/pipeline/document_source_change_notification_test.cpp
+++ b/src/mongo/db/pipeline/document_source_change_notification_test.cpp
@@ -94,14 +94,15 @@ public:
}
};
-TEST_F(ChangeNotificationStageTest, Basic) {
+TEST_F(ChangeNotificationStageTest, StagesGeneratedCorrectly) {
const auto spec = fromjson("{$changeNotification: {}}");
vector<intrusive_ptr<DocumentSource>> result =
DocumentSourceChangeNotification::createFromBson(spec.firstElement(), getExpCtx());
- ASSERT_EQUALS(result.size(), 3UL);
+ ASSERT_EQUALS(result.size(), 2UL);
ASSERT_EQUALS(string(result[0]->getSourceName()), "$match");
+ ASSERT_EQUALS(string(result[1]->getSourceName()), "$changeNotification");
// TODO: Check explain result.
}
diff --git a/src/mongo/db/pipeline/document_source_cursor.cpp b/src/mongo/db/pipeline/document_source_cursor.cpp
index 9704d826e53..f15bafd5b8b 100644
--- a/src/mongo/db/pipeline/document_source_cursor.cpp
+++ b/src/mongo/db/pipeline/document_source_cursor.cpp
@@ -31,7 +31,6 @@
#include "mongo/db/pipeline/document_source_cursor.h"
#include "mongo/db/catalog/collection.h"
-#include "mongo/db/curop.h"
#include "mongo/db/db_raii.h"
#include "mongo/db/exec/working_set_common.h"
#include "mongo/db/pipeline/document.h"
@@ -99,12 +98,21 @@ void DocumentSourceCursor::loadBatch() {
memUsageBytes += _currentBatch.back().getApproximateSize();
- if (memUsageBytes > internalDocumentSourceCursorBatchSizeBytes.load()) {
+ // As long as we're waiting for inserts, we shouldn't do any batching at this level
+ // we need the whole pipeline to see each document to see if we should stop waiting.
+ if (shouldWaitForInserts(pExpCtx->opCtx) ||
+ memUsageBytes > internalDocumentSourceCursorBatchSizeBytes.load()) {
// End this batch and prepare PlanExecutor for yielding.
_exec->saveState();
return;
}
}
+ // Special case for tailable cursor -- EOF doesn't preclude more results, so keep
+ // the PlanExecutor alive.
+ if (state == PlanExecutor::IS_EOF && pExpCtx->isTailable()) {
+ _exec->saveState();
+ return;
+ }
}
}
diff --git a/src/mongo/db/pipeline/expression_context.h b/src/mongo/db/pipeline/expression_context.h
index e17128ec8a6..d04b095aa4b 100644
--- a/src/mongo/db/pipeline/expression_context.h
+++ b/src/mongo/db/pipeline/expression_context.h
@@ -58,6 +58,8 @@ public:
std::vector<BSONObj> pipeline;
};
+ enum class TailableMode { kNormal, kTailableAndAwaitData };
+
/**
* Constructs an ExpressionContext to be used for Pipeline parsing and evaluation.
* 'resolvedNamespaces' maps collection names (not full namespaces) to ResolvedNamespaces.
@@ -101,6 +103,13 @@ public:
return it->second;
};
+ /**
+ * Convenience call that returns true if the tailableMode indicate a tailable query.
+ */
+ bool isTailable() const {
+ return tailableMode == ExpressionContext::TailableMode::kTailableAndAwaitData;
+ }
+
// The explain verbosity requested by the user, or boost::none if no explain was requested.
boost::optional<ExplainOptions::Verbosity> explain;
@@ -121,6 +130,8 @@ public:
Variables variables;
VariablesParseState variablesParseState;
+ TailableMode tailableMode = TailableMode::kNormal;
+
protected:
static const int kInterruptCheckPeriod = 128;
ExpressionContext() : variablesParseState(variables.useIdGenerator()) {}
diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp
index ec471f41df5..697c639ccf7 100644
--- a/src/mongo/db/pipeline/pipeline_d.cpp
+++ b/src/mongo/db/pipeline/pipeline_d.cpp
@@ -392,6 +392,14 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> attemptToGetExe
const AggregationRequest* aggRequest,
const size_t plannerOpts) {
auto qr = stdx::make_unique<QueryRequest>(nss);
+ switch (pExpCtx->tailableMode) {
+ case ExpressionContext::TailableMode::kNormal:
+ break;
+ case ExpressionContext::TailableMode::kTailableAndAwaitData:
+ qr->setTailable(true);
+ qr->setAwaitData(true);
+ break;
+ }
qr->setFilter(queryObj);
qr->setProj(projectionObj);
qr->setSort(sortObj);
diff --git a/src/mongo/db/query/SConscript b/src/mongo/db/query/SConscript
index 854f3045506..60eab4d4f79 100644
--- a/src/mongo/db/query/SConscript
+++ b/src/mongo/db/query/SConscript
@@ -81,6 +81,7 @@ env.Library(
'$BUILD_DIR/mongo/db/catalog/index_catalog_entry',
"$BUILD_DIR/mongo/db/curop",
"$BUILD_DIR/mongo/db/exec/exec",
+ "$BUILD_DIR/mongo/db/repl/repl_coordinator_interface",
"$BUILD_DIR/mongo/db/s/sharding",
"$BUILD_DIR/mongo/db/storage/oplog_hack",
"$BUILD_DIR/mongo/util/elapsed_tracker",
diff --git a/src/mongo/db/query/find.cpp b/src/mongo/db/query/find.cpp
index da1d0d5c6ea..da2a71e2068 100644
--- a/src/mongo/db/query/find.cpp
+++ b/src/mongo/db/query/find.cpp
@@ -364,6 +364,11 @@ Message getMore(OperationContext* opCtx,
if (cc->isReadCommitted())
uassertStatusOK(opCtx->recoveryUnit()->setReadFromMajorityCommittedSnapshot());
+ uassert(40548,
+ "OP_GET_MORE operations are not supported on tailable aggregations. Only clients "
+ "which support the getMore command can be used on tailable aggregations.",
+ readLock || !isCursorAwaitData(cc));
+
// If the operation that spawned this cursor had a time limit set, apply leftover
// time to this getmore.
if (cc->getLeftoverMaxTimeMicros() < Microseconds::max()) {
diff --git a/src/mongo/db/query/plan_executor.cpp b/src/mongo/db/query/plan_executor.cpp
index 2789e660b76..17aa8d42f02 100644
--- a/src/mongo/db/query/plan_executor.cpp
+++ b/src/mongo/db/query/plan_executor.cpp
@@ -32,6 +32,8 @@
#include "mongo/bson/simple_bsonobj_comparator.h"
#include "mongo/db/catalog/collection.h"
+#include "mongo/db/catalog/database.h"
+#include "mongo/db/catalog/database_holder.h"
#include "mongo/db/concurrency/write_conflict_exception.h"
#include "mongo/db/curop.h"
#include "mongo/db/exec/cached_plan.h"
@@ -43,10 +45,12 @@
#include "mongo/db/exec/working_set.h"
#include "mongo/db/exec/working_set_common.h"
#include "mongo/db/query/plan_yield_policy.h"
+#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/db/service_context.h"
#include "mongo/db/storage/record_fetcher.h"
#include "mongo/stdx/memory.h"
#include "mongo/util/fail_point_service.h"
+#include "mongo/util/scopeguard.h"
#include "mongo/util/stacktrace.h"
namespace mongo {
@@ -56,6 +60,11 @@ using std::string;
using std::unique_ptr;
using std::vector;
+const OperationContext::Decoration<bool> shouldWaitForInserts =
+ OperationContext::declareDecoration<bool>();
+const OperationContext::Decoration<repl::OpTime> clientsLastKnownCommittedOpTime =
+ OperationContext::declareDecoration<repl::OpTime>();
+
namespace {
namespace {
@@ -380,6 +389,50 @@ PlanExecutor::ExecState PlanExecutor::getNextSnapshotted(Snapshotted<BSONObj>* o
return getNextImpl(objOut, dlOut);
}
+
+bool PlanExecutor::shouldWaitForInserts() {
+ // If this is an awaitData-respecting operation and we have time left and we're not interrupted,
+ // we should wait for inserts.
+ if (mongo::shouldWaitForInserts(_opCtx) && _opCtx->checkForInterruptNoAssert().isOK() &&
+ _opCtx->getRemainingMaxTimeMicros() > Microseconds::zero()) {
+ // For operations with a last committed opTime, we should not wait if the replication
+ // coordinator's lastCommittedOpTime has changed.
+ if (!clientsLastKnownCommittedOpTime(_opCtx).isNull()) {
+ auto replCoord = repl::ReplicationCoordinator::get(_opCtx);
+ return clientsLastKnownCommittedOpTime(_opCtx) == replCoord->getLastCommittedOpTime();
+ }
+ return true;
+ }
+ return false;
+}
+
+bool PlanExecutor::waitForInserts() {
+ // If we cannot yield, we should retry immediately.
+ if (!_yieldPolicy->canReleaseLocksDuringExecution())
+ return true;
+
+ // We can only wait if we have a collection; otherwise retry immediately.
+ dassert(_opCtx->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IS));
+ auto db = dbHolder().get(_opCtx, _nss.db());
+ if (!db)
+ return true;
+ auto collection = db->getCollection(_opCtx, _nss);
+ if (!collection)
+ return true;
+
+ auto notifier = collection->getCappedInsertNotifier();
+ uint64_t notifierVersion = notifier->getVersion();
+ auto curOp = CurOp::get(_opCtx);
+ curOp->pauseTimer();
+ ON_BLOCK_EXIT([curOp] { curOp->resumeTimer(); });
+ auto opCtx = _opCtx;
+ bool yieldResult = _yieldPolicy->yield(nullptr, [opCtx, notifier, notifierVersion] {
+ const auto timeout = opCtx->getRemainingMaxTimeMicros();
+ notifier->wait(notifierVersion, timeout);
+ });
+ return yieldResult;
+}
+
PlanExecutor::ExecState PlanExecutor::getNextImpl(Snapshotted<BSONObj>* objOut, RecordId* dlOut) {
if (MONGO_FAIL_POINT(planExecutorAlwaysFails)) {
Status status(ErrorCodes::OperationFailed,
@@ -508,7 +561,23 @@ PlanExecutor::ExecState PlanExecutor::getNextImpl(Snapshotted<BSONObj>* objOut,
} else if (PlanStage::NEED_TIME == code) {
// Fall through to yield check at end of large conditional.
} else if (PlanStage::IS_EOF == code) {
- return PlanExecutor::IS_EOF;
+ if (shouldWaitForInserts()) {
+ const bool locksReacquiredAfterYield = waitForInserts();
+ if (locksReacquiredAfterYield) {
+ // There may be more results, try to get more data.
+ continue;
+ }
+ invariant(isMarkedAsKilled());
+ if (objOut) {
+ Status status(ErrorCodes::OperationFailed,
+ str::stream() << "Operation aborted because: " << *_killReason);
+ *objOut = Snapshotted<BSONObj>(
+ SnapshotId(), WorkingSetCommon::buildMemberStatusObject(status));
+ }
+ return PlanExecutor::DEAD;
+ } else {
+ return PlanExecutor::IS_EOF;
+ }
} else {
invariant(PlanStage::DEAD == code || PlanStage::FAILURE == code);
diff --git a/src/mongo/db/query/plan_executor.h b/src/mongo/db/query/plan_executor.h
index 48fed0eab0b..ba565e6b49c 100644
--- a/src/mongo/db/query/plan_executor.h
+++ b/src/mongo/db/query/plan_executor.h
@@ -51,6 +51,23 @@ struct PlanStageStats;
class WorkingSet;
/**
+ * If true, when no results are available from a plan, then instead of returning immediately, the
+ * system should wait up to the length of the operation deadline for data to be inserted which
+ * causes results to become available.
+ */
+extern const OperationContext::Decoration<bool> shouldWaitForInserts;
+
+/**
+ * If a getMore command specified a lastKnownCommittedOpTime (as secondaries do), we want to stop
+ * waiting for new data as soon as the committed op time changes.
+ *
+ * 'clientsLastKnownCommittedOpTime' represents the time passed to the getMore command.
+ * If the replication coordinator ever reports a higher committed op time, we should stop waiting
+ * for inserts and return immediately to speed up the propagation of commit level changes.
+ */
+extern const OperationContext::Decoration<repl::OpTime> clientsLastKnownCommittedOpTime;
+
+/**
* A PlanExecutor is the abstraction that knows how to crank a tree of stages into execution.
* The executor is usually part of a larger abstraction that is interacting with the cache
* and/or the query optimizer.
@@ -425,6 +442,16 @@ public:
}
private:
+ // Returns true if the PlanExecutor should wait for data to be inserted, which is when a getMore
+ // is called on a tailable and awaitData cursor on a capped collection. Returns false if an EOF
+ // should be returned immediately.
+ bool shouldWaitForInserts();
+
+ // Yields locks and waits for inserts to the collection. Returns true if there may be new
+ // inserts, false if there is a timeout or an interrupt. If this planExecutor cannot yield,
+ // returns true immediately.
+ bool waitForInserts();
+
ExecState getNextImpl(Snapshotted<BSONObj>* objOut, RecordId* dlOut);
/**
diff --git a/src/mongo/db/query/plan_yield_policy.cpp b/src/mongo/db/query/plan_yield_policy.cpp
index 52633ea0185..c5b54a90afb 100644
--- a/src/mongo/db/query/plan_yield_policy.cpp
+++ b/src/mongo/db/query/plan_yield_policy.cpp
@@ -71,7 +71,19 @@ void PlanYieldPolicy::resetTimer() {
_elapsedTracker.resetLastTime();
}
-bool PlanYieldPolicy::yield(RecordFetcher* fetcher) {
+bool PlanYieldPolicy::yield(RecordFetcher* recordFetcher) {
+ invariant(_planYielding);
+ if (recordFetcher) {
+ OperationContext* opCtx = _planYielding->getOpCtx();
+ return yield([recordFetcher, opCtx] { recordFetcher->setup(opCtx); },
+ [recordFetcher] { recordFetcher->fetch(); });
+ } else {
+ return yield(nullptr, nullptr);
+ }
+}
+
+bool PlanYieldPolicy::yield(stdx::function<void()> beforeYieldingFn,
+ stdx::function<void()> whileYieldingFn) {
invariant(_planYielding);
invariant(canAutoYield());
@@ -107,7 +119,9 @@ bool PlanYieldPolicy::yield(RecordFetcher* fetcher) {
opCtx->recoveryUnit()->abandonSnapshot();
} else {
// Release and reacquire locks.
- QueryYield::yieldAllLocks(opCtx, fetcher, _planYielding->nss());
+ if (beforeYieldingFn)
+ beforeYieldingFn();
+ QueryYield::yieldAllLocks(opCtx, whileYieldingFn, _planYielding->nss());
}
return _planYielding->restoreStateWithoutRetrying();
diff --git a/src/mongo/db/query/plan_yield_policy.h b/src/mongo/db/query/plan_yield_policy.h
index 892fbafe350..cdf93f0219a 100644
--- a/src/mongo/db/query/plan_yield_policy.h
+++ b/src/mongo/db/query/plan_yield_policy.h
@@ -30,6 +30,7 @@
#include "mongo/db/catalog/collection.h"
#include "mongo/db/query/plan_executor.h"
+#include "mongo/stdx/functional.h"
#include "mongo/util/elapsed_tracker.h"
namespace mongo {
@@ -74,6 +75,12 @@ public:
bool yield(RecordFetcher* fetcher = NULL);
/**
+ * More generic version of yield() above. This version calls 'beforeYieldingFn' immediately
+ * before locks are yielded (if they are), and 'whileYieldingFn' before locks are restored.
+ */
+ bool yield(stdx::function<void()> beforeYieldingFn, stdx::function<void()> whileYieldingFn);
+
+ /**
* All calls to shouldYield() will return true until the next call to yield.
*/
void forceYield() {
diff --git a/src/mongo/db/query/query_yield.cpp b/src/mongo/db/query/query_yield.cpp
index 6e1de753010..e731383a92d 100644
--- a/src/mongo/db/query/query_yield.cpp
+++ b/src/mongo/db/query/query_yield.cpp
@@ -25,7 +25,6 @@
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
-
#include "mongo/platform/basic.h"
#include "mongo/db/query/query_yield.h"
@@ -46,23 +45,18 @@ MONGO_FP_DECLARE(setYieldAllLocksWait);
// static
void QueryYield::yieldAllLocks(OperationContext* opCtx,
- RecordFetcher* fetcher,
+ stdx::function<void()> whileYieldingFn,
const NamespaceString& planExecNS) {
// Things have to happen here in a specific order:
- // 1) Tell the RecordFetcher to do any setup which needs to happen inside locks
- // 2) Release lock mgr locks
- // 3) Go to sleep
- // 4) Touch the record we're yielding on, if there is one (RecordFetcher::fetch)
- // 5) Reacquire lock mgr locks
+ // * Release lock mgr locks
+ // * Go to sleep
+ // * Call the whileYieldingFn
+ // * Reacquire lock mgr locks
Locker* locker = opCtx->lockState();
Locker::LockSnapshot snapshot;
- if (fetcher) {
- fetcher->setup(opCtx);
- }
-
// Nothing was unlocked, just return, yielding is pointless.
if (!locker->saveLockStateAndUnlock(&snapshot)) {
return;
@@ -85,8 +79,8 @@ void QueryYield::yieldAllLocks(OperationContext* opCtx,
}
}
- if (fetcher) {
- fetcher->fetch();
+ if (whileYieldingFn) {
+ whileYieldingFn();
}
locker->restoreLockState(snapshot);
diff --git a/src/mongo/db/query/query_yield.h b/src/mongo/db/query/query_yield.h
index 7d98b299484..a4fbba72fe7 100644
--- a/src/mongo/db/query/query_yield.h
+++ b/src/mongo/db/query/query_yield.h
@@ -29,6 +29,7 @@
#pragma once
#include "mongo/db/namespace_string.h"
+#include "mongo/stdx/functional.h"
namespace mongo {
@@ -47,9 +48,11 @@ public:
* switch to another thread, and then reacquires all locks.
*
* If in a nested context (eg DBDirectClient), does nothing.
+ *
+ * The whileYieldingFn will be executed after unlocking the locks and before re-acquiring them.
*/
static void yieldAllLocks(OperationContext* opCtx,
- RecordFetcher* fetcher,
+ stdx::function<void()> whileYieldingFn,
const NamespaceString& planExecNS);
};