summaryrefslogtreecommitdiff
path: root/src/mongo/db/exec/collection_scan.cpp
diff options
context:
space:
mode:
authorBernard Gorman <bernard.gorman@gmail.com>2020-06-01 18:47:00 +0100
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-08-27 13:09:38 +0000
commit694ed4153b9d5424b5d169fea5c68f99d4dfb45a (patch)
treeb3cffb5dce360007663e53a2aba68a77f89fdf86 /src/mongo/db/exec/collection_scan.cpp
parent1d3972cea1ae1a35e398fe61882cd455d78c01d1 (diff)
downloadmongo-694ed4153b9d5424b5d169fea5c68f99d4dfb45a.tar.gz
SERVER-48523 Unconditionally check the first entry in the oplog when attempting to resume a change stream
Diffstat (limited to 'src/mongo/db/exec/collection_scan.cpp')
-rw-r--r--src/mongo/db/exec/collection_scan.cpp32
1 files changed, 28 insertions, 4 deletions
diff --git a/src/mongo/db/exec/collection_scan.cpp b/src/mongo/db/exec/collection_scan.cpp
index 00547983fc8..ffabd1886b4 100644
--- a/src/mongo/db/exec/collection_scan.cpp
+++ b/src/mongo/db/exec/collection_scan.cpp
@@ -79,6 +79,12 @@ CollectionScan::CollectionScan(ExpressionContext* expCtx,
}
invariant(!_params.shouldTrackLatestOplogTimestamp || collection->ns().isOplog());
+ // We should never see 'assertMinTsHasNotFallenOffOplog' if 'minTS' is not present.
+ if (params.assertMinTsHasNotFallenOffOplog) {
+ invariant(params.shouldTrackLatestOplogTimestamp);
+ invariant(params.minTs);
+ }
+
if (params.resumeAfterRecordId) {
// The 'resumeAfterRecordId' parameter is used for resumable collection scans, which we
// only support in the forward direction.
@@ -187,19 +193,20 @@ PlanStage::StageState CollectionScan::doWork(WorkingSetID* out) {
}
if (!record) {
- // We just hit EOF. If we are tailable and have already returned data, leave us in a
- // state to pick up where we left off on the next call to work(). Otherwise EOF is
- // permanent.
+ // We hit EOF. If we are tailable and have already seen data, leave us in a state to pick up
+ // where we left off on the next call to work(). Otherwise, the EOF is permanent.
if (_params.tailable && !_lastSeenId.isNull()) {
_cursor.reset();
} else {
_commonStats.isEOF = true;
}
-
return PlanStage::IS_EOF;
}
_lastSeenId = record->id;
+ if (_params.assertMinTsHasNotFallenOffOplog) {
+ assertMinTsHasNotFallenOffOplog(*record);
+ }
if (_params.shouldTrackLatestOplogTimestamp) {
setLatestOplogEntryTimestamp(*record);
}
@@ -223,6 +230,23 @@ void CollectionScan::setLatestOplogEntryTimestamp(const Record& record) {
_latestOplogEntryTimestamp = std::max(_latestOplogEntryTimestamp, tsElem.timestamp());
}
+void CollectionScan::assertMinTsHasNotFallenOffOplog(const Record& record) {
+ // If the first entry we see in the oplog is the replset initialization, then it doesn't matter
+ // if its timestamp is later than the specified minTs; no events earlier than the minTs can have
+ // fallen off this oplog. Otherwise, verify that the timestamp of the first observed oplog entry
+ // is earlier than or equal to the minTs time.
+ auto oplogEntry = invariantStatusOK(repl::OplogEntry::parse(record.data.toBson()));
+ invariant(_specificStats.docsTested == 0);
+ const bool isNewRS =
+ oplogEntry.getObject().binaryEqual(BSON("msg" << repl::kInitiatingSetMsg)) &&
+ oplogEntry.getOpType() == repl::OpTypeEnum::kNoop;
+ uassert(ErrorCodes::OplogQueryMinTsMissing,
+ "Specified minTs has already fallen off the oplog",
+ isNewRS || oplogEntry.getTimestamp() <= *_params.minTs);
+ // We don't need to check this assertion again after we've confirmed the first oplog event.
+ _params.assertMinTsHasNotFallenOffOplog = false;
+}
+
PlanStage::StageState CollectionScan::returnIfMatches(WorkingSetMember* member,
WorkingSetID memberID,
WorkingSetID* out) {