diff options
author | Matthew Russotto <matthew.russotto@10gen.com> | 2017-07-14 17:15:52 -0400 |
---|---|---|
committer | Matthew Russotto <matthew.russotto@10gen.com> | 2017-07-17 08:52:57 -0400 |
commit | 3d38a6ff86b47b71d735b77f39704adec3ef3da7 (patch) | |
tree | 8f318b2b52852a1511ed6da6ede9ac62cbe67d4d | |
parent | a1c67941bf08c69cab04eba20bc9ce9a763e1c7f (diff) | |
download | mongo-3d38a6ff86b47b71d735b77f39704adec3ef3da7.tar.gz |
SERVER-29128 Fix performance regression on awaitData with lastKnownCommittedOpTime
Revert "Revert "SERVER-29128 Make $changeNotification stage return a tailable, awaitData cursor that continuously gives out oplog entries""
This reverts commit d29e92cffcb4db3cdd77b1e53d5d005db6cc309d.
-rw-r--r-- | jstests/aggregation/sources/changeNotification/change_notification.js | 158 | ||||
-rw-r--r-- | jstests/noPassthrough/awaitdata_getmore_cmd.js | 50 | ||||
-rw-r--r-- | src/mongo/db/clientcursor.h | 27 | ||||
-rw-r--r-- | src/mongo/db/commands/getmore_cmd.cpp | 72 | ||||
-rw-r--r-- | src/mongo/db/commands/run_aggregate.cpp | 31 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_change_notification.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_change_notification_test.cpp | 5 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_cursor.cpp | 12 | ||||
-rw-r--r-- | src/mongo/db/pipeline/expression_context.h | 11 | ||||
-rw-r--r-- | src/mongo/db/pipeline/pipeline_d.cpp | 8 | ||||
-rw-r--r-- | src/mongo/db/query/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/db/query/find.cpp | 5 | ||||
-rw-r--r-- | src/mongo/db/query/plan_executor.cpp | 71 | ||||
-rw-r--r-- | src/mongo/db/query/plan_executor.h | 27 | ||||
-rw-r--r-- | src/mongo/db/query/plan_yield_policy.cpp | 18 | ||||
-rw-r--r-- | src/mongo/db/query/plan_yield_policy.h | 7 | ||||
-rw-r--r-- | src/mongo/db/query/query_yield.cpp | 20 | ||||
-rw-r--r-- | src/mongo/db/query/query_yield.h | 5 |
18 files changed, 434 insertions, 98 deletions
diff --git a/jstests/aggregation/sources/changeNotification/change_notification.js b/jstests/aggregation/sources/changeNotification/change_notification.js index 7636b02fd53..e0df38950d6 100644 --- a/jstests/aggregation/sources/changeNotification/change_notification.js +++ b/jstests/aggregation/sources/changeNotification/change_notification.js @@ -4,20 +4,28 @@ (function() { "use strict"; + var oplogProjection = {$project: {"_id.ts": 0}}; + // Helper for testing that pipeline returns correct set of results. function testPipeline(pipeline, expectedResult, collection) { + // Limit to the last N documents from the end of the oplog, because currently + // $changeNotification always comes from the start of the oplog. + pipeline.push({$sort: {"_id.ts": -1}}); + if (expectedResult.length > 0) { + pipeline.push({$limit: expectedResult.length}); + } // Strip the oplog fields we aren't testing. - pipeline.push({$limit: 1}); - pipeline.push({$project: {"_id.ts": 0}}); - assert.docEq(collection.aggregate(pipeline).toArray(), expectedResult); + pipeline.push(oplogProjection); + assert.docEq(collection.aggregate(pipeline).toArray().reverse(), expectedResult); } - var replTest = new ReplSetTest({name: 'changeNotificationTest', nodes: 1}); - var nodes = replTest.startSet(); + let replTest = new ReplSetTest({name: 'changeNotificationTest', nodes: 1}); + let nodes = replTest.startSet(); replTest.initiate(); replTest.awaitReplication(); db = replTest.getPrimary().getDB('test'); + db.getMongo().forceReadMode('commands'); jsTestLog("Testing single insert"); assert.writeOK(db.t1.insert({_id: 0, a: 1})); @@ -129,5 +137,145 @@ assert.writeOK(db.t3.insert({_id: 101, renameCollection: "test.dne1", to: "test.dne2"})); testPipeline([{$changeNotification: {}}], [], db.dne1); testPipeline([{$changeNotification: {}}], [], db.dne2); + + // Now make sure the cursor behaves like a tailable awaitData cursor. + jsTestLog("Testing tailability"); + let tailableCursor = db.tailable1.aggregate([{$changeNotification: {}}, oplogProjection]); + assert(!tailableCursor.hasNext()); + assert.writeOK(db.tailable1.insert({_id: 101, a: 1})); + assert(tailableCursor.hasNext()); + assert.docEq(tailableCursor.next(), { + "_id": { + "_id": 101, + "ns": "test.tailable1", + }, + "documentKey": {"_id": 101}, + "newDocument": {"_id": 101, "a": 1}, + "ns": {"coll": "tailable1", "db": "test"}, + "operationType": "insert" + }); + + jsTestLog("Testing awaitdata"); + let res = assert.commandWorked(db.runCommand({ + aggregate: "tailable2", + pipeline: [{$changeNotification: {}}, oplogProjection], + cursor: {} + })); + let aggcursor = res.cursor; + + // We should get a valid cursor. + assert.neq(aggcursor.id, 0); + + // Initial batch size should be zero as there should be no data. + assert.eq(aggcursor.firstBatch.length, 0); + + // No data, so should return no results, but cursor should remain valid. + res = assert.commandWorked( + db.runCommand({getMore: aggcursor.id, collection: "tailable2", maxTimeMS: 50})); + aggcursor = res.cursor; + assert.neq(aggcursor.id, 0); + assert.eq(aggcursor.nextBatch.length, 0); + + // Now insert something in parallel while waiting for it. + let insertshell = startParallelShell(function() { + // Wait for the getMore to appear in currentop. + assert.soon(function() { + return db.currentOp({op: "getmore", "command.collection": "tailable2"}).inprog.length == + 1; + }); + assert.writeOK(db.tailable2.insert({_id: 102, a: 2})); + }); + res = assert.commandWorked( + db.runCommand({getMore: aggcursor.id, collection: "tailable2", maxTimeMS: 5 * 60 * 1000})); + aggcursor = res.cursor; + assert.eq(aggcursor.nextBatch.length, 1); + assert.docEq(aggcursor.nextBatch[0], { + "_id": { + "_id": 102, + "ns": "test.tailable2", + }, + "documentKey": {"_id": 102}, + "newDocument": {"_id": 102, "a": 2}, + "ns": {"coll": "tailable2", "db": "test"}, + "operationType": "insert" + }); + + // Wait for insert shell to terminate. + insertshell(); + + jsTestLog("Testing awaitdata - no wake on insert to another collection"); + res = assert.commandWorked(db.runCommand({ + aggregate: "tailable3", + pipeline: [{$changeNotification: {}}, oplogProjection], + cursor: {} + })); + aggcursor = res.cursor; + // We should get a valid cursor. + assert.neq(aggcursor.id, 0); + + // Initial batch size should be zero as there should be no data. + assert.eq(aggcursor.firstBatch.length, 0); + + // Now insert something in a different collection in parallel while waiting. + insertshell = startParallelShell(function() { + // Wait for the getMore to appear in currentop. + assert.soon(function() { + return db.currentOp({op: "getmore", "command.collection": "tailable3"}).inprog.length == + 1; + }); + assert.writeOK(db.tailable3a.insert({_id: 103, a: 2})); + }); + let start = new Date(); + res = assert.commandWorked( + db.runCommand({getMore: aggcursor.id, collection: "tailable3", maxTimeMS: 1000})); + let diff = (new Date()).getTime() - start.getTime(); + assert.gt(diff, 900, "AwaitData returned prematurely on insert to unrelated collection."); + aggcursor = res.cursor; + // Cursor should be valid with no data. + assert.neq(aggcursor.id, 0); + assert.eq(aggcursor.nextBatch.length, 0); + + // Wait for insert shell to terminate. + insertshell(); + + // This time, put something in a different collection, then in the correct collection. + // We should wake up with just the correct data. + insertshell = startParallelShell(function() { + // Wait for the getMore to appear in currentop. + assert.soon(function() { + return db.currentOp({op: "getmore", "command.collection": "tailable3"}).inprog.length == + 1; + }); + assert.writeOK(db.tailable3a.insert({_id: 104, a: 2})); + assert(db.currentOp({op: "getmore", "command.collection": "tailable3"}).inprog.length == 1); + assert.writeOK(db.tailable3.insert({_id: 105, a: 3})); + }); + res = assert.commandWorked( + db.runCommand({getMore: aggcursor.id, collection: "tailable3", maxTimeMS: 5 * 60 * 1000})); + aggcursor = res.cursor; + assert.neq(aggcursor.id, 0); + assert.eq(aggcursor.nextBatch.length, 1); + assert.docEq(aggcursor.nextBatch[0], { + "_id": { + "_id": 105, + "ns": "test.tailable3", + }, + "documentKey": {"_id": 105}, + "newDocument": {"_id": 105, "a": 3}, + "ns": {"coll": "tailable3", "db": "test"}, + "operationType": "insert" + }); + + // Wait for insert shell to terminate. + insertshell(); + + jsTestLog("Ensuring attempt to read with legacy operations fails."); + db.getMongo().forceReadMode('legacy'); + tailableCursor = db.tailable2.aggregate([{$changeNotification: {}}, oplogProjection], + {cursor: {batchSize: 0}}); + assert.throws(function() { + tailableCursor.next(); + }, [], "Legacy getMore expected to fail on changeNotification cursor."); + replTest.stopSet(); }()); diff --git a/jstests/noPassthrough/awaitdata_getmore_cmd.js b/jstests/noPassthrough/awaitdata_getmore_cmd.js index ef47efb0e67..5de2c8f83c1 100644 --- a/jstests/noPassthrough/awaitdata_getmore_cmd.js +++ b/jstests/noPassthrough/awaitdata_getmore_cmd.js @@ -28,8 +28,13 @@ cmdRes = db.runCommand({find: collName, tailable: true, awaitData: true}); assert.commandFailed(cmdRes); - // Create a capped collection with 10 documents. + // With a non-existent collection, should succeed but return no data and a closed cursor. coll.drop(); + cmdRes = assert.commandWorked(db.runCommand({find: collName, tailable: true})); + assert.eq(cmdRes.cursor.id, NumberLong(0)); + assert.eq(cmdRes.cursor.firstBatch.length, 0); + + // Create a capped collection with 10 documents. assert.commandWorked(db.createCollection(collName, {capped: true, size: 2048})); for (var i = 0; i < 10; i++) { assert.writeOK(coll.insert({a: i})); @@ -123,4 +128,47 @@ } assert.gte((new Date()) - now, 2000); + // Test filtered inserts while writing to a capped collection. + // Find with a filter which doesn't match any documents in the collection. + cmdRes = assert.commandWorked(db.runCommand( + {find: collName, batchSize: 2, filter: {x: 1}, awaitData: true, tailable: true})); + assert.gt(cmdRes.cursor.id, NumberLong(0)); + assert.eq(cmdRes.cursor.ns, coll.getFullName()); + assert.eq(cmdRes.cursor.firstBatch.length, 0); + + // getMore should time out if we insert a non-matching document. + let insertshell = startParallelShell(function() { + assert.soon(function() { + return db.currentOp({op: "getmore", "command.collection": "await_data"}) + .inprog.length == 1; + }); + assert.writeOK(db.await_data.insert({x: 0})); + }, mongo.port); + + now = new Date(); + cmdRes = db.runCommand({getMore: cmdRes.cursor.id, collection: collName, maxTimeMS: 4000}); + assert.commandWorked(cmdRes); + assert.gt(cmdRes.cursor.id, NumberLong(0)); + assert.eq(cmdRes.cursor.ns, coll.getFullName()); + assert.eq(cmdRes.cursor.nextBatch.length, 0); + assert.gte((new Date()) - now, + 4000, + "Insert not matching filter caused awaitData getMore to return prematurely."); + insertshell(); + + // getMore should succeed if we insert a non-matching document followed by a matching one. + insertshell = startParallelShell(function() { + assert.writeOK(db.await_data.insert({x: 0})); + assert.writeOK(db.await_data.insert({_id: "match", x: 1})); + jsTestLog("Written"); + }, mongo.port); + + cmdRes = + db.runCommand({getMore: cmdRes.cursor.id, collection: collName, maxTimeMS: 5 * 60 * 1000}); + assert.commandWorked(cmdRes); + assert.gt(cmdRes.cursor.id, NumberLong(0)); + assert.eq(cmdRes.cursor.ns, coll.getFullName()); + assert.eq(cmdRes.cursor.nextBatch.length, 1); + assert.docEq(cmdRes.cursor.nextBatch[0], {_id: "match", x: 1}); + insertshell(); })(); diff --git a/src/mongo/db/clientcursor.h b/src/mongo/db/clientcursor.h index 32fb018a8e1..f75d41e3089 100644 --- a/src/mongo/db/clientcursor.h +++ b/src/mongo/db/clientcursor.h @@ -71,6 +71,20 @@ struct ClientCursorParams { } } + void setTailable(bool tailable) { + if (tailable) + queryOptions |= QueryOption_CursorTailable; + else + queryOptions &= ~QueryOption_CursorTailable; + } + + void setAwaitData(bool awaitData) { + if (awaitData) + queryOptions |= QueryOption_AwaitData; + else + queryOptions &= ~QueryOption_AwaitData; + } + std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> exec; const NamespaceString nss; std::vector<UserName> authenticatedUsers; @@ -127,10 +141,23 @@ public: return _exec.get(); } + /** + * Returns the query options bitmask. If you'd like to know if the cursor is tailable or + * awaitData, prefer using the specific methods isTailable() and isAwaitData() over using this + * method. + */ int queryOptions() const { return _queryOptions; } + bool isTailable() const { + return _queryOptions & QueryOption_CursorTailable; + } + + bool isAwaitData() const { + return _queryOptions & QueryOption_AwaitData; + } + const BSONObj& getOriginatingCommandObj() const { return _originatingCommand; } diff --git a/src/mongo/db/commands/getmore_cmd.cpp b/src/mongo/db/commands/getmore_cmd.cpp index e429c268dd6..d41867d16b9 100644 --- a/src/mongo/db/commands/getmore_cmd.cpp +++ b/src/mongo/db/commands/getmore_cmd.cpp @@ -47,6 +47,7 @@ #include "mongo/db/query/find.h" #include "mongo/db/query/find_common.h" #include "mongo/db/query/getmore_request.h" +#include "mongo/db/query/plan_executor.h" #include "mongo/db/query/plan_summary_stats.h" #include "mongo/db/repl/oplog.h" #include "mongo/db/repl/replication_coordinator_global.h" @@ -269,11 +270,6 @@ public: // Validation related to awaitData. if (isCursorAwaitData(cursor)) { invariant(isCursorTailable(cursor)); - - if (CursorManager::isGloballyManagedCursor(request.cursorid)) { - Status status(ErrorCodes::BadValue, "awaitData cannot be set on this cursor"); - return appendCommandStatus(result, status); - } } if (request.awaitDataTimeout && !isCursorAwaitData(cursor)) { @@ -322,21 +318,6 @@ public: } } - uint64_t notifierVersion = 0; - std::shared_ptr<CappedInsertNotifier> notifier; - if (isCursorAwaitData(cursor)) { - invariant(readLock->getCollection()->isCapped()); - // Retrieve the notifier which we will wait on until new data arrives. We make sure - // to do this in the lock because once we drop the lock it is possible for the - // collection to become invalid. The notifier itself will outlive the collection if - // the collection is dropped, as we keep a shared_ptr to it. - notifier = readLock->getCollection()->getCappedInsertNotifier(); - - // Must get the version before we call generateBatch in case a write comes in after - // that call and before we call wait on the notifier. - notifierVersion = notifier->getVersion(); - } - CursorId respondWithId = 0; CursorResponseBuilder nextBatch(/*isInitialResponse*/ false, &result); BSONObj obj; @@ -352,46 +333,16 @@ public: PlanSummaryStats preExecutionStats; Explain::getSummaryStats(*exec, &preExecutionStats); - Status batchStatus = generateBatch(cursor, request, &nextBatch, &state, &numResults); - if (!batchStatus.isOK()) { - return appendCommandStatus(result, batchStatus); + // Mark this as an AwaitData operation if appropriate. + if (isCursorAwaitData(cursor)) { + if (request.lastKnownCommittedOpTime) + clientsLastKnownCommittedOpTime(opCtx) = request.lastKnownCommittedOpTime.get(); + shouldWaitForInserts(opCtx) = true; } - // If this is an await data cursor, and we hit EOF without generating any results, then - // we block waiting for new data to arrive. - if (isCursorAwaitData(cursor) && state == PlanExecutor::IS_EOF && numResults == 0) { - auto replCoord = repl::ReplicationCoordinator::get(opCtx); - // Return immediately if we need to update the commit time. - if (!request.lastKnownCommittedOpTime || - (request.lastKnownCommittedOpTime == replCoord->getLastCommittedOpTime())) { - // Retrieve the notifier which we will wait on until new data arrives. We make sure - // to do this in the lock because once we drop the lock it is possible for the - // collection to become invalid. The notifier itself will outlive the collection if - // the collection is dropped, as we keep a shared_ptr to it. - auto notifier = readLock->getCollection()->getCappedInsertNotifier(); - - // Save the PlanExecutor and drop our locks. - exec->saveState(); - readLock.reset(); - - // Block waiting for data. Time spent blocking is not counted towards the total - // operation latency. - curOp->pauseTimer(); - const auto timeout = opCtx->getRemainingMaxTimeMicros(); - notifier->wait(notifierVersion, timeout); - notifier.reset(); - curOp->resumeTimer(); - - readLock.emplace(opCtx, request.nss); - exec->restoreState(); - - // We woke up because either the timed_wait expired, or there was more data. Either - // way, attempt to generate another batch of results. - batchStatus = generateBatch(cursor, request, &nextBatch, &state, &numResults); - if (!batchStatus.isOK()) { - return appendCommandStatus(result, batchStatus); - } - } + Status batchStatus = generateBatch(opCtx, cursor, request, &nextBatch, &state, &numResults); + if (!batchStatus.isOK()) { + return appendCommandStatus(result, batchStatus); } PlanSummaryStats postExecutionStats; @@ -472,7 +423,8 @@ public: * Returns an OK status if the batch was successfully generated, and a non-OK status if the * PlanExecutor encounters a failure. */ - Status generateBatch(ClientCursor* cursor, + Status generateBatch(OperationContext* opCtx, + ClientCursor* cursor, const GetMoreRequest& request, CursorResponseBuilder* nextBatch, PlanExecutor::ExecState* state, @@ -494,6 +446,8 @@ public: break; } + // As soon as we get a result, this operation no longer waits. + shouldWaitForInserts(opCtx) = false; // Add result to output buffer. nextBatch->append(obj); (*numResults)++; diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp index 7b4d40f0bdb..1bee5cb5f07 100644 --- a/src/mongo/db/commands/run_aggregate.cpp +++ b/src/mongo/db/commands/run_aggregate.cpp @@ -99,8 +99,10 @@ bool handleCursorCommand(OperationContext* opCtx, // do it when batchSize is 0 since that indicates a desire for a fast return. PlanExecutor::ExecState state; if ((state = cursor->getExecutor()->getNext(&next, nullptr)) == PlanExecutor::IS_EOF) { - // make it an obvious error to use cursor or executor after this point - cursor = nullptr; + if (!cursor->isTailable()) { + // make it an obvious error to use cursor or executor after this point + cursor = nullptr; + } break; } @@ -393,6 +395,10 @@ Status runAggregate(OperationContext* opCtx, uassertStatusOK(resolveInvolvedNamespaces(opCtx, request)))); expCtx->tempDir = storageGlobalParams.dbpath + "/_tmp"; + if (liteParsedPipeline.startsWithChangeNotification()) { + expCtx->tailableMode = ExpressionContext::TailableMode::kTailableAndAwaitData; + } + // Parse the pipeline. auto statusWithPipeline = Pipeline::parse(request.getPipeline(), expCtx); if (!statusWithPipeline.isOK()) { @@ -451,13 +457,20 @@ Status runAggregate(OperationContext* opCtx, // cursor manager. The global cursor manager does not deliver invalidations or kill // notifications; the underlying PlanExecutor(s) used by the pipeline will be receiving // invalidations and kill notifications themselves, not the cursor we create here. - auto pin = CursorManager::getGlobalCursorManager()->registerCursor( - opCtx, - {std::move(exec), - origNss, - AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserNames(), - opCtx->recoveryUnit()->isReadingFromMajorityCommittedSnapshot(), - cmdObj}); + ClientCursorParams cursorParams( + std::move(exec), + origNss, + AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserNames(), + opCtx->recoveryUnit()->isReadingFromMajorityCommittedSnapshot(), + cmdObj); + if (expCtx->tailableMode == ExpressionContext::TailableMode::kTailableAndAwaitData) { + cursorParams.setTailable(true); + cursorParams.setAwaitData(true); + } + + auto pin = + CursorManager::getGlobalCursorManager()->registerCursor(opCtx, std::move(cursorParams)); + ScopeGuard cursorFreer = MakeGuard(&ClientCursorPin::deleteUnderlying, &pin); // If both explain and cursor are specified, explain wins. diff --git a/src/mongo/db/pipeline/document_source_change_notification.cpp b/src/mongo/db/pipeline/document_source_change_notification.cpp index faf5bcf97ac..b8d7027fc5f 100644 --- a/src/mongo/db/pipeline/document_source_change_notification.cpp +++ b/src/mongo/db/pipeline/document_source_change_notification.cpp @@ -94,12 +94,10 @@ vector<intrusive_ptr<DocumentSource>> DocumentSourceChangeNotification::createFr !expCtx->getCollator()); BSONObj matchObj = buildMatch(elem, expCtx->ns); - BSONObj sortObj = BSON("$sort" << BSON("ts" << -1)); auto matchSource = DocumentSourceMatch::createFromBson(matchObj.firstElement(), expCtx); - auto sortSource = DocumentSourceSort::createFromBson(sortObj.firstElement(), expCtx); auto transformSource = createTransformationStage(expCtx); - return {matchSource, sortSource, transformSource}; + return {matchSource, transformSource}; } intrusive_ptr<DocumentSource> DocumentSourceChangeNotification::createTransformationStage( diff --git a/src/mongo/db/pipeline/document_source_change_notification_test.cpp b/src/mongo/db/pipeline/document_source_change_notification_test.cpp index 0e2b53ea591..0fd14fd9a92 100644 --- a/src/mongo/db/pipeline/document_source_change_notification_test.cpp +++ b/src/mongo/db/pipeline/document_source_change_notification_test.cpp @@ -94,14 +94,15 @@ public: } }; -TEST_F(ChangeNotificationStageTest, Basic) { +TEST_F(ChangeNotificationStageTest, StagesGeneratedCorrectly) { const auto spec = fromjson("{$changeNotification: {}}"); vector<intrusive_ptr<DocumentSource>> result = DocumentSourceChangeNotification::createFromBson(spec.firstElement(), getExpCtx()); - ASSERT_EQUALS(result.size(), 3UL); + ASSERT_EQUALS(result.size(), 2UL); ASSERT_EQUALS(string(result[0]->getSourceName()), "$match"); + ASSERT_EQUALS(string(result[1]->getSourceName()), "$changeNotification"); // TODO: Check explain result. } diff --git a/src/mongo/db/pipeline/document_source_cursor.cpp b/src/mongo/db/pipeline/document_source_cursor.cpp index 9704d826e53..f15bafd5b8b 100644 --- a/src/mongo/db/pipeline/document_source_cursor.cpp +++ b/src/mongo/db/pipeline/document_source_cursor.cpp @@ -31,7 +31,6 @@ #include "mongo/db/pipeline/document_source_cursor.h" #include "mongo/db/catalog/collection.h" -#include "mongo/db/curop.h" #include "mongo/db/db_raii.h" #include "mongo/db/exec/working_set_common.h" #include "mongo/db/pipeline/document.h" @@ -99,12 +98,21 @@ void DocumentSourceCursor::loadBatch() { memUsageBytes += _currentBatch.back().getApproximateSize(); - if (memUsageBytes > internalDocumentSourceCursorBatchSizeBytes.load()) { + // As long as we're waiting for inserts, we shouldn't do any batching at this level + // we need the whole pipeline to see each document to see if we should stop waiting. + if (shouldWaitForInserts(pExpCtx->opCtx) || + memUsageBytes > internalDocumentSourceCursorBatchSizeBytes.load()) { // End this batch and prepare PlanExecutor for yielding. _exec->saveState(); return; } } + // Special case for tailable cursor -- EOF doesn't preclude more results, so keep + // the PlanExecutor alive. + if (state == PlanExecutor::IS_EOF && pExpCtx->isTailable()) { + _exec->saveState(); + return; + } } } diff --git a/src/mongo/db/pipeline/expression_context.h b/src/mongo/db/pipeline/expression_context.h index e17128ec8a6..d04b095aa4b 100644 --- a/src/mongo/db/pipeline/expression_context.h +++ b/src/mongo/db/pipeline/expression_context.h @@ -58,6 +58,8 @@ public: std::vector<BSONObj> pipeline; }; + enum class TailableMode { kNormal, kTailableAndAwaitData }; + /** * Constructs an ExpressionContext to be used for Pipeline parsing and evaluation. * 'resolvedNamespaces' maps collection names (not full namespaces) to ResolvedNamespaces. @@ -101,6 +103,13 @@ public: return it->second; }; + /** + * Convenience call that returns true if the tailableMode indicate a tailable query. + */ + bool isTailable() const { + return tailableMode == ExpressionContext::TailableMode::kTailableAndAwaitData; + } + // The explain verbosity requested by the user, or boost::none if no explain was requested. boost::optional<ExplainOptions::Verbosity> explain; @@ -121,6 +130,8 @@ public: Variables variables; VariablesParseState variablesParseState; + TailableMode tailableMode = TailableMode::kNormal; + protected: static const int kInterruptCheckPeriod = 128; ExpressionContext() : variablesParseState(variables.useIdGenerator()) {} diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp index ec471f41df5..697c639ccf7 100644 --- a/src/mongo/db/pipeline/pipeline_d.cpp +++ b/src/mongo/db/pipeline/pipeline_d.cpp @@ -392,6 +392,14 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> attemptToGetExe const AggregationRequest* aggRequest, const size_t plannerOpts) { auto qr = stdx::make_unique<QueryRequest>(nss); + switch (pExpCtx->tailableMode) { + case ExpressionContext::TailableMode::kNormal: + break; + case ExpressionContext::TailableMode::kTailableAndAwaitData: + qr->setTailable(true); + qr->setAwaitData(true); + break; + } qr->setFilter(queryObj); qr->setProj(projectionObj); qr->setSort(sortObj); diff --git a/src/mongo/db/query/SConscript b/src/mongo/db/query/SConscript index 854f3045506..60eab4d4f79 100644 --- a/src/mongo/db/query/SConscript +++ b/src/mongo/db/query/SConscript @@ -81,6 +81,7 @@ env.Library( '$BUILD_DIR/mongo/db/catalog/index_catalog_entry', "$BUILD_DIR/mongo/db/curop", "$BUILD_DIR/mongo/db/exec/exec", + "$BUILD_DIR/mongo/db/repl/repl_coordinator_interface", "$BUILD_DIR/mongo/db/s/sharding", "$BUILD_DIR/mongo/db/storage/oplog_hack", "$BUILD_DIR/mongo/util/elapsed_tracker", diff --git a/src/mongo/db/query/find.cpp b/src/mongo/db/query/find.cpp index da1d0d5c6ea..da2a71e2068 100644 --- a/src/mongo/db/query/find.cpp +++ b/src/mongo/db/query/find.cpp @@ -364,6 +364,11 @@ Message getMore(OperationContext* opCtx, if (cc->isReadCommitted()) uassertStatusOK(opCtx->recoveryUnit()->setReadFromMajorityCommittedSnapshot()); + uassert(40548, + "OP_GET_MORE operations are not supported on tailable aggregations. Only clients " + "which support the getMore command can be used on tailable aggregations.", + readLock || !isCursorAwaitData(cc)); + // If the operation that spawned this cursor had a time limit set, apply leftover // time to this getmore. if (cc->getLeftoverMaxTimeMicros() < Microseconds::max()) { diff --git a/src/mongo/db/query/plan_executor.cpp b/src/mongo/db/query/plan_executor.cpp index 2789e660b76..17aa8d42f02 100644 --- a/src/mongo/db/query/plan_executor.cpp +++ b/src/mongo/db/query/plan_executor.cpp @@ -32,6 +32,8 @@ #include "mongo/bson/simple_bsonobj_comparator.h" #include "mongo/db/catalog/collection.h" +#include "mongo/db/catalog/database.h" +#include "mongo/db/catalog/database_holder.h" #include "mongo/db/concurrency/write_conflict_exception.h" #include "mongo/db/curop.h" #include "mongo/db/exec/cached_plan.h" @@ -43,10 +45,12 @@ #include "mongo/db/exec/working_set.h" #include "mongo/db/exec/working_set_common.h" #include "mongo/db/query/plan_yield_policy.h" +#include "mongo/db/repl/replication_coordinator.h" #include "mongo/db/service_context.h" #include "mongo/db/storage/record_fetcher.h" #include "mongo/stdx/memory.h" #include "mongo/util/fail_point_service.h" +#include "mongo/util/scopeguard.h" #include "mongo/util/stacktrace.h" namespace mongo { @@ -56,6 +60,11 @@ using std::string; using std::unique_ptr; using std::vector; +const OperationContext::Decoration<bool> shouldWaitForInserts = + OperationContext::declareDecoration<bool>(); +const OperationContext::Decoration<repl::OpTime> clientsLastKnownCommittedOpTime = + OperationContext::declareDecoration<repl::OpTime>(); + namespace { namespace { @@ -380,6 +389,50 @@ PlanExecutor::ExecState PlanExecutor::getNextSnapshotted(Snapshotted<BSONObj>* o return getNextImpl(objOut, dlOut); } + +bool PlanExecutor::shouldWaitForInserts() { + // If this is an awaitData-respecting operation and we have time left and we're not interrupted, + // we should wait for inserts. + if (mongo::shouldWaitForInserts(_opCtx) && _opCtx->checkForInterruptNoAssert().isOK() && + _opCtx->getRemainingMaxTimeMicros() > Microseconds::zero()) { + // For operations with a last committed opTime, we should not wait if the replication + // coordinator's lastCommittedOpTime has changed. + if (!clientsLastKnownCommittedOpTime(_opCtx).isNull()) { + auto replCoord = repl::ReplicationCoordinator::get(_opCtx); + return clientsLastKnownCommittedOpTime(_opCtx) == replCoord->getLastCommittedOpTime(); + } + return true; + } + return false; +} + +bool PlanExecutor::waitForInserts() { + // If we cannot yield, we should retry immediately. + if (!_yieldPolicy->canReleaseLocksDuringExecution()) + return true; + + // We can only wait if we have a collection; otherwise retry immediately. + dassert(_opCtx->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IS)); + auto db = dbHolder().get(_opCtx, _nss.db()); + if (!db) + return true; + auto collection = db->getCollection(_opCtx, _nss); + if (!collection) + return true; + + auto notifier = collection->getCappedInsertNotifier(); + uint64_t notifierVersion = notifier->getVersion(); + auto curOp = CurOp::get(_opCtx); + curOp->pauseTimer(); + ON_BLOCK_EXIT([curOp] { curOp->resumeTimer(); }); + auto opCtx = _opCtx; + bool yieldResult = _yieldPolicy->yield(nullptr, [opCtx, notifier, notifierVersion] { + const auto timeout = opCtx->getRemainingMaxTimeMicros(); + notifier->wait(notifierVersion, timeout); + }); + return yieldResult; +} + PlanExecutor::ExecState PlanExecutor::getNextImpl(Snapshotted<BSONObj>* objOut, RecordId* dlOut) { if (MONGO_FAIL_POINT(planExecutorAlwaysFails)) { Status status(ErrorCodes::OperationFailed, @@ -508,7 +561,23 @@ PlanExecutor::ExecState PlanExecutor::getNextImpl(Snapshotted<BSONObj>* objOut, } else if (PlanStage::NEED_TIME == code) { // Fall through to yield check at end of large conditional. } else if (PlanStage::IS_EOF == code) { - return PlanExecutor::IS_EOF; + if (shouldWaitForInserts()) { + const bool locksReacquiredAfterYield = waitForInserts(); + if (locksReacquiredAfterYield) { + // There may be more results, try to get more data. + continue; + } + invariant(isMarkedAsKilled()); + if (objOut) { + Status status(ErrorCodes::OperationFailed, + str::stream() << "Operation aborted because: " << *_killReason); + *objOut = Snapshotted<BSONObj>( + SnapshotId(), WorkingSetCommon::buildMemberStatusObject(status)); + } + return PlanExecutor::DEAD; + } else { + return PlanExecutor::IS_EOF; + } } else { invariant(PlanStage::DEAD == code || PlanStage::FAILURE == code); diff --git a/src/mongo/db/query/plan_executor.h b/src/mongo/db/query/plan_executor.h index 48fed0eab0b..ba565e6b49c 100644 --- a/src/mongo/db/query/plan_executor.h +++ b/src/mongo/db/query/plan_executor.h @@ -51,6 +51,23 @@ struct PlanStageStats; class WorkingSet; /** + * If true, when no results are available from a plan, then instead of returning immediately, the + * system should wait up to the length of the operation deadline for data to be inserted which + * causes results to become available. + */ +extern const OperationContext::Decoration<bool> shouldWaitForInserts; + +/** + * If a getMore command specified a lastKnownCommittedOpTime (as secondaries do), we want to stop + * waiting for new data as soon as the committed op time changes. + * + * 'clientsLastKnownCommittedOpTime' represents the time passed to the getMore command. + * If the replication coordinator ever reports a higher committed op time, we should stop waiting + * for inserts and return immediately to speed up the propagation of commit level changes. + */ +extern const OperationContext::Decoration<repl::OpTime> clientsLastKnownCommittedOpTime; + +/** * A PlanExecutor is the abstraction that knows how to crank a tree of stages into execution. * The executor is usually part of a larger abstraction that is interacting with the cache * and/or the query optimizer. @@ -425,6 +442,16 @@ public: } private: + // Returns true if the PlanExecutor should wait for data to be inserted, which is when a getMore + // is called on a tailable and awaitData cursor on a capped collection. Returns false if an EOF + // should be returned immediately. + bool shouldWaitForInserts(); + + // Yields locks and waits for inserts to the collection. Returns true if there may be new + // inserts, false if there is a timeout or an interrupt. If this planExecutor cannot yield, + // returns true immediately. + bool waitForInserts(); + ExecState getNextImpl(Snapshotted<BSONObj>* objOut, RecordId* dlOut); /** diff --git a/src/mongo/db/query/plan_yield_policy.cpp b/src/mongo/db/query/plan_yield_policy.cpp index 52633ea0185..c5b54a90afb 100644 --- a/src/mongo/db/query/plan_yield_policy.cpp +++ b/src/mongo/db/query/plan_yield_policy.cpp @@ -71,7 +71,19 @@ void PlanYieldPolicy::resetTimer() { _elapsedTracker.resetLastTime(); } -bool PlanYieldPolicy::yield(RecordFetcher* fetcher) { +bool PlanYieldPolicy::yield(RecordFetcher* recordFetcher) { + invariant(_planYielding); + if (recordFetcher) { + OperationContext* opCtx = _planYielding->getOpCtx(); + return yield([recordFetcher, opCtx] { recordFetcher->setup(opCtx); }, + [recordFetcher] { recordFetcher->fetch(); }); + } else { + return yield(nullptr, nullptr); + } +} + +bool PlanYieldPolicy::yield(stdx::function<void()> beforeYieldingFn, + stdx::function<void()> whileYieldingFn) { invariant(_planYielding); invariant(canAutoYield()); @@ -107,7 +119,9 @@ bool PlanYieldPolicy::yield(RecordFetcher* fetcher) { opCtx->recoveryUnit()->abandonSnapshot(); } else { // Release and reacquire locks. - QueryYield::yieldAllLocks(opCtx, fetcher, _planYielding->nss()); + if (beforeYieldingFn) + beforeYieldingFn(); + QueryYield::yieldAllLocks(opCtx, whileYieldingFn, _planYielding->nss()); } return _planYielding->restoreStateWithoutRetrying(); diff --git a/src/mongo/db/query/plan_yield_policy.h b/src/mongo/db/query/plan_yield_policy.h index 892fbafe350..cdf93f0219a 100644 --- a/src/mongo/db/query/plan_yield_policy.h +++ b/src/mongo/db/query/plan_yield_policy.h @@ -30,6 +30,7 @@ #include "mongo/db/catalog/collection.h" #include "mongo/db/query/plan_executor.h" +#include "mongo/stdx/functional.h" #include "mongo/util/elapsed_tracker.h" namespace mongo { @@ -74,6 +75,12 @@ public: bool yield(RecordFetcher* fetcher = NULL); /** + * More generic version of yield() above. This version calls 'beforeYieldingFn' immediately + * before locks are yielded (if they are), and 'whileYieldingFn' before locks are restored. + */ + bool yield(stdx::function<void()> beforeYieldingFn, stdx::function<void()> whileYieldingFn); + + /** * All calls to shouldYield() will return true until the next call to yield. */ void forceYield() { diff --git a/src/mongo/db/query/query_yield.cpp b/src/mongo/db/query/query_yield.cpp index 6e1de753010..e731383a92d 100644 --- a/src/mongo/db/query/query_yield.cpp +++ b/src/mongo/db/query/query_yield.cpp @@ -25,7 +25,6 @@ * exception statement from all source files in the program, then also delete * it in the license file. */ - #include "mongo/platform/basic.h" #include "mongo/db/query/query_yield.h" @@ -46,23 +45,18 @@ MONGO_FP_DECLARE(setYieldAllLocksWait); // static void QueryYield::yieldAllLocks(OperationContext* opCtx, - RecordFetcher* fetcher, + stdx::function<void()> whileYieldingFn, const NamespaceString& planExecNS) { // Things have to happen here in a specific order: - // 1) Tell the RecordFetcher to do any setup which needs to happen inside locks - // 2) Release lock mgr locks - // 3) Go to sleep - // 4) Touch the record we're yielding on, if there is one (RecordFetcher::fetch) - // 5) Reacquire lock mgr locks + // * Release lock mgr locks + // * Go to sleep + // * Call the whileYieldingFn + // * Reacquire lock mgr locks Locker* locker = opCtx->lockState(); Locker::LockSnapshot snapshot; - if (fetcher) { - fetcher->setup(opCtx); - } - // Nothing was unlocked, just return, yielding is pointless. if (!locker->saveLockStateAndUnlock(&snapshot)) { return; @@ -85,8 +79,8 @@ void QueryYield::yieldAllLocks(OperationContext* opCtx, } } - if (fetcher) { - fetcher->fetch(); + if (whileYieldingFn) { + whileYieldingFn(); } locker->restoreLockState(snapshot); diff --git a/src/mongo/db/query/query_yield.h b/src/mongo/db/query/query_yield.h index 7d98b299484..a4fbba72fe7 100644 --- a/src/mongo/db/query/query_yield.h +++ b/src/mongo/db/query/query_yield.h @@ -29,6 +29,7 @@ #pragma once #include "mongo/db/namespace_string.h" +#include "mongo/stdx/functional.h" namespace mongo { @@ -47,9 +48,11 @@ public: * switch to another thread, and then reacquires all locks. * * If in a nested context (eg DBDirectClient), does nothing. + * + * The whileYieldingFn will be executed after unlocking the locks and before re-acquiring them. */ static void yieldAllLocks(OperationContext* opCtx, - RecordFetcher* fetcher, + stdx::function<void()> whileYieldingFn, const NamespaceString& planExecNS); }; |