summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBernard Gorman <bernard.gorman@gmail.com>2020-06-01 18:47:00 +0100
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-10-21 23:28:32 +0000
commitf58bdb68407abc68fddc8a91c0d58785423db1b5 (patch)
treecdb930fe469052dd6e51fc19ffea9b4500e0daae
parentae47641690c8ff75bd0b729d28d705eacdb84c9d (diff)
downloadmongo-f58bdb68407abc68fddc8a91c0d58785423db1b5.tar.gz
SERVER-48523 Unconditionally check the first entry in the oplog when attempting to resume a change stream
(cherry picked from commit 694ed4153b9d5424b5d169fea5c68f99d4dfb45a) (cherry picked from commit 9e38cbba7d3efefa59e25cb0411558591036d30b)
-rw-r--r--src/mongo/base/error_codes.err3
-rw-r--r--src/mongo/db/catalog/collection_mock.h4
-rw-r--r--src/mongo/db/exec/collection_scan.cpp35
-rw-r--r--src/mongo/db/exec/collection_scan.h5
-rw-r--r--src/mongo/db/exec/collection_scan_common.h7
-rw-r--r--src/mongo/db/pipeline/SConscript4
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream.cpp4
-rw-r--r--src/mongo/db/pipeline/document_source_check_resume_token.cpp165
-rw-r--r--src/mongo/db/pipeline/document_source_check_resume_token.h90
-rw-r--r--src/mongo/db/pipeline/document_source_check_resume_token_test.cpp755
-rw-r--r--src/mongo/db/pipeline/document_source_mock.cpp4
-rw-r--r--src/mongo/db/pipeline/document_source_mock.h2
-rw-r--r--src/mongo/db/pipeline/pipeline_d.cpp3
-rw-r--r--src/mongo/db/query/get_executor.cpp3
-rw-r--r--src/mongo/db/query/planner_access.cpp2
-rw-r--r--src/mongo/db/query/query_planner.cpp3
-rw-r--r--src/mongo/db/query/query_planner_params.h6
-rw-r--r--src/mongo/db/query/query_solution.cpp1
-rw-r--r--src/mongo/db/query/query_solution.h3
-rw-r--r--src/mongo/db/query/stage_builder.cpp1
-rw-r--r--src/mongo/db/repl/oplog_entry.h5
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.cpp3
22 files changed, 711 insertions, 397 deletions
diff --git a/src/mongo/base/error_codes.err b/src/mongo/base/error_codes.err
index 15f4907703b..19201195034 100644
--- a/src/mongo/base/error_codes.err
+++ b/src/mongo/base/error_codes.err
@@ -267,6 +267,9 @@ error_code("DataModifiedByRepair", 269);
error_code("RepairedReplicaSetNode", 270);
error_code("AlarmAlreadyFulfilled", 277)
error_code("ChangeStreamFatalError", 280)
+
+error_code("OplogQueryMinTsMissing", 326)
+
# Error codes 4000-8999 are reserved.
# Non-sequential error codes (for compatibility only)
diff --git a/src/mongo/db/catalog/collection_mock.h b/src/mongo/db/catalog/collection_mock.h
index 5cf52d3a842..41f801be109 100644
--- a/src/mongo/db/catalog/collection_mock.h
+++ b/src/mongo/db/catalog/collection_mock.h
@@ -38,8 +38,8 @@ namespace mongo {
* This class comprises a mock Collection for use by UUIDCatalog unit tests.
*/
class CollectionMock : virtual public Collection::Impl,
- virtual CappedCallback,
- virtual UpdateNotifier {
+ virtual public CappedCallback,
+ virtual public UpdateNotifier {
public:
CollectionMock(const NamespaceString& ns) : _ns(ns) {}
~CollectionMock() = default;
diff --git a/src/mongo/db/exec/collection_scan.cpp b/src/mongo/db/exec/collection_scan.cpp
index 4cfe764d8c8..f4e16cb3b84 100644
--- a/src/mongo/db/exec/collection_scan.cpp
+++ b/src/mongo/db/exec/collection_scan.cpp
@@ -40,6 +40,7 @@
#include "mongo/db/exec/scoped_timer.h"
#include "mongo/db/exec/working_set.h"
#include "mongo/db/exec/working_set_common.h"
+#include "mongo/db/repl/oplog_entry.h"
#include "mongo/db/repl/optime.h"
#include "mongo/db/storage/record_fetcher.h"
#include "mongo/stdx/memory.h"
@@ -72,6 +73,12 @@ CollectionScan::CollectionScan(OperationContext* opCtx,
_specificStats.maxTs = params.maxTs;
invariant(!_params.shouldTrackLatestOplogTimestamp || _params.collection->ns().isOplog());
+ // We should never see 'assertMinTsHasNotFallenOffOplog' if 'minTS' is not present.
+ if (params.assertMinTsHasNotFallenOffOplog) {
+ invariant(params.shouldTrackLatestOplogTimestamp);
+ invariant(params.minTs);
+ }
+
if (params.maxTs) {
_endConditionBSON = BSON("$gte" << *(params.maxTs));
_endCondition = stdx::make_unique<GTEMatchExpression>(repl::OpTime::kTimestampFieldName,
@@ -172,19 +179,20 @@ PlanStage::StageState CollectionScan::doWork(WorkingSetID* out) {
}
if (!record) {
- // We just hit EOF. If we are tailable and have already returned data, leave us in a
- // state to pick up where we left off on the next call to work(). Otherwise EOF is
- // permanent.
+ // We hit EOF. If we are tailable and have already seen data, leave us in a state to pick up
+ // where we left off on the next call to work(). Otherwise, the EOF is permanent.
if (_params.tailable && !_lastSeenId.isNull()) {
_cursor.reset();
} else {
_commonStats.isEOF = true;
}
-
return PlanStage::IS_EOF;
}
_lastSeenId = record->id;
+ if (_params.assertMinTsHasNotFallenOffOplog) {
+ assertMinTsHasNotFallenOffOplog(*record);
+ }
if (_params.shouldTrackLatestOplogTimestamp) {
auto status = setLatestOplogEntryTimestamp(*record);
if (!status.isOK()) {
@@ -215,6 +223,25 @@ Status CollectionScan::setLatestOplogEntryTimestamp(const Record& record) {
return Status::OK();
}
+void CollectionScan::assertMinTsHasNotFallenOffOplog(const Record& record) {
+ // If the first entry we see in the oplog is the replset initialization, then it doesn't matter
+ // if its timestamp is later than the specified minTs; no events earlier than the minTs can have
+ // fallen off this oplog. Otherwise, verify that the timestamp of the first observed oplog entry
+ // is earlier than or equal to the minTs time.
+ auto swOplogEntry = repl::OplogEntry::parse(record.data.toBson());
+ invariant(_specificStats.docsTested == 0);
+ invariant(swOplogEntry.isOK());
+ auto oplogEntry = std::move(swOplogEntry.getValue());
+ const bool isNewRS =
+ oplogEntry.getObject().binaryEqual(BSON("msg" << repl::kInitiatingSetMsg)) &&
+ oplogEntry.getOpType() == repl::OpTypeEnum::kNoop;
+ uassert(ErrorCodes::OplogQueryMinTsMissing,
+ "Specified minTs has already fallen off the oplog",
+ isNewRS || oplogEntry.getTimestamp() <= *_params.minTs);
+ // We don't need to check this assertion again after we've confirmed the first oplog event.
+ _params.assertMinTsHasNotFallenOffOplog = false;
+}
+
PlanStage::StageState CollectionScan::returnIfMatches(WorkingSetMember* member,
WorkingSetID memberID,
WorkingSetID* out) {
diff --git a/src/mongo/db/exec/collection_scan.h b/src/mongo/db/exec/collection_scan.h
index ca2e6d4a535..d9723d3fae3 100644
--- a/src/mongo/db/exec/collection_scan.h
+++ b/src/mongo/db/exec/collection_scan.h
@@ -94,6 +94,11 @@ private:
*/
Status setLatestOplogEntryTimestamp(const Record& record);
+ /**
+ * Asserts that the 'minTs' specified in the query filter has not already fallen off the oplog.
+ */
+ void assertMinTsHasNotFallenOffOplog(const Record& record);
+
// WorkingSet is not owned by us.
WorkingSet* _workingSet;
diff --git a/src/mongo/db/exec/collection_scan_common.h b/src/mongo/db/exec/collection_scan_common.h
index eff1e5e986d..0579eae5ac1 100644
--- a/src/mongo/db/exec/collection_scan_common.h
+++ b/src/mongo/db/exec/collection_scan_common.h
@@ -51,6 +51,10 @@ struct CollectionScanParams {
// not being invalidated before the first call to work(...).
RecordId start;
+ // If present, the collection scan will seek directly to the RecordId of an oplog entry as
+ // close to 'minTs' as possible without going higher. Must only be set on forward oplog scans.
+ boost::optional<Timestamp> minTs;
+
// If present, the collection scan will stop and return EOF the first time it sees a document
// that does not pass the filter and has 'ts' greater than 'maxTs'.
boost::optional<Timestamp> maxTs;
@@ -60,6 +64,9 @@ struct CollectionScanParams {
// Do we want the scan to be 'tailable'? Only meaningful if the collection is capped.
bool tailable = false;
+ // Should we assert that the specified minTS has not fallen off the oplog?
+ bool assertMinTsHasNotFallenOffOplog = false;
+
// Should we keep track of the timestamp of the latest oplog entry we've seen? This information
// is needed to merge cursors from the oplog in order of operation time when reading the oplog
// across a sharded cluster.
diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript
index bf753f4be96..0e5744cce40 100644
--- a/src/mongo/db/pipeline/SConscript
+++ b/src/mongo/db/pipeline/SConscript
@@ -225,9 +225,13 @@ env.CppUnitTest(
],
LIBDEPS=[
'$BUILD_DIR/mongo/db/auth/authmocks',
+ '$BUILD_DIR/mongo/db/catalog/catalog_impl',
+ '$BUILD_DIR/mongo/db/query_exec',
'$BUILD_DIR/mongo/db/repl/oplog_entry',
'$BUILD_DIR/mongo/db/repl/replmocks',
'$BUILD_DIR/mongo/db/service_context',
+ '$BUILD_DIR/mongo/db/storage/devnull/storage_devnull_core',
+ '$BUILD_DIR/mongo/db/storage/kv/kv_engine_mock',
'$BUILD_DIR/mongo/executor/thread_pool_task_executor_test_fixture',
'$BUILD_DIR/mongo/s/sharding_router_test_fixture',
'$BUILD_DIR/mongo/util/clock_source_mock',
diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp
index 28acafc8726..1245d6a99d9 100644
--- a/src/mongo/db/pipeline/document_source_change_stream.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream.cpp
@@ -385,7 +385,7 @@ list<intrusive_ptr<DocumentSource>> buildPipeline(
// to a specific event; we thus only need to check (1), similar to 'startAtOperationTime'.
startFrom = tokenData.clusterTime;
if (expCtx->needsMerge || ResumeToken::isHighWaterMarkToken(tokenData)) {
- resumeStage = DocumentSourceShardCheckResumability::create(expCtx, tokenData);
+ resumeStage = DocumentSourceCheckResumability::create(expCtx, tokenData);
} else {
resumeStage = DocumentSourceEnsureResumeTokenPresent::create(expCtx, tokenData);
}
@@ -417,7 +417,7 @@ list<intrusive_ptr<DocumentSource>> buildPipeline(
<< " in a $changeStream stage.",
!resumeAfterClusterTime);
startFrom = *startAtOperationTime;
- resumeStage = DocumentSourceShardCheckResumability::create(expCtx, *startFrom);
+ resumeStage = DocumentSourceCheckResumability::create(expCtx, *startFrom);
}
// There might not be a starting point if we're on mongos, otherwise we should either have a
diff --git a/src/mongo/db/pipeline/document_source_check_resume_token.cpp b/src/mongo/db/pipeline/document_source_check_resume_token.cpp
index ef067f773a0..2f1eda6f7a6 100644
--- a/src/mongo/db/pipeline/document_source_check_resume_token.cpp
+++ b/src/mongo/db/pipeline/document_source_check_resume_token.cpp
@@ -1,4 +1,3 @@
-
/**
* Copyright (C) 2018-present MongoDB, Inc.
*
@@ -35,9 +34,13 @@
using boost::intrusive_ptr;
namespace mongo {
+
+constexpr StringData DocumentSourceCheckResumability::kStageName;
+constexpr StringData DocumentSourceEnsureResumeTokenPresent::kStageName;
+
namespace {
-using ResumeStatus = DocumentSourceEnsureResumeTokenPresent::ResumeStatus;
+using ResumeStatus = DocumentSourceCheckResumability::ResumeStatus;
// Returns ResumeStatus::kFoundToken if the document retrieved from the resumed pipeline satisfies
// the client's resume token, ResumeStatus::kCheckNextDoc if it is older than the client's token,
@@ -79,15 +82,14 @@ ResumeStatus compareAgainstClientResumeToken(const intrusive_ptr<ExpressionConte
: ResumeStatus::kCheckNextDoc;
}
+ // If the document's 'applyOpsIndex' sorts before that of the client token, we must keep
+ // looking.
if (tokenDataFromResumedStream.applyOpsIndex < tokenDataFromClient.applyOpsIndex) {
return ResumeStatus::kCheckNextDoc;
} else if (tokenDataFromResumedStream.applyOpsIndex > tokenDataFromClient.applyOpsIndex) {
// This could happen if the client provided an applyOpsIndex of 0, yet the 0th document in
// the applyOps was irrelevant (meaning it was an operation on a collection or DB not being
- // watched). If we are looking for the resume token on a shard then this simply means that
- // the resume token may be on a different shard; otherwise, it indicates a corrupt token.
- uassert(50792, "Invalid resumeToken: applyOpsIndex was skipped", expCtx->needsMerge);
- // We are running on a merging shard. Signal that we have read beyond the resume token.
+ // watched). Signal that we have read beyond the resume token.
return ResumeStatus::kSurpassedToken;
}
@@ -161,16 +163,9 @@ ResumeStatus compareAgainstClientResumeToken(const intrusive_ptr<ExpressionConte
}
} // namespace
-const char* DocumentSourceEnsureResumeTokenPresent::getSourceName() const {
- return "$_ensureResumeTokenPresent";
-}
-
-Value DocumentSourceEnsureResumeTokenPresent::serialize(
- boost::optional<ExplainOptions::Verbosity> explain) const {
- // This stage is created by the DocumentSourceChangeStream stage, so serializing it here
- // would result in it being created twice.
- return Value();
-}
+DocumentSourceEnsureResumeTokenPresent::DocumentSourceEnsureResumeTokenPresent(
+ const intrusive_ptr<ExpressionContext>& expCtx, ResumeTokenData token)
+ : DocumentSourceCheckResumability(expCtx, std::move(token)) {}
intrusive_ptr<DocumentSourceEnsureResumeTokenPresent>
DocumentSourceEnsureResumeTokenPresent::create(const intrusive_ptr<ExpressionContext>& expCtx,
@@ -178,39 +173,40 @@ DocumentSourceEnsureResumeTokenPresent::create(const intrusive_ptr<ExpressionCon
return new DocumentSourceEnsureResumeTokenPresent(expCtx, std::move(token));
}
-DocumentSourceEnsureResumeTokenPresent::DocumentSourceEnsureResumeTokenPresent(
- const intrusive_ptr<ExpressionContext>& expCtx, ResumeTokenData token)
- : DocumentSource(expCtx), _tokenFromClient(std::move(token)) {}
+const char* DocumentSourceEnsureResumeTokenPresent::getSourceName() const {
+ return kStageName.rawData();
+}
DocumentSource::GetNextResult DocumentSourceEnsureResumeTokenPresent::getNext() {
pExpCtx->checkForInterrupt();
+ // If we have already verified the resume token is present, return the next doc immediately.
if (_resumeStatus == ResumeStatus::kFoundToken) {
- // We've already verified the resume token is present.
return pSource->getNext();
}
- Document documentFromResumedStream;
+ auto nextInput = GetNextResult::makeEOF();
- // Keep iterating the stream until we see either the resume token we're looking for,
- // or a change with a higher timestamp than our resume token.
+ // Keep iterating the stream until we see either the resume token we're looking for, or a change
+ // with a higher timestamp than our resume token.
while (_resumeStatus == ResumeStatus::kCheckNextDoc) {
- auto nextInput = pSource->getNext();
+ // Delegate to DocumentSourceCheckResumability to consume all events up to the token. This
+ // will also set '_resumeStatus' to indicate whether we have seen or surpassed the token.
+ nextInput = DocumentSourceCheckResumability::getNext();
- if (!nextInput.isAdvanced())
+ // If there are no more results, return EOF. We will continue checking for the resume token
+ // the next time the getNext method is called. If we hit EOF, then we cannot have surpassed
+ // the resume token on this iteration.
+ if (!nextInput.isAdvanced()) {
+ invariant(_resumeStatus != ResumeStatus::kSurpassedToken);
return nextInput;
-
- // The incoming documents are sorted on clusterTime, uuid, documentKey. We examine a range
- // of documents that have the same prefix (i.e. clusterTime and uuid). If the user provided
- // token would sort before this received document we cannot resume the change stream.
- _resumeStatus = compareAgainstClientResumeToken(
- pExpCtx, (documentFromResumedStream = nextInput.getDocument()), _tokenFromClient);
+ }
}
uassert(ErrorCodes::ChangeStreamFatalError,
str::stream()
<< "resume of change stream was not possible, as the resume token was not found. "
- << documentFromResumedStream["_id"].getDocument().toString(),
+ << nextInput.getDocument()["_id"].getDocument().toString(),
_resumeStatus != ResumeStatus::kSurpassedToken);
// If we reach this point, then we've seen the resume token.
@@ -220,102 +216,73 @@ DocumentSource::GetNextResult DocumentSourceEnsureResumeTokenPresent::getNext()
return pSource->getNext();
}
-const char* DocumentSourceShardCheckResumability::getSourceName() const {
- return "$_checkShardResumability";
-}
-
-Value DocumentSourceShardCheckResumability::serialize(
- boost::optional<ExplainOptions::Verbosity> explain) const {
- // This stage is created by the DocumentSourceChangeStream stage, so serializing it here
- // would result in it being created twice.
- return Value();
-}
+DocumentSourceCheckResumability::DocumentSourceCheckResumability(
+ const intrusive_ptr<ExpressionContext>& expCtx, ResumeTokenData token)
+ : DocumentSource(expCtx), _tokenFromClient(std::move(token)) {}
-intrusive_ptr<DocumentSourceShardCheckResumability> DocumentSourceShardCheckResumability::create(
+intrusive_ptr<DocumentSourceCheckResumability> DocumentSourceCheckResumability::create(
const intrusive_ptr<ExpressionContext>& expCtx, Timestamp ts) {
// We are resuming from a point in time, not an event. Seed the stage with a high water mark.
return create(expCtx, ResumeToken::makeHighWaterMarkToken(ts, expCtx->uuid).getData());
}
-intrusive_ptr<DocumentSourceShardCheckResumability> DocumentSourceShardCheckResumability::create(
+intrusive_ptr<DocumentSourceCheckResumability> DocumentSourceCheckResumability::create(
const intrusive_ptr<ExpressionContext>& expCtx, ResumeTokenData token) {
- return new DocumentSourceShardCheckResumability(expCtx, std::move(token));
+ return new DocumentSourceCheckResumability(expCtx, std::move(token));
}
-DocumentSourceShardCheckResumability::DocumentSourceShardCheckResumability(
- const intrusive_ptr<ExpressionContext>& expCtx, ResumeTokenData token)
- : DocumentSource(expCtx), _tokenFromClient(std::move(token)) {}
+const char* DocumentSourceCheckResumability::getSourceName() const {
+ return kStageName.rawData();
+}
-DocumentSource::GetNextResult DocumentSourceShardCheckResumability::getNext() {
+DocumentSource::GetNextResult DocumentSourceCheckResumability::getNext() {
pExpCtx->checkForInterrupt();
- if (_surpassedResumeToken)
+ if (_resumeStatus == ResumeStatus::kSurpassedToken) {
return pSource->getNext();
+ }
- while (!_surpassedResumeToken) {
- auto nextInput = pSource->getNext();
-
- // If we hit EOF, check the oplog to make sure that we are able to resume. This prevents us
- // from continually returning EOF in cases where the resume point has fallen off the oplog.
+ while (_resumeStatus != ResumeStatus::kSurpassedToken) {
+ // The underlying oplog scan will throw OplogQueryMinTsMissing if the minTs in the change
+ // stream filter has fallen off the oplog. Catch this and throw a more explanatory error.
+ auto nextInput = [this]() {
+ try {
+ return pSource->getNext();
+ } catch (const ExceptionFor<ErrorCodes::OplogQueryMinTsMissing>&) {
+ uasserted(ErrorCodes::ChangeStreamFatalError,
+ "Resume of change stream was not possible, as the resume point may no "
+ "longer be in the oplog.");
+ }
+ }();
+
+ // If we hit EOF, return it immediately.
if (!nextInput.isAdvanced()) {
- _assertOplogHasEnoughHistory(nextInput);
return nextInput;
}
+
// Determine whether the current event sorts before, equal to or after the resume token.
- auto resumeStatus =
+ _resumeStatus =
compareAgainstClientResumeToken(pExpCtx, nextInput.getDocument(), _tokenFromClient);
- switch (resumeStatus) {
+ switch (_resumeStatus) {
case ResumeStatus::kCheckNextDoc:
// If the result was kCheckNextDoc, we are resumable but must swallow this event.
- _verifiedOplogHasEnoughHistory = true;
continue;
case ResumeStatus::kSurpassedToken:
- // In this case the resume token wasn't found; it must be on another shard. We must
- // examine the oplog to ensure that its history reaches back to before the resume
- // token, otherwise we may have missed events that fell off the oplog. If we can
- // resume, fall through into the following case and set _surpassedResumeToken.
- _assertOplogHasEnoughHistory(nextInput);
+ // In this case the resume token wasn't found; it may be on another shard. However,
+ // since the oplog scan did not throw, we know that we are resumable. Fall through
+ // into the following case and return the document.
case ResumeStatus::kFoundToken:
- // We found the actual token! Set _surpassedResumeToken and return the result.
- _surpassedResumeToken = true;
+ // We found the actual token! Return the doc so DSEnsureResumeTokenPresent sees it.
return nextInput;
}
}
MONGO_UNREACHABLE;
}
-void DocumentSourceShardCheckResumability::_assertOplogHasEnoughHistory(
- const GetNextResult& nextInput) {
- // If we have already verified that this stream is resumable, return immediately.
- if (_verifiedOplogHasEnoughHistory) {
- return;
- }
- // Look up the first document in the oplog and compare it with the resume token's clusterTime.
- auto firstEntryExpCtx = pExpCtx->copyWith(NamespaceString::kRsOplogNamespace);
- auto matchSpec = BSON("$match" << BSONObj());
- auto pipeline = uassertStatusOK(
- pExpCtx->mongoProcessInterface->makePipeline({matchSpec}, firstEntryExpCtx));
- if (auto first = pipeline->getNext()) {
- auto firstOplogEntry = Value(*first);
- // If the first entry in the oplog is the replset initialization, then it doesn't matter
- // if its timestamp is later than the resume token. No events earlier than the token can
- // have fallen off this oplog, and it is therefore safe to resume. Otherwise, verify that
- // the timestamp of the first oplog entry is earlier than that of the resume token.
- const bool isNewRS =
- Value::compare(firstOplogEntry["o"]["msg"], Value("initiating set"_sd), nullptr) == 0 &&
- Value::compare(firstOplogEntry["op"], Value("n"_sd), nullptr) == 0;
- uassert(ErrorCodes::ChangeStreamFatalError,
- "Resume of change stream was not possible, as the resume point may no longer be in "
- "the oplog. ",
- isNewRS || firstOplogEntry["ts"].getTimestamp() < _tokenFromClient.clusterTime);
- } else {
- // Very unusual case: the oplog is empty. We can always resume. However, it should never be
- // possible to have obtained a document that matched the filter if the oplog is empty.
- uassert(ErrorCodes::ChangeStreamFatalError,
- "Oplog was empty but found an event in the change stream pipeline. It should not "
- "be possible for this to happen",
- nextInput.isEOF());
- }
- _verifiedOplogHasEnoughHistory = true;
+Value DocumentSourceCheckResumability::serialize(
+ boost::optional<ExplainOptions::Verbosity> explain) const {
+ // This stage is created by the DocumentSourceChangeStream stage, so serializing it here
+ // would result in it being created twice.
+ return Value();
}
} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_check_resume_token.h b/src/mongo/db/pipeline/document_source_check_resume_token.h
index 10c3ebd1a73..360e1d14b1b 100644
--- a/src/mongo/db/pipeline/document_source_check_resume_token.h
+++ b/src/mongo/db/pipeline/document_source_check_resume_token.h
@@ -39,30 +39,42 @@
namespace mongo {
/**
- * This checks for resumability on a single shard in the sharded case. The rules are
+ * This stage checks whether or not the oplog has enough history to resume the stream, and consumes
+ * all events up to the given resume point. It is deployed on all shards when resuming a stream on
+ * a sharded cluster, and is also used in the single-replicaset case when a stream is opened with
+ * startAtOperationTime or with a high-water-mark resume token. It defers to the COLLSCAN to check
+ * whether the first event (matching or non-matching) encountered in the oplog has a timestamp equal
+ * to or earlier than the minTs in the change stream filter. If not, the COLLSCAN will throw an
+ * assertion, which this stage catches and converts into a more comprehensible $changeStream
+ * specific exception. The rules are:
*
- * - If the first document in the pipeline for this shard has a matching timestamp, we can
- * always resume.
- * - If the oplog is empty, we can resume. An empty oplog is rare and can only occur
- * on a secondary that has just started up from a primary that has not taken a write.
- * In particular, an empty oplog cannot be the result of oplog truncation.
- * - If neither of the above is true, the least-recent document in the oplog must precede the resume
- * timestamp. If we do this check after seeing the first document in the pipeline in the shard, or
- * after seeing that there are no documents in the pipeline after the resume token in the shard,
- * we're guaranteed not to miss any documents.
+ * - If the first event seen in the oplog has the same timestamp as the requested resume token or
+ * startAtOperationTime, we can resume.
+ * - If the timestamp of the first event seen in the oplog is earlier than the requested resume
+ * token or startAtOperationTime, we can resume.
+ * - If the first entry in the oplog is a replica set initialization, then we can resume even if the
+ * token timestamp is earlier, since no events can have fallen off this oplog yet. This can happen
+ * in a sharded cluster when a new shard is added.
*
- * - Otherwise we cannot resume, as we do not know if this shard lost documents between the resume
- * token and the first matching document in the pipeline.
- *
- * This source need only run on a sharded collection. For unsharded collections,
- * DocumentSourceEnsureResumeTokenPresent is sufficient.
+ * - Otherwise we cannot resume, as we do not know if there were any events between the resume token
+ * and the first matching document in the oplog.
*/
-class DocumentSourceShardCheckResumability final : public DocumentSource {
+class DocumentSourceCheckResumability : public DocumentSource {
public:
- GetNextResult getNext() final;
- const char* getSourceName() const final;
+ static constexpr StringData kStageName = "$_internalCheckResumability"_sd;
- StageConstraints constraints(Pipeline::SplitState pipeState) const final {
+ // Used to record the results of comparing the token data extracted from documents in the
+ // resumed stream against the client's resume token.
+ enum class ResumeStatus {
+ kFoundToken, // The stream produced a document satisfying the client resume token.
+ kSurpassedToken, // The stream's latest document is more recent than the resume token.
+ kCheckNextDoc // The next document produced by the stream may contain the resume token.
+ };
+
+ GetNextResult getNext() override;
+ const char* getSourceName() const override;
+
+ StageConstraints constraints(Pipeline::SplitState pipeState) const override {
return {StreamType::kStreaming,
PositionRequirement::kNone,
HostTypeRequirement::kAnyShard,
@@ -74,42 +86,33 @@ public:
Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final;
- static boost::intrusive_ptr<DocumentSourceShardCheckResumability> create(
+ static boost::intrusive_ptr<DocumentSourceCheckResumability> create(
const boost::intrusive_ptr<ExpressionContext>& expCtx, Timestamp ts);
- static boost::intrusive_ptr<DocumentSourceShardCheckResumability> create(
+ static boost::intrusive_ptr<DocumentSourceCheckResumability> create(
const boost::intrusive_ptr<ExpressionContext>& expCtx, ResumeTokenData token);
-private:
+protected:
/**
- * Use the create static method to create a DocumentSourceShardCheckResumability.
+ * Use the create static method to create a DocumentSourceCheckResumability.
*/
- DocumentSourceShardCheckResumability(const boost::intrusive_ptr<ExpressionContext>& expCtx,
- ResumeTokenData token);
+ DocumentSourceCheckResumability(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ ResumeTokenData token);
- void _assertOplogHasEnoughHistory(const GetNextResult& nextInput);
-
- ResumeTokenData _tokenFromClient;
- bool _verifiedOplogHasEnoughHistory = false;
- bool _surpassedResumeToken = false;
+ ResumeStatus _resumeStatus = ResumeStatus::kCheckNextDoc;
+ const ResumeTokenData _tokenFromClient;
};
/**
* This stage is used internally for change streams to ensure that the resume token is in the
* stream. It is not intended to be created by the user.
*/
-class DocumentSourceEnsureResumeTokenPresent final : public DocumentSource,
+class DocumentSourceEnsureResumeTokenPresent final : public DocumentSourceCheckResumability,
public NeedsMergerDocumentSource {
public:
- // Used to record the results of comparing the token data extracted from documents in the
- // resumed stream against the client's resume token.
- enum class ResumeStatus {
- kFoundToken, // The stream produced a document satisfying the client resume token.
- kSurpassedToken, // The stream's latest document is more recent than the resume token.
- kCheckNextDoc // The next document produced by the stream may contain the resume token.
- };
+ static constexpr StringData kStageName = "$_internalEnsureResumeTokenPresent"_sd;
- GetNextResult getNext() final;
+ GetNextResult getNext() override;
const char* getSourceName() const final;
StageConstraints constraints(Pipeline::SplitState pipeState) const final {
@@ -127,11 +130,11 @@ public:
/**
* NeedsMergerDocumentSource methods; this has to run on the merger, since the resume point
- * could be at any shard. Also add a DocumentSourceShardCheckResumability stage on the shards
+ * could be at any shard. Also add a DocumentSourceCheckResumability stage on the shards
* pipeline to ensure that each shard has enough oplog history to resume the change stream.
*/
boost::intrusive_ptr<DocumentSource> getShardSource() final {
- return DocumentSourceShardCheckResumability::create(pExpCtx, _tokenFromClient);
+ return DocumentSourceCheckResumability::create(pExpCtx, _tokenFromClient);
};
std::list<boost::intrusive_ptr<DocumentSource>> getMergeSources() final {
@@ -149,8 +152,6 @@ public:
return {sortMergingPresorted, this};
};
- Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final;
-
static boost::intrusive_ptr<DocumentSourceEnsureResumeTokenPresent> create(
const boost::intrusive_ptr<ExpressionContext>& expCtx, ResumeTokenData token);
@@ -160,9 +161,6 @@ private:
*/
DocumentSourceEnsureResumeTokenPresent(const boost::intrusive_ptr<ExpressionContext>& expCtx,
ResumeTokenData token);
-
- ResumeStatus _resumeStatus = ResumeStatus::kCheckNextDoc;
- ResumeTokenData _tokenFromClient;
};
} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp
index a6fd999314a..b47212a6544 100644
--- a/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp
+++ b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp
@@ -36,6 +36,11 @@
#include "mongo/bson/bsonelement.h"
#include "mongo/bson/bsonobj.h"
#include "mongo/bson/bsonobjbuilder.h"
+#include "mongo/db/catalog/collection_mock.h"
+#include "mongo/db/catalog/database_holder_mock.h"
+#include "mongo/db/catalog/database_impl.h"
+#include "mongo/db/exec/collection_scan.h"
+#include "mongo/db/exec/plan_stats.h"
#include "mongo/db/pipeline/aggregation_context_fixture.h"
#include "mongo/db/pipeline/document_source_check_resume_token.h"
#include "mongo/db/pipeline/document_source_mock.h"
@@ -45,86 +50,371 @@
#include "mongo/db/pipeline/stub_mongo_process_interface.h"
#include "mongo/db/query/collation/collator_interface_mock.h"
#include "mongo/db/service_context.h"
+#include "mongo/db/storage/devnull/devnull_kv_engine.h"
+#include "mongo/db/storage/kv/kv_database_catalog_entry_mock.h"
+#include "mongo/db/storage/kv/kv_storage_engine.h"
#include "mongo/stdx/memory.h"
#include "mongo/unittest/death_test.h"
#include "mongo/unittest/unittest.h"
#include "mongo/util/uuid.h"
using boost::intrusive_ptr;
-using std::deque;
namespace mongo {
namespace {
+static constexpr StringData kOtherNs = "test.other.ns"_sd;
static constexpr StringData kTestNs = "test.ns"_sd;
+class ChangeStreamOplogCursorMock : public SeekableRecordCursor {
+public:
+ ChangeStreamOplogCursorMock(std::deque<Record>* records) : _records(records) {}
+
+ virtual ~ChangeStreamOplogCursorMock() {}
+
+ boost::optional<Record> next() override {
+ if (_records->empty()) {
+ return boost::none;
+ }
+ auto& next = _records->front();
+ _records->pop_front();
+ return next;
+ }
+
+ boost::optional<Record> seekExact(const RecordId& id) override {
+ return Record{};
+ }
+ void save() override {}
+ bool restore() override {
+ return true;
+ }
+ void detachFromOperationContext() override {}
+ void reattachToOperationContext(OperationContext* opCtx) override {}
+
+private:
+ std::deque<Record>* _records;
+};
+
+class ChangeStreamOplogCollectionMock : public CollectionMock {
+public:
+ ChangeStreamOplogCollectionMock() : CollectionMock(NamespaceString::kRsOplogNamespace) {
+ _recordStore =
+ _devNullEngine.getRecordStore(nullptr, NamespaceString::kRsOplogNamespace.ns(), "", {});
+ }
+
+ void init(OperationContext* opCtx) override {}
+
+ void push_back(Document doc) {
+ // Every entry we push into the oplog should have both 'ts' and 'ns' fields.
+ invariant(doc["ts"].getType() == BSONType::bsonTimestamp);
+ invariant(doc["ns"].getType() == BSONType::String);
+ // Events should always be added in ascending ts order.
+ auto lastTs =
+ _records.empty() ? Timestamp(0, 0) : _records.back().data.toBson()["ts"].timestamp();
+ invariant(ValueComparator().compare(Value(lastTs), doc["ts"]) <= 0);
+ // Fill out remaining required fields in the oplog entry.
+ MutableDocument mutableDoc{doc};
+ mutableDoc.setField("op", Value("n"_sd));
+ mutableDoc.setField("o", Value(Document{}));
+ mutableDoc.setField("h", Value(1LL));
+ mutableDoc.setField("wall",
+ Value(Date_t::fromMillisSinceEpoch(doc["ts"].getTimestamp().asLL())));
+ // Must remove _id since the oplog expects either no _id or an OID.
+ mutableDoc.remove("_id");
+ // Convert to owned BSON and create corresponding Records.
+ _data.push_back(mutableDoc.freeze().toBson());
+ Record record;
+ record.data = {_data.back().objdata(), _data.back().objsize()};
+ record.id = RecordId{static_cast<int64_t>(_data.size())};
+ _records.push_back(std::move(record));
+ }
+
+ std::unique_ptr<SeekableRecordCursor> getCursor(OperationContext* opCtx,
+ bool forward) const override {
+ return std::make_unique<ChangeStreamOplogCursorMock>(&_records);
+ }
+
+ const RecordStore* getRecordStore() const override {
+ return _recordStore.get();
+ }
+ RecordStore* getRecordStore() override {
+ return _recordStore.get();
+ }
+
+private:
+ // We retain the owned record queue here because cursors may be destroyed and recreated.
+ mutable std::deque<Record> _records;
+ std::deque<BSONObj> _data;
+
+ // These are no-op structures which are required by the CollectionScan.
+ std::unique_ptr<RecordStore> _recordStore;
+ DevNullKVEngine _devNullEngine;
+};
+
+/**
+ * The RequiresCollectionStageBase class attempts to obtain the current epoch of the database
+ * containing the collection to be scanned (in this case, the oplog). Here we provide a dummy
+ * DatabaseHolder which will always return a valid pointer to the _database member variable.
+ */
+class ChangeStreamDatabaseHolderMock : public DatabaseHolderMock {
+public:
+ ChangeStreamDatabaseHolderMock(){};
+
+ Database* get(OperationContext* opCtx, StringData ns) const override {
+ return createAndGetDB(opCtx, ns);
+ }
+
+ Database* openDb(OperationContext* opCtx, StringData ns, bool* justCreated = nullptr) override {
+ return createAndGetDB(opCtx, ns);
+ }
+
+private:
+ Database* createAndGetDB(OperationContext* opCtx, StringData ns) const {
+ if (!_database) {
+ _storageEngine = std::make_unique<KVStorageEngine>(
+ &_devNullEngine, KVStorageEngineOptions{}, kvDatabaseCatalogEntryMockFactory);
+ _dbEntry = kvDatabaseCatalogEntryMockFactory(NamespaceString::kRsOplogNamespace.db(),
+ _storageEngine.get());
+ _database = std::make_unique<Database>(
+ opCtx, NamespaceString::kRsOplogNamespace.db(), _dbEntry.get());
+ }
+ return _database.get();
+ }
+
+ mutable std::unique_ptr<mongo::KVDatabaseCatalogEntryMock> _dbEntry;
+ mutable std::unique_ptr<KVStorageEngine> _storageEngine;
+ mutable std::unique_ptr<Database> _database;
+ mutable DevNullKVEngine _devNullEngine;
+};
+
+/**
+ * Acts as an initial source for the change stream pipeline, taking the place of DSOplogMatch. This
+ * class maintains its own queue of documents added by each test, but also pushes each doc into an
+ * underlying ChangeStreamOplogCollectionMock. When getNext() is called, it retrieves the next
+ * document by pulling it from the mocked oplog collection via a CollectionScan, in order to test
+ * the latter's changestream-specific functionality. The reason this class keeps its own queue in
+ * addition to the ChangeStreamOplogCollectionMock is twofold:
+ *
+ * - The _id must be stripped from each document before it can be added to the mocked oplog, since
+ * the _id field of the test document is a resume token but oplog entries are only permitted to
+ * have OID _ids. We therefore have to restore the _id field of the document pulled from the
+ * CollectionScan before passing it into the pipeline.
+ *
+ * - The concept of GetNextResult::ReturnStatus::kPauseExecution does not exist in CollectionScan;
+ * NEED_TIME is somewhat analogous but cannot be artificially induced. For tests which exercise
+ * kPauseExecution, these events are stored only in the DocumentSourceChangeStreamMock queue
+ * with no corresponding entry in the ChangeStreamOplogCollectionMock queue.
+ */
+class DocumentSourceChangeStreamMock : public DocumentSourceMock {
+public:
+ DocumentSourceChangeStreamMock(const boost::intrusive_ptr<ExpressionContext>& expCtx)
+ : DocumentSourceMock({}, expCtx) {
+ // Create a ChangeStreamOplogCollectionMock and retain an unowned pointer to it.
+ auto csOplogColl = std::make_unique<ChangeStreamOplogCollectionMock>();
+ _collection = csOplogColl.get();
+
+ // Use the ChangeStreamOplogCollectionMock to instantiate a Collection wrapper.
+ _collWrapper = std::make_unique<Collection>(std::move(csOplogColl));
+
+ // Set up the CollectionScanParams object and pass it a pointer to the Collection.
+ _filterExpr = BSON("ns" << kTestNs);
+ _filter = _parseAndNormalize(_filterExpr);
+ _params.assertMinTsHasNotFallenOffOplog = true;
+ _params.shouldTrackLatestOplogTimestamp = true;
+ _params.collection = _collWrapper.get();
+ _params.minTs = Timestamp(0, 0);
+ _params.tailable = true;
+ }
+
+ void setResumeToken(ResumeTokenData resumeToken) {
+ invariant(!_collScan);
+ _filterExpr = BSON("ns" << kTestNs << "ts" << BSON("$gte" << resumeToken.clusterTime));
+ _filter = _parseAndNormalize(_filterExpr);
+ _params.minTs = resumeToken.clusterTime;
+ }
+
+ void push_back(GetNextResult&& result) {
+ // We should never push an explicit EOF onto the queue.
+ invariant(!result.isEOF());
+ // If there is a document supplied, add it to the mock collection.
+ if (result.isAdvanced()) {
+ _collection->push_back(result.getDocument());
+ }
+ // Both documents and pauses are stored in the DSMock queue.
+ queue.push_back(std::move(result));
+ }
+
+ void push_back(const GetNextResult& result) {
+ MONGO_UNREACHABLE;
+ }
+
+ bool isPermanentlyEOF() const {
+ return _collScan->getCommonStats()->isEOF;
+ }
+
+protected:
+ GetNextResult getNext() override {
+ // If this is the first call to getNext, we must create the COLLSCAN.
+ if (!_collScan) {
+ _collScan =
+ std::make_unique<CollectionScan>(pExpCtx->opCtx, _params, &_ws, _filter.get());
+ // The first call to doWork will create the cursor and return NEED_TIME. But it won't
+ // actually scan any of the documents that are present in the mock cursor queue.
+ ASSERT_EQ(_collScan->doWork(nullptr), PlanStage::NEED_TIME);
+ ASSERT_EQ(_getNumDocsTested(), 0UL);
+ }
+ while (true) {
+ // If the next result is a pause, return it and don't collscan.
+ auto nextResult = DocumentSourceMock::getNext();
+ if (nextResult.isPaused()) {
+ return nextResult;
+ }
+ // Otherwise, retrieve the document via the CollectionScan stage.
+ auto id = WorkingSet::INVALID_ID;
+ switch (_collScan->doWork(&id)) {
+ case PlanStage::IS_EOF:
+ invariant(nextResult.isEOF());
+ return nextResult;
+ case PlanStage::ADVANCED: {
+ // We need to restore the _id field which was removed when we added this
+ // entry into the oplog. This is like a stripped-down DSCSTransform stage.
+ MutableDocument mutableDoc{Document{_ws.get(id)->obj.value()}};
+ mutableDoc["_id"] = nextResult.getDocument()["_id"];
+ return mutableDoc.freeze();
+ }
+ case PlanStage::NEED_TIME:
+ continue;
+ case PlanStage::NEED_YIELD:
+ case PlanStage::FAILURE:
+ case PlanStage::DEAD:
+ MONGO_UNREACHABLE;
+ }
+ }
+ MONGO_UNREACHABLE;
+ }
+
+private:
+ std::unique_ptr<MatchExpression> _parseAndNormalize(BSONObj filterExpr) {
+ auto filter = uassertStatusOK(MatchExpressionParser::parse(filterExpr, pExpCtx));
+ filter = MatchExpression::optimize(std::move(filter));
+ CanonicalQuery::sortTree(filter.get());
+ return filter;
+ }
+
+ size_t _getNumDocsTested() {
+ return static_cast<const CollectionScanStats*>(_collScan->getSpecificStats())->docsTested;
+ }
+
+ ChangeStreamOplogCollectionMock* _collection;
+ std::unique_ptr<Collection> _collWrapper;
+
+ std::unique_ptr<CollectionScan> _collScan;
+ CollectionScanParams _params;
+
+ std::unique_ptr<MatchExpression> _filter;
+ BSONObj _filterExpr;
+
+ WorkingSet _ws;
+};
+
class CheckResumeTokenTest : public AggregationContextFixture {
public:
- CheckResumeTokenTest() : _mock(DocumentSourceMock::create()) {}
+ CheckResumeTokenTest() : _mock(new DocumentSourceChangeStreamMock(getExpCtx())) {}
protected:
/**
+ * Pushes a document with a resume token corresponding to the given ResumeTokenData into the
+ * mock queue. This document will have an ns field that matches the test namespace, and will
+ * appear in the change stream pipeline if its timestamp is at or after the resume timestamp.
+ */
+ void addOplogEntryOnTestNS(ResumeTokenData tokenData) {
+ _mock->push_back(Document{{"ns", kTestNs},
+ {"ts", tokenData.clusterTime},
+ {"_id",
+ ResumeToken(std::move(tokenData))
+ .toDocument(ResumeToken::SerializationFormat::kHexString)}});
+ }
+
+ /**
* Pushes a document with a resume token corresponding to the given timestamp, version,
* applyOpsIndex, docKey, and namespace into the mock queue.
*/
- void addDocument(
- Timestamp ts, int version, std::size_t applyOpsIndex, Document docKey, UUID uuid) {
- _mock->queue.push_back(
- Document{{"_id",
- ResumeToken(ResumeTokenData(ts, version, applyOpsIndex, uuid, Value(docKey)))
- .toDocument(ResumeToken::SerializationFormat::kHexString)}});
+ void addOplogEntryOnTestNS(
+ Timestamp ts, int version, std::size_t txnOpIndex, Document docKey, UUID uuid) {
+ return addOplogEntryOnTestNS({ts, version, txnOpIndex, uuid, Value(docKey)});
}
/**
* Pushes a document with a resume token corresponding to the given timestamp, version,
* applyOpsIndex, docKey, and namespace into the mock queue.
*/
- void addDocument(Timestamp ts, Document docKey, UUID uuid = testUuid()) {
- addDocument(ts, 0, 0, docKey, uuid);
+ void addOplogEntryOnTestNS(Timestamp ts, Document docKey, UUID uuid = testUuid()) {
+ addOplogEntryOnTestNS(ts, 0, 0, docKey, uuid);
}
/**
* Pushes a document with a resume token corresponding to the given timestamp, _id string, and
* namespace into the mock queue.
*/
- void addDocument(Timestamp ts, std::string id, UUID uuid = testUuid()) {
- addDocument(ts, 0, 0, Document{{"_id", id}}, uuid);
+ void addOplogEntryOnTestNS(Timestamp ts, std::string id, UUID uuid = testUuid()) {
+ addOplogEntryOnTestNS(ts, 0, 0, Document{{"_id", id}}, uuid);
}
+ /**
+ * Pushes a document that does not match the test namespace into the mock oplog. This will be
+ * examined by the oplog CollectionScan but will not produce an event in the pipeline.
+ */
+ void addOplogEntryOnOtherNS(Timestamp ts) {
+ _mock->push_back(Document{{"ns", kOtherNs}, {"ts", ts}});
+ }
+
+ /**
+ * Pushes a pause in execution into the pipeline queue.
+ */
void addPause() {
_mock->queue.push_back(DocumentSource::GetNextResult::makePauseExecution());
}
/**
+ * Convenience method to create the class under test with a given ResumeTokenData.
+ */
+ intrusive_ptr<DocumentSourceEnsureResumeTokenPresent> createDSEnsureResumeTokenPresent(
+ ResumeTokenData tokenData) {
+ auto checkResumeToken =
+ DocumentSourceEnsureResumeTokenPresent::create(getExpCtx(), tokenData);
+ _mock->setResumeToken(std::move(tokenData));
+ checkResumeToken->setSource(_mock.get());
+ return checkResumeToken;
+ }
+
+ /**
* Convenience method to create the class under test with a given timestamp, docKey, and
* namespace.
*/
- intrusive_ptr<DocumentSourceEnsureResumeTokenPresent> createCheckResumeToken(
+ intrusive_ptr<DocumentSourceEnsureResumeTokenPresent> createDSEnsureResumeTokenPresent(
Timestamp ts,
int version,
std::size_t applyOpsIndex,
boost::optional<Document> docKey,
UUID uuid) {
- auto checkResumeToken = DocumentSourceEnsureResumeTokenPresent::create(
- getExpCtx(), {ts, version, applyOpsIndex, uuid, docKey ? Value(*docKey) : Value()});
- checkResumeToken->setSource(_mock.get());
- return checkResumeToken;
+ return createDSEnsureResumeTokenPresent(
+ {ts, version, applyOpsIndex, uuid, docKey ? Value(*docKey) : Value()});
}
/**
* Convenience method to create the class under test with a given timestamp, docKey, and
* namespace.
*/
- intrusive_ptr<DocumentSourceEnsureResumeTokenPresent> createCheckResumeToken(
+ intrusive_ptr<DocumentSourceEnsureResumeTokenPresent> createDSEnsureResumeTokenPresent(
Timestamp ts, boost::optional<Document> docKey, UUID uuid = testUuid()) {
- return createCheckResumeToken(ts, 0, 0, docKey, uuid);
+ return createDSEnsureResumeTokenPresent(ts, 0, 0, docKey, uuid);
}
/**
* Convenience method to create the class under test with a given timestamp, _id string, and
* namespace.
*/
- intrusive_ptr<DocumentSourceEnsureResumeTokenPresent> createCheckResumeToken(
+ intrusive_ptr<DocumentSourceEnsureResumeTokenPresent> createDSEnsureResumeTokenPresent(
Timestamp ts, StringData id, UUID uuid = testUuid()) {
- return createCheckResumeToken(ts, 0, 0, Document{{"_id", id}}, uuid);
+ return createDSEnsureResumeTokenPresent(ts, 0, 0, Document{{"_id", id}}, uuid);
}
/**
@@ -136,29 +426,29 @@ protected:
return *uuid_gen;
}
- intrusive_ptr<DocumentSourceMock> _mock;
+ intrusive_ptr<DocumentSourceChangeStreamMock> _mock;
};
-class ShardCheckResumabilityTest : public CheckResumeTokenTest {
+class CheckResumabilityTest : public CheckResumeTokenTest {
protected:
- intrusive_ptr<DocumentSourceShardCheckResumability> createShardCheckResumability(
+ intrusive_ptr<DocumentSourceCheckResumability> createDSCheckResumability(
ResumeTokenData tokenData) {
- auto shardCheckResumability =
- DocumentSourceShardCheckResumability::create(getExpCtx(), tokenData);
- shardCheckResumability->setSource(_mock.get());
- return shardCheckResumability;
+ auto dsCheckResumability = DocumentSourceCheckResumability::create(getExpCtx(), tokenData);
+ _mock->setResumeToken(std::move(tokenData));
+ dsCheckResumability->setSource(_mock.get());
+ return dsCheckResumability;
}
- intrusive_ptr<DocumentSourceShardCheckResumability> createShardCheckResumability(Timestamp ts) {
- return createShardCheckResumability(
- ResumeToken::makeHighWaterMarkToken(ts, testUuid()).getData());
+ intrusive_ptr<DocumentSourceCheckResumability> createDSCheckResumability(Timestamp ts) {
+ return createDSCheckResumability(
+ ResumeToken::makeHighWaterMarkToken(ts, boost::none).getData());
}
};
TEST_F(CheckResumeTokenTest, ShouldSucceedWithOnlyResumeToken) {
Timestamp resumeTimestamp(100, 1);
- auto checkResumeToken = createCheckResumeToken(resumeTimestamp, "1");
- addDocument(resumeTimestamp, "1");
+ auto checkResumeToken = createDSEnsureResumeTokenPresent(resumeTimestamp, "1");
+ addOplogEntryOnTestNS(resumeTimestamp, "1");
// We should not see the resume token.
ASSERT_TRUE(checkResumeToken->getNext().isEOF());
}
@@ -166,9 +456,9 @@ TEST_F(CheckResumeTokenTest, ShouldSucceedWithOnlyResumeToken) {
TEST_F(CheckResumeTokenTest, ShouldSucceedWithPausesBeforeResumeToken) {
Timestamp resumeTimestamp(100, 1);
- auto checkResumeToken = createCheckResumeToken(resumeTimestamp, "1");
+ auto checkResumeToken = createDSEnsureResumeTokenPresent(resumeTimestamp, "1");
addPause();
- addDocument(resumeTimestamp, "1");
+ addOplogEntryOnTestNS(resumeTimestamp, "1");
// We see the pause we inserted, but not the resume token.
ASSERT_TRUE(checkResumeToken->getNext().isPaused());
@@ -179,10 +469,10 @@ TEST_F(CheckResumeTokenTest, ShouldSucceedWithPausesAfterResumeToken) {
Timestamp resumeTimestamp(100, 1);
Timestamp doc1Timestamp(100, 2);
- auto checkResumeToken = createCheckResumeToken(resumeTimestamp, "1");
- addDocument(resumeTimestamp, "1");
+ auto checkResumeToken = createDSEnsureResumeTokenPresent(resumeTimestamp, "1");
+ addOplogEntryOnTestNS(resumeTimestamp, "1");
addPause();
- addDocument(doc1Timestamp, "2");
+ addOplogEntryOnTestNS(doc1Timestamp, "2");
// Pause added explicitly.
ASSERT_TRUE(checkResumeToken->getNext().isPaused());
@@ -197,13 +487,13 @@ TEST_F(CheckResumeTokenTest, ShouldSucceedWithPausesAfterResumeToken) {
TEST_F(CheckResumeTokenTest, ShouldSucceedWithMultipleDocumentsAfterResumeToken) {
Timestamp resumeTimestamp(100, 1);
- auto checkResumeToken = createCheckResumeToken(resumeTimestamp, "0");
- addDocument(resumeTimestamp, "0");
+ auto checkResumeToken = createDSEnsureResumeTokenPresent(resumeTimestamp, "0");
+ addOplogEntryOnTestNS(resumeTimestamp, "0");
Timestamp doc1Timestamp(100, 2);
Timestamp doc2Timestamp(101, 1);
- addDocument(doc1Timestamp, "1");
- addDocument(doc2Timestamp, "2");
+ addOplogEntryOnTestNS(doc1Timestamp, "1");
+ addOplogEntryOnTestNS(doc2Timestamp, "2");
auto result1 = checkResumeToken->getNext();
ASSERT_TRUE(result1.isAdvanced());
@@ -217,23 +507,28 @@ TEST_F(CheckResumeTokenTest, ShouldSucceedWithMultipleDocumentsAfterResumeToken)
}
TEST_F(CheckResumeTokenTest, ShouldFailIfFirstDocHasWrongResumeToken) {
- Timestamp resumeTimestamp(100, 1);
+ // The first timestamp in the oplog precedes the resume token, and the second matches it...
+ Timestamp doc1Timestamp(100, 1);
+ Timestamp resumeTimestamp(100, 2);
+ Timestamp doc2Timestamp = resumeTimestamp;
- auto checkResumeToken = createCheckResumeToken(resumeTimestamp, "1");
+ auto checkResumeToken = createDSEnsureResumeTokenPresent(resumeTimestamp, "1");
- Timestamp doc1Timestamp(100, 2);
- Timestamp doc2Timestamp(101, 1);
- addDocument(doc1Timestamp, "1");
- addDocument(doc2Timestamp, "2");
+ // ... but there's no entry in the oplog that matches the full token.
+ addOplogEntryOnTestNS(doc1Timestamp, "1");
+ addOplogEntryOnTestNS(doc2Timestamp, "2");
ASSERT_THROWS_CODE(
checkResumeToken->getNext(), AssertionException, ErrorCodes::ChangeStreamFatalError);
}
-TEST_F(CheckResumeTokenTest, ShouldIgnoreChangeWithEarlierTimestamp) {
+TEST_F(CheckResumeTokenTest, ShouldIgnoreChangeWithEarlierResumeToken) {
Timestamp resumeTimestamp(100, 1);
- auto checkResumeToken = createCheckResumeToken(resumeTimestamp, "1");
- addDocument(resumeTimestamp, "0");
+ auto checkResumeToken = createDSEnsureResumeTokenPresent(resumeTimestamp, "1");
+
+ // Add an entry into the oplog with the same timestamp but a lower documentKey. We swallow it
+ // but don't throw - we haven't surpassed the token yet and still may see it in the next doc.
+ addOplogEntryOnTestNS(resumeTimestamp, "0");
ASSERT_TRUE(checkResumeToken->getNext().isEOF());
}
@@ -241,9 +536,9 @@ TEST_F(CheckResumeTokenTest, ShouldFailIfTokenHasWrongNamespace) {
Timestamp resumeTimestamp(100, 1);
auto resumeTokenUUID = UUID::gen();
- auto checkResumeToken = createCheckResumeToken(resumeTimestamp, "1", resumeTokenUUID);
+ auto checkResumeToken = createDSEnsureResumeTokenPresent(resumeTimestamp, "1", resumeTokenUUID);
auto otherUUID = UUID::gen();
- addDocument(resumeTimestamp, "1", otherUUID);
+ addOplogEntryOnTestNS(resumeTimestamp, "1", otherUUID);
ASSERT_THROWS_CODE(
checkResumeToken->getNext(), AssertionException, ErrorCodes::ChangeStreamFatalError);
}
@@ -254,9 +549,9 @@ TEST_F(CheckResumeTokenTest, ShouldSucceedWithBinaryCollation) {
Timestamp resumeTimestamp(100, 1);
- auto checkResumeToken = createCheckResumeToken(resumeTimestamp, "abc");
+ auto checkResumeToken = createDSEnsureResumeTokenPresent(resumeTimestamp, "abc");
// We must not see the following document.
- addDocument(resumeTimestamp, "ABC");
+ addOplogEntryOnTestNS(resumeTimestamp, "ABC");
ASSERT_TRUE(checkResumeToken->getNext().isEOF());
}
@@ -268,13 +563,14 @@ TEST_F(CheckResumeTokenTest, UnshardedTokenSucceedsForShardedResumeOnMongosIfIdM
Timestamp resumeTimestamp(100, 1);
getExpCtx()->inMongos = true;
- auto checkResumeToken = createCheckResumeToken(resumeTimestamp, Document{{"_id"_sd, 1}});
+ auto checkResumeToken =
+ createDSEnsureResumeTokenPresent(resumeTimestamp, Document{{"_id"_sd, 1}});
Timestamp doc1Timestamp(100, 1);
- addDocument(doc1Timestamp, {{"x"_sd, 0}, {"_id"_sd, 1}});
+ addOplogEntryOnTestNS(doc1Timestamp, {{"x"_sd, 0}, {"_id"_sd, 1}});
Timestamp doc2Timestamp(100, 2);
Document doc2DocKey{{"x"_sd, 0}, {"_id"_sd, 2}};
- addDocument(doc2Timestamp, doc2DocKey);
+ addOplogEntryOnTestNS(doc2Timestamp, doc2DocKey);
// We should skip doc1 since it satisfies the resume token, and retrieve doc2.
const auto firstDocAfterResume = checkResumeToken->getNext();
@@ -289,10 +585,11 @@ TEST_F(CheckResumeTokenTest, UnshardedTokenFailsForShardedResumeOnMongosIfIdDoes
Timestamp resumeTimestamp(100, 1);
getExpCtx()->inMongos = true;
- auto checkResumeToken = createCheckResumeToken(resumeTimestamp, Document{{"_id"_sd, 1}});
+ auto checkResumeToken =
+ createDSEnsureResumeTokenPresent(resumeTimestamp, Document{{"_id"_sd, 1}});
- addDocument(Timestamp(100, 1), {{"x"_sd, 0}, {"_id"_sd, 0}});
- addDocument(Timestamp(100, 2), {{"x"_sd, 0}, {"_id"_sd, 2}});
+ addOplogEntryOnTestNS(Timestamp(100, 1), {{"x"_sd, 0}, {"_id"_sd, 0}});
+ addOplogEntryOnTestNS(Timestamp(100, 2), {{"x"_sd, 0}, {"_id"_sd, 2}});
ASSERT_THROWS_CODE(
checkResumeToken->getNext(), AssertionException, ErrorCodes::ChangeStreamFatalError);
@@ -307,10 +604,10 @@ TEST_F(CheckResumeTokenTest, ShardedResumeFailsOnMongosIfTokenHasSubsetOfDocumen
getExpCtx()->inMongos = true;
auto checkResumeToken =
- createCheckResumeToken(resumeTimestamp, Document{{"x"_sd, 0}, {"_id"_sd, 1}});
+ createDSEnsureResumeTokenPresent(resumeTimestamp, Document{{"x"_sd, 0}, {"_id"_sd, 1}});
- addDocument(Timestamp(100, 1), {{"x"_sd, 0}, {"y"_sd, -1}, {"_id"_sd, 1}});
- addDocument(Timestamp(100, 2), {{"x"_sd, 0}, {"y"_sd, -1}, {"_id"_sd, 2}});
+ addOplogEntryOnTestNS(Timestamp(100, 1), {{"x"_sd, 0}, {"y"_sd, -1}, {"_id"_sd, 1}});
+ addOplogEntryOnTestNS(Timestamp(100, 2), {{"x"_sd, 0}, {"y"_sd, -1}, {"_id"_sd, 2}});
ASSERT_THROWS_CODE(
checkResumeToken->getNext(), AssertionException, ErrorCodes::ChangeStreamFatalError);
@@ -323,10 +620,10 @@ TEST_F(CheckResumeTokenTest, ShardedResumeFailsOnMongosIfDocumentKeyIsNonObject)
Timestamp resumeTimestamp(100, 1);
getExpCtx()->inMongos = true;
- auto checkResumeToken = createCheckResumeToken(resumeTimestamp, boost::none);
+ auto checkResumeToken = createDSEnsureResumeTokenPresent(resumeTimestamp, boost::none);
- addDocument(Timestamp(100, 1), {{"x"_sd, 0}, {"_id"_sd, 1}});
- addDocument(Timestamp(100, 2), {{"x"_sd, 0}, {"_id"_sd, 2}});
+ addOplogEntryOnTestNS(Timestamp(100, 1), {{"x"_sd, 0}, {"_id"_sd, 1}});
+ addOplogEntryOnTestNS(Timestamp(100, 2), {{"x"_sd, 0}, {"_id"_sd, 2}});
ASSERT_THROWS_CODE(
checkResumeToken->getNext(), AssertionException, ErrorCodes::ChangeStreamFatalError);
@@ -339,11 +636,12 @@ TEST_F(CheckResumeTokenTest, ShardedResumeFailsOnMongosIfDocumentKeyOmitsId) {
Timestamp resumeTimestamp(100, 1);
getExpCtx()->inMongos = true;
- auto checkResumeToken = createCheckResumeToken(resumeTimestamp, Document{{"x"_sd, 0}});
+ auto checkResumeToken =
+ createDSEnsureResumeTokenPresent(resumeTimestamp, Document{{"x"_sd, 0}});
- addDocument(Timestamp(100, 1), {{"x"_sd, 0}, {"y"_sd, -1}, {"_id", 1}});
- addDocument(Timestamp(100, 1), {{"x"_sd, 0}, {"y"_sd, -1}});
- addDocument(Timestamp(100, 2), {{"x"_sd, 0}, {"y"_sd, -1}});
+ addOplogEntryOnTestNS(Timestamp(100, 1), {{"x"_sd, 0}, {"y"_sd, -1}, {"_id", 1}});
+ addOplogEntryOnTestNS(Timestamp(100, 1), {{"x"_sd, 0}, {"y"_sd, -1}});
+ addOplogEntryOnTestNS(Timestamp(100, 2), {{"x"_sd, 0}, {"y"_sd, -1}});
ASSERT_THROWS_CODE(
checkResumeToken->getNext(), AssertionException, ErrorCodes::ChangeStreamFatalError);
@@ -368,20 +666,20 @@ TEST_F(CheckResumeTokenTest,
// Create the resume token using the higher-sorting UUID.
auto checkResumeToken =
- createCheckResumeToken(resumeTimestamp, Document{{"_id"_sd, 1}}, uuids[1]);
+ createDSEnsureResumeTokenPresent(resumeTimestamp, Document{{"_id"_sd, 1}}, uuids[1]);
// Add two documents which have the same clusterTime but a lower UUID. One of the documents has
// a lower docKey than the resume token, the other has a higher docKey; this demonstrates that
// the UUID is the discriminating factor.
- addDocument(resumeTimestamp, {{"_id"_sd, 0}}, uuids[0]);
- addDocument(resumeTimestamp, {{"_id"_sd, 2}}, uuids[0]);
+ addOplogEntryOnTestNS(resumeTimestamp, {{"_id"_sd, 0}}, uuids[0]);
+ addOplogEntryOnTestNS(resumeTimestamp, {{"_id"_sd, 2}}, uuids[0]);
// Add a third document that matches the resume token.
- addDocument(resumeTimestamp, {{"_id"_sd, 1}}, uuids[1]);
+ addOplogEntryOnTestNS(resumeTimestamp, {{"_id"_sd, 1}}, uuids[1]);
// Add a fourth document with the same timestamp and UUID whose docKey sorts after the token.
auto expectedDocKey = Document{{"_id"_sd, 3}};
- addDocument(resumeTimestamp, expectedDocKey, uuids[1]);
+ addOplogEntryOnTestNS(resumeTimestamp, expectedDocKey, uuids[1]);
// We should skip the first two docs, swallow the resume token, and return the fourth doc.
const auto firstDocAfterResume = checkResumeToken->getNext();
@@ -406,19 +704,19 @@ TEST_F(CheckResumeTokenTest,
// Create the resume token using the lower-sorting UUID.
auto checkResumeToken =
- createCheckResumeToken(resumeTimestamp, Document{{"_id"_sd, 1}}, uuids[0]);
+ createDSEnsureResumeTokenPresent(resumeTimestamp, Document{{"_id"_sd, 1}}, uuids[0]);
// Add a document which has the same clusterTime and a lower docKey but a higher UUID, followed
// by a document which matches the resume token. This is not possible in practice, but it serves
// to demonstrate that the resume attempt fails even when the resume token is present.
- addDocument(resumeTimestamp, {{"_id"_sd, 0}}, uuids[1]);
- addDocument(resumeTimestamp, {{"_id"_sd, 1}}, uuids[0]);
+ addOplogEntryOnTestNS(resumeTimestamp, {{"_id"_sd, 0}}, uuids[1]);
+ addOplogEntryOnTestNS(resumeTimestamp, {{"_id"_sd, 1}}, uuids[0]);
ASSERT_THROWS_CODE(
checkResumeToken->getNext(), AssertionException, ErrorCodes::ChangeStreamFatalError);
}
-TEST_F(CheckResumeTokenTest, ShouldSkipResumeTokensWithEarlierApplyOpsIndex) {
+TEST_F(CheckResumeTokenTest, ShouldSkipResumeTokensWithEarlierTxnOpIndex) {
Timestamp resumeTimestamp(100, 1);
// Create an ordered array of 3 UUIDs.
@@ -427,21 +725,21 @@ TEST_F(CheckResumeTokenTest, ShouldSkipResumeTokensWithEarlierApplyOpsIndex) {
std::sort(uuids.begin(), uuids.end());
auto checkResumeToken =
- createCheckResumeToken(resumeTimestamp, 0, 2, Document{{"_id"_sd, 1}}, uuids[1]);
+ createDSEnsureResumeTokenPresent(resumeTimestamp, 0, 2, Document{{"_id"_sd, 1}}, uuids[1]);
// Add two documents which have the same clusterTime and version but a lower applyOps index. One
// of the documents has a lower uuid than the resume token, the other has a higher uuid; this
// demonstrates that the applyOps index is the discriminating factor.
- addDocument(resumeTimestamp, 0, 0, {{"_id"_sd, 0}}, uuids[0]);
- addDocument(resumeTimestamp, 0, 1, {{"_id"_sd, 2}}, uuids[2]);
+ addOplogEntryOnTestNS(resumeTimestamp, 0, 0, {{"_id"_sd, 0}}, uuids[0]);
+ addOplogEntryOnTestNS(resumeTimestamp, 0, 1, {{"_id"_sd, 2}}, uuids[2]);
// Add a third document that matches the resume token.
- addDocument(resumeTimestamp, 0, 2, {{"_id"_sd, 1}}, uuids[1]);
+ addOplogEntryOnTestNS(resumeTimestamp, 0, 2, {{"_id"_sd, 1}}, uuids[1]);
// Add a fourth document with the same timestamp and version whose applyOps sorts after the
// resume token.
auto expectedDocKey = Document{{"_id"_sd, 3}};
- addDocument(resumeTimestamp, 0, 3, expectedDocKey, uuids[1]);
+ addOplogEntryOnTestNS(resumeTimestamp, 0, 3, expectedDocKey, uuids[1]);
// We should skip the first two docs, swallow the resume token, and return the fourth doc.
const auto firstDocAfterResume = checkResumeToken->getNext();
@@ -458,278 +756,255 @@ TEST_F(CheckResumeTokenTest, ShouldSkipResumeTokensWithEarlierApplyOpsIndex) {
TEST_F(CheckResumeTokenTest, ShouldSucceedWithNoDocuments) {
Timestamp resumeTimestamp(100, 1);
- auto checkResumeToken = createCheckResumeToken(resumeTimestamp, "0");
+ auto checkResumeToken = createDSEnsureResumeTokenPresent(resumeTimestamp, "0");
ASSERT_TRUE(checkResumeToken->getNext().isEOF());
}
-/**
- * A mock MongoProcessInterface which allows mocking a foreign pipeline.
- */
-class MockMongoInterface final : public StubMongoProcessInterface {
-public:
- MockMongoInterface(deque<DocumentSource::GetNextResult> mockResults)
- : _mockResults(std::move(mockResults)) {}
-
- bool isSharded(OperationContext* opCtx, const NamespaceString& ns) final {
- return false;
- }
-
- StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> makePipeline(
- const std::vector<BSONObj>& rawPipeline,
- const boost::intrusive_ptr<ExpressionContext>& expCtx,
- const MakePipelineOptions opts) final {
- auto pipeline = Pipeline::parse(rawPipeline, expCtx);
- if (!pipeline.isOK()) {
- return pipeline.getStatus();
- }
-
- if (opts.optimize) {
- pipeline.getValue()->optimizePipeline();
- }
-
- if (opts.attachCursorSource) {
- uassertStatusOK(attachCursorSourceToPipeline(expCtx, pipeline.getValue().get()));
- }
-
- return pipeline;
- }
-
- Status attachCursorSourceToPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx,
- Pipeline* pipeline) final {
- pipeline->addInitialSource(DocumentSourceMock::create(_mockResults));
- return Status::OK();
- }
-
-private:
- deque<DocumentSource::GetNextResult> _mockResults;
-};
-
-TEST_F(ShardCheckResumabilityTest,
- ShouldSucceedIfResumeTokenIsPresentAndEarliestOplogEntryBeforeToken) {
+TEST_F(CheckResumabilityTest, ShouldSucceedIfResumeTokenIsPresentAndEarliestOplogEntryBeforeToken) {
Timestamp oplogTimestamp(100, 1);
Timestamp resumeTimestamp(100, 2);
- auto shardCheckResumability = createShardCheckResumability(resumeTimestamp);
- deque<DocumentSource::GetNextResult> mockOplog({Document{{"ts", oplogTimestamp}}});
- getExpCtx()->mongoProcessInterface = std::make_shared<MockMongoInterface>(mockOplog);
- addDocument(resumeTimestamp, "ID");
+ auto dsCheckResumability = createDSCheckResumability(resumeTimestamp);
+ addOplogEntryOnOtherNS(oplogTimestamp);
+ addOplogEntryOnTestNS(resumeTimestamp, "ID");
// We should see the resume token.
- auto result = shardCheckResumability->getNext();
+ auto result = dsCheckResumability->getNext();
ASSERT_TRUE(result.isAdvanced());
auto& doc = result.getDocument();
ASSERT_EQ(resumeTimestamp, ResumeToken::parse(doc["_id"].getDocument()).getData().clusterTime);
}
-TEST_F(ShardCheckResumabilityTest,
- ShouldSucceedIfResumeTokenIsPresentAndEarliestOplogEntryAfterToken) {
+TEST_F(CheckResumabilityTest,
+ ShouldSucceedIfResumeTokenIsPresentAndEarliestOplogEntryEqualToToken) {
Timestamp resumeTimestamp(100, 1);
- Timestamp oplogTimestamp(100, 2);
+ Timestamp oplogTimestamp(100, 1);
- ResumeTokenData tokenData(
- resumeTimestamp, 0, 0, testUuid(), Value(Document{{"_id"_sd, "ID"_sd}}));
- auto shardCheckResumability = createShardCheckResumability(tokenData);
- deque<DocumentSource::GetNextResult> mockOplog({Document{{"ts", oplogTimestamp}}});
- getExpCtx()->mongoProcessInterface = std::make_shared<MockMongoInterface>(mockOplog);
- addDocument(resumeTimestamp, "ID");
+ auto dsCheckResumability = createDSCheckResumability(resumeTimestamp);
+ addOplogEntryOnOtherNS(oplogTimestamp);
+ addOplogEntryOnTestNS(resumeTimestamp, "ID");
// We should see the resume token.
- auto result = shardCheckResumability->getNext();
+ auto result = dsCheckResumability->getNext();
ASSERT_TRUE(result.isAdvanced());
auto& doc = result.getDocument();
ASSERT_EQ(resumeTimestamp, ResumeToken::parse(doc["_id"].getDocument()).getData().clusterTime);
}
-TEST_F(ShardCheckResumabilityTest, ShouldSucceedIfResumeTokenIsPresentAndOplogIsEmpty) {
+TEST_F(CheckResumabilityTest, ShouldPermanentlyEofIfOplogIsEmpty) {
Timestamp resumeTimestamp(100, 1);
- ResumeTokenData token(resumeTimestamp, 0, 0, testUuid(), Value(Document{{"_id"_sd, "ID"_sd}}));
- auto shardCheckResumability = createShardCheckResumability(token);
- deque<DocumentSource::GetNextResult> mockOplog;
- getExpCtx()->mongoProcessInterface = std::make_shared<MockMongoInterface>(mockOplog);
- addDocument(resumeTimestamp, "ID");
- // We should see the resume token.
+ // As with other tailable cursors, starting a change stream on an empty capped collection will
+ // cause the cursor to immediately and permanently EOF. This should never happen in practice,
+ // since a replset member can only accept requests while in PRIMARY, SECONDARY or RECOVERING
+ // states, and there must be at least one entry in the oplog in order to reach those states.
+ auto shardCheckResumability = createDSCheckResumability(resumeTimestamp);
auto result = shardCheckResumability->getNext();
- ASSERT_TRUE(result.isAdvanced());
- auto& doc = result.getDocument();
- ASSERT_EQ(resumeTimestamp, ResumeToken::parse(doc["_id"].getDocument()).getData().clusterTime);
+ ASSERT_TRUE(result.isEOF());
+ ASSERT_TRUE(_mock->isPermanentlyEOF());
}
-TEST_F(ShardCheckResumabilityTest,
+TEST_F(CheckResumabilityTest,
ShouldSucceedWithNoDocumentsInPipelineAndEarliestOplogEntryBeforeToken) {
Timestamp oplogTimestamp(100, 1);
Timestamp resumeTimestamp(100, 2);
- auto shardCheckResumability = createShardCheckResumability(resumeTimestamp);
- deque<DocumentSource::GetNextResult> mockOplog({Document{{"ts", oplogTimestamp}}});
- getExpCtx()->mongoProcessInterface = std::make_shared<MockMongoInterface>(mockOplog);
- auto result = shardCheckResumability->getNext();
+ auto dsCheckResumability = createDSCheckResumability(resumeTimestamp);
+ addOplogEntryOnOtherNS(oplogTimestamp);
+ auto result = dsCheckResumability->getNext();
ASSERT_TRUE(result.isEOF());
}
-TEST_F(ShardCheckResumabilityTest,
- ShouldFailWithNoDocumentsInPipelineAndEarliestOplogEntryAfterToken) {
+TEST_F(CheckResumabilityTest,
+ ShouldSucceedWithNoDocumentsInPipelineAndEarliestOplogEntryEqualToToken) {
+ Timestamp oplogTimestamp(100, 1);
+ Timestamp resumeTimestamp(100, 1);
+
+ auto dsCheckResumability = createDSCheckResumability(resumeTimestamp);
+ addOplogEntryOnOtherNS(oplogTimestamp);
+ auto result = dsCheckResumability->getNext();
+ ASSERT_TRUE(result.isEOF());
+}
+
+TEST_F(CheckResumabilityTest, ShouldFailWithNoDocumentsInPipelineAndEarliestOplogEntryAfterToken) {
Timestamp resumeTimestamp(100, 1);
Timestamp oplogTimestamp(100, 2);
- auto shardCheckResumability = createShardCheckResumability(resumeTimestamp);
- deque<DocumentSource::GetNextResult> mockOplog({Document{{"ts", oplogTimestamp}}});
- getExpCtx()->mongoProcessInterface = std::make_shared<MockMongoInterface>(mockOplog);
+ auto dsCheckResumability = createDSCheckResumability(resumeTimestamp);
+ addOplogEntryOnOtherNS(oplogTimestamp);
ASSERT_THROWS_CODE(
- shardCheckResumability->getNext(), AssertionException, ErrorCodes::ChangeStreamFatalError);
+ dsCheckResumability->getNext(), AssertionException, ErrorCodes::ChangeStreamFatalError);
}
-TEST_F(ShardCheckResumabilityTest, ShouldSucceedWithNoDocumentsInPipelineAndOplogIsEmpty) {
+TEST_F(CheckResumabilityTest, ShouldSucceedWithNoDocumentsInPipelineAndOplogIsEmpty) {
Timestamp resumeTimestamp(100, 2);
- auto shardCheckResumability = createShardCheckResumability(resumeTimestamp);
- deque<DocumentSource::GetNextResult> mockOplog;
- getExpCtx()->mongoProcessInterface = std::make_shared<MockMongoInterface>(mockOplog);
- auto result = shardCheckResumability->getNext();
+ auto dsCheckResumability = createDSCheckResumability(resumeTimestamp);
+ auto result = dsCheckResumability->getNext();
ASSERT_TRUE(result.isEOF());
}
-TEST_F(ShardCheckResumabilityTest,
+TEST_F(CheckResumabilityTest,
ShouldSucceedWithLaterDocumentsInPipelineAndEarliestOplogEntryBeforeToken) {
Timestamp oplogTimestamp(100, 1);
Timestamp resumeTimestamp(100, 2);
Timestamp docTimestamp(100, 3);
- auto shardCheckResumability = createShardCheckResumability(resumeTimestamp);
- addDocument(docTimestamp, "ID");
- deque<DocumentSource::GetNextResult> mockOplog({Document{{"ts", oplogTimestamp}}});
- getExpCtx()->mongoProcessInterface = std::make_shared<MockMongoInterface>(mockOplog);
- auto result = shardCheckResumability->getNext();
+ auto dsCheckResumability = createDSCheckResumability(resumeTimestamp);
+ addOplogEntryOnOtherNS(oplogTimestamp);
+ addOplogEntryOnTestNS(docTimestamp, "ID");
+ auto result = dsCheckResumability->getNext();
ASSERT_TRUE(result.isAdvanced());
auto& doc = result.getDocument();
ASSERT_EQ(docTimestamp, ResumeToken::parse(doc["_id"].getDocument()).getData().clusterTime);
}
-TEST_F(ShardCheckResumabilityTest,
+TEST_F(CheckResumabilityTest,
+ ShouldSucceedWithLaterDocumentsInPipelineAndEarliestOplogEntryEqualToToken) {
+ Timestamp oplogTimestamp(100, 1);
+ Timestamp resumeTimestamp(100, 1);
+ Timestamp docTimestamp(100, 3);
+
+ auto dsCheckResumability = createDSCheckResumability(resumeTimestamp);
+ addOplogEntryOnOtherNS(oplogTimestamp);
+ addOplogEntryOnTestNS(docTimestamp, "ID");
+ auto result = dsCheckResumability->getNext();
+ ASSERT_TRUE(result.isAdvanced());
+ auto& doc = result.getDocument();
+ ASSERT_EQ(docTimestamp, ResumeToken::parse(doc["_id"].getDocument()).getData().clusterTime);
+}
+
+TEST_F(CheckResumabilityTest,
ShouldFailWithLaterDocumentsInPipelineAndEarliestOplogEntryAfterToken) {
Timestamp resumeTimestamp(100, 1);
Timestamp oplogTimestamp(100, 2);
Timestamp docTimestamp(100, 3);
- auto shardCheckResumability = createShardCheckResumability(resumeTimestamp);
- addDocument(docTimestamp, "ID");
- deque<DocumentSource::GetNextResult> mockOplog({Document{{"ts", oplogTimestamp}}});
- getExpCtx()->mongoProcessInterface = std::make_shared<MockMongoInterface>(mockOplog);
+ auto dsCheckResumability = createDSCheckResumability(resumeTimestamp);
+ addOplogEntryOnOtherNS(oplogTimestamp);
+ addOplogEntryOnTestNS(docTimestamp, "ID");
ASSERT_THROWS_CODE(
- shardCheckResumability->getNext(), AssertionException, ErrorCodes::ChangeStreamFatalError);
+ dsCheckResumability->getNext(), AssertionException, ErrorCodes::ChangeStreamFatalError);
}
-TEST_F(ShardCheckResumabilityTest, ShouldIgnoreOplogAfterFirstDoc) {
+TEST_F(CheckResumabilityTest,
+ ShouldFailWithoutReadingLaterDocumentsInPipelineIfEarliestOplogEntryAfterToken) {
+ Timestamp resumeTimestamp(100, 1);
+ Timestamp oplogTimestamp(100, 2);
+ Timestamp docTimestamp(100, 3);
+
+ auto dsCheckResumability = createDSCheckResumability(resumeTimestamp);
+ addOplogEntryOnOtherNS(oplogTimestamp);
+ addOplogEntryOnTestNS(docTimestamp, "ID");
+ // Confirm that there are two documents queued in the mock oplog.
+ ASSERT_EQ(_mock->size(), 2u);
+ ASSERT_THROWS_CODE(
+ dsCheckResumability->getNext(), AssertionException, ErrorCodes::ChangeStreamFatalError);
+ // Confirm that only the first document was read before the assertion was thrown.
+ ASSERT_EQ(_mock->size(), 1u);
+}
+
+TEST_F(CheckResumabilityTest, ShouldIgnoreOplogAfterFirstDoc) {
Timestamp oplogTimestamp(100, 1);
Timestamp resumeTimestamp(100, 2);
Timestamp oplogFutureTimestamp(100, 3);
Timestamp docTimestamp(100, 4);
- auto shardCheckResumability = createShardCheckResumability(resumeTimestamp);
- addDocument(docTimestamp, "ID");
- deque<DocumentSource::GetNextResult> mockOplog({Document{{"ts", oplogTimestamp}}});
- getExpCtx()->mongoProcessInterface = std::make_shared<MockMongoInterface>(mockOplog);
- auto result1 = shardCheckResumability->getNext();
+ auto dsCheckResumability = createDSCheckResumability(resumeTimestamp);
+ addOplogEntryOnOtherNS(oplogTimestamp);
+ addOplogEntryOnTestNS(docTimestamp, "ID");
+ auto result1 = dsCheckResumability->getNext();
ASSERT_TRUE(result1.isAdvanced());
auto& doc1 = result1.getDocument();
ASSERT_EQ(docTimestamp, ResumeToken::parse(doc1["_id"].getDocument()).getData().clusterTime);
- mockOplog = {Document{{"ts", oplogFutureTimestamp}}};
- getExpCtx()->mongoProcessInterface = std::make_shared<MockMongoInterface>(mockOplog);
- auto result2 = shardCheckResumability->getNext();
+ addOplogEntryOnOtherNS(oplogFutureTimestamp);
+ auto result2 = dsCheckResumability->getNext();
ASSERT_TRUE(result2.isEOF());
}
-TEST_F(ShardCheckResumabilityTest, ShouldSucceedWhenOplogEntriesExistBeforeAndAfterResumeToken) {
+TEST_F(CheckResumabilityTest, ShouldSucceedWhenOplogEntriesExistBeforeAndAfterResumeToken) {
Timestamp oplogTimestamp(100, 1);
Timestamp resumeTimestamp(100, 2);
Timestamp oplogFutureTimestamp(100, 3);
Timestamp docTimestamp(100, 4);
- auto shardCheckResumability = createShardCheckResumability(resumeTimestamp);
- addDocument(docTimestamp, "ID");
- deque<DocumentSource::GetNextResult> mockOplog(
- {{Document{{"ts", oplogTimestamp}}}, {Document{{"ts", oplogFutureTimestamp}}}});
- getExpCtx()->mongoProcessInterface = std::make_shared<MockMongoInterface>(mockOplog);
- auto result1 = shardCheckResumability->getNext();
+ auto dsCheckResumability = createDSCheckResumability(resumeTimestamp);
+ addOplogEntryOnOtherNS(oplogTimestamp);
+ addOplogEntryOnOtherNS(oplogFutureTimestamp);
+ addOplogEntryOnTestNS(docTimestamp, "ID");
+
+ auto result1 = dsCheckResumability->getNext();
ASSERT_TRUE(result1.isAdvanced());
auto& doc1 = result1.getDocument();
ASSERT_EQ(docTimestamp, ResumeToken::parse(doc1["_id"].getDocument()).getData().clusterTime);
- auto result2 = shardCheckResumability->getNext();
+ auto result2 = dsCheckResumability->getNext();
ASSERT_TRUE(result2.isEOF());
}
-TEST_F(ShardCheckResumabilityTest, ShouldIgnoreOplogAfterFirstEOF) {
+TEST_F(CheckResumabilityTest, ShouldIgnoreOplogAfterFirstEOF) {
Timestamp oplogTimestamp(100, 1);
Timestamp resumeTimestamp(100, 2);
Timestamp oplogFutureTimestamp(100, 3);
- auto shardCheckResumability = createShardCheckResumability(resumeTimestamp);
- deque<DocumentSource::GetNextResult> mockOplog({Document{{"ts", oplogTimestamp}}});
- getExpCtx()->mongoProcessInterface = std::make_shared<MockMongoInterface>(mockOplog);
- auto result1 = shardCheckResumability->getNext();
+ auto dsCheckResumability = createDSCheckResumability(resumeTimestamp);
+ addOplogEntryOnOtherNS(oplogTimestamp);
+ auto result1 = dsCheckResumability->getNext();
ASSERT_TRUE(result1.isEOF());
- mockOplog = {Document{{"ts", oplogFutureTimestamp}}};
- getExpCtx()->mongoProcessInterface = std::make_shared<MockMongoInterface>(mockOplog);
- auto result2 = shardCheckResumability->getNext();
+ addOplogEntryOnOtherNS(oplogFutureTimestamp);
+ auto result2 = dsCheckResumability->getNext();
ASSERT_TRUE(result2.isEOF());
}
-TEST_F(ShardCheckResumabilityTest, ShouldSwallowAllEventsAtSameClusterTimeUpToResumeToken) {
+TEST_F(CheckResumabilityTest, ShouldSwallowAllEventsAtSameClusterTimeUpToResumeToken) {
Timestamp resumeTimestamp(100, 2);
- // Set up the DocumentSourceShardCheckResumability to check for an exact event ResumeToken.
+ // Set up the DocumentSourceCheckResumability to check for an exact event ResumeToken.
ResumeTokenData token(resumeTimestamp, 0, 0, testUuid(), Value(Document{{"_id"_sd, "3"_sd}}));
- auto shardCheckResumability = createShardCheckResumability(token);
- deque<DocumentSource::GetNextResult> mockOplog;
- getExpCtx()->mongoProcessInterface = std::make_shared<MockMongoInterface>(mockOplog);
+ auto dsCheckResumability = createDSCheckResumability(token);
// Add 2 events at the same clusterTime as the resume token but whose docKey sort before it.
- addDocument(resumeTimestamp, "1");
- addDocument(resumeTimestamp, "2");
+ addOplogEntryOnTestNS(resumeTimestamp, "1");
+ addOplogEntryOnTestNS(resumeTimestamp, "2");
// Add the resume token, plus one further event whose docKey sorts after the token.
- addDocument(resumeTimestamp, "3");
- addDocument(resumeTimestamp, "4");
+ addOplogEntryOnTestNS(resumeTimestamp, "3");
+ addOplogEntryOnTestNS(resumeTimestamp, "4");
// The first event we see should be the resume token...
- auto result = shardCheckResumability->getNext();
+ auto result = dsCheckResumability->getNext();
ASSERT_TRUE(result.isAdvanced());
auto doc = result.getDocument();
ASSERT_EQ(token, ResumeToken::parse(doc["_id"].getDocument()).getData());
// ... then the post-token event, and then finally EOF.
- result = shardCheckResumability->getNext();
+ result = dsCheckResumability->getNext();
ASSERT_TRUE(result.isAdvanced());
- Document postResumeTokenDoc{
- {"_id",
- ResumeToken({resumeTimestamp, 0, 0, testUuid(), Value(Document{{"_id"_sd, "4"_sd}})})
- .toDocument(ResumeToken::SerializationFormat::kHexString)}};
- ASSERT_DOCUMENT_EQ(result.getDocument(), postResumeTokenDoc);
- ASSERT_TRUE(shardCheckResumability->getNext().isEOF());
+ auto postResumeTokenDoc =
+ ResumeToken({resumeTimestamp, 0, 0, testUuid(), Value(Document{{"_id"_sd, "4"_sd}})})
+ .toDocument(ResumeToken::SerializationFormat::kHexString);
+ ASSERT_DOCUMENT_EQ(result.getDocument()["_id"].getDocument(), postResumeTokenDoc);
+ ASSERT_TRUE(dsCheckResumability->getNext().isEOF());
}
-TEST_F(ShardCheckResumabilityTest, ShouldSwallowAllEventsAtSameClusterTimePriorToResumeToken) {
+TEST_F(CheckResumabilityTest, ShouldSwallowAllEventsAtSameClusterTimePriorToResumeToken) {
Timestamp resumeTimestamp(100, 2);
- // Set up the DocumentSourceShardCheckResumability to check for an exact event ResumeToken.
+ // Set up the DocumentSourceCheckResumability to check for an exact event ResumeToken.
ResumeTokenData token(resumeTimestamp, 0, 0, testUuid(), Value(Document{{"_id"_sd, "3"_sd}}));
- auto shardCheckResumability = createShardCheckResumability(token);
- deque<DocumentSource::GetNextResult> mockOplog;
- getExpCtx()->mongoProcessInterface = std::make_shared<MockMongoInterface>(mockOplog);
+ auto dsCheckResumability = createDSCheckResumability(token);
// Add 2 events at the same clusterTime as the resume token but whose docKey sort before it.
- addDocument(resumeTimestamp, "1");
- addDocument(resumeTimestamp, "2");
+ addOplogEntryOnTestNS(resumeTimestamp, "1");
+ addOplogEntryOnTestNS(resumeTimestamp, "2");
// Add one further event whose docKey sorts after the token.
- addDocument(resumeTimestamp, "4");
+ addOplogEntryOnTestNS(resumeTimestamp, "4");
// The first event we see should be the post-token event, followed by EOF.
- auto result = shardCheckResumability->getNext();
+ auto result = dsCheckResumability->getNext();
ASSERT_TRUE(result.isAdvanced());
- Document postResumeTokenDoc{
- {"_id",
- ResumeToken({resumeTimestamp, 0, 0, testUuid(), Value(Document{{"_id"_sd, "4"_sd}})})
- .toDocument(ResumeToken::SerializationFormat::kHexString)}};
- ASSERT_DOCUMENT_EQ(result.getDocument(), postResumeTokenDoc);
- ASSERT_TRUE(shardCheckResumability->getNext().isEOF());
+ auto postResumeTokenDoc =
+ ResumeToken({resumeTimestamp, 0, 0, testUuid(), Value(Document{{"_id"_sd, "4"_sd}})})
+ .toDocument(ResumeToken::SerializationFormat::kHexString);
+ ASSERT_DOCUMENT_EQ(result.getDocument()["_id"].getDocument(), postResumeTokenDoc);
+ ASSERT_TRUE(dsCheckResumability->getNext().isEOF());
}
} // namespace
diff --git a/src/mongo/db/pipeline/document_source_mock.cpp b/src/mongo/db/pipeline/document_source_mock.cpp
index 26623df34a6..19771bdbc2c 100644
--- a/src/mongo/db/pipeline/document_source_mock.cpp
+++ b/src/mongo/db/pipeline/document_source_mock.cpp
@@ -106,4 +106,8 @@ DocumentSource::GetNextResult DocumentSourceMock::getNext() {
queue.pop_front();
return next;
}
+
+const size_t DocumentSourceMock::size() const {
+ return queue.size();
}
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_mock.h b/src/mongo/db/pipeline/document_source_mock.h
index aeade157c7e..1b88503706d 100644
--- a/src/mongo/db/pipeline/document_source_mock.h
+++ b/src/mongo/db/pipeline/document_source_mock.h
@@ -78,6 +78,8 @@ public:
static boost::intrusive_ptr<DocumentSourceMock> create(
const std::initializer_list<const char*>& jsons);
+ const size_t size() const;
+
void reattachToOperationContext(OperationContext* opCtx) {
isDetachedFromOpCtx = false;
}
diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp
index 5291d665598..c4e19f548d5 100644
--- a/src/mongo/db/pipeline/pipeline_d.cpp
+++ b/src/mongo/db/pipeline/pipeline_d.cpp
@@ -401,7 +401,8 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> PipelineD::prep
pipeline->getSources().empty() ? nullptr : pipeline->getSources().front().get();
if (firstSource && firstSource->constraints().isChangeStreamStage()) {
invariant(expCtx->tailableMode == TailableModeEnum::kTailableAndAwaitData);
- plannerOpts |= QueryPlannerParams::TRACK_LATEST_OPLOG_TS;
+ plannerOpts |= (QueryPlannerParams::TRACK_LATEST_OPLOG_TS |
+ QueryPlannerParams::ASSERT_MIN_TS_HAS_NOT_FALLEN_OFF_OPLOG);
}
if (expCtx->needsMerge && expCtx->tailableMode == TailableModeEnum::kTailableAndAwaitData) {
diff --git a/src/mongo/db/query/get_executor.cpp b/src/mongo/db/query/get_executor.cpp
index 41b7d70f786..3e0d1885a83 100644
--- a/src/mongo/db/query/get_executor.cpp
+++ b/src/mongo/db/query/get_executor.cpp
@@ -650,11 +650,14 @@ StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getOplogStartHack(
if (startLoc) {
params.start = *startLoc;
}
+ params.minTs = minTs;
params.maxTs = maxTs;
params.direction = CollectionScanParams::FORWARD;
params.tailable = cq->getQueryRequest().isTailable();
params.shouldTrackLatestOplogTimestamp =
plannerOptions & QueryPlannerParams::TRACK_LATEST_OPLOG_TS;
+ params.assertMinTsHasNotFallenOffOplog =
+ plannerOptions & QueryPlannerParams::ASSERT_MIN_TS_HAS_NOT_FALLEN_OFF_OPLOG;
params.shouldWaitForOplogVisibility =
shouldWaitForOplogVisibility(opCtx, collection, params.tailable);
diff --git a/src/mongo/db/query/planner_access.cpp b/src/mongo/db/query/planner_access.cpp
index 7f2fa557584..d220eec241c 100644
--- a/src/mongo/db/query/planner_access.cpp
+++ b/src/mongo/db/query/planner_access.cpp
@@ -154,6 +154,8 @@ std::unique_ptr<QuerySolutionNode> QueryPlannerAccess::makeCollectionScan(
csn->maxScan = query.getQueryRequest().getMaxScan();
csn->shouldTrackLatestOplogTimestamp =
params.options & QueryPlannerParams::TRACK_LATEST_OPLOG_TS;
+ csn->assertMinTsHasNotFallenOffOplog =
+ params.options & QueryPlannerParams::ASSERT_MIN_TS_HAS_NOT_FALLEN_OFF_OPLOG;
csn->shouldWaitForOplogVisibility =
params.options & QueryPlannerParams::OPLOG_SCAN_WAIT_FOR_VISIBLE;
diff --git a/src/mongo/db/query/query_planner.cpp b/src/mongo/db/query/query_planner.cpp
index 18944f8e17b..e4fc226157b 100644
--- a/src/mongo/db/query/query_planner.cpp
+++ b/src/mongo/db/query/query_planner.cpp
@@ -138,6 +138,9 @@ string optionString(size_t options) {
case QueryPlannerParams::OPLOG_SCAN_WAIT_FOR_VISIBLE:
ss << "OPLOG_SCAN_WAIT_FOR_VISIBLE ";
break;
+ case QueryPlannerParams::ASSERT_MIN_TS_HAS_NOT_FALLEN_OFF_OPLOG:
+ ss << "ASSERT_MIN_TS_HAS_NOT_FALLEN_OFF_OPLOG ";
+ break;
case QueryPlannerParams::ENUMERATE_OR_CHILDREN_LOCKSTEP:
ss << "ENUMERATE_OR_CHILDREN_LOCKSTEP ";
break;
diff --git a/src/mongo/db/query/query_planner_params.h b/src/mongo/db/query/query_planner_params.h
index 94cedcfbe0e..46ff6244726 100644
--- a/src/mongo/db/query/query_planner_params.h
+++ b/src/mongo/db/query/query_planner_params.h
@@ -103,6 +103,10 @@ struct QueryPlannerParams {
// Set this so that collection scans on the oplog wait for visibility before reading.
OPLOG_SCAN_WAIT_FOR_VISIBLE = 1 << 13,
+ // Set this on an oplog scan to uassert that the oplog has not already rolled over the
+ // minimum 'ts' timestamp specified in the query.
+ ASSERT_MIN_TS_HAS_NOT_FALLEN_OFF_OPLOG = 1 << 14,
+
// Instruct the plan enumerator to enumerate contained $ors in a special order. $or
// enumeration can generate an exponential number of plans, and is therefore limited at some
// arbitrary cutoff controlled by a parameter. When this limit is hit, the order of
@@ -118,7 +122,7 @@ struct QueryPlannerParams {
// order, we would get assignments [a_b, a_b], [a_c, a_c], [a_c, a_b], then [a_b, a_c]. This
// is thought to be helpful in general, but particularly in cases where all children of the
// $or use the same fields and have the same indexes available, as in this example.
- ENUMERATE_OR_CHILDREN_LOCKSTEP = 1 << 14,
+ ENUMERATE_OR_CHILDREN_LOCKSTEP = 1 << 15,
};
// See Options enum above.
diff --git a/src/mongo/db/query/query_solution.cpp b/src/mongo/db/query/query_solution.cpp
index f7e85aaa9e4..39ab80a4a4b 100644
--- a/src/mongo/db/query/query_solution.cpp
+++ b/src/mongo/db/query/query_solution.cpp
@@ -250,6 +250,7 @@ QuerySolutionNode* CollectionScanNode::clone() const {
copy->direction = this->direction;
copy->maxScan = this->maxScan;
copy->shouldTrackLatestOplogTimestamp = this->shouldTrackLatestOplogTimestamp;
+ copy->assertMinTsHasNotFallenOffOplog = this->assertMinTsHasNotFallenOffOplog;
copy->shouldWaitForOplogVisibility = this->shouldWaitForOplogVisibility;
return copy;
diff --git a/src/mongo/db/query/query_solution.h b/src/mongo/db/query/query_solution.h
index 2543b4e7c2b..1546ff1421d 100644
--- a/src/mongo/db/query/query_solution.h
+++ b/src/mongo/db/query/query_solution.h
@@ -319,6 +319,9 @@ struct CollectionScanNode : public QuerySolutionNode {
// across a sharded cluster.
bool shouldTrackLatestOplogTimestamp = false;
+ // Should we assert that the specified minTS has not fallen off the oplog?
+ bool assertMinTsHasNotFallenOffOplog = false;
+
int direction;
// maxScan option to .find() limits how many docs we look at.
diff --git a/src/mongo/db/query/stage_builder.cpp b/src/mongo/db/query/stage_builder.cpp
index 89a996be968..0f323f6aa23 100644
--- a/src/mongo/db/query/stage_builder.cpp
+++ b/src/mongo/db/query/stage_builder.cpp
@@ -81,6 +81,7 @@ PlanStage* buildStages(OperationContext* opCtx,
params.collection = collection;
params.tailable = csn->tailable;
params.shouldTrackLatestOplogTimestamp = csn->shouldTrackLatestOplogTimestamp;
+ params.assertMinTsHasNotFallenOffOplog = csn->assertMinTsHasNotFallenOffOplog;
params.direction = (csn->direction == 1) ? CollectionScanParams::FORWARD
: CollectionScanParams::BACKWARD;
params.maxScan = csn->maxScan;
diff --git a/src/mongo/db/repl/oplog_entry.h b/src/mongo/db/repl/oplog_entry.h
index 815de6a4074..f89016e8bc3 100644
--- a/src/mongo/db/repl/oplog_entry.h
+++ b/src/mongo/db/repl/oplog_entry.h
@@ -40,6 +40,11 @@ namespace mongo {
namespace repl {
/**
+ * The first oplog entry is a no-op with this message in its "msg" field.
+ */
+constexpr auto kInitiatingSetMsg = "initiating set"_sd;
+
+/**
* A parsed oplog entry that inherits from the OplogEntryBase parsed by the IDL.
* This class is immutable.
*/
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
index 8f3870337c9..092e6971ee8 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
@@ -431,8 +431,7 @@ Status ReplicationCoordinatorExternalStateImpl::initializeReplSetStorage(Operati
WriteUnitOfWork wuow(opCtx);
Helpers::putSingleton(opCtx, configCollectionName, config);
- const auto msgObj = BSON("msg"
- << "initiating set");
+ const auto msgObj = BSON("msg" << kInitiatingSetMsg);
_service->getOpObserver()->onOpMessage(opCtx, msgObj);
wuow.commit();
// ReplSetTest assumes that immediately after the replSetInitiate