diff options
author | Vesselina Ratcheva <vesselina.ratcheva@mongodb.com> | 2019-11-04 22:41:12 +0000 |
---|---|---|
committer | evergreen <evergreen@mongodb.com> | 2019-11-04 22:41:12 +0000 |
commit | 0a817c811bc6adc3e35ec99da3eb83597462a13f (patch) | |
tree | 3dea741f1620293d07e56d245b155ccb84ca6747 /src/mongo/db/exec | |
parent | 081a51065ac9b855a0d9e07d88701910b547ad96 (diff) | |
download | mongo-0a817c811bc6adc3e35ec99da3eb83597462a13f.tar.gz |
SERVER-43269 Implement resumeAfterRecordId in CollectionScan and CollectionScanParams
Diffstat (limited to 'src/mongo/db/exec')
-rw-r--r-- | src/mongo/db/exec/collection_scan.cpp | 33 | ||||
-rw-r--r-- | src/mongo/db/exec/collection_scan.h | 4 | ||||
-rw-r--r-- | src/mongo/db/exec/collection_scan_common.h | 7 |
3 files changed, 42 insertions, 2 deletions
diff --git a/src/mongo/db/exec/collection_scan.cpp b/src/mongo/db/exec/collection_scan.cpp index f53cf16e4af..66711faadf6 100644 --- a/src/mongo/db/exec/collection_scan.cpp +++ b/src/mongo/db/exec/collection_scan.cpp @@ -75,9 +75,16 @@ CollectionScan::CollectionScan(OperationContext* opCtx, // applies only to forwards scans of the oplog. invariant(params.direction == CollectionScanParams::FORWARD); invariant(collection->ns().isOplog()); + invariant(!params.resumeAfterRecordId); } invariant(!_params.shouldTrackLatestOplogTimestamp || collection->ns().isOplog()); + if (params.resumeAfterRecordId) { + // The 'resumeAfterRecordId' parameter is used for resumable collection scans, which we + // only support in the forward direction. + invariant(params.direction == CollectionScanParams::FORWARD); + } + // Set early stop condition. if (params.maxTs) { _endConditionBSON = BSON("$gte"_sd << *(params.maxTs)); @@ -122,8 +129,9 @@ PlanStage::StageState CollectionScan::doWork(WorkingSetID* out) { // want to signal an error rather than silently dropping data from the stream. // // Note that we want to return the record *after* this one since we have already - // returned this one. This is only possible in the tailing case because that is the - // only time we'd need to create a cursor after already getting a record out of it. + // returned this one. This is possible in the tailing case. Notably, tailing is the + // only time we'd need to create a cursor after already getting a record out of it + // and updating our _lastSeenId. if (!_cursor->seekExact(_lastSeenId)) { Status status(ErrorCodes::CappedPositionLost, str::stream() << "CollectionScan died due to failure to restore " @@ -134,6 +142,27 @@ PlanStage::StageState CollectionScan::doWork(WorkingSetID* out) { } } + if (_params.resumeAfterRecordId) { + invariant(!_params.tailable); + invariant(_lastSeenId.isNull()); + // Seek to where we are trying to resume the scan from. Signal a KeyNotFound error + // if the record no longer exists. + // + // Note that we want to return the record *after* this one since we have already + // returned this one prior to the resume. + auto recordIdToSeek = *_params.resumeAfterRecordId; + if (!_cursor->seekExact(recordIdToSeek)) { + Status status( + ErrorCodes::KeyNotFound, + str::stream() + << "Failed to resume collection scan: the recordId from which we are " + << "attempting to resume no longer exists in the collection. " + << "recordId: " << recordIdToSeek); + *out = WorkingSetCommon::allocateStatusMember(_workingSet, status); + return PlanStage::FAILURE; + } + } + return PlanStage::NEED_TIME; } diff --git a/src/mongo/db/exec/collection_scan.h b/src/mongo/db/exec/collection_scan.h index ee50293a5c9..c8b5a072851 100644 --- a/src/mongo/db/exec/collection_scan.h +++ b/src/mongo/db/exec/collection_scan.h @@ -73,6 +73,10 @@ public: return _latestOplogEntryTimestamp; } + RecordId getLastSeenRecordId() const { + return _lastSeenId; + } + std::unique_ptr<PlanStageStats> getStats() final; const SpecificStats* getSpecificStats() const final; diff --git a/src/mongo/db/exec/collection_scan_common.h b/src/mongo/db/exec/collection_scan_common.h index 55307ba70c8..55e7508f8cd 100644 --- a/src/mongo/db/exec/collection_scan_common.h +++ b/src/mongo/db/exec/collection_scan_common.h @@ -44,13 +44,20 @@ struct CollectionScanParams { // If present, the collection scan will seek directly to the RecordId of an oplog entry as // close to 'minTs' as possible without going higher. Must only be set on forward oplog scans. + // This field cannot be used in conjunction with 'resumeAfterRecordId'. boost::optional<Timestamp> minTs; // If present, the collection scan will stop and return EOF the first time it sees a document // that does not pass the filter and has 'ts' greater than 'maxTs'. Must only be set on forward // oplog scans. + // This field cannot be used in conjunction with 'resumeAfterRecordId'. boost::optional<Timestamp> maxTs; + // If present, the collection scan will seek to the exact RecordId, or return KeyNotFound if it + // does not exist. Must only be set on forward collection scans. + // This field cannot be used in conjunction with 'minTs' or 'maxTs'. + boost::optional<RecordId> resumeAfterRecordId; + Direction direction = FORWARD; // Do we want the scan to be 'tailable'? Only meaningful if the collection is capped. |