diff options
author | Bernard Gorman <bernard.gorman@gmail.com> | 2020-06-01 18:47:00 +0100 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-08-27 13:09:38 +0000 |
commit | 694ed4153b9d5424b5d169fea5c68f99d4dfb45a (patch) | |
tree | b3cffb5dce360007663e53a2aba68a77f89fdf86 /src/mongo/db/exec | |
parent | 1d3972cea1ae1a35e398fe61882cd455d78c01d1 (diff) | |
download | mongo-694ed4153b9d5424b5d169fea5c68f99d4dfb45a.tar.gz |
SERVER-48523 Unconditionally check the first entry in the oplog when attempting to resume a change stream
Diffstat (limited to 'src/mongo/db/exec')
-rw-r--r-- | src/mongo/db/exec/collection_scan.cpp | 32 | ||||
-rw-r--r-- | src/mongo/db/exec/collection_scan.h | 5 | ||||
-rw-r--r-- | src/mongo/db/exec/collection_scan_common.h | 3 |
3 files changed, 36 insertions, 4 deletions
diff --git a/src/mongo/db/exec/collection_scan.cpp b/src/mongo/db/exec/collection_scan.cpp index 00547983fc8..ffabd1886b4 100644 --- a/src/mongo/db/exec/collection_scan.cpp +++ b/src/mongo/db/exec/collection_scan.cpp @@ -79,6 +79,12 @@ CollectionScan::CollectionScan(ExpressionContext* expCtx, } invariant(!_params.shouldTrackLatestOplogTimestamp || collection->ns().isOplog()); + // We should never see 'assertMinTsHasNotFallenOffOplog' if 'minTS' is not present. + if (params.assertMinTsHasNotFallenOffOplog) { + invariant(params.shouldTrackLatestOplogTimestamp); + invariant(params.minTs); + } + if (params.resumeAfterRecordId) { // The 'resumeAfterRecordId' parameter is used for resumable collection scans, which we // only support in the forward direction. @@ -187,19 +193,20 @@ PlanStage::StageState CollectionScan::doWork(WorkingSetID* out) { } if (!record) { - // We just hit EOF. If we are tailable and have already returned data, leave us in a - // state to pick up where we left off on the next call to work(). Otherwise EOF is - // permanent. + // We hit EOF. If we are tailable and have already seen data, leave us in a state to pick up + // where we left off on the next call to work(). Otherwise, the EOF is permanent. if (_params.tailable && !_lastSeenId.isNull()) { _cursor.reset(); } else { _commonStats.isEOF = true; } - return PlanStage::IS_EOF; } _lastSeenId = record->id; + if (_params.assertMinTsHasNotFallenOffOplog) { + assertMinTsHasNotFallenOffOplog(*record); + } if (_params.shouldTrackLatestOplogTimestamp) { setLatestOplogEntryTimestamp(*record); } @@ -223,6 +230,23 @@ void CollectionScan::setLatestOplogEntryTimestamp(const Record& record) { _latestOplogEntryTimestamp = std::max(_latestOplogEntryTimestamp, tsElem.timestamp()); } +void CollectionScan::assertMinTsHasNotFallenOffOplog(const Record& record) { + // If the first entry we see in the oplog is the replset initialization, then it doesn't matter + // if its timestamp is later than the specified minTs; no events earlier than the minTs can have + // fallen off this oplog. Otherwise, verify that the timestamp of the first observed oplog entry + // is earlier than or equal to the minTs time. + auto oplogEntry = invariantStatusOK(repl::OplogEntry::parse(record.data.toBson())); + invariant(_specificStats.docsTested == 0); + const bool isNewRS = + oplogEntry.getObject().binaryEqual(BSON("msg" << repl::kInitiatingSetMsg)) && + oplogEntry.getOpType() == repl::OpTypeEnum::kNoop; + uassert(ErrorCodes::OplogQueryMinTsMissing, + "Specified minTs has already fallen off the oplog", + isNewRS || oplogEntry.getTimestamp() <= *_params.minTs); + // We don't need to check this assertion again after we've confirmed the first oplog event. + _params.assertMinTsHasNotFallenOffOplog = false; +} + PlanStage::StageState CollectionScan::returnIfMatches(WorkingSetMember* member, WorkingSetID memberID, WorkingSetID* out) { diff --git a/src/mongo/db/exec/collection_scan.h b/src/mongo/db/exec/collection_scan.h index 74d9d8ddb29..3d79915b0fe 100644 --- a/src/mongo/db/exec/collection_scan.h +++ b/src/mongo/db/exec/collection_scan.h @@ -100,6 +100,11 @@ private: */ void setLatestOplogEntryTimestamp(const Record& record); + /** + * Asserts that the 'minTs' specified in the query filter has not already fallen off the oplog. + */ + void assertMinTsHasNotFallenOffOplog(const Record& record); + // WorkingSet is not owned by us. WorkingSet* _workingSet; diff --git a/src/mongo/db/exec/collection_scan_common.h b/src/mongo/db/exec/collection_scan_common.h index 659f6659eb1..aa0c790d1fd 100644 --- a/src/mongo/db/exec/collection_scan_common.h +++ b/src/mongo/db/exec/collection_scan_common.h @@ -66,6 +66,9 @@ struct CollectionScanParams { // Do we want the scan to be 'tailable'? Only meaningful if the collection is capped. bool tailable = false; + // Should we assert that the specified minTS has not fallen off the oplog? + bool assertMinTsHasNotFallenOffOplog = false; + // Should we keep track of the timestamp of the latest oplog entry we've seen? This information // is needed to merge cursors from the oplog in order of operation time when reading the oplog // across a sharded cluster. |