summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Russotto <matthew.russotto@10gen.com>2017-07-14 11:39:03 -0400
committerMatthew Russotto <matthew.russotto@10gen.com>2017-07-14 15:21:08 -0400
commit61453cc84594ce4d3bcea551d2a6e9cce070fc1d (patch)
treef42c29a7313665dab224de341d9c5cab452972f1
parenta9a2772d214b6839ebffaa8c547c1628ffe62e99 (diff)
downloadmongo-61453cc84594ce4d3bcea551d2a6e9cce070fc1d.tar.gz
Revert "SERVER-29128 Make $changeNotification stage return a tailable, awaitData cursor that continuously gives out oplog entries"
This reverts commit 3bab15739e421e9eed4bf180cbcf5c7392a9a90d.
-rw-r--r--jstests/aggregation/sources/changeNotification/change_notification.js158
-rw-r--r--jstests/noPassthrough/awaitdata_getmore_cmd.js50
-rw-r--r--src/mongo/db/clientcursor.h27
-rw-r--r--src/mongo/db/commands/getmore_cmd.cpp73
-rw-r--r--src/mongo/db/commands/run_aggregate.cpp31
-rw-r--r--src/mongo/db/pipeline/document_source_change_notification.cpp4
-rw-r--r--src/mongo/db/pipeline/document_source_change_notification_test.cpp5
-rw-r--r--src/mongo/db/pipeline/document_source_cursor.cpp12
-rw-r--r--src/mongo/db/pipeline/expression_context.h11
-rw-r--r--src/mongo/db/pipeline/pipeline_d.cpp8
-rw-r--r--src/mongo/db/query/find.cpp5
-rw-r--r--src/mongo/db/query/plan_executor.cpp59
-rw-r--r--src/mongo/db/query/plan_executor.h17
-rw-r--r--src/mongo/db/query/plan_yield_policy.cpp18
-rw-r--r--src/mongo/db/query/plan_yield_policy.h7
-rw-r--r--src/mongo/db/query/query_yield.cpp20
-rw-r--r--src/mongo/db/query/query_yield.h5
17 files changed, 97 insertions, 413 deletions
diff --git a/jstests/aggregation/sources/changeNotification/change_notification.js b/jstests/aggregation/sources/changeNotification/change_notification.js
index e0df38950d6..7636b02fd53 100644
--- a/jstests/aggregation/sources/changeNotification/change_notification.js
+++ b/jstests/aggregation/sources/changeNotification/change_notification.js
@@ -4,28 +4,20 @@
(function() {
"use strict";
- var oplogProjection = {$project: {"_id.ts": 0}};
-
// Helper for testing that pipeline returns correct set of results.
function testPipeline(pipeline, expectedResult, collection) {
- // Limit to the last N documents from the end of the oplog, because currently
- // $changeNotification always comes from the start of the oplog.
- pipeline.push({$sort: {"_id.ts": -1}});
- if (expectedResult.length > 0) {
- pipeline.push({$limit: expectedResult.length});
- }
// Strip the oplog fields we aren't testing.
- pipeline.push(oplogProjection);
- assert.docEq(collection.aggregate(pipeline).toArray().reverse(), expectedResult);
+ pipeline.push({$limit: 1});
+ pipeline.push({$project: {"_id.ts": 0}});
+ assert.docEq(collection.aggregate(pipeline).toArray(), expectedResult);
}
- let replTest = new ReplSetTest({name: 'changeNotificationTest', nodes: 1});
- let nodes = replTest.startSet();
+ var replTest = new ReplSetTest({name: 'changeNotificationTest', nodes: 1});
+ var nodes = replTest.startSet();
replTest.initiate();
replTest.awaitReplication();
db = replTest.getPrimary().getDB('test');
- db.getMongo().forceReadMode('commands');
jsTestLog("Testing single insert");
assert.writeOK(db.t1.insert({_id: 0, a: 1}));
@@ -137,145 +129,5 @@
assert.writeOK(db.t3.insert({_id: 101, renameCollection: "test.dne1", to: "test.dne2"}));
testPipeline([{$changeNotification: {}}], [], db.dne1);
testPipeline([{$changeNotification: {}}], [], db.dne2);
-
- // Now make sure the cursor behaves like a tailable awaitData cursor.
- jsTestLog("Testing tailability");
- let tailableCursor = db.tailable1.aggregate([{$changeNotification: {}}, oplogProjection]);
- assert(!tailableCursor.hasNext());
- assert.writeOK(db.tailable1.insert({_id: 101, a: 1}));
- assert(tailableCursor.hasNext());
- assert.docEq(tailableCursor.next(), {
- "_id": {
- "_id": 101,
- "ns": "test.tailable1",
- },
- "documentKey": {"_id": 101},
- "newDocument": {"_id": 101, "a": 1},
- "ns": {"coll": "tailable1", "db": "test"},
- "operationType": "insert"
- });
-
- jsTestLog("Testing awaitdata");
- let res = assert.commandWorked(db.runCommand({
- aggregate: "tailable2",
- pipeline: [{$changeNotification: {}}, oplogProjection],
- cursor: {}
- }));
- let aggcursor = res.cursor;
-
- // We should get a valid cursor.
- assert.neq(aggcursor.id, 0);
-
- // Initial batch size should be zero as there should be no data.
- assert.eq(aggcursor.firstBatch.length, 0);
-
- // No data, so should return no results, but cursor should remain valid.
- res = assert.commandWorked(
- db.runCommand({getMore: aggcursor.id, collection: "tailable2", maxTimeMS: 50}));
- aggcursor = res.cursor;
- assert.neq(aggcursor.id, 0);
- assert.eq(aggcursor.nextBatch.length, 0);
-
- // Now insert something in parallel while waiting for it.
- let insertshell = startParallelShell(function() {
- // Wait for the getMore to appear in currentop.
- assert.soon(function() {
- return db.currentOp({op: "getmore", "command.collection": "tailable2"}).inprog.length ==
- 1;
- });
- assert.writeOK(db.tailable2.insert({_id: 102, a: 2}));
- });
- res = assert.commandWorked(
- db.runCommand({getMore: aggcursor.id, collection: "tailable2", maxTimeMS: 5 * 60 * 1000}));
- aggcursor = res.cursor;
- assert.eq(aggcursor.nextBatch.length, 1);
- assert.docEq(aggcursor.nextBatch[0], {
- "_id": {
- "_id": 102,
- "ns": "test.tailable2",
- },
- "documentKey": {"_id": 102},
- "newDocument": {"_id": 102, "a": 2},
- "ns": {"coll": "tailable2", "db": "test"},
- "operationType": "insert"
- });
-
- // Wait for insert shell to terminate.
- insertshell();
-
- jsTestLog("Testing awaitdata - no wake on insert to another collection");
- res = assert.commandWorked(db.runCommand({
- aggregate: "tailable3",
- pipeline: [{$changeNotification: {}}, oplogProjection],
- cursor: {}
- }));
- aggcursor = res.cursor;
- // We should get a valid cursor.
- assert.neq(aggcursor.id, 0);
-
- // Initial batch size should be zero as there should be no data.
- assert.eq(aggcursor.firstBatch.length, 0);
-
- // Now insert something in a different collection in parallel while waiting.
- insertshell = startParallelShell(function() {
- // Wait for the getMore to appear in currentop.
- assert.soon(function() {
- return db.currentOp({op: "getmore", "command.collection": "tailable3"}).inprog.length ==
- 1;
- });
- assert.writeOK(db.tailable3a.insert({_id: 103, a: 2}));
- });
- let start = new Date();
- res = assert.commandWorked(
- db.runCommand({getMore: aggcursor.id, collection: "tailable3", maxTimeMS: 1000}));
- let diff = (new Date()).getTime() - start.getTime();
- assert.gt(diff, 900, "AwaitData returned prematurely on insert to unrelated collection.");
- aggcursor = res.cursor;
- // Cursor should be valid with no data.
- assert.neq(aggcursor.id, 0);
- assert.eq(aggcursor.nextBatch.length, 0);
-
- // Wait for insert shell to terminate.
- insertshell();
-
- // This time, put something in a different collection, then in the correct collection.
- // We should wake up with just the correct data.
- insertshell = startParallelShell(function() {
- // Wait for the getMore to appear in currentop.
- assert.soon(function() {
- return db.currentOp({op: "getmore", "command.collection": "tailable3"}).inprog.length ==
- 1;
- });
- assert.writeOK(db.tailable3a.insert({_id: 104, a: 2}));
- assert(db.currentOp({op: "getmore", "command.collection": "tailable3"}).inprog.length == 1);
- assert.writeOK(db.tailable3.insert({_id: 105, a: 3}));
- });
- res = assert.commandWorked(
- db.runCommand({getMore: aggcursor.id, collection: "tailable3", maxTimeMS: 5 * 60 * 1000}));
- aggcursor = res.cursor;
- assert.neq(aggcursor.id, 0);
- assert.eq(aggcursor.nextBatch.length, 1);
- assert.docEq(aggcursor.nextBatch[0], {
- "_id": {
- "_id": 105,
- "ns": "test.tailable3",
- },
- "documentKey": {"_id": 105},
- "newDocument": {"_id": 105, "a": 3},
- "ns": {"coll": "tailable3", "db": "test"},
- "operationType": "insert"
- });
-
- // Wait for insert shell to terminate.
- insertshell();
-
- jsTestLog("Ensuring attempt to read with legacy operations fails.");
- db.getMongo().forceReadMode('legacy');
- tailableCursor = db.tailable2.aggregate([{$changeNotification: {}}, oplogProjection],
- {cursor: {batchSize: 0}});
- assert.throws(function() {
- tailableCursor.next();
- }, [], "Legacy getMore expected to fail on changeNotification cursor.");
-
replTest.stopSet();
}());
diff --git a/jstests/noPassthrough/awaitdata_getmore_cmd.js b/jstests/noPassthrough/awaitdata_getmore_cmd.js
index 5de2c8f83c1..ef47efb0e67 100644
--- a/jstests/noPassthrough/awaitdata_getmore_cmd.js
+++ b/jstests/noPassthrough/awaitdata_getmore_cmd.js
@@ -28,13 +28,8 @@
cmdRes = db.runCommand({find: collName, tailable: true, awaitData: true});
assert.commandFailed(cmdRes);
- // With a non-existent collection, should succeed but return no data and a closed cursor.
- coll.drop();
- cmdRes = assert.commandWorked(db.runCommand({find: collName, tailable: true}));
- assert.eq(cmdRes.cursor.id, NumberLong(0));
- assert.eq(cmdRes.cursor.firstBatch.length, 0);
-
// Create a capped collection with 10 documents.
+ coll.drop();
assert.commandWorked(db.createCollection(collName, {capped: true, size: 2048}));
for (var i = 0; i < 10; i++) {
assert.writeOK(coll.insert({a: i}));
@@ -128,47 +123,4 @@
}
assert.gte((new Date()) - now, 2000);
- // Test filtered inserts while writing to a capped collection.
- // Find with a filter which doesn't match any documents in the collection.
- cmdRes = assert.commandWorked(db.runCommand(
- {find: collName, batchSize: 2, filter: {x: 1}, awaitData: true, tailable: true}));
- assert.gt(cmdRes.cursor.id, NumberLong(0));
- assert.eq(cmdRes.cursor.ns, coll.getFullName());
- assert.eq(cmdRes.cursor.firstBatch.length, 0);
-
- // getMore should time out if we insert a non-matching document.
- let insertshell = startParallelShell(function() {
- assert.soon(function() {
- return db.currentOp({op: "getmore", "command.collection": "await_data"})
- .inprog.length == 1;
- });
- assert.writeOK(db.await_data.insert({x: 0}));
- }, mongo.port);
-
- now = new Date();
- cmdRes = db.runCommand({getMore: cmdRes.cursor.id, collection: collName, maxTimeMS: 4000});
- assert.commandWorked(cmdRes);
- assert.gt(cmdRes.cursor.id, NumberLong(0));
- assert.eq(cmdRes.cursor.ns, coll.getFullName());
- assert.eq(cmdRes.cursor.nextBatch.length, 0);
- assert.gte((new Date()) - now,
- 4000,
- "Insert not matching filter caused awaitData getMore to return prematurely.");
- insertshell();
-
- // getMore should succeed if we insert a non-matching document followed by a matching one.
- insertshell = startParallelShell(function() {
- assert.writeOK(db.await_data.insert({x: 0}));
- assert.writeOK(db.await_data.insert({_id: "match", x: 1}));
- jsTestLog("Written");
- }, mongo.port);
-
- cmdRes =
- db.runCommand({getMore: cmdRes.cursor.id, collection: collName, maxTimeMS: 5 * 60 * 1000});
- assert.commandWorked(cmdRes);
- assert.gt(cmdRes.cursor.id, NumberLong(0));
- assert.eq(cmdRes.cursor.ns, coll.getFullName());
- assert.eq(cmdRes.cursor.nextBatch.length, 1);
- assert.docEq(cmdRes.cursor.nextBatch[0], {_id: "match", x: 1});
- insertshell();
})();
diff --git a/src/mongo/db/clientcursor.h b/src/mongo/db/clientcursor.h
index f75d41e3089..32fb018a8e1 100644
--- a/src/mongo/db/clientcursor.h
+++ b/src/mongo/db/clientcursor.h
@@ -71,20 +71,6 @@ struct ClientCursorParams {
}
}
- void setTailable(bool tailable) {
- if (tailable)
- queryOptions |= QueryOption_CursorTailable;
- else
- queryOptions &= ~QueryOption_CursorTailable;
- }
-
- void setAwaitData(bool awaitData) {
- if (awaitData)
- queryOptions |= QueryOption_AwaitData;
- else
- queryOptions &= ~QueryOption_AwaitData;
- }
-
std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> exec;
const NamespaceString nss;
std::vector<UserName> authenticatedUsers;
@@ -141,23 +127,10 @@ public:
return _exec.get();
}
- /**
- * Returns the query options bitmask. If you'd like to know if the cursor is tailable or
- * awaitData, prefer using the specific methods isTailable() and isAwaitData() over using this
- * method.
- */
int queryOptions() const {
return _queryOptions;
}
- bool isTailable() const {
- return _queryOptions & QueryOption_CursorTailable;
- }
-
- bool isAwaitData() const {
- return _queryOptions & QueryOption_AwaitData;
- }
-
const BSONObj& getOriginatingCommandObj() const {
return _originatingCommand;
}
diff --git a/src/mongo/db/commands/getmore_cmd.cpp b/src/mongo/db/commands/getmore_cmd.cpp
index 48b77395635..e429c268dd6 100644
--- a/src/mongo/db/commands/getmore_cmd.cpp
+++ b/src/mongo/db/commands/getmore_cmd.cpp
@@ -47,7 +47,6 @@
#include "mongo/db/query/find.h"
#include "mongo/db/query/find_common.h"
#include "mongo/db/query/getmore_request.h"
-#include "mongo/db/query/plan_executor.h"
#include "mongo/db/query/plan_summary_stats.h"
#include "mongo/db/repl/oplog.h"
#include "mongo/db/repl/replication_coordinator_global.h"
@@ -270,6 +269,11 @@ public:
// Validation related to awaitData.
if (isCursorAwaitData(cursor)) {
invariant(isCursorTailable(cursor));
+
+ if (CursorManager::isGloballyManagedCursor(request.cursorid)) {
+ Status status(ErrorCodes::BadValue, "awaitData cannot be set on this cursor");
+ return appendCommandStatus(result, status);
+ }
}
if (request.awaitDataTimeout && !isCursorAwaitData(cursor)) {
@@ -318,6 +322,21 @@ public:
}
}
+ uint64_t notifierVersion = 0;
+ std::shared_ptr<CappedInsertNotifier> notifier;
+ if (isCursorAwaitData(cursor)) {
+ invariant(readLock->getCollection()->isCapped());
+ // Retrieve the notifier which we will wait on until new data arrives. We make sure
+ // to do this in the lock because once we drop the lock it is possible for the
+ // collection to become invalid. The notifier itself will outlive the collection if
+ // the collection is dropped, as we keep a shared_ptr to it.
+ notifier = readLock->getCollection()->getCappedInsertNotifier();
+
+ // Must get the version before we call generateBatch in case a write comes in after
+ // that call and before we call wait on the notifier.
+ notifierVersion = notifier->getVersion();
+ }
+
CursorId respondWithId = 0;
CursorResponseBuilder nextBatch(/*isInitialResponse*/ false, &result);
BSONObj obj;
@@ -333,19 +352,46 @@ public:
PlanSummaryStats preExecutionStats;
Explain::getSummaryStats(*exec, &preExecutionStats);
- // Mark this as an AwaitData operation if appropriate.
- if (isCursorAwaitData(cursor)) {
- // Do not wait if we need to update the commit time; just get whatever is available
- // now.
+ Status batchStatus = generateBatch(cursor, request, &nextBatch, &state, &numResults);
+ if (!batchStatus.isOK()) {
+ return appendCommandStatus(result, batchStatus);
+ }
+
+ // If this is an await data cursor, and we hit EOF without generating any results, then
+ // we block waiting for new data to arrive.
+ if (isCursorAwaitData(cursor) && state == PlanExecutor::IS_EOF && numResults == 0) {
auto replCoord = repl::ReplicationCoordinator::get(opCtx);
+ // Return immediately if we need to update the commit time.
if (!request.lastKnownCommittedOpTime ||
- (request.lastKnownCommittedOpTime == replCoord->getLastCommittedOpTime()))
- shouldWaitForInserts(opCtx) = true;
- }
+ (request.lastKnownCommittedOpTime == replCoord->getLastCommittedOpTime())) {
+ // Retrieve the notifier which we will wait on until new data arrives. We make sure
+ // to do this in the lock because once we drop the lock it is possible for the
+ // collection to become invalid. The notifier itself will outlive the collection if
+ // the collection is dropped, as we keep a shared_ptr to it.
+ auto notifier = readLock->getCollection()->getCappedInsertNotifier();
+
+ // Save the PlanExecutor and drop our locks.
+ exec->saveState();
+ readLock.reset();
- Status batchStatus = generateBatch(opCtx, cursor, request, &nextBatch, &state, &numResults);
- if (!batchStatus.isOK()) {
- return appendCommandStatus(result, batchStatus);
+ // Block waiting for data. Time spent blocking is not counted towards the total
+ // operation latency.
+ curOp->pauseTimer();
+ const auto timeout = opCtx->getRemainingMaxTimeMicros();
+ notifier->wait(notifierVersion, timeout);
+ notifier.reset();
+ curOp->resumeTimer();
+
+ readLock.emplace(opCtx, request.nss);
+ exec->restoreState();
+
+ // We woke up because either the timed_wait expired, or there was more data. Either
+ // way, attempt to generate another batch of results.
+ batchStatus = generateBatch(cursor, request, &nextBatch, &state, &numResults);
+ if (!batchStatus.isOK()) {
+ return appendCommandStatus(result, batchStatus);
+ }
+ }
}
PlanSummaryStats postExecutionStats;
@@ -426,8 +472,7 @@ public:
* Returns an OK status if the batch was successfully generated, and a non-OK status if the
* PlanExecutor encounters a failure.
*/
- Status generateBatch(OperationContext* opCtx,
- ClientCursor* cursor,
+ Status generateBatch(ClientCursor* cursor,
const GetMoreRequest& request,
CursorResponseBuilder* nextBatch,
PlanExecutor::ExecState* state,
@@ -449,8 +494,6 @@ public:
break;
}
- // As soon as we get a result, this operation no longer waits.
- shouldWaitForInserts(opCtx) = false;
// Add result to output buffer.
nextBatch->append(obj);
(*numResults)++;
diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp
index 1bee5cb5f07..7b4d40f0bdb 100644
--- a/src/mongo/db/commands/run_aggregate.cpp
+++ b/src/mongo/db/commands/run_aggregate.cpp
@@ -99,10 +99,8 @@ bool handleCursorCommand(OperationContext* opCtx,
// do it when batchSize is 0 since that indicates a desire for a fast return.
PlanExecutor::ExecState state;
if ((state = cursor->getExecutor()->getNext(&next, nullptr)) == PlanExecutor::IS_EOF) {
- if (!cursor->isTailable()) {
- // make it an obvious error to use cursor or executor after this point
- cursor = nullptr;
- }
+ // make it an obvious error to use cursor or executor after this point
+ cursor = nullptr;
break;
}
@@ -395,10 +393,6 @@ Status runAggregate(OperationContext* opCtx,
uassertStatusOK(resolveInvolvedNamespaces(opCtx, request))));
expCtx->tempDir = storageGlobalParams.dbpath + "/_tmp";
- if (liteParsedPipeline.startsWithChangeNotification()) {
- expCtx->tailableMode = ExpressionContext::TailableMode::kTailableAndAwaitData;
- }
-
// Parse the pipeline.
auto statusWithPipeline = Pipeline::parse(request.getPipeline(), expCtx);
if (!statusWithPipeline.isOK()) {
@@ -457,20 +451,13 @@ Status runAggregate(OperationContext* opCtx,
// cursor manager. The global cursor manager does not deliver invalidations or kill
// notifications; the underlying PlanExecutor(s) used by the pipeline will be receiving
// invalidations and kill notifications themselves, not the cursor we create here.
- ClientCursorParams cursorParams(
- std::move(exec),
- origNss,
- AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserNames(),
- opCtx->recoveryUnit()->isReadingFromMajorityCommittedSnapshot(),
- cmdObj);
- if (expCtx->tailableMode == ExpressionContext::TailableMode::kTailableAndAwaitData) {
- cursorParams.setTailable(true);
- cursorParams.setAwaitData(true);
- }
-
- auto pin =
- CursorManager::getGlobalCursorManager()->registerCursor(opCtx, std::move(cursorParams));
-
+ auto pin = CursorManager::getGlobalCursorManager()->registerCursor(
+ opCtx,
+ {std::move(exec),
+ origNss,
+ AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserNames(),
+ opCtx->recoveryUnit()->isReadingFromMajorityCommittedSnapshot(),
+ cmdObj});
ScopeGuard cursorFreer = MakeGuard(&ClientCursorPin::deleteUnderlying, &pin);
// If both explain and cursor are specified, explain wins.
diff --git a/src/mongo/db/pipeline/document_source_change_notification.cpp b/src/mongo/db/pipeline/document_source_change_notification.cpp
index b8d7027fc5f..faf5bcf97ac 100644
--- a/src/mongo/db/pipeline/document_source_change_notification.cpp
+++ b/src/mongo/db/pipeline/document_source_change_notification.cpp
@@ -94,10 +94,12 @@ vector<intrusive_ptr<DocumentSource>> DocumentSourceChangeNotification::createFr
!expCtx->getCollator());
BSONObj matchObj = buildMatch(elem, expCtx->ns);
+ BSONObj sortObj = BSON("$sort" << BSON("ts" << -1));
auto matchSource = DocumentSourceMatch::createFromBson(matchObj.firstElement(), expCtx);
+ auto sortSource = DocumentSourceSort::createFromBson(sortObj.firstElement(), expCtx);
auto transformSource = createTransformationStage(expCtx);
- return {matchSource, transformSource};
+ return {matchSource, sortSource, transformSource};
}
intrusive_ptr<DocumentSource> DocumentSourceChangeNotification::createTransformationStage(
diff --git a/src/mongo/db/pipeline/document_source_change_notification_test.cpp b/src/mongo/db/pipeline/document_source_change_notification_test.cpp
index 0fd14fd9a92..0e2b53ea591 100644
--- a/src/mongo/db/pipeline/document_source_change_notification_test.cpp
+++ b/src/mongo/db/pipeline/document_source_change_notification_test.cpp
@@ -94,15 +94,14 @@ public:
}
};
-TEST_F(ChangeNotificationStageTest, StagesGeneratedCorrectly) {
+TEST_F(ChangeNotificationStageTest, Basic) {
const auto spec = fromjson("{$changeNotification: {}}");
vector<intrusive_ptr<DocumentSource>> result =
DocumentSourceChangeNotification::createFromBson(spec.firstElement(), getExpCtx());
- ASSERT_EQUALS(result.size(), 2UL);
+ ASSERT_EQUALS(result.size(), 3UL);
ASSERT_EQUALS(string(result[0]->getSourceName()), "$match");
- ASSERT_EQUALS(string(result[1]->getSourceName()), "$changeNotification");
// TODO: Check explain result.
}
diff --git a/src/mongo/db/pipeline/document_source_cursor.cpp b/src/mongo/db/pipeline/document_source_cursor.cpp
index f15bafd5b8b..9704d826e53 100644
--- a/src/mongo/db/pipeline/document_source_cursor.cpp
+++ b/src/mongo/db/pipeline/document_source_cursor.cpp
@@ -31,6 +31,7 @@
#include "mongo/db/pipeline/document_source_cursor.h"
#include "mongo/db/catalog/collection.h"
+#include "mongo/db/curop.h"
#include "mongo/db/db_raii.h"
#include "mongo/db/exec/working_set_common.h"
#include "mongo/db/pipeline/document.h"
@@ -98,21 +99,12 @@ void DocumentSourceCursor::loadBatch() {
memUsageBytes += _currentBatch.back().getApproximateSize();
- // As long as we're waiting for inserts, we shouldn't do any batching at this level
- // we need the whole pipeline to see each document to see if we should stop waiting.
- if (shouldWaitForInserts(pExpCtx->opCtx) ||
- memUsageBytes > internalDocumentSourceCursorBatchSizeBytes.load()) {
+ if (memUsageBytes > internalDocumentSourceCursorBatchSizeBytes.load()) {
// End this batch and prepare PlanExecutor for yielding.
_exec->saveState();
return;
}
}
- // Special case for tailable cursor -- EOF doesn't preclude more results, so keep
- // the PlanExecutor alive.
- if (state == PlanExecutor::IS_EOF && pExpCtx->isTailable()) {
- _exec->saveState();
- return;
- }
}
}
diff --git a/src/mongo/db/pipeline/expression_context.h b/src/mongo/db/pipeline/expression_context.h
index d04b095aa4b..e17128ec8a6 100644
--- a/src/mongo/db/pipeline/expression_context.h
+++ b/src/mongo/db/pipeline/expression_context.h
@@ -58,8 +58,6 @@ public:
std::vector<BSONObj> pipeline;
};
- enum class TailableMode { kNormal, kTailableAndAwaitData };
-
/**
* Constructs an ExpressionContext to be used for Pipeline parsing and evaluation.
* 'resolvedNamespaces' maps collection names (not full namespaces) to ResolvedNamespaces.
@@ -103,13 +101,6 @@ public:
return it->second;
};
- /**
- * Convenience call that returns true if the tailableMode indicate a tailable query.
- */
- bool isTailable() const {
- return tailableMode == ExpressionContext::TailableMode::kTailableAndAwaitData;
- }
-
// The explain verbosity requested by the user, or boost::none if no explain was requested.
boost::optional<ExplainOptions::Verbosity> explain;
@@ -130,8 +121,6 @@ public:
Variables variables;
VariablesParseState variablesParseState;
- TailableMode tailableMode = TailableMode::kNormal;
-
protected:
static const int kInterruptCheckPeriod = 128;
ExpressionContext() : variablesParseState(variables.useIdGenerator()) {}
diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp
index 697c639ccf7..ec471f41df5 100644
--- a/src/mongo/db/pipeline/pipeline_d.cpp
+++ b/src/mongo/db/pipeline/pipeline_d.cpp
@@ -392,14 +392,6 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> attemptToGetExe
const AggregationRequest* aggRequest,
const size_t plannerOpts) {
auto qr = stdx::make_unique<QueryRequest>(nss);
- switch (pExpCtx->tailableMode) {
- case ExpressionContext::TailableMode::kNormal:
- break;
- case ExpressionContext::TailableMode::kTailableAndAwaitData:
- qr->setTailable(true);
- qr->setAwaitData(true);
- break;
- }
qr->setFilter(queryObj);
qr->setProj(projectionObj);
qr->setSort(sortObj);
diff --git a/src/mongo/db/query/find.cpp b/src/mongo/db/query/find.cpp
index da2a71e2068..da1d0d5c6ea 100644
--- a/src/mongo/db/query/find.cpp
+++ b/src/mongo/db/query/find.cpp
@@ -364,11 +364,6 @@ Message getMore(OperationContext* opCtx,
if (cc->isReadCommitted())
uassertStatusOK(opCtx->recoveryUnit()->setReadFromMajorityCommittedSnapshot());
- uassert(40548,
- "OP_GET_MORE operations are not supported on tailable aggregations. Only clients "
- "which support the getMore command can be used on tailable aggregations.",
- readLock || !isCursorAwaitData(cc));
-
// If the operation that spawned this cursor had a time limit set, apply leftover
// time to this getmore.
if (cc->getLeftoverMaxTimeMicros() < Microseconds::max()) {
diff --git a/src/mongo/db/query/plan_executor.cpp b/src/mongo/db/query/plan_executor.cpp
index ff18a403286..2789e660b76 100644
--- a/src/mongo/db/query/plan_executor.cpp
+++ b/src/mongo/db/query/plan_executor.cpp
@@ -32,8 +32,6 @@
#include "mongo/bson/simple_bsonobj_comparator.h"
#include "mongo/db/catalog/collection.h"
-#include "mongo/db/catalog/database.h"
-#include "mongo/db/catalog/database_holder.h"
#include "mongo/db/concurrency/write_conflict_exception.h"
#include "mongo/db/curop.h"
#include "mongo/db/exec/cached_plan.h"
@@ -49,7 +47,6 @@
#include "mongo/db/storage/record_fetcher.h"
#include "mongo/stdx/memory.h"
#include "mongo/util/fail_point_service.h"
-#include "mongo/util/scopeguard.h"
#include "mongo/util/stacktrace.h"
namespace mongo {
@@ -59,9 +56,6 @@ using std::string;
using std::unique_ptr;
using std::vector;
-const OperationContext::Decoration<bool> shouldWaitForInserts =
- OperationContext::declareDecoration<bool>();
-
namespace {
namespace {
@@ -386,41 +380,6 @@ PlanExecutor::ExecState PlanExecutor::getNextSnapshotted(Snapshotted<BSONObj>* o
return getNextImpl(objOut, dlOut);
}
-
-bool PlanExecutor::shouldWaitForInserts() {
- // If this is an awaitData-respecting operation and we have time left and we're not interrupted,
- // we should wait for inserts.
- return mongo::shouldWaitForInserts(_opCtx) && _opCtx->checkForInterruptNoAssert().isOK() &&
- _opCtx->getRemainingMaxTimeMicros() > Microseconds::zero();
-}
-
-bool PlanExecutor::waitForInserts() {
- // If we cannot yield, we should retry immediately.
- if (!_yieldPolicy->canReleaseLocksDuringExecution())
- return true;
-
- // We can only wait if we have a collection; otherwise retry immediately.
- dassert(_opCtx->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IS));
- auto db = dbHolder().get(_opCtx, _nss.db());
- if (!db)
- return true;
- auto collection = db->getCollection(_opCtx, _nss);
- if (!collection)
- return true;
-
- auto notifier = collection->getCappedInsertNotifier();
- uint64_t notifierVersion = notifier->getVersion();
- auto curOp = CurOp::get(_opCtx);
- curOp->pauseTimer();
- ON_BLOCK_EXIT([curOp] { curOp->resumeTimer(); });
- auto opCtx = _opCtx;
- bool yieldResult = _yieldPolicy->yield(nullptr, [opCtx, notifier, notifierVersion] {
- const auto timeout = opCtx->getRemainingMaxTimeMicros();
- notifier->wait(notifierVersion, timeout);
- });
- return yieldResult;
-}
-
PlanExecutor::ExecState PlanExecutor::getNextImpl(Snapshotted<BSONObj>* objOut, RecordId* dlOut) {
if (MONGO_FAIL_POINT(planExecutorAlwaysFails)) {
Status status(ErrorCodes::OperationFailed,
@@ -549,23 +508,7 @@ PlanExecutor::ExecState PlanExecutor::getNextImpl(Snapshotted<BSONObj>* objOut,
} else if (PlanStage::NEED_TIME == code) {
// Fall through to yield check at end of large conditional.
} else if (PlanStage::IS_EOF == code) {
- if (shouldWaitForInserts()) {
- const bool locksReacquiredAfterYield = waitForInserts();
- if (locksReacquiredAfterYield) {
- // There may be more results, try to get more data.
- continue;
- }
- invariant(isMarkedAsKilled());
- if (objOut) {
- Status status(ErrorCodes::OperationFailed,
- str::stream() << "Operation aborted because: " << *_killReason);
- *objOut = Snapshotted<BSONObj>(
- SnapshotId(), WorkingSetCommon::buildMemberStatusObject(status));
- }
- return PlanExecutor::DEAD;
- } else {
- return PlanExecutor::IS_EOF;
- }
+ return PlanExecutor::IS_EOF;
} else {
invariant(PlanStage::DEAD == code || PlanStage::FAILURE == code);
diff --git a/src/mongo/db/query/plan_executor.h b/src/mongo/db/query/plan_executor.h
index dfd879bbccf..48fed0eab0b 100644
--- a/src/mongo/db/query/plan_executor.h
+++ b/src/mongo/db/query/plan_executor.h
@@ -51,13 +51,6 @@ struct PlanStageStats;
class WorkingSet;
/**
- * If true, when no results are available from a plan, then instead of returning immediately, the
- * system should wait up to the length of the operation deadline for data to be inserted which
- * causes results to become available.
- */
-extern const OperationContext::Decoration<bool> shouldWaitForInserts;
-
-/**
* A PlanExecutor is the abstraction that knows how to crank a tree of stages into execution.
* The executor is usually part of a larger abstraction that is interacting with the cache
* and/or the query optimizer.
@@ -432,16 +425,6 @@ public:
}
private:
- // Returns true if the PlanExecutor should wait for data to be inserted, which is when a getMore
- // is called on a tailable and awaitData cursor on a capped collection. Returns false if an EOF
- // should be returned immediately.
- bool shouldWaitForInserts();
-
- // Yields locks and waits for inserts to the collection. Returns true if there may be new
- // inserts, false if there is a timeout or an interrupt. If this planExecutor cannot yield,
- // returns true immediately.
- bool waitForInserts();
-
ExecState getNextImpl(Snapshotted<BSONObj>* objOut, RecordId* dlOut);
/**
diff --git a/src/mongo/db/query/plan_yield_policy.cpp b/src/mongo/db/query/plan_yield_policy.cpp
index c5b54a90afb..52633ea0185 100644
--- a/src/mongo/db/query/plan_yield_policy.cpp
+++ b/src/mongo/db/query/plan_yield_policy.cpp
@@ -71,19 +71,7 @@ void PlanYieldPolicy::resetTimer() {
_elapsedTracker.resetLastTime();
}
-bool PlanYieldPolicy::yield(RecordFetcher* recordFetcher) {
- invariant(_planYielding);
- if (recordFetcher) {
- OperationContext* opCtx = _planYielding->getOpCtx();
- return yield([recordFetcher, opCtx] { recordFetcher->setup(opCtx); },
- [recordFetcher] { recordFetcher->fetch(); });
- } else {
- return yield(nullptr, nullptr);
- }
-}
-
-bool PlanYieldPolicy::yield(stdx::function<void()> beforeYieldingFn,
- stdx::function<void()> whileYieldingFn) {
+bool PlanYieldPolicy::yield(RecordFetcher* fetcher) {
invariant(_planYielding);
invariant(canAutoYield());
@@ -119,9 +107,7 @@ bool PlanYieldPolicy::yield(stdx::function<void()> beforeYieldingFn,
opCtx->recoveryUnit()->abandonSnapshot();
} else {
// Release and reacquire locks.
- if (beforeYieldingFn)
- beforeYieldingFn();
- QueryYield::yieldAllLocks(opCtx, whileYieldingFn, _planYielding->nss());
+ QueryYield::yieldAllLocks(opCtx, fetcher, _planYielding->nss());
}
return _planYielding->restoreStateWithoutRetrying();
diff --git a/src/mongo/db/query/plan_yield_policy.h b/src/mongo/db/query/plan_yield_policy.h
index cdf93f0219a..892fbafe350 100644
--- a/src/mongo/db/query/plan_yield_policy.h
+++ b/src/mongo/db/query/plan_yield_policy.h
@@ -30,7 +30,6 @@
#include "mongo/db/catalog/collection.h"
#include "mongo/db/query/plan_executor.h"
-#include "mongo/stdx/functional.h"
#include "mongo/util/elapsed_tracker.h"
namespace mongo {
@@ -75,12 +74,6 @@ public:
bool yield(RecordFetcher* fetcher = NULL);
/**
- * More generic version of yield() above. This version calls 'beforeYieldingFn' immediately
- * before locks are yielded (if they are), and 'whileYieldingFn' before locks are restored.
- */
- bool yield(stdx::function<void()> beforeYieldingFn, stdx::function<void()> whileYieldingFn);
-
- /**
* All calls to shouldYield() will return true until the next call to yield.
*/
void forceYield() {
diff --git a/src/mongo/db/query/query_yield.cpp b/src/mongo/db/query/query_yield.cpp
index e731383a92d..6e1de753010 100644
--- a/src/mongo/db/query/query_yield.cpp
+++ b/src/mongo/db/query/query_yield.cpp
@@ -25,6 +25,7 @@
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
+
#include "mongo/platform/basic.h"
#include "mongo/db/query/query_yield.h"
@@ -45,18 +46,23 @@ MONGO_FP_DECLARE(setYieldAllLocksWait);
// static
void QueryYield::yieldAllLocks(OperationContext* opCtx,
- stdx::function<void()> whileYieldingFn,
+ RecordFetcher* fetcher,
const NamespaceString& planExecNS) {
// Things have to happen here in a specific order:
- // * Release lock mgr locks
- // * Go to sleep
- // * Call the whileYieldingFn
- // * Reacquire lock mgr locks
+ // 1) Tell the RecordFetcher to do any setup which needs to happen inside locks
+ // 2) Release lock mgr locks
+ // 3) Go to sleep
+ // 4) Touch the record we're yielding on, if there is one (RecordFetcher::fetch)
+ // 5) Reacquire lock mgr locks
Locker* locker = opCtx->lockState();
Locker::LockSnapshot snapshot;
+ if (fetcher) {
+ fetcher->setup(opCtx);
+ }
+
// Nothing was unlocked, just return, yielding is pointless.
if (!locker->saveLockStateAndUnlock(&snapshot)) {
return;
@@ -79,8 +85,8 @@ void QueryYield::yieldAllLocks(OperationContext* opCtx,
}
}
- if (whileYieldingFn) {
- whileYieldingFn();
+ if (fetcher) {
+ fetcher->fetch();
}
locker->restoreLockState(snapshot);
diff --git a/src/mongo/db/query/query_yield.h b/src/mongo/db/query/query_yield.h
index a4fbba72fe7..7d98b299484 100644
--- a/src/mongo/db/query/query_yield.h
+++ b/src/mongo/db/query/query_yield.h
@@ -29,7 +29,6 @@
#pragma once
#include "mongo/db/namespace_string.h"
-#include "mongo/stdx/functional.h"
namespace mongo {
@@ -48,11 +47,9 @@ public:
* switch to another thread, and then reacquires all locks.
*
* If in a nested context (eg DBDirectClient), does nothing.
- *
- * The whileYieldingFn will be executed after unlocking the locks and before re-acquiring them.
*/
static void yieldAllLocks(OperationContext* opCtx,
- stdx::function<void()> whileYieldingFn,
+ RecordFetcher* fetcher,
const NamespaceString& planExecNS);
};