summaryrefslogtreecommitdiff
path: root/src/mongo/db/exec
diff options
context:
space:
mode:
authorLouis Williams <louis.williams@mongodb.com>2021-02-23 16:03:49 -0500
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-02-24 00:42:55 +0000
commitdd395723139d3d04cb05c66136a670102be210f1 (patch)
treefc64859029e3884270f522e2ae29f24be5b512c3 /src/mongo/db/exec
parent24a71228f4bf3ca9051b72f0777bae2286e7182a (diff)
downloadmongo-dd395723139d3d04cb05c66136a670102be210f1.tar.gz
SERVER-54008 Generalize CollectionScan to perform queries over RecordId
ranges
Diffstat (limited to 'src/mongo/db/exec')
-rw-r--r--src/mongo/db/exec/collection_scan.cpp102
-rw-r--r--src/mongo/db/exec/collection_scan.h9
-rw-r--r--src/mongo/db/exec/collection_scan_common.h31
-rw-r--r--src/mongo/db/exec/plan_stats.h11
4 files changed, 83 insertions, 70 deletions
diff --git a/src/mongo/db/exec/collection_scan.cpp b/src/mongo/db/exec/collection_scan.cpp
index 237da29c8a2..7b63afcbd3d 100644
--- a/src/mongo/db/exec/collection_scan.cpp
+++ b/src/mongo/db/exec/collection_scan.cpp
@@ -42,12 +42,9 @@
#include "mongo/db/exec/working_set.h"
#include "mongo/db/exec/working_set_common.h"
#include "mongo/db/repl/optime.h"
-#include "mongo/db/storage/oplog_hack.h"
#include "mongo/logv2/log.h"
#include "mongo/util/fail_point.h"
-#include "mongo/db/client.h" // XXX-ERH
-
namespace mongo {
using std::unique_ptr;
@@ -67,23 +64,29 @@ CollectionScan::CollectionScan(ExpressionContext* expCtx,
_params(params) {
// Explain reports the direction of the collection scan.
_specificStats.direction = params.direction;
- _specificStats.minTs = params.minTs;
- _specificStats.maxTs = params.maxTs;
+ _specificStats.minRecord = params.minRecord;
+ _specificStats.maxRecord = params.maxRecord;
_specificStats.tailable = params.tailable;
- if (params.minTs || params.maxTs) {
- // The 'minTs' and 'maxTs' parameters are used for a special optimization that
- // applies only to forwards scans of the oplog.
- invariant(params.direction == CollectionScanParams::FORWARD);
- invariant(collection->ns().isOplog());
+ if (params.minRecord || params.maxRecord) {
+ // The 'minRecord' and 'maxRecord' parameters are used for a special optimization that
+ // applies only to forwards scans of the oplog and scans on collections clustered by _id.
invariant(!params.resumeAfterRecordId);
+ if (collection->ns().isOplog()) {
+ invariant(params.direction == CollectionScanParams::FORWARD);
+ } else {
+ invariant(collection->isClustered());
+ }
}
+ LOGV2_DEBUG(5400802,
+ 5,
+ "collection scan bounds",
+ "min"_attr = (!_params.minRecord) ? "none" : _params.minRecord->toString(),
+ "max"_attr = (!_params.maxRecord) ? "none" : _params.maxRecord->toString());
invariant(!_params.shouldTrackLatestOplogTimestamp || collection->ns().isOplog());
- // We should never see 'assertMinTsHasNotFallenOffOplog' if 'minTS' is not present. This can be
- // incorrectly requested by the user, but in that case we should already have uasserted by now.
- if (params.assertMinTsHasNotFallenOffOplog) {
+ if (params.assertTsHasNotFallenOffOplog) {
invariant(params.shouldTrackLatestOplogTimestamp);
- invariant(params.minTs);
+ invariant(params.direction == CollectionScanParams::FORWARD);
}
if (params.resumeAfterRecordId) {
@@ -91,13 +94,6 @@ CollectionScan::CollectionScan(ExpressionContext* expCtx,
// only support in the forward direction.
invariant(params.direction == CollectionScanParams::FORWARD);
}
-
- // Set early stop condition.
- if (params.maxTs) {
- _endConditionBSON = BSON("$gte"_sd << *(params.maxTs));
- _endCondition = std::make_unique<GTEMatchExpression>(repl::OpTime::kTimestampFieldName,
- _endConditionBSON.firstElement());
- }
}
PlanStage::StageState CollectionScan::doWork(WorkingSetID* out) {
@@ -169,17 +165,16 @@ PlanStage::StageState CollectionScan::doWork(WorkingSetID* out) {
return PlanStage::NEED_TIME;
}
- if (_lastSeenId.isNull() && _params.minTs) {
- // See if the RecordStore supports the oplogStartHack.
- StatusWith<RecordId> goal = oploghack::keyForOptime(*_params.minTs);
- if (goal.isOK()) {
- boost::optional<RecordId> startLoc =
- collection()->getRecordStore()->oplogStartHack(opCtx(), goal.getValue());
- if (startLoc && !startLoc->isNull()) {
- LOGV2_DEBUG(20584, 3, "Using direct oplog seek");
- record = _cursor->seekExact(*startLoc);
- }
- }
+ if (_lastSeenId.isNull() && _params.direction == CollectionScanParams::FORWARD &&
+ _params.minRecord) {
+ // Seek to the approximate start location.
+ record = _cursor->seekNear(*_params.minRecord);
+ }
+
+ if (_lastSeenId.isNull() && _params.direction == CollectionScanParams::BACKWARD &&
+ _params.maxRecord) {
+ // Seek to the approximate start location (at the end).
+ record = _cursor->seekNear(*_params.maxRecord);
}
if (!record) {
@@ -205,8 +200,8 @@ PlanStage::StageState CollectionScan::doWork(WorkingSetID* out) {
}
_lastSeenId = record->id;
- if (_params.assertMinTsHasNotFallenOffOplog) {
- assertMinTsHasNotFallenOffOplog(*record);
+ if (_params.assertTsHasNotFallenOffOplog) {
+ assertTsHasNotFallenOffOplog(*record);
}
if (_params.shouldTrackLatestOplogTimestamp) {
setLatestOplogEntryTimestamp(*record);
@@ -231,37 +226,54 @@ void CollectionScan::setLatestOplogEntryTimestamp(const Record& record) {
_latestOplogEntryTimestamp = std::max(_latestOplogEntryTimestamp, tsElem.timestamp());
}
-void CollectionScan::assertMinTsHasNotFallenOffOplog(const Record& record) {
+void CollectionScan::assertTsHasNotFallenOffOplog(const Record& record) {
// If the first entry we see in the oplog is the replset initialization, then it doesn't matter
- // if its timestamp is later than the specified minTs; no events earlier than the minTs can have
- // fallen off this oplog. Otherwise, verify that the timestamp of the first observed oplog entry
- // is earlier than or equal to the minTs time.
+ // if its timestamp is later than the timestamp that should not have fallen off the oplog; no
+ // events earlier can have fallen off this oplog. Otherwise, verify that the timestamp of the
+ // first observed oplog entry is earlier than or equal to timestamp that should not have fallen
+ // off the oplog.
auto oplogEntry = invariantStatusOK(repl::OplogEntry::parse(record.data.toBson()));
invariant(_specificStats.docsTested == 0);
const bool isNewRS =
oplogEntry.getObject().binaryEqual(BSON("msg" << repl::kInitiatingSetMsg)) &&
oplogEntry.getOpType() == repl::OpTypeEnum::kNoop;
uassert(ErrorCodes::OplogQueryMinTsMissing,
- "Specified minTs has already fallen off the oplog",
- isNewRS || oplogEntry.getTimestamp() <= *_params.minTs);
+ "Specified timestamp has already fallen off the oplog",
+ isNewRS || oplogEntry.getTimestamp() <= *_params.assertTsHasNotFallenOffOplog);
// We don't need to check this assertion again after we've confirmed the first oplog event.
- _params.assertMinTsHasNotFallenOffOplog = false;
+ _params.assertTsHasNotFallenOffOplog = boost::none;
+}
+
+namespace {
+bool atEndOfRangeInclusive(const CollectionScanParams& params, const WorkingSetMember& member) {
+ if (params.direction == CollectionScanParams::FORWARD) {
+ return params.maxRecord && member.recordId > *params.maxRecord;
+ } else {
+ return params.minRecord && member.recordId < *params.minRecord;
+ }
}
+} // namespace
PlanStage::StageState CollectionScan::returnIfMatches(WorkingSetMember* member,
WorkingSetID memberID,
WorkingSetID* out) {
++_specificStats.docsTested;
+
+ // The 'minRecord' and 'maxRecord' bounds are always inclusive, even if the query predicate is
+ // an exclusive inequality like $gt or $lt. In such cases, we rely on '_filter' to either
+ // exclude or include the endpoints as required by the user's query.
+ if (atEndOfRangeInclusive(_params, *member)) {
+ _workingSet->free(memberID);
+ _commonStats.isEOF = true;
+ return PlanStage::IS_EOF;
+ }
+
if (Filter::passes(member, _filter)) {
if (_params.stopApplyingFilterAfterFirstMatch) {
_filter = nullptr;
}
*out = memberID;
return PlanStage::ADVANCED;
- } else if (_endCondition && Filter::passes(member, _endCondition.get())) {
- _workingSet->free(memberID);
- _commonStats.isEOF = true;
- return PlanStage::IS_EOF;
} else {
_workingSet->free(memberID);
return PlanStage::NEED_TIME;
diff --git a/src/mongo/db/exec/collection_scan.h b/src/mongo/db/exec/collection_scan.h
index b8030e367be..8fdbb70a1e2 100644
--- a/src/mongo/db/exec/collection_scan.h
+++ b/src/mongo/db/exec/collection_scan.h
@@ -114,9 +114,9 @@ private:
void setLatestOplogEntryTimestamp(const Record& record);
/**
- * Asserts that the 'minTs' specified in the query filter has not already fallen off the oplog.
+ * Asserts that the minimum timestamp in the query filter has not already fallen off the oplog.
*/
- void assertMinTsHasNotFallenOffOplog(const Record& record);
+ void assertTsHasNotFallenOffOplog(const Record& record);
// WorkingSet is not owned by us.
WorkingSet* _workingSet;
@@ -124,11 +124,6 @@ private:
// The filter is not owned by us.
const MatchExpression* _filter;
- // If a document does not pass '_filter' but passes '_endCondition', stop scanning and return
- // IS_EOF.
- BSONObj _endConditionBSON;
- std::unique_ptr<GTEMatchExpression> _endCondition;
-
std::unique_ptr<SeekableRecordCursor> _cursor;
CollectionScanParams _params;
diff --git a/src/mongo/db/exec/collection_scan_common.h b/src/mongo/db/exec/collection_scan_common.h
index fb847553b36..39254279e09 100644
--- a/src/mongo/db/exec/collection_scan_common.h
+++ b/src/mongo/db/exec/collection_scan_common.h
@@ -40,23 +40,30 @@ struct CollectionScanParams {
BACKWARD = -1,
};
- // If present, the collection scan will seek directly to the RecordId of an oplog entry as
- // close to 'minTs' as possible without going higher. Must only be set on forward oplog scans.
- // This field cannot be used in conjunction with 'resumeAfterRecordId'.
- boost::optional<Timestamp> minTs;
+ // If present, this parameter sets the start point of a forward scan or the end point of a
+ // reverse scan. A forward scan will start scanning at the document with the lowest RecordId
+ // greater than or equal to minRecord. A reverse scan will stop and return EOF on the first
+ // document with a RecordId less than minRecord, or a higher record if none exists. May only
+ // be used for scans on collections clustered by _id and forward oplog scans. If exclusive
+ // bounds are required, a MatchExpression must be passed to the CollectionScan stage. This field
+ // cannot be used in conjunction with 'resumeAfterRecordId'
+ boost::optional<RecordId> minRecord;
- // If present, the collection scan will stop and return EOF the first time it sees a document
- // that does not pass the filter and has 'ts' greater than 'maxTs'. Must only be set on forward
- // oplog scans.
- // This field cannot be used in conjunction with 'resumeAfterRecordId'.
- boost::optional<Timestamp> maxTs;
+ // If present, this parameter sets the start point of a reverse scan or the end point of a
+ // forward scan. A forward scan will stop and return EOF on the first document with a RecordId
+ // greater than maxRecord. A reverse scan will start scanning at the document with the
+ // highest RecordId less than or equal to maxRecord, or a lower record if none exists. May
+ // only be used for scans on collections clustered by _id and forward oplog scans. If exclusive
+ // bounds are required, a MatchExpression must be passed to the CollectionScan stage. This field
+ // cannot be used in conjunction with 'resumeAfterRecordId'.
+ boost::optional<RecordId> maxRecord;
// If true, the collection scan will return a token that can be used to resume the scan.
bool requestResumeToken = false;
// If present, the collection scan will seek to the exact RecordId, or return KeyNotFound if it
// does not exist. Must only be set on forward collection scans.
- // This field cannot be used in conjunction with 'minTs' or 'maxTs'.
+ // This field cannot be used in conjunction with 'minRecord' or 'maxRecord'.
boost::optional<RecordId> resumeAfterRecordId;
Direction direction = FORWARD;
@@ -64,8 +71,8 @@ struct CollectionScanParams {
// Do we want the scan to be 'tailable'? Only meaningful if the collection is capped.
bool tailable = false;
- // Should we assert that the specified minTS has not fallen off the oplog?
- bool assertMinTsHasNotFallenOffOplog = false;
+ // Assert that the specified timestamp has not fallen off the oplog on a forward scan.
+ boost::optional<Timestamp> assertTsHasNotFallenOffOplog = boost::none;
// Should we keep track of the timestamp of the latest oplog entry we've seen? This information
// is needed to merge cursors from the oplog in order of operation time when reading the oplog
diff --git a/src/mongo/db/exec/plan_stats.h b/src/mongo/db/exec/plan_stats.h
index a40685f03aa..aad7e562913 100644
--- a/src/mongo/db/exec/plan_stats.h
+++ b/src/mongo/db/exec/plan_stats.h
@@ -38,6 +38,7 @@
#include "mongo/db/jsobj.h"
#include "mongo/db/query/plan_summary_stats.h"
#include "mongo/db/query/stage_types.h"
+#include "mongo/db/record_id.h"
#include "mongo/util/container_size_helper.h"
#include "mongo/util/time_support.h"
@@ -246,13 +247,11 @@ struct CollectionScanStats : public SpecificStats {
bool tailable{false};
- // The start location of the scan. Must only be set on forward oplog scans.
- boost::optional<Timestamp> minTs;
+ // The start location of a forward scan and end location for a reverse scan.
+ boost::optional<RecordId> minRecord;
- // Indicates that the collection scan will stop and return EOF the first time it sees a
- // document that does not pass the filter and has a "ts" Timestamp field greater than 'maxTs'.
- // Must only be set on forward oplog scans.
- boost::optional<Timestamp> maxTs;
+ // The end location of a reverse scan and start location for a forward scan.
+ boost::optional<RecordId> maxRecord;
};
struct CountStats : public SpecificStats {