summaryrefslogtreecommitdiff
path: root/src/mongo/db/exec
diff options
context:
space:
mode:
authorVesselina Ratcheva <vesselina.ratcheva@mongodb.com>2019-11-04 22:41:12 +0000
committerevergreen <evergreen@mongodb.com>2019-11-04 22:41:12 +0000
commit0a817c811bc6adc3e35ec99da3eb83597462a13f (patch)
tree3dea741f1620293d07e56d245b155ccb84ca6747 /src/mongo/db/exec
parent081a51065ac9b855a0d9e07d88701910b547ad96 (diff)
downloadmongo-0a817c811bc6adc3e35ec99da3eb83597462a13f.tar.gz
SERVER-43269 Implement resumeAfterRecordId in CollectionScan and CollectionScanParams
Diffstat (limited to 'src/mongo/db/exec')
-rw-r--r--src/mongo/db/exec/collection_scan.cpp33
-rw-r--r--src/mongo/db/exec/collection_scan.h4
-rw-r--r--src/mongo/db/exec/collection_scan_common.h7
3 files changed, 42 insertions, 2 deletions
diff --git a/src/mongo/db/exec/collection_scan.cpp b/src/mongo/db/exec/collection_scan.cpp
index f53cf16e4af..66711faadf6 100644
--- a/src/mongo/db/exec/collection_scan.cpp
+++ b/src/mongo/db/exec/collection_scan.cpp
@@ -75,9 +75,16 @@ CollectionScan::CollectionScan(OperationContext* opCtx,
// applies only to forwards scans of the oplog.
invariant(params.direction == CollectionScanParams::FORWARD);
invariant(collection->ns().isOplog());
+ invariant(!params.resumeAfterRecordId);
}
invariant(!_params.shouldTrackLatestOplogTimestamp || collection->ns().isOplog());
+ if (params.resumeAfterRecordId) {
+ // The 'resumeAfterRecordId' parameter is used for resumable collection scans, which we
+ // only support in the forward direction.
+ invariant(params.direction == CollectionScanParams::FORWARD);
+ }
+
// Set early stop condition.
if (params.maxTs) {
_endConditionBSON = BSON("$gte"_sd << *(params.maxTs));
@@ -122,8 +129,9 @@ PlanStage::StageState CollectionScan::doWork(WorkingSetID* out) {
// want to signal an error rather than silently dropping data from the stream.
//
// Note that we want to return the record *after* this one since we have already
- // returned this one. This is only possible in the tailing case because that is the
- // only time we'd need to create a cursor after already getting a record out of it.
+ // returned this one. This is possible in the tailing case. Notably, tailing is the
+ // only time we'd need to create a cursor after already getting a record out of it
+ // and updating our _lastSeenId.
if (!_cursor->seekExact(_lastSeenId)) {
Status status(ErrorCodes::CappedPositionLost,
str::stream() << "CollectionScan died due to failure to restore "
@@ -134,6 +142,27 @@ PlanStage::StageState CollectionScan::doWork(WorkingSetID* out) {
}
}
+ if (_params.resumeAfterRecordId) {
+ invariant(!_params.tailable);
+ invariant(_lastSeenId.isNull());
+ // Seek to where we are trying to resume the scan from. Signal a KeyNotFound error
+ // if the record no longer exists.
+ //
+ // Note that we want to return the record *after* this one since we have already
+ // returned this one prior to the resume.
+ auto recordIdToSeek = *_params.resumeAfterRecordId;
+ if (!_cursor->seekExact(recordIdToSeek)) {
+ Status status(
+ ErrorCodes::KeyNotFound,
+ str::stream()
+ << "Failed to resume collection scan: the recordId from which we are "
+ << "attempting to resume no longer exists in the collection. "
+ << "recordId: " << recordIdToSeek);
+ *out = WorkingSetCommon::allocateStatusMember(_workingSet, status);
+ return PlanStage::FAILURE;
+ }
+ }
+
return PlanStage::NEED_TIME;
}
diff --git a/src/mongo/db/exec/collection_scan.h b/src/mongo/db/exec/collection_scan.h
index ee50293a5c9..c8b5a072851 100644
--- a/src/mongo/db/exec/collection_scan.h
+++ b/src/mongo/db/exec/collection_scan.h
@@ -73,6 +73,10 @@ public:
return _latestOplogEntryTimestamp;
}
+ RecordId getLastSeenRecordId() const {
+ return _lastSeenId;
+ }
+
std::unique_ptr<PlanStageStats> getStats() final;
const SpecificStats* getSpecificStats() const final;
diff --git a/src/mongo/db/exec/collection_scan_common.h b/src/mongo/db/exec/collection_scan_common.h
index 55307ba70c8..55e7508f8cd 100644
--- a/src/mongo/db/exec/collection_scan_common.h
+++ b/src/mongo/db/exec/collection_scan_common.h
@@ -44,13 +44,20 @@ struct CollectionScanParams {
// If present, the collection scan will seek directly to the RecordId of an oplog entry as
// close to 'minTs' as possible without going higher. Must only be set on forward oplog scans.
+ // This field cannot be used in conjunction with 'resumeAfterRecordId'.
boost::optional<Timestamp> minTs;
// If present, the collection scan will stop and return EOF the first time it sees a document
// that does not pass the filter and has 'ts' greater than 'maxTs'. Must only be set on forward
// oplog scans.
+ // This field cannot be used in conjunction with 'resumeAfterRecordId'.
boost::optional<Timestamp> maxTs;
+ // If present, the collection scan will seek to the exact RecordId, or return KeyNotFound if it
+ // does not exist. Must only be set on forward collection scans.
+ // This field cannot be used in conjunction with 'minTs' or 'maxTs'.
+ boost::optional<RecordId> resumeAfterRecordId;
+
Direction direction = FORWARD;
// Do we want the scan to be 'tailable'? Only meaningful if the collection is capped.