summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--buildscripts/resmokeconfig/suites/change_streams_mongos_passthrough.yml5
-rw-r--r--jstests/change_streams/report_latest_observed_oplog_timestamp.js96
-rw-r--r--src/mongo/db/commands/getmore_cmd.cpp8
-rw-r--r--src/mongo/db/commands/run_aggregate.cpp12
-rw-r--r--src/mongo/db/exec/collection_scan.cpp21
-rw-r--r--src/mongo/db/exec/collection_scan.h16
-rw-r--r--src/mongo/db/exec/collection_scan_common.h6
-rw-r--r--src/mongo/db/exec/pipeline_proxy.cpp4
-rw-r--r--src/mongo/db/exec/pipeline_proxy.h5
-rw-r--r--src/mongo/db/pipeline/document_source_cursor.cpp3
-rw-r--r--src/mongo/db/pipeline/document_source_cursor.h7
-rw-r--r--src/mongo/db/pipeline/pipeline_d.cpp12
-rw-r--r--src/mongo/db/pipeline/pipeline_d.h2
-rw-r--r--src/mongo/db/query/cursor_response.cpp4
-rw-r--r--src/mongo/db/query/cursor_response.h8
-rw-r--r--src/mongo/db/query/get_executor.cpp13
-rw-r--r--src/mongo/db/query/plan_executor.cpp9
-rw-r--r--src/mongo/db/query/plan_executor.h6
-rw-r--r--src/mongo/db/query/planner_access.cpp2
-rw-r--r--src/mongo/db/query/query_planner.cpp3
-rw-r--r--src/mongo/db/query/query_planner_params.h3
-rw-r--r--src/mongo/db/query/query_solution.cpp1
-rw-r--r--src/mongo/db/query/query_solution.h5
-rw-r--r--src/mongo/db/query/stage_builder.cpp1
24 files changed, 238 insertions, 14 deletions
diff --git a/buildscripts/resmokeconfig/suites/change_streams_mongos_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_mongos_passthrough.yml
index d740564e09c..8583dce1b84 100644
--- a/buildscripts/resmokeconfig/suites/change_streams_mongos_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/change_streams_mongos_passthrough.yml
@@ -3,7 +3,10 @@ test_kind: js_test
selector:
roots:
- jstests/change_streams/**/*.js
-
+ exclude_files:
+ # This test exercises an internal detail of mongos<->mongod communication and is not expected
+ # to work against a mongos.
+ - jstests/change_streams/report_latest_observed_oplog_timestamp.js
executor:
config:
shell_options:
diff --git a/jstests/change_streams/report_latest_observed_oplog_timestamp.js b/jstests/change_streams/report_latest_observed_oplog_timestamp.js
new file mode 100644
index 00000000000..7345cfc4667
--- /dev/null
+++ b/jstests/change_streams/report_latest_observed_oplog_timestamp.js
@@ -0,0 +1,96 @@
+// Tests that an aggregate with a $changeStream stage will report the latest optime read in
+// the oplog by its cursor. This is information is needed in order to correctly merge the results
+// from the various shards on mongos.
+(function() {
+ "use strict";
+
+ const testName = "report_latest_observed_oplog_timestamp";
+ const cursorCollection = db.getCollection(testName);
+ const otherCollection = db.getCollection("unrelated_" + testName);
+
+ // Drop collections to assure a clean run. Collections may not exist so do not check result.
+ cursorCollection.drop();
+ otherCollection.drop();
+
+ // Get a resume point.
+ jsTestLog("Getting a resume point.");
+ const batchSize = 2;
+ assert.commandWorked(db.createCollection(cursorCollection.getName()));
+ const firstResponse = assert.commandWorked(cursorCollection.runCommand(
+ {aggregate: testName, pipeline: [{$changeStream: {}}], cursor: {batchSize: batchSize}}));
+ assert.eq(0, firstResponse.cursor.firstBatch.length);
+ assert.writeOK(cursorCollection.insert({_id: 0}));
+
+ function iterateCursor(initialCursorResponse) {
+ const getMoreCollName = initialCursorResponse.cursor.ns.substr(
+ initialCursorResponse.cursor.ns.indexOf('.') + 1);
+ return assert.commandWorked(cursorCollection.runCommand({
+ getMore: initialCursorResponse.cursor.id,
+ collection: getMoreCollName,
+ batchSize: batchSize
+ }));
+ }
+ const resumeResponse = iterateCursor(firstResponse);
+ assert.eq(1, resumeResponse.cursor.nextBatch.length);
+ // Because needsMerge was not set, the latest oplog timestamp should not be returned.
+ assert.eq(undefined, resumeResponse.$_internalLatestOplogTimestamp);
+ const resumeToken = resumeResponse.cursor.nextBatch[0]["_id"];
+
+ // Seed the collection with enough documents to fit in one batch.
+ // Note the resume document is included; when needsMerge is true, we see the resume token
+ // in the stream.
+ jsTestLog("Adding documents to collection.");
+ for (let i = 1; i < batchSize * 2; i++) {
+ assert.writeOK(cursorCollection.insert({_id: i}, {writeConcern: {w: 1}}));
+ }
+
+ // TODO: SERVER-29126
+ // While change streams still uses read concern level local instead of read concern level
+ // majority, we need to use causal consistency to be able to immediately read our own writes out
+ // of the oplog. Once change streams read from the majority snapshot, we can remove this
+ // synchronization point from this test.
+ assert.commandWorked(db.runCommand({
+ find: "foo",
+ readConcern: {level: "local", afterClusterTime: db.getMongo().getOperationTime()}
+ }));
+
+ // Look at one batch's worth.
+ jsTestLog("Testing that operation time is present on initial aggregate command.");
+ const cursorResponse = assert.commandWorked(cursorCollection.runCommand({
+ aggregate: testName,
+ // The latest observed optime is only reported when needsMerge is set, and needsMerge
+ // requires fromMongos be set.
+ needsMerge: true,
+ fromMongos: true,
+ pipeline: [{$changeStream: {resumeAfter: resumeToken}}],
+ cursor: {batchSize: batchSize}
+ }));
+ const firstBatchOplogTimestamp = cursorResponse.$_internalLatestOplogTimestamp;
+ assert.neq(undefined, firstBatchOplogTimestamp, tojson(cursorResponse));
+
+ // Iterate the cursor and assert that the observed operation time advanced.
+ jsTestLog("Testing that operation time advances with getMore.");
+ let getMoreResponse = iterateCursor(cursorResponse);
+ const getMoreOplogTimestamp = getMoreResponse.$_internalLatestOplogTimestamp;
+ assert.neq(undefined, getMoreOplogTimestamp, tojson(getMoreResponse));
+ assert.gt(getMoreOplogTimestamp, firstBatchOplogTimestamp);
+
+ // Now make sure that the reported operation time advances if there are writes to an unrelated
+ // collection.
+ jsTestLog("Testing that operation time advances with writes to an unrelated collection.");
+
+ // First make sure there is nothing left in our cursor.
+ getMoreResponse = iterateCursor(cursorResponse);
+ assert.eq(getMoreResponse.cursor.nextBatch, []);
+
+ // Record that operation time, then test that the reported time advances on an insert to an
+ // unrelated collection.
+ const oplogTimeAtExhaust = getMoreResponse.$_internalLatestOplogTimestamp;
+ assert.neq(undefined, oplogTimeAtExhaust, tojson(getMoreResponse));
+ assert.writeOK(otherCollection.insert({}));
+
+ getMoreResponse = iterateCursor(cursorResponse);
+ const oplogTimeAfterUnrelatedInsert = getMoreResponse.$_internalLatestOplogTimestamp;
+ assert.neq(undefined, oplogTimeAtExhaust, tojson(getMoreResponse));
+ assert.gt(oplogTimeAfterUnrelatedInsert, oplogTimeAtExhaust);
+})();
diff --git a/src/mongo/db/commands/getmore_cmd.cpp b/src/mongo/db/commands/getmore_cmd.cpp
index 7b78ba64fff..a86e0737475 100644
--- a/src/mongo/db/commands/getmore_cmd.cpp
+++ b/src/mongo/db/commands/getmore_cmd.cpp
@@ -322,9 +322,6 @@ public:
CursorId respondWithId = 0;
CursorResponseBuilder nextBatch(/*isInitialResponse*/ false, &result);
BSONObj obj;
- // generateBatch() will not initialize 'state' if it exceeds the time limiting generating
- // the next batch for an awaitData cursor. In this case, 'state' should be
- // PlanExecutor::ADVANCED, so we do not attempt to get another batch.
PlanExecutor::ExecState state = PlanExecutor::ADVANCED;
long long numResults = 0;
@@ -443,6 +440,7 @@ public:
// As soon as we get a result, this operation no longer waits.
shouldWaitForInserts(opCtx) = false;
// Add result to output buffer.
+ nextBatch->setLatestOplogTimestamp(exec->getLatestOplogTimestamp());
nextBatch->append(obj);
(*numResults)++;
}
@@ -467,6 +465,10 @@ public:
return Status(ErrorCodes::QueryPlanKilled,
str::stream() << "PlanExecutor killed: "
<< WorkingSetCommon::toStatusString(obj));
+ } else if (PlanExecutor::IS_EOF == *state) {
+ // This causes the reported latest oplog timestamp to advance even when there are
+ // no results for this particular query.
+ nextBatch->setLatestOplogTimestamp(exec->getLatestOplogTimestamp());
}
return Status::OK();
diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp
index c8d63487229..848eee04c67 100644
--- a/src/mongo/db/commands/run_aggregate.cpp
+++ b/src/mongo/db/commands/run_aggregate.cpp
@@ -93,8 +93,7 @@ bool handleCursorCommand(OperationContext* opCtx,
long long batchSize = request.getBatchSize();
- // can't use result BSONObjBuilder directly since it won't handle exceptions correctly.
- BSONArrayBuilder resultsArray;
+ CursorResponseBuilder responseBuilder(true, &result);
BSONObj next;
for (int objCount = 0; objCount < batchSize; objCount++) {
// The initial getNext() on a PipelineProxyStage may be very expensive so we don't
@@ -112,6 +111,8 @@ bool handleCursorCommand(OperationContext* opCtx,
}
if (state == PlanExecutor::IS_EOF) {
+ responseBuilder.setLatestOplogTimestamp(
+ cursor->getExecutor()->getLatestOplogTimestamp());
if (!cursor->isTailable()) {
// make it an obvious error to use cursor or executor after this point
cursor = nullptr;
@@ -128,12 +129,13 @@ bool handleCursorCommand(OperationContext* opCtx,
// If adding this object will cause us to exceed the message size limit, then we stash it
// for later.
- if (!FindCommon::haveSpaceForNext(next, objCount, resultsArray.len())) {
+ if (!FindCommon::haveSpaceForNext(next, objCount, responseBuilder.bytesUsed())) {
cursor->getExecutor()->enqueue(next);
break;
}
- resultsArray.append(next);
+ responseBuilder.setLatestOplogTimestamp(cursor->getExecutor()->getLatestOplogTimestamp());
+ responseBuilder.append(next);
}
if (cursor) {
@@ -152,7 +154,7 @@ bool handleCursorCommand(OperationContext* opCtx,
}
const CursorId cursorId = cursor ? cursor->cursorid() : 0LL;
- appendCursorResponseObject(cursorId, nsForCursor.ns(), resultsArray.arr(), &result);
+ responseBuilder.done(cursorId, nsForCursor.ns());
return static_cast<bool>(cursor);
}
diff --git a/src/mongo/db/exec/collection_scan.cpp b/src/mongo/db/exec/collection_scan.cpp
index c05dde68cea..c041792b780 100644
--- a/src/mongo/db/exec/collection_scan.cpp
+++ b/src/mongo/db/exec/collection_scan.cpp
@@ -66,6 +66,7 @@ CollectionScan::CollectionScan(OperationContext* opCtx,
_wsidForFetch(_workingSet->allocate()) {
// Explain reports the direction of the collection scan.
_specificStats.direction = params.direction;
+ invariant(!_params.shouldTrackLatestOplogTimestamp || _params.collection->ns().isOplog());
}
PlanStage::StageState CollectionScan::doWork(WorkingSetID* out) {
@@ -169,6 +170,13 @@ PlanStage::StageState CollectionScan::doWork(WorkingSetID* out) {
}
_lastSeenId = record->id;
+ if (_params.shouldTrackLatestOplogTimestamp) {
+ auto status = setLatestOplogEntryTimestamp(*record);
+ if (!status.isOK()) {
+ *out = WorkingSetCommon::allocateStatusMember(_workingSet, status);
+ return PlanStage::FAILURE;
+ }
+ }
WorkingSetID id = _workingSet->allocate();
WorkingSetMember* member = _workingSet->get(id);
@@ -179,6 +187,19 @@ PlanStage::StageState CollectionScan::doWork(WorkingSetID* out) {
return returnIfMatches(member, id, out);
}
+Status CollectionScan::setLatestOplogEntryTimestamp(const Record& record) {
+ auto tsElem = record.data.toBson()["ts"];
+ if (tsElem.type() != BSONType::bsonTimestamp) {
+ Status status(ErrorCodes::InternalError,
+ str::stream() << "CollectionScan was asked to track latest operation time, "
+ "but found a result without a valid 'ts' field: "
+ << record.data.toBson().toString());
+ return status;
+ }
+ _latestOplogEntryTimestamp = std::max(_latestOplogEntryTimestamp, tsElem.timestamp());
+ return Status::OK();
+}
+
PlanStage::StageState CollectionScan::returnIfMatches(WorkingSetMember* member,
WorkingSetID memberID,
WorkingSetID* out) {
diff --git a/src/mongo/db/exec/collection_scan.h b/src/mongo/db/exec/collection_scan.h
index 3ba80b90d3f..880debb7421 100644
--- a/src/mongo/db/exec/collection_scan.h
+++ b/src/mongo/db/exec/collection_scan.h
@@ -37,6 +37,7 @@
namespace mongo {
+struct Record;
class SeekableRecordCursor;
class WorkingSet;
class OperationContext;
@@ -67,6 +68,10 @@ public:
return STAGE_COLLSCAN;
}
+ Timestamp getLatestOplogTimestamp() const {
+ return _latestOplogEntryTimestamp;
+ }
+
std::unique_ptr<PlanStageStats> getStats() final;
const SpecificStats* getSpecificStats() const final;
@@ -80,6 +85,13 @@ private:
*/
StageState returnIfMatches(WorkingSetMember* member, WorkingSetID memberID, WorkingSetID* out);
+ /**
+ * Extracts the timestamp from the 'ts' field of 'record', and sets '_latestOplogEntryTimestamp'
+ * to that time if it isn't already greater. Returns an error if the 'ts' field cannot be
+ * extracted.
+ */
+ Status setLatestOplogEntryTimestamp(const Record& record);
+
// WorkingSet is not owned by us.
WorkingSet* _workingSet;
@@ -99,6 +111,10 @@ private:
// should remain in the INVALID state.
const WorkingSetID _wsidForFetch;
+ // If _params.shouldTrackLatestOplogTimestamp is set and the collection is the oplog, the latest
+ // timestamp seen in the collection. Otherwise, this is a null timestamp.
+ Timestamp _latestOplogEntryTimestamp;
+
// Stats
CollectionScanStats _specificStats;
};
diff --git a/src/mongo/db/exec/collection_scan_common.h b/src/mongo/db/exec/collection_scan_common.h
index 712e56f1eac..43f8f5989bf 100644
--- a/src/mongo/db/exec/collection_scan_common.h
+++ b/src/mongo/db/exec/collection_scan_common.h
@@ -28,6 +28,7 @@
#pragma once
+#include "mongo/bson/timestamp.h"
#include "mongo/db/record_id.h"
namespace mongo {
@@ -53,6 +54,11 @@ struct CollectionScanParams {
// Do we want the scan to be 'tailable'? Only meaningful if the collection is capped.
bool tailable = false;
+ // Should we keep track of the timestamp of the latest oplog entry we've seen? This information
+ // is needed to merge cursors from the oplog in order of operation time when reading the oplog
+ // across a sharded cluster.
+ bool shouldTrackLatestOplogTimestamp = false;
+
// Once the first matching document is found, assume that all documents after it must match.
// This is useful for oplog queries where we know we will see records ordered by the ts field.
bool stopApplyingFilterAfterFirstMatch = false;
diff --git a/src/mongo/db/exec/pipeline_proxy.cpp b/src/mongo/db/exec/pipeline_proxy.cpp
index b814a2eee0f..a1b9660661f 100644
--- a/src/mongo/db/exec/pipeline_proxy.cpp
+++ b/src/mongo/db/exec/pipeline_proxy.cpp
@@ -127,6 +127,10 @@ boost::optional<BSONObj> PipelineProxyStage::getNextBson() {
return boost::none;
}
+Timestamp PipelineProxyStage::getLatestOplogTimestamp() const {
+ return PipelineD::getLatestOplogTimestamp(_pipeline.get());
+}
+
std::string PipelineProxyStage::getPlanSummaryStr() const {
return PipelineD::getPlanSummaryStr(_pipeline.get());
}
diff --git a/src/mongo/db/exec/pipeline_proxy.h b/src/mongo/db/exec/pipeline_proxy.h
index bb6c6645eb1..973f69e79c2 100644
--- a/src/mongo/db/exec/pipeline_proxy.h
+++ b/src/mongo/db/exec/pipeline_proxy.h
@@ -74,6 +74,11 @@ public:
MONGO_UNREACHABLE;
}
+ /**
+ * Pass through the last oplog timestamp from the proxied pipeline.
+ */
+ Timestamp getLatestOplogTimestamp() const;
+
std::string getPlanSummaryStr() const;
void getPlanSummaryStats(PlanSummaryStats* statsOut) const;
diff --git a/src/mongo/db/pipeline/document_source_cursor.cpp b/src/mongo/db/pipeline/document_source_cursor.cpp
index e31751e3a5c..7585b129f4d 100644
--- a/src/mongo/db/pipeline/document_source_cursor.cpp
+++ b/src/mongo/db/pipeline/document_source_cursor.cpp
@@ -100,7 +100,10 @@ void DocumentSourceCursor::loadBatch() {
// As long as we're waiting for inserts, we shouldn't do any batching at this level
// we need the whole pipeline to see each document to see if we should stop waiting.
+ // Furthermore, if we need to return the latest oplog time (in the tailable and
+ // needs-merge case), batching will result in a wrong time.
if (shouldWaitForInserts(pExpCtx->opCtx) ||
+ (pExpCtx->isTailable() && pExpCtx->needsMerge) ||
memUsageBytes > internalDocumentSourceCursorBatchSizeBytes.load()) {
// End this batch and prepare PlanExecutor for yielding.
_exec->saveState();
diff --git a/src/mongo/db/pipeline/document_source_cursor.h b/src/mongo/db/pipeline/document_source_cursor.h
index c15c0495ba7..f9c9345baf6 100644
--- a/src/mongo/db/pipeline/document_source_cursor.h
+++ b/src/mongo/db/pipeline/document_source_cursor.h
@@ -127,6 +127,13 @@ public:
_shouldProduceEmptyDocs = true;
}
+ Timestamp getLatestOplogTimestamp() const {
+ if (_exec) {
+ return _exec->getLatestOplogTimestamp();
+ }
+ return Timestamp();
+ }
+
const std::string& getPlanSummaryStr() const {
return _planSummary;
}
diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp
index 75fb836c319..33043acde9e 100644
--- a/src/mongo/db/pipeline/pipeline_d.cpp
+++ b/src/mongo/db/pipeline/pipeline_d.cpp
@@ -627,6 +627,10 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> PipelineD::prep
plannerOpts |= QueryPlannerParams::NO_UNCOVERED_PROJECTIONS;
}
+ if (expCtx->needsMerge && expCtx->tailableMode == TailableMode::kTailableAndAwaitData) {
+ plannerOpts |= QueryPlannerParams::TRACK_LATEST_OPLOG_TS;
+ }
+
const BSONObj emptyProjection;
const BSONObj metaSortProjection = BSON("$meta"
<< "sortKey");
@@ -781,6 +785,14 @@ void PipelineD::addCursorSource(Collection* collection,
pipeline->optimizePipeline();
}
+Timestamp PipelineD::getLatestOplogTimestamp(const Pipeline* pipeline) {
+ if (auto docSourceCursor =
+ dynamic_cast<DocumentSourceCursor*>(pipeline->_sources.front().get())) {
+ return docSourceCursor->getLatestOplogTimestamp();
+ }
+ return Timestamp();
+}
+
std::string PipelineD::getPlanSummaryStr(const Pipeline* pPipeline) {
if (auto docSourceCursor =
dynamic_cast<DocumentSourceCursor*>(pPipeline->_sources.front().get())) {
diff --git a/src/mongo/db/pipeline/pipeline_d.h b/src/mongo/db/pipeline/pipeline_d.h
index 81b095ca5b1..3c1acaac998 100644
--- a/src/mongo/db/pipeline/pipeline_d.h
+++ b/src/mongo/db/pipeline/pipeline_d.h
@@ -85,6 +85,8 @@ public:
static void getPlanSummaryStats(const Pipeline* pipeline, PlanSummaryStats* statsOut);
+ static Timestamp getLatestOplogTimestamp(const Pipeline* pipeline);
+
private:
PipelineD(); // does not exist: prevent instantiation
diff --git a/src/mongo/db/query/cursor_response.cpp b/src/mongo/db/query/cursor_response.cpp
index 69bb87e858d..b435f3351d5 100644
--- a/src/mongo/db/query/cursor_response.cpp
+++ b/src/mongo/db/query/cursor_response.cpp
@@ -45,6 +45,7 @@ const char kIdField[] = "id";
const char kNsField[] = "ns";
const char kBatchField[] = "nextBatch";
const char kBatchFieldInitial[] = "firstBatch";
+const char kInternalLatestOplogTimestampField[] = "$_internalLatestOplogTimestamp";
} // namespace
@@ -61,6 +62,9 @@ void CursorResponseBuilder::done(CursorId cursorId, StringData cursorNamespace)
_cursorObject.append(kIdField, cursorId);
_cursorObject.append(kNsField, cursorNamespace);
_cursorObject.doneFast();
+ if (!_latestOplogTimestamp.isNull()) {
+ _commandResponse->append(kInternalLatestOplogTimestampField, _latestOplogTimestamp);
+ }
_active = false;
}
diff --git a/src/mongo/db/query/cursor_response.h b/src/mongo/db/query/cursor_response.h
index ecec7c2eb83..6f8ad8979cd 100644
--- a/src/mongo/db/query/cursor_response.h
+++ b/src/mongo/db/query/cursor_response.h
@@ -39,7 +39,8 @@
namespace mongo {
/**
- * Builds the cursor field for a reply to a cursor-generating command in place.
+ * Builds the cursor field and the _latestOplogTimestamp field for a reply to a cursor-generating
+ * command in place.
*/
class CursorResponseBuilder {
MONGO_DISALLOW_COPYING(CursorResponseBuilder);
@@ -70,6 +71,10 @@ public:
_batch.append(obj);
}
+ void setLatestOplogTimestamp(Timestamp ts) {
+ _latestOplogTimestamp = ts;
+ }
+
/**
* Call this after successfully appending all fields that will be part of this response.
* After calling, you may not call any more methods on this object.
@@ -89,6 +94,7 @@ private:
BSONObjBuilder* const _commandResponse;
BSONObjBuilder _cursorObject;
BSONArrayBuilder _batch;
+ Timestamp _latestOplogTimestamp;
};
/**
diff --git a/src/mongo/db/query/get_executor.cpp b/src/mongo/db/query/get_executor.cpp
index eaaec1141ea..0560aa907f3 100644
--- a/src/mongo/db/query/get_executor.cpp
+++ b/src/mongo/db/query/get_executor.cpp
@@ -513,7 +513,10 @@ mongo::BSONElement extractOplogTsOptime(const mongo::MatchExpression* me) {
}
StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getOplogStartHack(
- OperationContext* opCtx, Collection* collection, unique_ptr<CanonicalQuery> cq) {
+ OperationContext* opCtx,
+ Collection* collection,
+ unique_ptr<CanonicalQuery> cq,
+ size_t plannerOptions) {
invariant(collection);
invariant(cq.get());
@@ -586,9 +589,9 @@ StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getOplogStartHack(
// This is normal. The start of the oplog is the beginning of the collection.
if (PlanExecutor::IS_EOF == state) {
- return getExecutor(opCtx, collection, std::move(cq), PlanExecutor::YIELD_AUTO);
+ return getExecutor(
+ opCtx, collection, std::move(cq), PlanExecutor::YIELD_AUTO, plannerOptions);
}
-
// This is not normal. An error was encountered.
if (PlanExecutor::ADVANCED != state) {
return Status(ErrorCodes::InternalError, "quick oplog start location had error...?");
@@ -601,6 +604,8 @@ StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getOplogStartHack(
params.start = *startLoc;
params.direction = CollectionScanParams::FORWARD;
params.tailable = cq->getQueryRequest().isTailable();
+ params.shouldTrackLatestOplogTimestamp =
+ plannerOptions & QueryPlannerParams::TRACK_LATEST_OPLOG_TS;
// If the query is just tsExpr, we know that every document in the collection after the first
// matching one must also match. To avoid wasting time running the match expression on every
@@ -628,7 +633,7 @@ StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getExecutorFind(
PlanExecutor::YieldPolicy yieldPolicy,
size_t plannerOptions) {
if (NULL != collection && canonicalQuery->getQueryRequest().isOplogReplay()) {
- return getOplogStartHack(opCtx, collection, std::move(canonicalQuery));
+ return getOplogStartHack(opCtx, collection, std::move(canonicalQuery), plannerOptions);
}
if (ShardingState::get(opCtx)->needCollectionMetadata(opCtx, nss.ns())) {
diff --git a/src/mongo/db/query/plan_executor.cpp b/src/mongo/db/query/plan_executor.cpp
index ed443a2b09d..5862d8bf813 100644
--- a/src/mongo/db/query/plan_executor.cpp
+++ b/src/mongo/db/query/plan_executor.cpp
@@ -37,6 +37,7 @@
#include "mongo/db/concurrency/write_conflict_exception.h"
#include "mongo/db/curop.h"
#include "mongo/db/exec/cached_plan.h"
+#include "mongo/db/exec/collection_scan.h"
#include "mongo/db/exec/multi_plan.h"
#include "mongo/db/exec/pipeline_proxy.h"
#include "mongo/db/exec/plan_stage.h"
@@ -703,6 +704,14 @@ PlanExecutor::ExecState PlanExecutor::swallowTimeoutIfAwaitData(
return PlanExecutor::DEAD;
}
+Timestamp PlanExecutor::getLatestOplogTimestamp() {
+ if (auto pipelineProxy = getStageByType(_root.get(), STAGE_PIPELINE_PROXY))
+ return static_cast<PipelineProxyStage*>(pipelineProxy)->getLatestOplogTimestamp();
+ if (auto collectionScan = getStageByType(_root.get(), STAGE_COLLSCAN))
+ return static_cast<CollectionScan*>(collectionScan)->getLatestOplogTimestamp();
+ return Timestamp();
+}
+
//
// PlanExecutor::Deleter
//
diff --git a/src/mongo/db/query/plan_executor.h b/src/mongo/db/query/plan_executor.h
index 743c818d65f..aeed2ea8154 100644
--- a/src/mongo/db/query/plan_executor.h
+++ b/src/mongo/db/query/plan_executor.h
@@ -457,6 +457,12 @@ public:
return *_killReason;
}
+ /**
+ * If the last oplog timestamp is being tracked for this PlanExecutor, return it.
+ * Otherwise return a null timestamp.
+ */
+ Timestamp getLatestOplogTimestamp();
+
private:
/**
* Returns true if the PlanExecutor should wait for data to be inserted, which is when a getMore
diff --git a/src/mongo/db/query/planner_access.cpp b/src/mongo/db/query/planner_access.cpp
index cd35a45d4aa..5a47f4b4470 100644
--- a/src/mongo/db/query/planner_access.cpp
+++ b/src/mongo/db/query/planner_access.cpp
@@ -151,6 +151,8 @@ std::unique_ptr<QuerySolutionNode> QueryPlannerAccess::makeCollectionScan(
csn->filter = query.root()->shallowClone();
csn->tailable = tailable;
csn->maxScan = query.getQueryRequest().getMaxScan();
+ csn->shouldTrackLatestOplogTimestamp =
+ params.options & QueryPlannerParams::TRACK_LATEST_OPLOG_TS;
// If the hint is {$natural: +-1} this changes the direction of the collection scan.
if (!query.getQueryRequest().getHint().isEmpty()) {
diff --git a/src/mongo/db/query/query_planner.cpp b/src/mongo/db/query/query_planner.cpp
index 56e9041dd23..c033911f4ba 100644
--- a/src/mongo/db/query/query_planner.cpp
+++ b/src/mongo/db/query/query_planner.cpp
@@ -127,6 +127,9 @@ string optionString(size_t options) {
if (options & QueryPlannerParams::GENERATE_COVERED_IXSCANS) {
ss << "GENERATE_COVERED_IXSCANS ";
}
+ if (options & QueryPlannerParams::TRACK_LATEST_OPLOG_TS) {
+ ss << "TRACK_LATEST_OPLOG_TS ";
+ }
return ss;
}
diff --git a/src/mongo/db/query/query_planner_params.h b/src/mongo/db/query/query_planner_params.h
index de47dbf6abd..80df2b0c3ba 100644
--- a/src/mongo/db/query/query_planner_params.h
+++ b/src/mongo/db/query/query_planner_params.h
@@ -98,6 +98,9 @@ struct QueryPlannerParams {
// Set this to generate covered whole IXSCAN plans.
GENERATE_COVERED_IXSCANS = 1 << 11,
+
+ // Set this to track the most recent timestamp seen by this cursor while scanning the oplog.
+ TRACK_LATEST_OPLOG_TS = 1 << 12,
};
// See Options enum above.
diff --git a/src/mongo/db/query/query_solution.cpp b/src/mongo/db/query/query_solution.cpp
index 5eec4719842..e9095c01ceb 100644
--- a/src/mongo/db/query/query_solution.cpp
+++ b/src/mongo/db/query/query_solution.cpp
@@ -247,6 +247,7 @@ QuerySolutionNode* CollectionScanNode::clone() const {
copy->tailable = this->tailable;
copy->direction = this->direction;
copy->maxScan = this->maxScan;
+ copy->shouldTrackLatestOplogTimestamp = this->shouldTrackLatestOplogTimestamp;
return copy;
}
diff --git a/src/mongo/db/query/query_solution.h b/src/mongo/db/query/query_solution.h
index c2187385adb..68484ed8d66 100644
--- a/src/mongo/db/query/query_solution.h
+++ b/src/mongo/db/query/query_solution.h
@@ -289,6 +289,11 @@ struct CollectionScanNode : public QuerySolutionNode {
// Should we make a tailable cursor?
bool tailable;
+ // Should we keep track of the timestamp of the latest oplog entry we've seen? This information
+ // is needed to merge cursors from the oplog in order of operation time when reading the oplog
+ // across a sharded cluster.
+ bool shouldTrackLatestOplogTimestamp = false;
+
int direction;
// maxScan option to .find() limits how many docs we look at.
diff --git a/src/mongo/db/query/stage_builder.cpp b/src/mongo/db/query/stage_builder.cpp
index 1319ac07f52..dcd21b27f35 100644
--- a/src/mongo/db/query/stage_builder.cpp
+++ b/src/mongo/db/query/stage_builder.cpp
@@ -78,6 +78,7 @@ PlanStage* buildStages(OperationContext* opCtx,
CollectionScanParams params;
params.collection = collection;
params.tailable = csn->tailable;
+ params.shouldTrackLatestOplogTimestamp = csn->shouldTrackLatestOplogTimestamp;
params.direction = (csn->direction == 1) ? CollectionScanParams::FORWARD
: CollectionScanParams::BACKWARD;
params.maxScan = csn->maxScan;