diff options
author | Matthew Russotto <matthew.russotto@10gen.com> | 2017-07-14 11:39:03 -0400 |
---|---|---|
committer | Matthew Russotto <matthew.russotto@10gen.com> | 2017-07-14 15:21:08 -0400 |
commit | 61453cc84594ce4d3bcea551d2a6e9cce070fc1d (patch) | |
tree | f42c29a7313665dab224de341d9c5cab452972f1 | |
parent | a9a2772d214b6839ebffaa8c547c1628ffe62e99 (diff) | |
download | mongo-61453cc84594ce4d3bcea551d2a6e9cce070fc1d.tar.gz |
Revert "SERVER-29128 Make $changeNotification stage return a tailable, awaitData cursor that continuously gives out oplog entries"
This reverts commit 3bab15739e421e9eed4bf180cbcf5c7392a9a90d.
-rw-r--r-- | jstests/aggregation/sources/changeNotification/change_notification.js | 158 | ||||
-rw-r--r-- | jstests/noPassthrough/awaitdata_getmore_cmd.js | 50 | ||||
-rw-r--r-- | src/mongo/db/clientcursor.h | 27 | ||||
-rw-r--r-- | src/mongo/db/commands/getmore_cmd.cpp | 73 | ||||
-rw-r--r-- | src/mongo/db/commands/run_aggregate.cpp | 31 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_change_notification.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_change_notification_test.cpp | 5 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_cursor.cpp | 12 | ||||
-rw-r--r-- | src/mongo/db/pipeline/expression_context.h | 11 | ||||
-rw-r--r-- | src/mongo/db/pipeline/pipeline_d.cpp | 8 | ||||
-rw-r--r-- | src/mongo/db/query/find.cpp | 5 | ||||
-rw-r--r-- | src/mongo/db/query/plan_executor.cpp | 59 | ||||
-rw-r--r-- | src/mongo/db/query/plan_executor.h | 17 | ||||
-rw-r--r-- | src/mongo/db/query/plan_yield_policy.cpp | 18 | ||||
-rw-r--r-- | src/mongo/db/query/plan_yield_policy.h | 7 | ||||
-rw-r--r-- | src/mongo/db/query/query_yield.cpp | 20 | ||||
-rw-r--r-- | src/mongo/db/query/query_yield.h | 5 |
17 files changed, 97 insertions, 413 deletions
diff --git a/jstests/aggregation/sources/changeNotification/change_notification.js b/jstests/aggregation/sources/changeNotification/change_notification.js index e0df38950d6..7636b02fd53 100644 --- a/jstests/aggregation/sources/changeNotification/change_notification.js +++ b/jstests/aggregation/sources/changeNotification/change_notification.js @@ -4,28 +4,20 @@ (function() { "use strict"; - var oplogProjection = {$project: {"_id.ts": 0}}; - // Helper for testing that pipeline returns correct set of results. function testPipeline(pipeline, expectedResult, collection) { - // Limit to the last N documents from the end of the oplog, because currently - // $changeNotification always comes from the start of the oplog. - pipeline.push({$sort: {"_id.ts": -1}}); - if (expectedResult.length > 0) { - pipeline.push({$limit: expectedResult.length}); - } // Strip the oplog fields we aren't testing. - pipeline.push(oplogProjection); - assert.docEq(collection.aggregate(pipeline).toArray().reverse(), expectedResult); + pipeline.push({$limit: 1}); + pipeline.push({$project: {"_id.ts": 0}}); + assert.docEq(collection.aggregate(pipeline).toArray(), expectedResult); } - let replTest = new ReplSetTest({name: 'changeNotificationTest', nodes: 1}); - let nodes = replTest.startSet(); + var replTest = new ReplSetTest({name: 'changeNotificationTest', nodes: 1}); + var nodes = replTest.startSet(); replTest.initiate(); replTest.awaitReplication(); db = replTest.getPrimary().getDB('test'); - db.getMongo().forceReadMode('commands'); jsTestLog("Testing single insert"); assert.writeOK(db.t1.insert({_id: 0, a: 1})); @@ -137,145 +129,5 @@ assert.writeOK(db.t3.insert({_id: 101, renameCollection: "test.dne1", to: "test.dne2"})); testPipeline([{$changeNotification: {}}], [], db.dne1); testPipeline([{$changeNotification: {}}], [], db.dne2); - - // Now make sure the cursor behaves like a tailable awaitData cursor. - jsTestLog("Testing tailability"); - let tailableCursor = db.tailable1.aggregate([{$changeNotification: {}}, oplogProjection]); - assert(!tailableCursor.hasNext()); - assert.writeOK(db.tailable1.insert({_id: 101, a: 1})); - assert(tailableCursor.hasNext()); - assert.docEq(tailableCursor.next(), { - "_id": { - "_id": 101, - "ns": "test.tailable1", - }, - "documentKey": {"_id": 101}, - "newDocument": {"_id": 101, "a": 1}, - "ns": {"coll": "tailable1", "db": "test"}, - "operationType": "insert" - }); - - jsTestLog("Testing awaitdata"); - let res = assert.commandWorked(db.runCommand({ - aggregate: "tailable2", - pipeline: [{$changeNotification: {}}, oplogProjection], - cursor: {} - })); - let aggcursor = res.cursor; - - // We should get a valid cursor. - assert.neq(aggcursor.id, 0); - - // Initial batch size should be zero as there should be no data. - assert.eq(aggcursor.firstBatch.length, 0); - - // No data, so should return no results, but cursor should remain valid. - res = assert.commandWorked( - db.runCommand({getMore: aggcursor.id, collection: "tailable2", maxTimeMS: 50})); - aggcursor = res.cursor; - assert.neq(aggcursor.id, 0); - assert.eq(aggcursor.nextBatch.length, 0); - - // Now insert something in parallel while waiting for it. - let insertshell = startParallelShell(function() { - // Wait for the getMore to appear in currentop. - assert.soon(function() { - return db.currentOp({op: "getmore", "command.collection": "tailable2"}).inprog.length == - 1; - }); - assert.writeOK(db.tailable2.insert({_id: 102, a: 2})); - }); - res = assert.commandWorked( - db.runCommand({getMore: aggcursor.id, collection: "tailable2", maxTimeMS: 5 * 60 * 1000})); - aggcursor = res.cursor; - assert.eq(aggcursor.nextBatch.length, 1); - assert.docEq(aggcursor.nextBatch[0], { - "_id": { - "_id": 102, - "ns": "test.tailable2", - }, - "documentKey": {"_id": 102}, - "newDocument": {"_id": 102, "a": 2}, - "ns": {"coll": "tailable2", "db": "test"}, - "operationType": "insert" - }); - - // Wait for insert shell to terminate. - insertshell(); - - jsTestLog("Testing awaitdata - no wake on insert to another collection"); - res = assert.commandWorked(db.runCommand({ - aggregate: "tailable3", - pipeline: [{$changeNotification: {}}, oplogProjection], - cursor: {} - })); - aggcursor = res.cursor; - // We should get a valid cursor. - assert.neq(aggcursor.id, 0); - - // Initial batch size should be zero as there should be no data. - assert.eq(aggcursor.firstBatch.length, 0); - - // Now insert something in a different collection in parallel while waiting. - insertshell = startParallelShell(function() { - // Wait for the getMore to appear in currentop. - assert.soon(function() { - return db.currentOp({op: "getmore", "command.collection": "tailable3"}).inprog.length == - 1; - }); - assert.writeOK(db.tailable3a.insert({_id: 103, a: 2})); - }); - let start = new Date(); - res = assert.commandWorked( - db.runCommand({getMore: aggcursor.id, collection: "tailable3", maxTimeMS: 1000})); - let diff = (new Date()).getTime() - start.getTime(); - assert.gt(diff, 900, "AwaitData returned prematurely on insert to unrelated collection."); - aggcursor = res.cursor; - // Cursor should be valid with no data. - assert.neq(aggcursor.id, 0); - assert.eq(aggcursor.nextBatch.length, 0); - - // Wait for insert shell to terminate. - insertshell(); - - // This time, put something in a different collection, then in the correct collection. - // We should wake up with just the correct data. - insertshell = startParallelShell(function() { - // Wait for the getMore to appear in currentop. - assert.soon(function() { - return db.currentOp({op: "getmore", "command.collection": "tailable3"}).inprog.length == - 1; - }); - assert.writeOK(db.tailable3a.insert({_id: 104, a: 2})); - assert(db.currentOp({op: "getmore", "command.collection": "tailable3"}).inprog.length == 1); - assert.writeOK(db.tailable3.insert({_id: 105, a: 3})); - }); - res = assert.commandWorked( - db.runCommand({getMore: aggcursor.id, collection: "tailable3", maxTimeMS: 5 * 60 * 1000})); - aggcursor = res.cursor; - assert.neq(aggcursor.id, 0); - assert.eq(aggcursor.nextBatch.length, 1); - assert.docEq(aggcursor.nextBatch[0], { - "_id": { - "_id": 105, - "ns": "test.tailable3", - }, - "documentKey": {"_id": 105}, - "newDocument": {"_id": 105, "a": 3}, - "ns": {"coll": "tailable3", "db": "test"}, - "operationType": "insert" - }); - - // Wait for insert shell to terminate. - insertshell(); - - jsTestLog("Ensuring attempt to read with legacy operations fails."); - db.getMongo().forceReadMode('legacy'); - tailableCursor = db.tailable2.aggregate([{$changeNotification: {}}, oplogProjection], - {cursor: {batchSize: 0}}); - assert.throws(function() { - tailableCursor.next(); - }, [], "Legacy getMore expected to fail on changeNotification cursor."); - replTest.stopSet(); }()); diff --git a/jstests/noPassthrough/awaitdata_getmore_cmd.js b/jstests/noPassthrough/awaitdata_getmore_cmd.js index 5de2c8f83c1..ef47efb0e67 100644 --- a/jstests/noPassthrough/awaitdata_getmore_cmd.js +++ b/jstests/noPassthrough/awaitdata_getmore_cmd.js @@ -28,13 +28,8 @@ cmdRes = db.runCommand({find: collName, tailable: true, awaitData: true}); assert.commandFailed(cmdRes); - // With a non-existent collection, should succeed but return no data and a closed cursor. - coll.drop(); - cmdRes = assert.commandWorked(db.runCommand({find: collName, tailable: true})); - assert.eq(cmdRes.cursor.id, NumberLong(0)); - assert.eq(cmdRes.cursor.firstBatch.length, 0); - // Create a capped collection with 10 documents. + coll.drop(); assert.commandWorked(db.createCollection(collName, {capped: true, size: 2048})); for (var i = 0; i < 10; i++) { assert.writeOK(coll.insert({a: i})); @@ -128,47 +123,4 @@ } assert.gte((new Date()) - now, 2000); - // Test filtered inserts while writing to a capped collection. - // Find with a filter which doesn't match any documents in the collection. - cmdRes = assert.commandWorked(db.runCommand( - {find: collName, batchSize: 2, filter: {x: 1}, awaitData: true, tailable: true})); - assert.gt(cmdRes.cursor.id, NumberLong(0)); - assert.eq(cmdRes.cursor.ns, coll.getFullName()); - assert.eq(cmdRes.cursor.firstBatch.length, 0); - - // getMore should time out if we insert a non-matching document. - let insertshell = startParallelShell(function() { - assert.soon(function() { - return db.currentOp({op: "getmore", "command.collection": "await_data"}) - .inprog.length == 1; - }); - assert.writeOK(db.await_data.insert({x: 0})); - }, mongo.port); - - now = new Date(); - cmdRes = db.runCommand({getMore: cmdRes.cursor.id, collection: collName, maxTimeMS: 4000}); - assert.commandWorked(cmdRes); - assert.gt(cmdRes.cursor.id, NumberLong(0)); - assert.eq(cmdRes.cursor.ns, coll.getFullName()); - assert.eq(cmdRes.cursor.nextBatch.length, 0); - assert.gte((new Date()) - now, - 4000, - "Insert not matching filter caused awaitData getMore to return prematurely."); - insertshell(); - - // getMore should succeed if we insert a non-matching document followed by a matching one. - insertshell = startParallelShell(function() { - assert.writeOK(db.await_data.insert({x: 0})); - assert.writeOK(db.await_data.insert({_id: "match", x: 1})); - jsTestLog("Written"); - }, mongo.port); - - cmdRes = - db.runCommand({getMore: cmdRes.cursor.id, collection: collName, maxTimeMS: 5 * 60 * 1000}); - assert.commandWorked(cmdRes); - assert.gt(cmdRes.cursor.id, NumberLong(0)); - assert.eq(cmdRes.cursor.ns, coll.getFullName()); - assert.eq(cmdRes.cursor.nextBatch.length, 1); - assert.docEq(cmdRes.cursor.nextBatch[0], {_id: "match", x: 1}); - insertshell(); })(); diff --git a/src/mongo/db/clientcursor.h b/src/mongo/db/clientcursor.h index f75d41e3089..32fb018a8e1 100644 --- a/src/mongo/db/clientcursor.h +++ b/src/mongo/db/clientcursor.h @@ -71,20 +71,6 @@ struct ClientCursorParams { } } - void setTailable(bool tailable) { - if (tailable) - queryOptions |= QueryOption_CursorTailable; - else - queryOptions &= ~QueryOption_CursorTailable; - } - - void setAwaitData(bool awaitData) { - if (awaitData) - queryOptions |= QueryOption_AwaitData; - else - queryOptions &= ~QueryOption_AwaitData; - } - std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> exec; const NamespaceString nss; std::vector<UserName> authenticatedUsers; @@ -141,23 +127,10 @@ public: return _exec.get(); } - /** - * Returns the query options bitmask. If you'd like to know if the cursor is tailable or - * awaitData, prefer using the specific methods isTailable() and isAwaitData() over using this - * method. - */ int queryOptions() const { return _queryOptions; } - bool isTailable() const { - return _queryOptions & QueryOption_CursorTailable; - } - - bool isAwaitData() const { - return _queryOptions & QueryOption_AwaitData; - } - const BSONObj& getOriginatingCommandObj() const { return _originatingCommand; } diff --git a/src/mongo/db/commands/getmore_cmd.cpp b/src/mongo/db/commands/getmore_cmd.cpp index 48b77395635..e429c268dd6 100644 --- a/src/mongo/db/commands/getmore_cmd.cpp +++ b/src/mongo/db/commands/getmore_cmd.cpp @@ -47,7 +47,6 @@ #include "mongo/db/query/find.h" #include "mongo/db/query/find_common.h" #include "mongo/db/query/getmore_request.h" -#include "mongo/db/query/plan_executor.h" #include "mongo/db/query/plan_summary_stats.h" #include "mongo/db/repl/oplog.h" #include "mongo/db/repl/replication_coordinator_global.h" @@ -270,6 +269,11 @@ public: // Validation related to awaitData. if (isCursorAwaitData(cursor)) { invariant(isCursorTailable(cursor)); + + if (CursorManager::isGloballyManagedCursor(request.cursorid)) { + Status status(ErrorCodes::BadValue, "awaitData cannot be set on this cursor"); + return appendCommandStatus(result, status); + } } if (request.awaitDataTimeout && !isCursorAwaitData(cursor)) { @@ -318,6 +322,21 @@ public: } } + uint64_t notifierVersion = 0; + std::shared_ptr<CappedInsertNotifier> notifier; + if (isCursorAwaitData(cursor)) { + invariant(readLock->getCollection()->isCapped()); + // Retrieve the notifier which we will wait on until new data arrives. We make sure + // to do this in the lock because once we drop the lock it is possible for the + // collection to become invalid. The notifier itself will outlive the collection if + // the collection is dropped, as we keep a shared_ptr to it. + notifier = readLock->getCollection()->getCappedInsertNotifier(); + + // Must get the version before we call generateBatch in case a write comes in after + // that call and before we call wait on the notifier. + notifierVersion = notifier->getVersion(); + } + CursorId respondWithId = 0; CursorResponseBuilder nextBatch(/*isInitialResponse*/ false, &result); BSONObj obj; @@ -333,19 +352,46 @@ public: PlanSummaryStats preExecutionStats; Explain::getSummaryStats(*exec, &preExecutionStats); - // Mark this as an AwaitData operation if appropriate. - if (isCursorAwaitData(cursor)) { - // Do not wait if we need to update the commit time; just get whatever is available - // now. + Status batchStatus = generateBatch(cursor, request, &nextBatch, &state, &numResults); + if (!batchStatus.isOK()) { + return appendCommandStatus(result, batchStatus); + } + + // If this is an await data cursor, and we hit EOF without generating any results, then + // we block waiting for new data to arrive. + if (isCursorAwaitData(cursor) && state == PlanExecutor::IS_EOF && numResults == 0) { auto replCoord = repl::ReplicationCoordinator::get(opCtx); + // Return immediately if we need to update the commit time. if (!request.lastKnownCommittedOpTime || - (request.lastKnownCommittedOpTime == replCoord->getLastCommittedOpTime())) - shouldWaitForInserts(opCtx) = true; - } + (request.lastKnownCommittedOpTime == replCoord->getLastCommittedOpTime())) { + // Retrieve the notifier which we will wait on until new data arrives. We make sure + // to do this in the lock because once we drop the lock it is possible for the + // collection to become invalid. The notifier itself will outlive the collection if + // the collection is dropped, as we keep a shared_ptr to it. + auto notifier = readLock->getCollection()->getCappedInsertNotifier(); + + // Save the PlanExecutor and drop our locks. + exec->saveState(); + readLock.reset(); - Status batchStatus = generateBatch(opCtx, cursor, request, &nextBatch, &state, &numResults); - if (!batchStatus.isOK()) { - return appendCommandStatus(result, batchStatus); + // Block waiting for data. Time spent blocking is not counted towards the total + // operation latency. + curOp->pauseTimer(); + const auto timeout = opCtx->getRemainingMaxTimeMicros(); + notifier->wait(notifierVersion, timeout); + notifier.reset(); + curOp->resumeTimer(); + + readLock.emplace(opCtx, request.nss); + exec->restoreState(); + + // We woke up because either the timed_wait expired, or there was more data. Either + // way, attempt to generate another batch of results. + batchStatus = generateBatch(cursor, request, &nextBatch, &state, &numResults); + if (!batchStatus.isOK()) { + return appendCommandStatus(result, batchStatus); + } + } } PlanSummaryStats postExecutionStats; @@ -426,8 +472,7 @@ public: * Returns an OK status if the batch was successfully generated, and a non-OK status if the * PlanExecutor encounters a failure. */ - Status generateBatch(OperationContext* opCtx, - ClientCursor* cursor, + Status generateBatch(ClientCursor* cursor, const GetMoreRequest& request, CursorResponseBuilder* nextBatch, PlanExecutor::ExecState* state, @@ -449,8 +494,6 @@ public: break; } - // As soon as we get a result, this operation no longer waits. - shouldWaitForInserts(opCtx) = false; // Add result to output buffer. nextBatch->append(obj); (*numResults)++; diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp index 1bee5cb5f07..7b4d40f0bdb 100644 --- a/src/mongo/db/commands/run_aggregate.cpp +++ b/src/mongo/db/commands/run_aggregate.cpp @@ -99,10 +99,8 @@ bool handleCursorCommand(OperationContext* opCtx, // do it when batchSize is 0 since that indicates a desire for a fast return. PlanExecutor::ExecState state; if ((state = cursor->getExecutor()->getNext(&next, nullptr)) == PlanExecutor::IS_EOF) { - if (!cursor->isTailable()) { - // make it an obvious error to use cursor or executor after this point - cursor = nullptr; - } + // make it an obvious error to use cursor or executor after this point + cursor = nullptr; break; } @@ -395,10 +393,6 @@ Status runAggregate(OperationContext* opCtx, uassertStatusOK(resolveInvolvedNamespaces(opCtx, request)))); expCtx->tempDir = storageGlobalParams.dbpath + "/_tmp"; - if (liteParsedPipeline.startsWithChangeNotification()) { - expCtx->tailableMode = ExpressionContext::TailableMode::kTailableAndAwaitData; - } - // Parse the pipeline. auto statusWithPipeline = Pipeline::parse(request.getPipeline(), expCtx); if (!statusWithPipeline.isOK()) { @@ -457,20 +451,13 @@ Status runAggregate(OperationContext* opCtx, // cursor manager. The global cursor manager does not deliver invalidations or kill // notifications; the underlying PlanExecutor(s) used by the pipeline will be receiving // invalidations and kill notifications themselves, not the cursor we create here. - ClientCursorParams cursorParams( - std::move(exec), - origNss, - AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserNames(), - opCtx->recoveryUnit()->isReadingFromMajorityCommittedSnapshot(), - cmdObj); - if (expCtx->tailableMode == ExpressionContext::TailableMode::kTailableAndAwaitData) { - cursorParams.setTailable(true); - cursorParams.setAwaitData(true); - } - - auto pin = - CursorManager::getGlobalCursorManager()->registerCursor(opCtx, std::move(cursorParams)); - + auto pin = CursorManager::getGlobalCursorManager()->registerCursor( + opCtx, + {std::move(exec), + origNss, + AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserNames(), + opCtx->recoveryUnit()->isReadingFromMajorityCommittedSnapshot(), + cmdObj}); ScopeGuard cursorFreer = MakeGuard(&ClientCursorPin::deleteUnderlying, &pin); // If both explain and cursor are specified, explain wins. diff --git a/src/mongo/db/pipeline/document_source_change_notification.cpp b/src/mongo/db/pipeline/document_source_change_notification.cpp index b8d7027fc5f..faf5bcf97ac 100644 --- a/src/mongo/db/pipeline/document_source_change_notification.cpp +++ b/src/mongo/db/pipeline/document_source_change_notification.cpp @@ -94,10 +94,12 @@ vector<intrusive_ptr<DocumentSource>> DocumentSourceChangeNotification::createFr !expCtx->getCollator()); BSONObj matchObj = buildMatch(elem, expCtx->ns); + BSONObj sortObj = BSON("$sort" << BSON("ts" << -1)); auto matchSource = DocumentSourceMatch::createFromBson(matchObj.firstElement(), expCtx); + auto sortSource = DocumentSourceSort::createFromBson(sortObj.firstElement(), expCtx); auto transformSource = createTransformationStage(expCtx); - return {matchSource, transformSource}; + return {matchSource, sortSource, transformSource}; } intrusive_ptr<DocumentSource> DocumentSourceChangeNotification::createTransformationStage( diff --git a/src/mongo/db/pipeline/document_source_change_notification_test.cpp b/src/mongo/db/pipeline/document_source_change_notification_test.cpp index 0fd14fd9a92..0e2b53ea591 100644 --- a/src/mongo/db/pipeline/document_source_change_notification_test.cpp +++ b/src/mongo/db/pipeline/document_source_change_notification_test.cpp @@ -94,15 +94,14 @@ public: } }; -TEST_F(ChangeNotificationStageTest, StagesGeneratedCorrectly) { +TEST_F(ChangeNotificationStageTest, Basic) { const auto spec = fromjson("{$changeNotification: {}}"); vector<intrusive_ptr<DocumentSource>> result = DocumentSourceChangeNotification::createFromBson(spec.firstElement(), getExpCtx()); - ASSERT_EQUALS(result.size(), 2UL); + ASSERT_EQUALS(result.size(), 3UL); ASSERT_EQUALS(string(result[0]->getSourceName()), "$match"); - ASSERT_EQUALS(string(result[1]->getSourceName()), "$changeNotification"); // TODO: Check explain result. } diff --git a/src/mongo/db/pipeline/document_source_cursor.cpp b/src/mongo/db/pipeline/document_source_cursor.cpp index f15bafd5b8b..9704d826e53 100644 --- a/src/mongo/db/pipeline/document_source_cursor.cpp +++ b/src/mongo/db/pipeline/document_source_cursor.cpp @@ -31,6 +31,7 @@ #include "mongo/db/pipeline/document_source_cursor.h" #include "mongo/db/catalog/collection.h" +#include "mongo/db/curop.h" #include "mongo/db/db_raii.h" #include "mongo/db/exec/working_set_common.h" #include "mongo/db/pipeline/document.h" @@ -98,21 +99,12 @@ void DocumentSourceCursor::loadBatch() { memUsageBytes += _currentBatch.back().getApproximateSize(); - // As long as we're waiting for inserts, we shouldn't do any batching at this level - // we need the whole pipeline to see each document to see if we should stop waiting. - if (shouldWaitForInserts(pExpCtx->opCtx) || - memUsageBytes > internalDocumentSourceCursorBatchSizeBytes.load()) { + if (memUsageBytes > internalDocumentSourceCursorBatchSizeBytes.load()) { // End this batch and prepare PlanExecutor for yielding. _exec->saveState(); return; } } - // Special case for tailable cursor -- EOF doesn't preclude more results, so keep - // the PlanExecutor alive. - if (state == PlanExecutor::IS_EOF && pExpCtx->isTailable()) { - _exec->saveState(); - return; - } } } diff --git a/src/mongo/db/pipeline/expression_context.h b/src/mongo/db/pipeline/expression_context.h index d04b095aa4b..e17128ec8a6 100644 --- a/src/mongo/db/pipeline/expression_context.h +++ b/src/mongo/db/pipeline/expression_context.h @@ -58,8 +58,6 @@ public: std::vector<BSONObj> pipeline; }; - enum class TailableMode { kNormal, kTailableAndAwaitData }; - /** * Constructs an ExpressionContext to be used for Pipeline parsing and evaluation. * 'resolvedNamespaces' maps collection names (not full namespaces) to ResolvedNamespaces. @@ -103,13 +101,6 @@ public: return it->second; }; - /** - * Convenience call that returns true if the tailableMode indicate a tailable query. - */ - bool isTailable() const { - return tailableMode == ExpressionContext::TailableMode::kTailableAndAwaitData; - } - // The explain verbosity requested by the user, or boost::none if no explain was requested. boost::optional<ExplainOptions::Verbosity> explain; @@ -130,8 +121,6 @@ public: Variables variables; VariablesParseState variablesParseState; - TailableMode tailableMode = TailableMode::kNormal; - protected: static const int kInterruptCheckPeriod = 128; ExpressionContext() : variablesParseState(variables.useIdGenerator()) {} diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp index 697c639ccf7..ec471f41df5 100644 --- a/src/mongo/db/pipeline/pipeline_d.cpp +++ b/src/mongo/db/pipeline/pipeline_d.cpp @@ -392,14 +392,6 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> attemptToGetExe const AggregationRequest* aggRequest, const size_t plannerOpts) { auto qr = stdx::make_unique<QueryRequest>(nss); - switch (pExpCtx->tailableMode) { - case ExpressionContext::TailableMode::kNormal: - break; - case ExpressionContext::TailableMode::kTailableAndAwaitData: - qr->setTailable(true); - qr->setAwaitData(true); - break; - } qr->setFilter(queryObj); qr->setProj(projectionObj); qr->setSort(sortObj); diff --git a/src/mongo/db/query/find.cpp b/src/mongo/db/query/find.cpp index da2a71e2068..da1d0d5c6ea 100644 --- a/src/mongo/db/query/find.cpp +++ b/src/mongo/db/query/find.cpp @@ -364,11 +364,6 @@ Message getMore(OperationContext* opCtx, if (cc->isReadCommitted()) uassertStatusOK(opCtx->recoveryUnit()->setReadFromMajorityCommittedSnapshot()); - uassert(40548, - "OP_GET_MORE operations are not supported on tailable aggregations. Only clients " - "which support the getMore command can be used on tailable aggregations.", - readLock || !isCursorAwaitData(cc)); - // If the operation that spawned this cursor had a time limit set, apply leftover // time to this getmore. if (cc->getLeftoverMaxTimeMicros() < Microseconds::max()) { diff --git a/src/mongo/db/query/plan_executor.cpp b/src/mongo/db/query/plan_executor.cpp index ff18a403286..2789e660b76 100644 --- a/src/mongo/db/query/plan_executor.cpp +++ b/src/mongo/db/query/plan_executor.cpp @@ -32,8 +32,6 @@ #include "mongo/bson/simple_bsonobj_comparator.h" #include "mongo/db/catalog/collection.h" -#include "mongo/db/catalog/database.h" -#include "mongo/db/catalog/database_holder.h" #include "mongo/db/concurrency/write_conflict_exception.h" #include "mongo/db/curop.h" #include "mongo/db/exec/cached_plan.h" @@ -49,7 +47,6 @@ #include "mongo/db/storage/record_fetcher.h" #include "mongo/stdx/memory.h" #include "mongo/util/fail_point_service.h" -#include "mongo/util/scopeguard.h" #include "mongo/util/stacktrace.h" namespace mongo { @@ -59,9 +56,6 @@ using std::string; using std::unique_ptr; using std::vector; -const OperationContext::Decoration<bool> shouldWaitForInserts = - OperationContext::declareDecoration<bool>(); - namespace { namespace { @@ -386,41 +380,6 @@ PlanExecutor::ExecState PlanExecutor::getNextSnapshotted(Snapshotted<BSONObj>* o return getNextImpl(objOut, dlOut); } - -bool PlanExecutor::shouldWaitForInserts() { - // If this is an awaitData-respecting operation and we have time left and we're not interrupted, - // we should wait for inserts. - return mongo::shouldWaitForInserts(_opCtx) && _opCtx->checkForInterruptNoAssert().isOK() && - _opCtx->getRemainingMaxTimeMicros() > Microseconds::zero(); -} - -bool PlanExecutor::waitForInserts() { - // If we cannot yield, we should retry immediately. - if (!_yieldPolicy->canReleaseLocksDuringExecution()) - return true; - - // We can only wait if we have a collection; otherwise retry immediately. - dassert(_opCtx->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IS)); - auto db = dbHolder().get(_opCtx, _nss.db()); - if (!db) - return true; - auto collection = db->getCollection(_opCtx, _nss); - if (!collection) - return true; - - auto notifier = collection->getCappedInsertNotifier(); - uint64_t notifierVersion = notifier->getVersion(); - auto curOp = CurOp::get(_opCtx); - curOp->pauseTimer(); - ON_BLOCK_EXIT([curOp] { curOp->resumeTimer(); }); - auto opCtx = _opCtx; - bool yieldResult = _yieldPolicy->yield(nullptr, [opCtx, notifier, notifierVersion] { - const auto timeout = opCtx->getRemainingMaxTimeMicros(); - notifier->wait(notifierVersion, timeout); - }); - return yieldResult; -} - PlanExecutor::ExecState PlanExecutor::getNextImpl(Snapshotted<BSONObj>* objOut, RecordId* dlOut) { if (MONGO_FAIL_POINT(planExecutorAlwaysFails)) { Status status(ErrorCodes::OperationFailed, @@ -549,23 +508,7 @@ PlanExecutor::ExecState PlanExecutor::getNextImpl(Snapshotted<BSONObj>* objOut, } else if (PlanStage::NEED_TIME == code) { // Fall through to yield check at end of large conditional. } else if (PlanStage::IS_EOF == code) { - if (shouldWaitForInserts()) { - const bool locksReacquiredAfterYield = waitForInserts(); - if (locksReacquiredAfterYield) { - // There may be more results, try to get more data. - continue; - } - invariant(isMarkedAsKilled()); - if (objOut) { - Status status(ErrorCodes::OperationFailed, - str::stream() << "Operation aborted because: " << *_killReason); - *objOut = Snapshotted<BSONObj>( - SnapshotId(), WorkingSetCommon::buildMemberStatusObject(status)); - } - return PlanExecutor::DEAD; - } else { - return PlanExecutor::IS_EOF; - } + return PlanExecutor::IS_EOF; } else { invariant(PlanStage::DEAD == code || PlanStage::FAILURE == code); diff --git a/src/mongo/db/query/plan_executor.h b/src/mongo/db/query/plan_executor.h index dfd879bbccf..48fed0eab0b 100644 --- a/src/mongo/db/query/plan_executor.h +++ b/src/mongo/db/query/plan_executor.h @@ -51,13 +51,6 @@ struct PlanStageStats; class WorkingSet; /** - * If true, when no results are available from a plan, then instead of returning immediately, the - * system should wait up to the length of the operation deadline for data to be inserted which - * causes results to become available. - */ -extern const OperationContext::Decoration<bool> shouldWaitForInserts; - -/** * A PlanExecutor is the abstraction that knows how to crank a tree of stages into execution. * The executor is usually part of a larger abstraction that is interacting with the cache * and/or the query optimizer. @@ -432,16 +425,6 @@ public: } private: - // Returns true if the PlanExecutor should wait for data to be inserted, which is when a getMore - // is called on a tailable and awaitData cursor on a capped collection. Returns false if an EOF - // should be returned immediately. - bool shouldWaitForInserts(); - - // Yields locks and waits for inserts to the collection. Returns true if there may be new - // inserts, false if there is a timeout or an interrupt. If this planExecutor cannot yield, - // returns true immediately. - bool waitForInserts(); - ExecState getNextImpl(Snapshotted<BSONObj>* objOut, RecordId* dlOut); /** diff --git a/src/mongo/db/query/plan_yield_policy.cpp b/src/mongo/db/query/plan_yield_policy.cpp index c5b54a90afb..52633ea0185 100644 --- a/src/mongo/db/query/plan_yield_policy.cpp +++ b/src/mongo/db/query/plan_yield_policy.cpp @@ -71,19 +71,7 @@ void PlanYieldPolicy::resetTimer() { _elapsedTracker.resetLastTime(); } -bool PlanYieldPolicy::yield(RecordFetcher* recordFetcher) { - invariant(_planYielding); - if (recordFetcher) { - OperationContext* opCtx = _planYielding->getOpCtx(); - return yield([recordFetcher, opCtx] { recordFetcher->setup(opCtx); }, - [recordFetcher] { recordFetcher->fetch(); }); - } else { - return yield(nullptr, nullptr); - } -} - -bool PlanYieldPolicy::yield(stdx::function<void()> beforeYieldingFn, - stdx::function<void()> whileYieldingFn) { +bool PlanYieldPolicy::yield(RecordFetcher* fetcher) { invariant(_planYielding); invariant(canAutoYield()); @@ -119,9 +107,7 @@ bool PlanYieldPolicy::yield(stdx::function<void()> beforeYieldingFn, opCtx->recoveryUnit()->abandonSnapshot(); } else { // Release and reacquire locks. - if (beforeYieldingFn) - beforeYieldingFn(); - QueryYield::yieldAllLocks(opCtx, whileYieldingFn, _planYielding->nss()); + QueryYield::yieldAllLocks(opCtx, fetcher, _planYielding->nss()); } return _planYielding->restoreStateWithoutRetrying(); diff --git a/src/mongo/db/query/plan_yield_policy.h b/src/mongo/db/query/plan_yield_policy.h index cdf93f0219a..892fbafe350 100644 --- a/src/mongo/db/query/plan_yield_policy.h +++ b/src/mongo/db/query/plan_yield_policy.h @@ -30,7 +30,6 @@ #include "mongo/db/catalog/collection.h" #include "mongo/db/query/plan_executor.h" -#include "mongo/stdx/functional.h" #include "mongo/util/elapsed_tracker.h" namespace mongo { @@ -75,12 +74,6 @@ public: bool yield(RecordFetcher* fetcher = NULL); /** - * More generic version of yield() above. This version calls 'beforeYieldingFn' immediately - * before locks are yielded (if they are), and 'whileYieldingFn' before locks are restored. - */ - bool yield(stdx::function<void()> beforeYieldingFn, stdx::function<void()> whileYieldingFn); - - /** * All calls to shouldYield() will return true until the next call to yield. */ void forceYield() { diff --git a/src/mongo/db/query/query_yield.cpp b/src/mongo/db/query/query_yield.cpp index e731383a92d..6e1de753010 100644 --- a/src/mongo/db/query/query_yield.cpp +++ b/src/mongo/db/query/query_yield.cpp @@ -25,6 +25,7 @@ * exception statement from all source files in the program, then also delete * it in the license file. */ + #include "mongo/platform/basic.h" #include "mongo/db/query/query_yield.h" @@ -45,18 +46,23 @@ MONGO_FP_DECLARE(setYieldAllLocksWait); // static void QueryYield::yieldAllLocks(OperationContext* opCtx, - stdx::function<void()> whileYieldingFn, + RecordFetcher* fetcher, const NamespaceString& planExecNS) { // Things have to happen here in a specific order: - // * Release lock mgr locks - // * Go to sleep - // * Call the whileYieldingFn - // * Reacquire lock mgr locks + // 1) Tell the RecordFetcher to do any setup which needs to happen inside locks + // 2) Release lock mgr locks + // 3) Go to sleep + // 4) Touch the record we're yielding on, if there is one (RecordFetcher::fetch) + // 5) Reacquire lock mgr locks Locker* locker = opCtx->lockState(); Locker::LockSnapshot snapshot; + if (fetcher) { + fetcher->setup(opCtx); + } + // Nothing was unlocked, just return, yielding is pointless. if (!locker->saveLockStateAndUnlock(&snapshot)) { return; @@ -79,8 +85,8 @@ void QueryYield::yieldAllLocks(OperationContext* opCtx, } } - if (whileYieldingFn) { - whileYieldingFn(); + if (fetcher) { + fetcher->fetch(); } locker->restoreLockState(snapshot); diff --git a/src/mongo/db/query/query_yield.h b/src/mongo/db/query/query_yield.h index a4fbba72fe7..7d98b299484 100644 --- a/src/mongo/db/query/query_yield.h +++ b/src/mongo/db/query/query_yield.h @@ -29,7 +29,6 @@ #pragma once #include "mongo/db/namespace_string.h" -#include "mongo/stdx/functional.h" namespace mongo { @@ -48,11 +47,9 @@ public: * switch to another thread, and then reacquires all locks. * * If in a nested context (eg DBDirectClient), does nothing. - * - * The whileYieldingFn will be executed after unlocking the locks and before re-acquiring them. */ static void yieldAllLocks(OperationContext* opCtx, - stdx::function<void()> whileYieldingFn, + RecordFetcher* fetcher, const NamespaceString& planExecNS); }; |