diff options
24 files changed, 238 insertions, 14 deletions
diff --git a/buildscripts/resmokeconfig/suites/change_streams_mongos_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_mongos_passthrough.yml index d740564e09c..8583dce1b84 100644 --- a/buildscripts/resmokeconfig/suites/change_streams_mongos_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/change_streams_mongos_passthrough.yml @@ -3,7 +3,10 @@ test_kind: js_test selector: roots: - jstests/change_streams/**/*.js - + exclude_files: + # This test exercises an internal detail of mongos<->mongod communication and is not expected + # to work against a mongos. + - jstests/change_streams/report_latest_observed_oplog_timestamp.js executor: config: shell_options: diff --git a/jstests/change_streams/report_latest_observed_oplog_timestamp.js b/jstests/change_streams/report_latest_observed_oplog_timestamp.js new file mode 100644 index 00000000000..7345cfc4667 --- /dev/null +++ b/jstests/change_streams/report_latest_observed_oplog_timestamp.js @@ -0,0 +1,96 @@ +// Tests that an aggregate with a $changeStream stage will report the latest optime read in +// the oplog by its cursor. This is information is needed in order to correctly merge the results +// from the various shards on mongos. +(function() { + "use strict"; + + const testName = "report_latest_observed_oplog_timestamp"; + const cursorCollection = db.getCollection(testName); + const otherCollection = db.getCollection("unrelated_" + testName); + + // Drop collections to assure a clean run. Collections may not exist so do not check result. + cursorCollection.drop(); + otherCollection.drop(); + + // Get a resume point. + jsTestLog("Getting a resume point."); + const batchSize = 2; + assert.commandWorked(db.createCollection(cursorCollection.getName())); + const firstResponse = assert.commandWorked(cursorCollection.runCommand( + {aggregate: testName, pipeline: [{$changeStream: {}}], cursor: {batchSize: batchSize}})); + assert.eq(0, firstResponse.cursor.firstBatch.length); + assert.writeOK(cursorCollection.insert({_id: 0})); + + function iterateCursor(initialCursorResponse) { + const getMoreCollName = initialCursorResponse.cursor.ns.substr( + initialCursorResponse.cursor.ns.indexOf('.') + 1); + return assert.commandWorked(cursorCollection.runCommand({ + getMore: initialCursorResponse.cursor.id, + collection: getMoreCollName, + batchSize: batchSize + })); + } + const resumeResponse = iterateCursor(firstResponse); + assert.eq(1, resumeResponse.cursor.nextBatch.length); + // Because needsMerge was not set, the latest oplog timestamp should not be returned. + assert.eq(undefined, resumeResponse.$_internalLatestOplogTimestamp); + const resumeToken = resumeResponse.cursor.nextBatch[0]["_id"]; + + // Seed the collection with enough documents to fit in one batch. + // Note the resume document is included; when needsMerge is true, we see the resume token + // in the stream. + jsTestLog("Adding documents to collection."); + for (let i = 1; i < batchSize * 2; i++) { + assert.writeOK(cursorCollection.insert({_id: i}, {writeConcern: {w: 1}})); + } + + // TODO: SERVER-29126 + // While change streams still uses read concern level local instead of read concern level + // majority, we need to use causal consistency to be able to immediately read our own writes out + // of the oplog. Once change streams read from the majority snapshot, we can remove this + // synchronization point from this test. + assert.commandWorked(db.runCommand({ + find: "foo", + readConcern: {level: "local", afterClusterTime: db.getMongo().getOperationTime()} + })); + + // Look at one batch's worth. + jsTestLog("Testing that operation time is present on initial aggregate command."); + const cursorResponse = assert.commandWorked(cursorCollection.runCommand({ + aggregate: testName, + // The latest observed optime is only reported when needsMerge is set, and needsMerge + // requires fromMongos be set. + needsMerge: true, + fromMongos: true, + pipeline: [{$changeStream: {resumeAfter: resumeToken}}], + cursor: {batchSize: batchSize} + })); + const firstBatchOplogTimestamp = cursorResponse.$_internalLatestOplogTimestamp; + assert.neq(undefined, firstBatchOplogTimestamp, tojson(cursorResponse)); + + // Iterate the cursor and assert that the observed operation time advanced. + jsTestLog("Testing that operation time advances with getMore."); + let getMoreResponse = iterateCursor(cursorResponse); + const getMoreOplogTimestamp = getMoreResponse.$_internalLatestOplogTimestamp; + assert.neq(undefined, getMoreOplogTimestamp, tojson(getMoreResponse)); + assert.gt(getMoreOplogTimestamp, firstBatchOplogTimestamp); + + // Now make sure that the reported operation time advances if there are writes to an unrelated + // collection. + jsTestLog("Testing that operation time advances with writes to an unrelated collection."); + + // First make sure there is nothing left in our cursor. + getMoreResponse = iterateCursor(cursorResponse); + assert.eq(getMoreResponse.cursor.nextBatch, []); + + // Record that operation time, then test that the reported time advances on an insert to an + // unrelated collection. + const oplogTimeAtExhaust = getMoreResponse.$_internalLatestOplogTimestamp; + assert.neq(undefined, oplogTimeAtExhaust, tojson(getMoreResponse)); + assert.writeOK(otherCollection.insert({})); + + getMoreResponse = iterateCursor(cursorResponse); + const oplogTimeAfterUnrelatedInsert = getMoreResponse.$_internalLatestOplogTimestamp; + assert.neq(undefined, oplogTimeAtExhaust, tojson(getMoreResponse)); + assert.gt(oplogTimeAfterUnrelatedInsert, oplogTimeAtExhaust); +})(); diff --git a/src/mongo/db/commands/getmore_cmd.cpp b/src/mongo/db/commands/getmore_cmd.cpp index 7b78ba64fff..a86e0737475 100644 --- a/src/mongo/db/commands/getmore_cmd.cpp +++ b/src/mongo/db/commands/getmore_cmd.cpp @@ -322,9 +322,6 @@ public: CursorId respondWithId = 0; CursorResponseBuilder nextBatch(/*isInitialResponse*/ false, &result); BSONObj obj; - // generateBatch() will not initialize 'state' if it exceeds the time limiting generating - // the next batch for an awaitData cursor. In this case, 'state' should be - // PlanExecutor::ADVANCED, so we do not attempt to get another batch. PlanExecutor::ExecState state = PlanExecutor::ADVANCED; long long numResults = 0; @@ -443,6 +440,7 @@ public: // As soon as we get a result, this operation no longer waits. shouldWaitForInserts(opCtx) = false; // Add result to output buffer. + nextBatch->setLatestOplogTimestamp(exec->getLatestOplogTimestamp()); nextBatch->append(obj); (*numResults)++; } @@ -467,6 +465,10 @@ public: return Status(ErrorCodes::QueryPlanKilled, str::stream() << "PlanExecutor killed: " << WorkingSetCommon::toStatusString(obj)); + } else if (PlanExecutor::IS_EOF == *state) { + // This causes the reported latest oplog timestamp to advance even when there are + // no results for this particular query. + nextBatch->setLatestOplogTimestamp(exec->getLatestOplogTimestamp()); } return Status::OK(); diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp index c8d63487229..848eee04c67 100644 --- a/src/mongo/db/commands/run_aggregate.cpp +++ b/src/mongo/db/commands/run_aggregate.cpp @@ -93,8 +93,7 @@ bool handleCursorCommand(OperationContext* opCtx, long long batchSize = request.getBatchSize(); - // can't use result BSONObjBuilder directly since it won't handle exceptions correctly. - BSONArrayBuilder resultsArray; + CursorResponseBuilder responseBuilder(true, &result); BSONObj next; for (int objCount = 0; objCount < batchSize; objCount++) { // The initial getNext() on a PipelineProxyStage may be very expensive so we don't @@ -112,6 +111,8 @@ bool handleCursorCommand(OperationContext* opCtx, } if (state == PlanExecutor::IS_EOF) { + responseBuilder.setLatestOplogTimestamp( + cursor->getExecutor()->getLatestOplogTimestamp()); if (!cursor->isTailable()) { // make it an obvious error to use cursor or executor after this point cursor = nullptr; @@ -128,12 +129,13 @@ bool handleCursorCommand(OperationContext* opCtx, // If adding this object will cause us to exceed the message size limit, then we stash it // for later. - if (!FindCommon::haveSpaceForNext(next, objCount, resultsArray.len())) { + if (!FindCommon::haveSpaceForNext(next, objCount, responseBuilder.bytesUsed())) { cursor->getExecutor()->enqueue(next); break; } - resultsArray.append(next); + responseBuilder.setLatestOplogTimestamp(cursor->getExecutor()->getLatestOplogTimestamp()); + responseBuilder.append(next); } if (cursor) { @@ -152,7 +154,7 @@ bool handleCursorCommand(OperationContext* opCtx, } const CursorId cursorId = cursor ? cursor->cursorid() : 0LL; - appendCursorResponseObject(cursorId, nsForCursor.ns(), resultsArray.arr(), &result); + responseBuilder.done(cursorId, nsForCursor.ns()); return static_cast<bool>(cursor); } diff --git a/src/mongo/db/exec/collection_scan.cpp b/src/mongo/db/exec/collection_scan.cpp index c05dde68cea..c041792b780 100644 --- a/src/mongo/db/exec/collection_scan.cpp +++ b/src/mongo/db/exec/collection_scan.cpp @@ -66,6 +66,7 @@ CollectionScan::CollectionScan(OperationContext* opCtx, _wsidForFetch(_workingSet->allocate()) { // Explain reports the direction of the collection scan. _specificStats.direction = params.direction; + invariant(!_params.shouldTrackLatestOplogTimestamp || _params.collection->ns().isOplog()); } PlanStage::StageState CollectionScan::doWork(WorkingSetID* out) { @@ -169,6 +170,13 @@ PlanStage::StageState CollectionScan::doWork(WorkingSetID* out) { } _lastSeenId = record->id; + if (_params.shouldTrackLatestOplogTimestamp) { + auto status = setLatestOplogEntryTimestamp(*record); + if (!status.isOK()) { + *out = WorkingSetCommon::allocateStatusMember(_workingSet, status); + return PlanStage::FAILURE; + } + } WorkingSetID id = _workingSet->allocate(); WorkingSetMember* member = _workingSet->get(id); @@ -179,6 +187,19 @@ PlanStage::StageState CollectionScan::doWork(WorkingSetID* out) { return returnIfMatches(member, id, out); } +Status CollectionScan::setLatestOplogEntryTimestamp(const Record& record) { + auto tsElem = record.data.toBson()["ts"]; + if (tsElem.type() != BSONType::bsonTimestamp) { + Status status(ErrorCodes::InternalError, + str::stream() << "CollectionScan was asked to track latest operation time, " + "but found a result without a valid 'ts' field: " + << record.data.toBson().toString()); + return status; + } + _latestOplogEntryTimestamp = std::max(_latestOplogEntryTimestamp, tsElem.timestamp()); + return Status::OK(); +} + PlanStage::StageState CollectionScan::returnIfMatches(WorkingSetMember* member, WorkingSetID memberID, WorkingSetID* out) { diff --git a/src/mongo/db/exec/collection_scan.h b/src/mongo/db/exec/collection_scan.h index 3ba80b90d3f..880debb7421 100644 --- a/src/mongo/db/exec/collection_scan.h +++ b/src/mongo/db/exec/collection_scan.h @@ -37,6 +37,7 @@ namespace mongo { +struct Record; class SeekableRecordCursor; class WorkingSet; class OperationContext; @@ -67,6 +68,10 @@ public: return STAGE_COLLSCAN; } + Timestamp getLatestOplogTimestamp() const { + return _latestOplogEntryTimestamp; + } + std::unique_ptr<PlanStageStats> getStats() final; const SpecificStats* getSpecificStats() const final; @@ -80,6 +85,13 @@ private: */ StageState returnIfMatches(WorkingSetMember* member, WorkingSetID memberID, WorkingSetID* out); + /** + * Extracts the timestamp from the 'ts' field of 'record', and sets '_latestOplogEntryTimestamp' + * to that time if it isn't already greater. Returns an error if the 'ts' field cannot be + * extracted. + */ + Status setLatestOplogEntryTimestamp(const Record& record); + // WorkingSet is not owned by us. WorkingSet* _workingSet; @@ -99,6 +111,10 @@ private: // should remain in the INVALID state. const WorkingSetID _wsidForFetch; + // If _params.shouldTrackLatestOplogTimestamp is set and the collection is the oplog, the latest + // timestamp seen in the collection. Otherwise, this is a null timestamp. + Timestamp _latestOplogEntryTimestamp; + // Stats CollectionScanStats _specificStats; }; diff --git a/src/mongo/db/exec/collection_scan_common.h b/src/mongo/db/exec/collection_scan_common.h index 712e56f1eac..43f8f5989bf 100644 --- a/src/mongo/db/exec/collection_scan_common.h +++ b/src/mongo/db/exec/collection_scan_common.h @@ -28,6 +28,7 @@ #pragma once +#include "mongo/bson/timestamp.h" #include "mongo/db/record_id.h" namespace mongo { @@ -53,6 +54,11 @@ struct CollectionScanParams { // Do we want the scan to be 'tailable'? Only meaningful if the collection is capped. bool tailable = false; + // Should we keep track of the timestamp of the latest oplog entry we've seen? This information + // is needed to merge cursors from the oplog in order of operation time when reading the oplog + // across a sharded cluster. + bool shouldTrackLatestOplogTimestamp = false; + // Once the first matching document is found, assume that all documents after it must match. // This is useful for oplog queries where we know we will see records ordered by the ts field. bool stopApplyingFilterAfterFirstMatch = false; diff --git a/src/mongo/db/exec/pipeline_proxy.cpp b/src/mongo/db/exec/pipeline_proxy.cpp index b814a2eee0f..a1b9660661f 100644 --- a/src/mongo/db/exec/pipeline_proxy.cpp +++ b/src/mongo/db/exec/pipeline_proxy.cpp @@ -127,6 +127,10 @@ boost::optional<BSONObj> PipelineProxyStage::getNextBson() { return boost::none; } +Timestamp PipelineProxyStage::getLatestOplogTimestamp() const { + return PipelineD::getLatestOplogTimestamp(_pipeline.get()); +} + std::string PipelineProxyStage::getPlanSummaryStr() const { return PipelineD::getPlanSummaryStr(_pipeline.get()); } diff --git a/src/mongo/db/exec/pipeline_proxy.h b/src/mongo/db/exec/pipeline_proxy.h index bb6c6645eb1..973f69e79c2 100644 --- a/src/mongo/db/exec/pipeline_proxy.h +++ b/src/mongo/db/exec/pipeline_proxy.h @@ -74,6 +74,11 @@ public: MONGO_UNREACHABLE; } + /** + * Pass through the last oplog timestamp from the proxied pipeline. + */ + Timestamp getLatestOplogTimestamp() const; + std::string getPlanSummaryStr() const; void getPlanSummaryStats(PlanSummaryStats* statsOut) const; diff --git a/src/mongo/db/pipeline/document_source_cursor.cpp b/src/mongo/db/pipeline/document_source_cursor.cpp index e31751e3a5c..7585b129f4d 100644 --- a/src/mongo/db/pipeline/document_source_cursor.cpp +++ b/src/mongo/db/pipeline/document_source_cursor.cpp @@ -100,7 +100,10 @@ void DocumentSourceCursor::loadBatch() { // As long as we're waiting for inserts, we shouldn't do any batching at this level // we need the whole pipeline to see each document to see if we should stop waiting. + // Furthermore, if we need to return the latest oplog time (in the tailable and + // needs-merge case), batching will result in a wrong time. if (shouldWaitForInserts(pExpCtx->opCtx) || + (pExpCtx->isTailable() && pExpCtx->needsMerge) || memUsageBytes > internalDocumentSourceCursorBatchSizeBytes.load()) { // End this batch and prepare PlanExecutor for yielding. _exec->saveState(); diff --git a/src/mongo/db/pipeline/document_source_cursor.h b/src/mongo/db/pipeline/document_source_cursor.h index c15c0495ba7..f9c9345baf6 100644 --- a/src/mongo/db/pipeline/document_source_cursor.h +++ b/src/mongo/db/pipeline/document_source_cursor.h @@ -127,6 +127,13 @@ public: _shouldProduceEmptyDocs = true; } + Timestamp getLatestOplogTimestamp() const { + if (_exec) { + return _exec->getLatestOplogTimestamp(); + } + return Timestamp(); + } + const std::string& getPlanSummaryStr() const { return _planSummary; } diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp index 75fb836c319..33043acde9e 100644 --- a/src/mongo/db/pipeline/pipeline_d.cpp +++ b/src/mongo/db/pipeline/pipeline_d.cpp @@ -627,6 +627,10 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> PipelineD::prep plannerOpts |= QueryPlannerParams::NO_UNCOVERED_PROJECTIONS; } + if (expCtx->needsMerge && expCtx->tailableMode == TailableMode::kTailableAndAwaitData) { + plannerOpts |= QueryPlannerParams::TRACK_LATEST_OPLOG_TS; + } + const BSONObj emptyProjection; const BSONObj metaSortProjection = BSON("$meta" << "sortKey"); @@ -781,6 +785,14 @@ void PipelineD::addCursorSource(Collection* collection, pipeline->optimizePipeline(); } +Timestamp PipelineD::getLatestOplogTimestamp(const Pipeline* pipeline) { + if (auto docSourceCursor = + dynamic_cast<DocumentSourceCursor*>(pipeline->_sources.front().get())) { + return docSourceCursor->getLatestOplogTimestamp(); + } + return Timestamp(); +} + std::string PipelineD::getPlanSummaryStr(const Pipeline* pPipeline) { if (auto docSourceCursor = dynamic_cast<DocumentSourceCursor*>(pPipeline->_sources.front().get())) { diff --git a/src/mongo/db/pipeline/pipeline_d.h b/src/mongo/db/pipeline/pipeline_d.h index 81b095ca5b1..3c1acaac998 100644 --- a/src/mongo/db/pipeline/pipeline_d.h +++ b/src/mongo/db/pipeline/pipeline_d.h @@ -85,6 +85,8 @@ public: static void getPlanSummaryStats(const Pipeline* pipeline, PlanSummaryStats* statsOut); + static Timestamp getLatestOplogTimestamp(const Pipeline* pipeline); + private: PipelineD(); // does not exist: prevent instantiation diff --git a/src/mongo/db/query/cursor_response.cpp b/src/mongo/db/query/cursor_response.cpp index 69bb87e858d..b435f3351d5 100644 --- a/src/mongo/db/query/cursor_response.cpp +++ b/src/mongo/db/query/cursor_response.cpp @@ -45,6 +45,7 @@ const char kIdField[] = "id"; const char kNsField[] = "ns"; const char kBatchField[] = "nextBatch"; const char kBatchFieldInitial[] = "firstBatch"; +const char kInternalLatestOplogTimestampField[] = "$_internalLatestOplogTimestamp"; } // namespace @@ -61,6 +62,9 @@ void CursorResponseBuilder::done(CursorId cursorId, StringData cursorNamespace) _cursorObject.append(kIdField, cursorId); _cursorObject.append(kNsField, cursorNamespace); _cursorObject.doneFast(); + if (!_latestOplogTimestamp.isNull()) { + _commandResponse->append(kInternalLatestOplogTimestampField, _latestOplogTimestamp); + } _active = false; } diff --git a/src/mongo/db/query/cursor_response.h b/src/mongo/db/query/cursor_response.h index ecec7c2eb83..6f8ad8979cd 100644 --- a/src/mongo/db/query/cursor_response.h +++ b/src/mongo/db/query/cursor_response.h @@ -39,7 +39,8 @@ namespace mongo { /** - * Builds the cursor field for a reply to a cursor-generating command in place. + * Builds the cursor field and the _latestOplogTimestamp field for a reply to a cursor-generating + * command in place. */ class CursorResponseBuilder { MONGO_DISALLOW_COPYING(CursorResponseBuilder); @@ -70,6 +71,10 @@ public: _batch.append(obj); } + void setLatestOplogTimestamp(Timestamp ts) { + _latestOplogTimestamp = ts; + } + /** * Call this after successfully appending all fields that will be part of this response. * After calling, you may not call any more methods on this object. @@ -89,6 +94,7 @@ private: BSONObjBuilder* const _commandResponse; BSONObjBuilder _cursorObject; BSONArrayBuilder _batch; + Timestamp _latestOplogTimestamp; }; /** diff --git a/src/mongo/db/query/get_executor.cpp b/src/mongo/db/query/get_executor.cpp index eaaec1141ea..0560aa907f3 100644 --- a/src/mongo/db/query/get_executor.cpp +++ b/src/mongo/db/query/get_executor.cpp @@ -513,7 +513,10 @@ mongo::BSONElement extractOplogTsOptime(const mongo::MatchExpression* me) { } StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getOplogStartHack( - OperationContext* opCtx, Collection* collection, unique_ptr<CanonicalQuery> cq) { + OperationContext* opCtx, + Collection* collection, + unique_ptr<CanonicalQuery> cq, + size_t plannerOptions) { invariant(collection); invariant(cq.get()); @@ -586,9 +589,9 @@ StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getOplogStartHack( // This is normal. The start of the oplog is the beginning of the collection. if (PlanExecutor::IS_EOF == state) { - return getExecutor(opCtx, collection, std::move(cq), PlanExecutor::YIELD_AUTO); + return getExecutor( + opCtx, collection, std::move(cq), PlanExecutor::YIELD_AUTO, plannerOptions); } - // This is not normal. An error was encountered. if (PlanExecutor::ADVANCED != state) { return Status(ErrorCodes::InternalError, "quick oplog start location had error...?"); @@ -601,6 +604,8 @@ StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getOplogStartHack( params.start = *startLoc; params.direction = CollectionScanParams::FORWARD; params.tailable = cq->getQueryRequest().isTailable(); + params.shouldTrackLatestOplogTimestamp = + plannerOptions & QueryPlannerParams::TRACK_LATEST_OPLOG_TS; // If the query is just tsExpr, we know that every document in the collection after the first // matching one must also match. To avoid wasting time running the match expression on every @@ -628,7 +633,7 @@ StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getExecutorFind( PlanExecutor::YieldPolicy yieldPolicy, size_t plannerOptions) { if (NULL != collection && canonicalQuery->getQueryRequest().isOplogReplay()) { - return getOplogStartHack(opCtx, collection, std::move(canonicalQuery)); + return getOplogStartHack(opCtx, collection, std::move(canonicalQuery), plannerOptions); } if (ShardingState::get(opCtx)->needCollectionMetadata(opCtx, nss.ns())) { diff --git a/src/mongo/db/query/plan_executor.cpp b/src/mongo/db/query/plan_executor.cpp index ed443a2b09d..5862d8bf813 100644 --- a/src/mongo/db/query/plan_executor.cpp +++ b/src/mongo/db/query/plan_executor.cpp @@ -37,6 +37,7 @@ #include "mongo/db/concurrency/write_conflict_exception.h" #include "mongo/db/curop.h" #include "mongo/db/exec/cached_plan.h" +#include "mongo/db/exec/collection_scan.h" #include "mongo/db/exec/multi_plan.h" #include "mongo/db/exec/pipeline_proxy.h" #include "mongo/db/exec/plan_stage.h" @@ -703,6 +704,14 @@ PlanExecutor::ExecState PlanExecutor::swallowTimeoutIfAwaitData( return PlanExecutor::DEAD; } +Timestamp PlanExecutor::getLatestOplogTimestamp() { + if (auto pipelineProxy = getStageByType(_root.get(), STAGE_PIPELINE_PROXY)) + return static_cast<PipelineProxyStage*>(pipelineProxy)->getLatestOplogTimestamp(); + if (auto collectionScan = getStageByType(_root.get(), STAGE_COLLSCAN)) + return static_cast<CollectionScan*>(collectionScan)->getLatestOplogTimestamp(); + return Timestamp(); +} + // // PlanExecutor::Deleter // diff --git a/src/mongo/db/query/plan_executor.h b/src/mongo/db/query/plan_executor.h index 743c818d65f..aeed2ea8154 100644 --- a/src/mongo/db/query/plan_executor.h +++ b/src/mongo/db/query/plan_executor.h @@ -457,6 +457,12 @@ public: return *_killReason; } + /** + * If the last oplog timestamp is being tracked for this PlanExecutor, return it. + * Otherwise return a null timestamp. + */ + Timestamp getLatestOplogTimestamp(); + private: /** * Returns true if the PlanExecutor should wait for data to be inserted, which is when a getMore diff --git a/src/mongo/db/query/planner_access.cpp b/src/mongo/db/query/planner_access.cpp index cd35a45d4aa..5a47f4b4470 100644 --- a/src/mongo/db/query/planner_access.cpp +++ b/src/mongo/db/query/planner_access.cpp @@ -151,6 +151,8 @@ std::unique_ptr<QuerySolutionNode> QueryPlannerAccess::makeCollectionScan( csn->filter = query.root()->shallowClone(); csn->tailable = tailable; csn->maxScan = query.getQueryRequest().getMaxScan(); + csn->shouldTrackLatestOplogTimestamp = + params.options & QueryPlannerParams::TRACK_LATEST_OPLOG_TS; // If the hint is {$natural: +-1} this changes the direction of the collection scan. if (!query.getQueryRequest().getHint().isEmpty()) { diff --git a/src/mongo/db/query/query_planner.cpp b/src/mongo/db/query/query_planner.cpp index 56e9041dd23..c033911f4ba 100644 --- a/src/mongo/db/query/query_planner.cpp +++ b/src/mongo/db/query/query_planner.cpp @@ -127,6 +127,9 @@ string optionString(size_t options) { if (options & QueryPlannerParams::GENERATE_COVERED_IXSCANS) { ss << "GENERATE_COVERED_IXSCANS "; } + if (options & QueryPlannerParams::TRACK_LATEST_OPLOG_TS) { + ss << "TRACK_LATEST_OPLOG_TS "; + } return ss; } diff --git a/src/mongo/db/query/query_planner_params.h b/src/mongo/db/query/query_planner_params.h index de47dbf6abd..80df2b0c3ba 100644 --- a/src/mongo/db/query/query_planner_params.h +++ b/src/mongo/db/query/query_planner_params.h @@ -98,6 +98,9 @@ struct QueryPlannerParams { // Set this to generate covered whole IXSCAN plans. GENERATE_COVERED_IXSCANS = 1 << 11, + + // Set this to track the most recent timestamp seen by this cursor while scanning the oplog. + TRACK_LATEST_OPLOG_TS = 1 << 12, }; // See Options enum above. diff --git a/src/mongo/db/query/query_solution.cpp b/src/mongo/db/query/query_solution.cpp index 5eec4719842..e9095c01ceb 100644 --- a/src/mongo/db/query/query_solution.cpp +++ b/src/mongo/db/query/query_solution.cpp @@ -247,6 +247,7 @@ QuerySolutionNode* CollectionScanNode::clone() const { copy->tailable = this->tailable; copy->direction = this->direction; copy->maxScan = this->maxScan; + copy->shouldTrackLatestOplogTimestamp = this->shouldTrackLatestOplogTimestamp; return copy; } diff --git a/src/mongo/db/query/query_solution.h b/src/mongo/db/query/query_solution.h index c2187385adb..68484ed8d66 100644 --- a/src/mongo/db/query/query_solution.h +++ b/src/mongo/db/query/query_solution.h @@ -289,6 +289,11 @@ struct CollectionScanNode : public QuerySolutionNode { // Should we make a tailable cursor? bool tailable; + // Should we keep track of the timestamp of the latest oplog entry we've seen? This information + // is needed to merge cursors from the oplog in order of operation time when reading the oplog + // across a sharded cluster. + bool shouldTrackLatestOplogTimestamp = false; + int direction; // maxScan option to .find() limits how many docs we look at. diff --git a/src/mongo/db/query/stage_builder.cpp b/src/mongo/db/query/stage_builder.cpp index 1319ac07f52..dcd21b27f35 100644 --- a/src/mongo/db/query/stage_builder.cpp +++ b/src/mongo/db/query/stage_builder.cpp @@ -78,6 +78,7 @@ PlanStage* buildStages(OperationContext* opCtx, CollectionScanParams params; params.collection = collection; params.tailable = csn->tailable; + params.shouldTrackLatestOplogTimestamp = csn->shouldTrackLatestOplogTimestamp; params.direction = (csn->direction == 1) ? CollectionScanParams::FORWARD : CollectionScanParams::BACKWARD; params.maxScan = csn->maxScan; |