summaryrefslogtreecommitdiff
path: root/src/mongo/db
diff options
context:
space:
mode:
authorLouis Williams <louis.williams@mongodb.com>2021-02-23 16:03:49 -0500
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-02-24 00:42:55 +0000
commitdd395723139d3d04cb05c66136a670102be210f1 (patch)
treefc64859029e3884270f522e2ae29f24be5b512c3 /src/mongo/db
parent24a71228f4bf3ca9051b72f0777bae2286e7182a (diff)
downloadmongo-dd395723139d3d04cb05c66136a670102be210f1.tar.gz
SERVER-54008 Generalize CollectionScan to perform queries over RecordId
ranges
Diffstat (limited to 'src/mongo/db')
-rw-r--r--src/mongo/db/SConscript11
-rw-r--r--src/mongo/db/exec/collection_scan.cpp102
-rw-r--r--src/mongo/db/exec/collection_scan.h9
-rw-r--r--src/mongo/db/exec/collection_scan_common.h31
-rw-r--r--src/mongo/db/exec/plan_stats.h11
-rw-r--r--src/mongo/db/pipeline/document_source_check_resume_token_test.cpp12
-rw-r--r--src/mongo/db/query/SConscript1
-rw-r--r--src/mongo/db/query/classic_stage_builder.cpp8
-rw-r--r--src/mongo/db/query/plan_explainer_impl.cpp16
-rw-r--r--src/mongo/db/query/plan_explainer_sbe.cpp14
-rw-r--r--src/mongo/db/query/planner_access.cpp34
-rw-r--r--src/mongo/db/query/query_solution.cpp2
-rw-r--r--src/mongo/db/query/query_solution.h21
-rw-r--r--src/mongo/db/query/sbe_stage_builder_coll_scan.cpp62
-rw-r--r--src/mongo/db/record_id.h8
-rw-r--r--src/mongo/db/record_id_helpers.cpp (renamed from src/mongo/db/storage/oplog_hack.cpp)6
-rw-r--r--src/mongo/db/record_id_helpers.h (renamed from src/mongo/db/storage/oplog_hack.h)4
-rw-r--r--src/mongo/db/record_id_test.cpp12
-rw-r--r--src/mongo/db/repl/oplog.cpp4
-rw-r--r--src/mongo/db/storage/SConscript10
-rw-r--r--src/mongo/db/storage/devnull/SConscript2
-rw-r--r--src/mongo/db/storage/devnull/devnull_kv_engine.cpp3
-rw-r--r--src/mongo/db/storage/devnull/ephemeral_catalog_record_store.cpp39
-rw-r--r--src/mongo/db/storage/devnull/ephemeral_catalog_record_store.h3
-rw-r--r--src/mongo/db/storage/ephemeral_for_test/SConscript2
-rw-r--r--src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_kv_engine.cpp3
-rw-r--r--src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_record_store.cpp121
-rw-r--r--src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_record_store.h5
-rw-r--r--src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_recovery_unit.cpp4
-rw-r--r--src/mongo/db/storage/record_store.h26
-rw-r--r--src/mongo/db/storage/record_store_test_harness.cpp65
-rw-r--r--src/mongo/db/storage/record_store_test_oplog.cpp166
-rw-r--r--src/mongo/db/storage/wiredtiger/SConscript2
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp106
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h5
35 files changed, 630 insertions, 300 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript
index 174e8d70de4..da599c0ce21 100644
--- a/src/mongo/db/SConscript
+++ b/src/mongo/db/SConscript
@@ -1136,6 +1136,16 @@ env.Library(
)
env.Library(
+ target='record_id_helpers',
+ source=[
+ 'record_id_helpers.cpp',
+ ],
+ LIBDEPS=[
+ '$BUILD_DIR/mongo/base',
+ ],
+)
+
+env.Library(
target='query_exec',
source=[
'clientcursor.cpp',
@@ -1268,7 +1278,6 @@ env.Library(
's/sharding_api_d',
'shared_request_handling',
'stats/serveronly_stats',
- 'storage/oplog_hack',
'storage/remove_saver',
'storage/storage_options',
'update/update_driver',
diff --git a/src/mongo/db/exec/collection_scan.cpp b/src/mongo/db/exec/collection_scan.cpp
index 237da29c8a2..7b63afcbd3d 100644
--- a/src/mongo/db/exec/collection_scan.cpp
+++ b/src/mongo/db/exec/collection_scan.cpp
@@ -42,12 +42,9 @@
#include "mongo/db/exec/working_set.h"
#include "mongo/db/exec/working_set_common.h"
#include "mongo/db/repl/optime.h"
-#include "mongo/db/storage/oplog_hack.h"
#include "mongo/logv2/log.h"
#include "mongo/util/fail_point.h"
-#include "mongo/db/client.h" // XXX-ERH
-
namespace mongo {
using std::unique_ptr;
@@ -67,23 +64,29 @@ CollectionScan::CollectionScan(ExpressionContext* expCtx,
_params(params) {
// Explain reports the direction of the collection scan.
_specificStats.direction = params.direction;
- _specificStats.minTs = params.minTs;
- _specificStats.maxTs = params.maxTs;
+ _specificStats.minRecord = params.minRecord;
+ _specificStats.maxRecord = params.maxRecord;
_specificStats.tailable = params.tailable;
- if (params.minTs || params.maxTs) {
- // The 'minTs' and 'maxTs' parameters are used for a special optimization that
- // applies only to forwards scans of the oplog.
- invariant(params.direction == CollectionScanParams::FORWARD);
- invariant(collection->ns().isOplog());
+ if (params.minRecord || params.maxRecord) {
+ // The 'minRecord' and 'maxRecord' parameters are used for a special optimization that
+ // applies only to forwards scans of the oplog and scans on collections clustered by _id.
invariant(!params.resumeAfterRecordId);
+ if (collection->ns().isOplog()) {
+ invariant(params.direction == CollectionScanParams::FORWARD);
+ } else {
+ invariant(collection->isClustered());
+ }
}
+ LOGV2_DEBUG(5400802,
+ 5,
+ "collection scan bounds",
+ "min"_attr = (!_params.minRecord) ? "none" : _params.minRecord->toString(),
+ "max"_attr = (!_params.maxRecord) ? "none" : _params.maxRecord->toString());
invariant(!_params.shouldTrackLatestOplogTimestamp || collection->ns().isOplog());
- // We should never see 'assertMinTsHasNotFallenOffOplog' if 'minTS' is not present. This can be
- // incorrectly requested by the user, but in that case we should already have uasserted by now.
- if (params.assertMinTsHasNotFallenOffOplog) {
+ if (params.assertTsHasNotFallenOffOplog) {
invariant(params.shouldTrackLatestOplogTimestamp);
- invariant(params.minTs);
+ invariant(params.direction == CollectionScanParams::FORWARD);
}
if (params.resumeAfterRecordId) {
@@ -91,13 +94,6 @@ CollectionScan::CollectionScan(ExpressionContext* expCtx,
// only support in the forward direction.
invariant(params.direction == CollectionScanParams::FORWARD);
}
-
- // Set early stop condition.
- if (params.maxTs) {
- _endConditionBSON = BSON("$gte"_sd << *(params.maxTs));
- _endCondition = std::make_unique<GTEMatchExpression>(repl::OpTime::kTimestampFieldName,
- _endConditionBSON.firstElement());
- }
}
PlanStage::StageState CollectionScan::doWork(WorkingSetID* out) {
@@ -169,17 +165,16 @@ PlanStage::StageState CollectionScan::doWork(WorkingSetID* out) {
return PlanStage::NEED_TIME;
}
- if (_lastSeenId.isNull() && _params.minTs) {
- // See if the RecordStore supports the oplogStartHack.
- StatusWith<RecordId> goal = oploghack::keyForOptime(*_params.minTs);
- if (goal.isOK()) {
- boost::optional<RecordId> startLoc =
- collection()->getRecordStore()->oplogStartHack(opCtx(), goal.getValue());
- if (startLoc && !startLoc->isNull()) {
- LOGV2_DEBUG(20584, 3, "Using direct oplog seek");
- record = _cursor->seekExact(*startLoc);
- }
- }
+ if (_lastSeenId.isNull() && _params.direction == CollectionScanParams::FORWARD &&
+ _params.minRecord) {
+ // Seek to the approximate start location.
+ record = _cursor->seekNear(*_params.minRecord);
+ }
+
+ if (_lastSeenId.isNull() && _params.direction == CollectionScanParams::BACKWARD &&
+ _params.maxRecord) {
+ // Seek to the approximate start location (at the end).
+ record = _cursor->seekNear(*_params.maxRecord);
}
if (!record) {
@@ -205,8 +200,8 @@ PlanStage::StageState CollectionScan::doWork(WorkingSetID* out) {
}
_lastSeenId = record->id;
- if (_params.assertMinTsHasNotFallenOffOplog) {
- assertMinTsHasNotFallenOffOplog(*record);
+ if (_params.assertTsHasNotFallenOffOplog) {
+ assertTsHasNotFallenOffOplog(*record);
}
if (_params.shouldTrackLatestOplogTimestamp) {
setLatestOplogEntryTimestamp(*record);
@@ -231,37 +226,54 @@ void CollectionScan::setLatestOplogEntryTimestamp(const Record& record) {
_latestOplogEntryTimestamp = std::max(_latestOplogEntryTimestamp, tsElem.timestamp());
}
-void CollectionScan::assertMinTsHasNotFallenOffOplog(const Record& record) {
+void CollectionScan::assertTsHasNotFallenOffOplog(const Record& record) {
// If the first entry we see in the oplog is the replset initialization, then it doesn't matter
- // if its timestamp is later than the specified minTs; no events earlier than the minTs can have
- // fallen off this oplog. Otherwise, verify that the timestamp of the first observed oplog entry
- // is earlier than or equal to the minTs time.
+ // if its timestamp is later than the timestamp that should not have fallen off the oplog; no
+ // events earlier can have fallen off this oplog. Otherwise, verify that the timestamp of the
+ // first observed oplog entry is earlier than or equal to timestamp that should not have fallen
+ // off the oplog.
auto oplogEntry = invariantStatusOK(repl::OplogEntry::parse(record.data.toBson()));
invariant(_specificStats.docsTested == 0);
const bool isNewRS =
oplogEntry.getObject().binaryEqual(BSON("msg" << repl::kInitiatingSetMsg)) &&
oplogEntry.getOpType() == repl::OpTypeEnum::kNoop;
uassert(ErrorCodes::OplogQueryMinTsMissing,
- "Specified minTs has already fallen off the oplog",
- isNewRS || oplogEntry.getTimestamp() <= *_params.minTs);
+ "Specified timestamp has already fallen off the oplog",
+ isNewRS || oplogEntry.getTimestamp() <= *_params.assertTsHasNotFallenOffOplog);
// We don't need to check this assertion again after we've confirmed the first oplog event.
- _params.assertMinTsHasNotFallenOffOplog = false;
+ _params.assertTsHasNotFallenOffOplog = boost::none;
+}
+
+namespace {
+bool atEndOfRangeInclusive(const CollectionScanParams& params, const WorkingSetMember& member) {
+ if (params.direction == CollectionScanParams::FORWARD) {
+ return params.maxRecord && member.recordId > *params.maxRecord;
+ } else {
+ return params.minRecord && member.recordId < *params.minRecord;
+ }
}
+} // namespace
PlanStage::StageState CollectionScan::returnIfMatches(WorkingSetMember* member,
WorkingSetID memberID,
WorkingSetID* out) {
++_specificStats.docsTested;
+
+ // The 'minRecord' and 'maxRecord' bounds are always inclusive, even if the query predicate is
+ // an exclusive inequality like $gt or $lt. In such cases, we rely on '_filter' to either
+ // exclude or include the endpoints as required by the user's query.
+ if (atEndOfRangeInclusive(_params, *member)) {
+ _workingSet->free(memberID);
+ _commonStats.isEOF = true;
+ return PlanStage::IS_EOF;
+ }
+
if (Filter::passes(member, _filter)) {
if (_params.stopApplyingFilterAfterFirstMatch) {
_filter = nullptr;
}
*out = memberID;
return PlanStage::ADVANCED;
- } else if (_endCondition && Filter::passes(member, _endCondition.get())) {
- _workingSet->free(memberID);
- _commonStats.isEOF = true;
- return PlanStage::IS_EOF;
} else {
_workingSet->free(memberID);
return PlanStage::NEED_TIME;
diff --git a/src/mongo/db/exec/collection_scan.h b/src/mongo/db/exec/collection_scan.h
index b8030e367be..8fdbb70a1e2 100644
--- a/src/mongo/db/exec/collection_scan.h
+++ b/src/mongo/db/exec/collection_scan.h
@@ -114,9 +114,9 @@ private:
void setLatestOplogEntryTimestamp(const Record& record);
/**
- * Asserts that the 'minTs' specified in the query filter has not already fallen off the oplog.
+ * Asserts that the minimum timestamp in the query filter has not already fallen off the oplog.
*/
- void assertMinTsHasNotFallenOffOplog(const Record& record);
+ void assertTsHasNotFallenOffOplog(const Record& record);
// WorkingSet is not owned by us.
WorkingSet* _workingSet;
@@ -124,11 +124,6 @@ private:
// The filter is not owned by us.
const MatchExpression* _filter;
- // If a document does not pass '_filter' but passes '_endCondition', stop scanning and return
- // IS_EOF.
- BSONObj _endConditionBSON;
- std::unique_ptr<GTEMatchExpression> _endCondition;
-
std::unique_ptr<SeekableRecordCursor> _cursor;
CollectionScanParams _params;
diff --git a/src/mongo/db/exec/collection_scan_common.h b/src/mongo/db/exec/collection_scan_common.h
index fb847553b36..39254279e09 100644
--- a/src/mongo/db/exec/collection_scan_common.h
+++ b/src/mongo/db/exec/collection_scan_common.h
@@ -40,23 +40,30 @@ struct CollectionScanParams {
BACKWARD = -1,
};
- // If present, the collection scan will seek directly to the RecordId of an oplog entry as
- // close to 'minTs' as possible without going higher. Must only be set on forward oplog scans.
- // This field cannot be used in conjunction with 'resumeAfterRecordId'.
- boost::optional<Timestamp> minTs;
+ // If present, this parameter sets the start point of a forward scan or the end point of a
+ // reverse scan. A forward scan will start scanning at the document with the lowest RecordId
+ // greater than or equal to minRecord. A reverse scan will stop and return EOF on the first
+ // document with a RecordId less than minRecord, or a higher record if none exists. May only
+ // be used for scans on collections clustered by _id and forward oplog scans. If exclusive
+ // bounds are required, a MatchExpression must be passed to the CollectionScan stage. This field
+ // cannot be used in conjunction with 'resumeAfterRecordId'
+ boost::optional<RecordId> minRecord;
- // If present, the collection scan will stop and return EOF the first time it sees a document
- // that does not pass the filter and has 'ts' greater than 'maxTs'. Must only be set on forward
- // oplog scans.
- // This field cannot be used in conjunction with 'resumeAfterRecordId'.
- boost::optional<Timestamp> maxTs;
+ // If present, this parameter sets the start point of a reverse scan or the end point of a
+ // forward scan. A forward scan will stop and return EOF on the first document with a RecordId
+ // greater than maxRecord. A reverse scan will start scanning at the document with the
+ // highest RecordId less than or equal to maxRecord, or a lower record if none exists. May
+ // only be used for scans on collections clustered by _id and forward oplog scans. If exclusive
+ // bounds are required, a MatchExpression must be passed to the CollectionScan stage. This field
+ // cannot be used in conjunction with 'resumeAfterRecordId'.
+ boost::optional<RecordId> maxRecord;
// If true, the collection scan will return a token that can be used to resume the scan.
bool requestResumeToken = false;
// If present, the collection scan will seek to the exact RecordId, or return KeyNotFound if it
// does not exist. Must only be set on forward collection scans.
- // This field cannot be used in conjunction with 'minTs' or 'maxTs'.
+ // This field cannot be used in conjunction with 'minRecord' or 'maxRecord'.
boost::optional<RecordId> resumeAfterRecordId;
Direction direction = FORWARD;
@@ -64,8 +71,8 @@ struct CollectionScanParams {
// Do we want the scan to be 'tailable'? Only meaningful if the collection is capped.
bool tailable = false;
- // Should we assert that the specified minTS has not fallen off the oplog?
- bool assertMinTsHasNotFallenOffOplog = false;
+ // Assert that the specified timestamp has not fallen off the oplog on a forward scan.
+ boost::optional<Timestamp> assertTsHasNotFallenOffOplog = boost::none;
// Should we keep track of the timestamp of the latest oplog entry we've seen? This information
// is needed to merge cursors from the oplog in order of operation time when reading the oplog
diff --git a/src/mongo/db/exec/plan_stats.h b/src/mongo/db/exec/plan_stats.h
index a40685f03aa..aad7e562913 100644
--- a/src/mongo/db/exec/plan_stats.h
+++ b/src/mongo/db/exec/plan_stats.h
@@ -38,6 +38,7 @@
#include "mongo/db/jsobj.h"
#include "mongo/db/query/plan_summary_stats.h"
#include "mongo/db/query/stage_types.h"
+#include "mongo/db/record_id.h"
#include "mongo/util/container_size_helper.h"
#include "mongo/util/time_support.h"
@@ -246,13 +247,11 @@ struct CollectionScanStats : public SpecificStats {
bool tailable{false};
- // The start location of the scan. Must only be set on forward oplog scans.
- boost::optional<Timestamp> minTs;
+ // The start location of a forward scan and end location for a reverse scan.
+ boost::optional<RecordId> minRecord;
- // Indicates that the collection scan will stop and return EOF the first time it sees a
- // document that does not pass the filter and has a "ts" Timestamp field greater than 'maxTs'.
- // Must only be set on forward oplog scans.
- boost::optional<Timestamp> maxTs;
+ // The end location of a reverse scan and start location for a forward scan.
+ boost::optional<RecordId> maxRecord;
};
struct CountStats : public SpecificStats {
diff --git a/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp
index fbea1ea1076..668596500a0 100644
--- a/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp
+++ b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp
@@ -77,6 +77,9 @@ public:
boost::optional<Record> seekExact(const RecordId& id) override {
return Record{};
}
+ boost::optional<Record> seekNear(const RecordId& id) override {
+ return boost::none;
+ }
void save() override {}
bool restore() override {
return true;
@@ -115,7 +118,7 @@ public:
_data.push_back(mutableDoc.freeze().toBson());
Record record;
record.data = {_data.back().objdata(), _data.back().objsize()};
- record.id = RecordId{static_cast<int64_t>(_data.size())};
+ record.id = RecordId{doc["ts"].getTimestamp().asLL()};
_records.push_back(std::move(record));
}
@@ -162,9 +165,9 @@ public:
: DocumentSourceMock({}, expCtx), _collectionPtr(&_collection) {
_filterExpr = BSON("ns" << kTestNs);
_filter = MatchExpressionParser::parseAndNormalize(_filterExpr, pExpCtx);
- _params.assertMinTsHasNotFallenOffOplog = true;
+ _params.assertTsHasNotFallenOffOplog = Timestamp(0);
_params.shouldTrackLatestOplogTimestamp = true;
- _params.minTs = Timestamp(0, 0);
+ _params.minRecord = RecordId(0);
_params.tailable = true;
}
@@ -172,7 +175,8 @@ public:
invariant(!_collScan);
_filterExpr = BSON("ns" << kTestNs << "ts" << BSON("$gte" << resumeToken.clusterTime));
_filter = MatchExpressionParser::parseAndNormalize(_filterExpr, pExpCtx);
- _params.minTs = resumeToken.clusterTime;
+ _params.minRecord = RecordId(resumeToken.clusterTime.asLL());
+ _params.assertTsHasNotFallenOffOplog = resumeToken.clusterTime;
}
void push_back(GetNextResult&& result) {
diff --git a/src/mongo/db/query/SConscript b/src/mongo/db/query/SConscript
index c12394340b0..da30db42262 100644
--- a/src/mongo/db/query/SConscript
+++ b/src/mongo/db/query/SConscript
@@ -70,6 +70,7 @@ env.Library(
"query_knobs",
],
LIBDEPS_PRIVATE=[
+ "$BUILD_DIR/mongo/db/record_id_helpers",
"$BUILD_DIR/mongo/idl/server_parameter",
],
)
diff --git a/src/mongo/db/query/classic_stage_builder.cpp b/src/mongo/db/query/classic_stage_builder.cpp
index 5a5d2f311dd..3d52f8b3a9f 100644
--- a/src/mongo/db/query/classic_stage_builder.cpp
+++ b/src/mongo/db/query/classic_stage_builder.cpp
@@ -62,8 +62,8 @@
#include "mongo/db/exec/text.h"
#include "mongo/db/index/fts_access_method.h"
#include "mongo/db/matcher/extensions_callback_real.h"
+#include "mongo/db/record_id_helpers.h"
#include "mongo/db/s/collection_sharding_state.h"
-#include "mongo/db/storage/oplog_hack.h"
#include "mongo/logv2/log.h"
namespace mongo::stage_builder {
@@ -78,12 +78,12 @@ std::unique_ptr<PlanStage> ClassicStageBuilder::build(const QuerySolutionNode* r
CollectionScanParams params;
params.tailable = csn->tailable;
params.shouldTrackLatestOplogTimestamp = csn->shouldTrackLatestOplogTimestamp;
- params.assertMinTsHasNotFallenOffOplog = csn->assertMinTsHasNotFallenOffOplog;
+ params.assertTsHasNotFallenOffOplog = csn->assertTsHasNotFallenOffOplog;
params.direction = (csn->direction == 1) ? CollectionScanParams::FORWARD
: CollectionScanParams::BACKWARD;
params.shouldWaitForOplogVisibility = csn->shouldWaitForOplogVisibility;
- params.minTs = csn->minTs;
- params.maxTs = csn->maxTs;
+ params.minRecord = csn->minRecord;
+ params.maxRecord = csn->maxRecord;
params.requestResumeToken = csn->requestResumeToken;
params.resumeAfterRecordId = csn->resumeAfterRecordId;
params.stopApplyingFilterAfterFirstMatch = csn->stopApplyingFilterAfterFirstMatch;
diff --git a/src/mongo/db/query/plan_explainer_impl.cpp b/src/mongo/db/query/plan_explainer_impl.cpp
index 480d8c663a2..b5ffc341545 100644
--- a/src/mongo/db/query/plan_explainer_impl.cpp
+++ b/src/mongo/db/query/plan_explainer_impl.cpp
@@ -235,11 +235,17 @@ void statsToBSON(const PlanStageStats& stats,
} else if (STAGE_COLLSCAN == stats.stageType) {
CollectionScanStats* spec = static_cast<CollectionScanStats*>(stats.specific.get());
bob->append("direction", spec->direction > 0 ? "forward" : "backward");
- if (spec->minTs) {
- bob->append("minTs", *(spec->minTs));
- }
- if (spec->maxTs) {
- bob->append("maxTs", *(spec->maxTs));
+ if (spec->minRecord) {
+ spec->minRecord->withFormat(
+ [&](RecordId::Null n) { bob->appendNull("minRecord"); },
+ [&](int64_t rid) { bob->append("minRecord", rid); },
+ [&](const char* str, int size) { bob->append("minRecord", OID::from(str)); });
+ }
+ if (spec->maxRecord) {
+ spec->maxRecord->withFormat(
+ [&](RecordId::Null n) { bob->appendNull("maxRecord"); },
+ [&](int64_t rid) { bob->append("maxRecord", rid); },
+ [&](const char* str, int size) { bob->append("maxRecord", OID::from(str)); });
}
if (verbosity >= ExplainOptions::Verbosity::kExecStats) {
bob->appendNumber("docsExamined", static_cast<long long>(spec->docsTested));
diff --git a/src/mongo/db/query/plan_explainer_sbe.cpp b/src/mongo/db/query/plan_explainer_sbe.cpp
index 61106a9674e..0f9df195196 100644
--- a/src/mongo/db/query/plan_explainer_sbe.cpp
+++ b/src/mongo/db/query/plan_explainer_sbe.cpp
@@ -65,11 +65,17 @@ void statsToBSON(const QuerySolutionNode* node,
case STAGE_COLLSCAN: {
auto csn = static_cast<const CollectionScanNode*>(node);
bob->append("direction", csn->direction > 0 ? "forward" : "backward");
- if (csn->minTs) {
- bob->append("minTs", *csn->minTs);
+ if (csn->minRecord) {
+ csn->minRecord->withFormat(
+ [&](RecordId::Null n) { bob->appendNull("minRecord"); },
+ [&](int64_t rid) { bob->append("minRecord", rid); },
+ [&](const char* str, int size) { bob->append("minRecord", OID::from(str)); });
}
- if (csn->maxTs) {
- bob->append("maxTs", *csn->maxTs);
+ if (csn->maxRecord) {
+ csn->maxRecord->withFormat(
+ [&](RecordId::Null n) { bob->appendNull("maxRecord"); },
+ [&](int64_t rid) { bob->append("maxRecord", rid); },
+ [&](const char* str, int size) { bob->append("maxRecord", OID::from(str)); });
}
break;
}
diff --git a/src/mongo/db/query/planner_access.cpp b/src/mongo/db/query/planner_access.cpp
index 194110865fd..56893e7ab11 100644
--- a/src/mongo/db/query/planner_access.cpp
+++ b/src/mongo/db/query/planner_access.cpp
@@ -50,6 +50,7 @@
#include "mongo/db/query/query_knobs_gen.h"
#include "mongo/db/query/query_planner.h"
#include "mongo/db/query/query_planner_common.h"
+#include "mongo/db/record_id_helpers.h"
#include "mongo/logv2/log.h"
#include "mongo/util/transitional_tools_do_not_use/vector_spooling.h"
@@ -217,8 +218,6 @@ std::unique_ptr<QuerySolutionNode> QueryPlannerAccess::makeCollectionScan(
csn->tailable = tailable;
csn->shouldTrackLatestOplogTimestamp =
params.options & QueryPlannerParams::TRACK_LATEST_OPLOG_TS;
- csn->assertMinTsHasNotFallenOffOplog =
- params.options & QueryPlannerParams::ASSERT_MIN_TS_HAS_NOT_FALLEN_OFF_OPLOG;
csn->shouldWaitForOplogVisibility =
params.options & QueryPlannerParams::OPLOG_SCAN_WAIT_FOR_VISIBLE;
@@ -257,11 +256,29 @@ std::unique_ptr<QuerySolutionNode> QueryPlannerAccess::makeCollectionScan(
}
}
+ const bool assertMinTsHasNotFallenOffOplog =
+ params.options & QueryPlannerParams::ASSERT_MIN_TS_HAS_NOT_FALLEN_OFF_OPLOG;
if (query.nss().isOplog() && csn->direction == 1) {
// Optimizes the start and end location parameters for a collection scan for an oplog
// collection. Not compatible with $_resumeAfter so we do not optimize in that case.
if (resumeAfterObj.isEmpty()) {
- std::tie(csn->minTs, csn->maxTs) = extractTsRange(query.root());
+ auto [minTs, maxTs] = extractTsRange(query.root());
+ if (minTs) {
+ StatusWith<RecordId> goal = record_id_helpers::keyForOptime(*minTs);
+ if (goal.isOK()) {
+ csn->minRecord = goal.getValue();
+ }
+
+ if (assertMinTsHasNotFallenOffOplog) {
+ csn->assertTsHasNotFallenOffOplog = *minTs;
+ }
+ }
+ if (maxTs) {
+ StatusWith<RecordId> goal = record_id_helpers::keyForOptime(*maxTs);
+ if (goal.isOK()) {
+ csn->maxRecord = goal.getValue();
+ }
+ }
}
// If the query is just a lower bound on "ts" on a forward scan, every document in the
@@ -275,11 +292,12 @@ std::unique_ptr<QuerySolutionNode> QueryPlannerAccess::makeCollectionScan(
// The user may have requested 'assertMinTsHasNotFallenOffOplog' for a query that does not
// specify a minimum timestamp. This is not a valid request, so we throw InvalidOptions.
- uassert(ErrorCodes::InvalidOptions,
- str::stream() << "assertMinTsHasNotFallenOffOplog cannot be applied to a query "
- "which does not imply a minimum 'ts' value ",
- !(csn->assertMinTsHasNotFallenOffOplog && !csn->minTs));
-
+ if (assertMinTsHasNotFallenOffOplog) {
+ uassert(ErrorCodes::InvalidOptions,
+ str::stream() << "assertTsHasNotFallenOffOplog cannot be applied to a query "
+ "which does not imply a minimum 'ts' value ",
+ csn->assertTsHasNotFallenOffOplog);
+ }
return csn;
}
diff --git a/src/mongo/db/query/query_solution.cpp b/src/mongo/db/query/query_solution.cpp
index 35b16f72588..d86f6c5865f 100644
--- a/src/mongo/db/query/query_solution.cpp
+++ b/src/mongo/db/query/query_solution.cpp
@@ -231,7 +231,7 @@ QuerySolutionNode* CollectionScanNode::clone() const {
copy->tailable = this->tailable;
copy->direction = this->direction;
copy->shouldTrackLatestOplogTimestamp = this->shouldTrackLatestOplogTimestamp;
- copy->assertMinTsHasNotFallenOffOplog = this->assertMinTsHasNotFallenOffOplog;
+ copy->assertTsHasNotFallenOffOplog = this->assertTsHasNotFallenOffOplog;
copy->shouldWaitForOplogVisibility = this->shouldWaitForOplogVisibility;
return copy;
diff --git a/src/mongo/db/query/query_solution.h b/src/mongo/db/query/query_solution.h
index 7e5d6632a74..5a7f6a0db09 100644
--- a/src/mongo/db/query/query_solution.h
+++ b/src/mongo/db/query/query_solution.h
@@ -470,23 +470,20 @@ struct CollectionScanNode : public QuerySolutionNodeWithSortSet {
// Name of the namespace.
std::string name;
- // If present, the collection scan will seek directly to the RecordId of an oplog entry as
- // close to 'minTs' as possible without going higher. Should only be set on forward oplog scans.
- // This field cannot be used in conjunction with 'resumeAfterRecordId'.
- boost::optional<Timestamp> minTs;
+ // If present, this parameter sets the start point of a forward scan or the end point of a
+ // reverse scan.
+ boost::optional<RecordId> minRecord;
- // If present the collection scan will stop and return EOF the first time it sees a document
- // that does not pass the filter and has 'ts' greater than 'maxTs'. Should only be set on
- // forward oplog scans.
- // This field cannot be used in conjunction with 'resumeAfterRecordId'.
- boost::optional<Timestamp> maxTs;
+ // If present, this parameter sets the start point of a reverse scan or the end point of a
+ // forward scan.
+ boost::optional<RecordId> maxRecord;
// If true, the collection scan will return a token that can be used to resume the scan.
bool requestResumeToken = false;
// If present, the collection scan will seek to the exact RecordId, or return KeyNotFound if it
// does not exist. Must only be set on forward collection scans.
- // This field cannot be used in conjunction with 'minTs' or 'maxTs'.
+ // This field cannot be used in conjunction with 'minRecord' or 'maxRecord'.
boost::optional<RecordId> resumeAfterRecordId;
// Should we make a tailable cursor?
@@ -497,8 +494,8 @@ struct CollectionScanNode : public QuerySolutionNodeWithSortSet {
// across a sharded cluster.
bool shouldTrackLatestOplogTimestamp = false;
- // Should we assert that the specified minTS has not fallen off the oplog?
- bool assertMinTsHasNotFallenOffOplog = false;
+ // Assert that the specified timestamp has not fallen off the oplog.
+ boost::optional<Timestamp> assertTsHasNotFallenOffOplog = boost::none;
int direction{1};
diff --git a/src/mongo/db/query/sbe_stage_builder_coll_scan.cpp b/src/mongo/db/query/sbe_stage_builder_coll_scan.cpp
index 252a5be9ccf..f8e8d038c63 100644
--- a/src/mongo/db/query/sbe_stage_builder_coll_scan.cpp
+++ b/src/mongo/db/query/sbe_stage_builder_coll_scan.cpp
@@ -46,7 +46,7 @@
#include "mongo/db/query/sbe_stage_builder_filter.h"
#include "mongo/db/query/sbe_stage_builder_helpers.h"
#include "mongo/db/query/util/make_data_structure.h"
-#include "mongo/db/storage/oplog_hack.h"
+#include "mongo/db/record_id_helpers.h"
#include "mongo/logv2/log.h"
#include "mongo/util/str.h"
@@ -126,16 +126,15 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> generateOptimizedOplo
bool isTailableResumeBranch,
sbe::LockAcquisitionCallback lockAcquisitionCallback) {
invariant(collection->ns().isOplog());
- // The minTs and maxTs optimizations are not compatible with resumeAfterRecordId and can only
- // be done for a forward scan.
+ // The minRecord and maxRecord optimizations are not compatible with resumeAfterRecordId and can
+ // only be done for a forward scan.
invariant(!csn->resumeAfterRecordId);
invariant(csn->direction == CollectionScanParams::FORWARD);
auto resultSlot = slotIdGenerator->generate();
auto recordIdSlot = slotIdGenerator->generate();
- // See if the RecordStore supports the oplogStartHack. If so, the scan will start from the
- // RecordId stored in seekRecordId.
+ // Start the scan from the RecordId stored in seekRecordId.
// Otherwise, if we're building a collection scan for a resume branch of a special union
// sub-tree implementing a tailable cursor scan, we can use the seekRecordIdSlot directly
// to access the recordId to resume the scan from.
@@ -144,25 +143,22 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> generateOptimizedOplo
if (isTailableResumeBranch) {
auto resumeRecordIdSlot = env->getSlot("resumeRecordId"_sd);
return {{}, resumeRecordIdSlot};
- } else if (csn->minTs) {
- auto goal = oploghack::keyForOptime(*csn->minTs);
- if (goal.isOK()) {
- auto startLoc =
- collection->getRecordStore()->oplogStartHack(opCtx, goal.getValue());
- if (startLoc && !startLoc->isNull()) {
- LOGV2_DEBUG(205841, 3, "Using direct oplog seek");
- return {startLoc, slotIdGenerator->generate()};
- }
+ } else if (csn->minRecord) {
+ auto cursor = collection->getRecordStore()->getCursor(opCtx);
+ auto startRec = cursor->seekNear(*csn->minRecord);
+ if (startRec) {
+ LOGV2_DEBUG(205841, 3, "Using direct oplog seek");
+ return {startRec->id, slotIdGenerator->generate()};
}
}
return {};
}();
// Check if we need to project out an oplog 'ts' field as part of the collection scan. We will
- // need it either when 'maxTs' bound has been provided, so that we can apply an EOF filter, of
- // if we need to track the latest oplog timestamp.
+ // need it either when 'maxRecord' bound has been provided, so that we can apply an EOF filter,
+ // of if we need to track the latest oplog timestamp.
const auto shouldTrackLatestOplogTimestamp = !csn->stopApplyingFilterAfterFirstMatch &&
- (csn->maxTs || csn->shouldTrackLatestOplogTimestamp);
+ (csn->maxRecord || csn->shouldTrackLatestOplogTimestamp);
auto&& [fields, slots, tsSlot] = makeOplogTimestampSlotsIfNeeded(
collection, slotIdGenerator, shouldTrackLatestOplogTimestamp);
@@ -179,7 +175,7 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> generateOptimizedOplo
lockAcquisitionCallback,
makeOpenCallbackIfNeeded(collection, csn));
- // Start the scan from the seekRecordId if we can use the oplogStartHack.
+ // Start the scan from the seekRecordId.
if (seekRecordId) {
invariant(seekRecordIdSlot);
@@ -199,14 +195,12 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> generateOptimizedOplo
}
// Create a filter which checks the first document to ensure either that its 'ts' is less than
- // or equal to minTs, or that it is a replica set initialization message. If this fails, then we
- // throw ErrorCodes::OplogQueryMinTsMissing. We avoid doing this check on the resumable branch
- // of a tailable scan; it only needs to be done once, when the initial branch is run.
- if (csn->assertMinTsHasNotFallenOffOplog && !isTailableResumeBranch) {
- // We should never see 'assertMinTsHasNotFallenOffOplog' if 'minTS' is not present. This can
- // be incorrectly requested by the user, but in that case we should already have uasserted.
+ // or equal the minimum timestamp that should not have rolled off the oplog, or that it is a
+ // replica set initialization message. If this fails, then we throw
+ // ErrorCodes::OplogQueryMinTsMissing. We avoid doing this check on the resumable branch of a
+ // tailable scan; it only needs to be done once, when the initial branch is run.
+ if (csn->assertTsHasNotFallenOffOplog && !isTailableResumeBranch) {
invariant(csn->shouldTrackLatestOplogTimestamp);
- invariant(csn->minTs);
// We will be constructing a filter that needs to see the 'ts' field. We name it 'minTsSlot'
// here so that it does not shadow the 'tsSlot' which we allocated earlier.
@@ -261,10 +255,10 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> generateOptimizedOplo
sbe::makeE<sbe::EIf>(
makeBinaryOp(
sbe::EPrimBinary::logicOr,
- makeBinaryOp(
- sbe::EPrimBinary::lessEq,
- makeVariable(*minTsSlot),
- makeConstant(sbe::value::TypeTags::Timestamp, csn->minTs->asULL())),
+ makeBinaryOp(sbe::EPrimBinary::lessEq,
+ makeVariable(*minTsSlot),
+ makeConstant(sbe::value::TypeTags::Timestamp,
+ csn->assertTsHasNotFallenOffOplog->asULL())),
makeBinaryOp(
sbe::EPrimBinary::logicAnd,
makeBinaryOp(
@@ -306,8 +300,8 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> generateOptimizedOplo
// Add an EOF filter to stop the scan after we fetch the first document that has 'ts' greater
// than the upper bound.
- if (csn->maxTs) {
- // The 'maxTs' optimization is not compatible with 'stopApplyingFilterAfterFirstMatch'.
+ if (csn->maxRecord) {
+ // The 'maxRecord' optimization is not compatible with 'stopApplyingFilterAfterFirstMatch'.
invariant(!csn->stopApplyingFilterAfterFirstMatch);
invariant(tsSlot);
@@ -315,7 +309,7 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> generateOptimizedOplo
std::move(stage),
makeBinaryOp(sbe::EPrimBinary::lessEq,
makeVariable(*tsSlot),
- makeConstant(sbe::value::TypeTags::Timestamp, csn->maxTs->asULL())),
+ makeConstant(sbe::value::TypeTags::Timestamp, csn->maxRecord->asLong())),
csn->nodeId());
}
@@ -361,7 +355,7 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> generateOptimizedOplo
// inner branch, and the execution will continue from this point further on, without
// applying the filter.
if (csn->stopApplyingFilterAfterFirstMatch) {
- invariant(csn->minTs);
+ invariant(csn->minRecord);
invariant(csn->direction == CollectionScanParams::FORWARD);
std::tie(fields, slots, tsSlot) = makeOplogTimestampSlotsIfNeeded(
@@ -569,7 +563,7 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> generateCollScan(
sbe::RuntimeEnvironment* env,
bool isTailableResumeBranch,
sbe::LockAcquisitionCallback lockAcquisitionCallback) {
- if (csn->minTs || csn->maxTs) {
+ if (csn->minRecord || csn->maxRecord) {
return generateOptimizedOplogScan(opCtx,
collection,
csn,
diff --git a/src/mongo/db/record_id.h b/src/mongo/db/record_id.h
index 370a0d5b350..9189e77f1ab 100644
--- a/src/mongo/db/record_id.h
+++ b/src/mongo/db/record_id.h
@@ -139,12 +139,12 @@ public:
}
int compare(const RecordId& rhs) const {
- // Null always compares less than every other RecordId.
- if (isNull() && rhs.isNull()) {
+ // Null always compares less than every other RecordId format.
+ if (_format == Format::kNull && rhs._format == Format::kNull) {
return 0;
- } else if (isNull()) {
+ } else if (_format == Format::kNull) {
return -1;
- } else if (rhs.isNull()) {
+ } else if (rhs._format == Format::kNull) {
return 1;
}
invariant(_format == rhs._format);
diff --git a/src/mongo/db/storage/oplog_hack.cpp b/src/mongo/db/record_id_helpers.cpp
index dc3ac3e2f5f..2cc3bb43353 100644
--- a/src/mongo/db/storage/oplog_hack.cpp
+++ b/src/mongo/db/record_id_helpers.cpp
@@ -29,7 +29,7 @@
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kStorage
-#include "mongo/db/storage/oplog_hack.h"
+#include "mongo/db/record_id_helpers.h"
#include <limits>
@@ -40,7 +40,7 @@
#include "mongo/util/debug_util.h"
namespace mongo {
-namespace oploghack {
+namespace record_id_helpers {
StatusWith<RecordId> keyForOptime(const Timestamp& opTime) {
// Make sure secs and inc wouldn't be negative if treated as signed. This ensures that they
@@ -80,5 +80,5 @@ StatusWith<RecordId> extractKey(const char* data, int len) {
return keyForOptime(elem.timestamp());
}
-} // namespace oploghack
+} // namespace record_id_helpers
} // namespace mongo
diff --git a/src/mongo/db/storage/oplog_hack.h b/src/mongo/db/record_id_helpers.h
index 2aac4b60556..ed9016822f3 100644
--- a/src/mongo/db/storage/oplog_hack.h
+++ b/src/mongo/db/record_id_helpers.h
@@ -36,7 +36,7 @@ namespace mongo {
class RecordId;
class Timestamp;
-namespace oploghack {
+namespace record_id_helpers {
/**
* Converts Timestamp to a RecordId in an unspecified manor that is safe to use as the key to
@@ -49,5 +49,5 @@ StatusWith<RecordId> keyForOptime(const Timestamp& opTime);
*/
StatusWith<RecordId> extractKey(const char* data, int len);
-} // namespace oploghack
+} // namespace record_id_helpers
} // namespace mongo
diff --git a/src/mongo/db/record_id_test.cpp b/src/mongo/db/record_id_test.cpp
index e606762a251..63f36c07029 100644
--- a/src/mongo/db/record_id_test.cpp
+++ b/src/mongo/db/record_id_test.cpp
@@ -112,13 +112,13 @@ TEST(RecordId, OidTest) {
TEST(RecordId, NullTest) {
// The int64 format should be considered null if its value is 0. Likewise, the value should be
// interpreted as int64_t(0) if it is null.
- RecordId nullRid(0);
- ASSERT(nullRid.isNull());
-
- RecordId rid0;
+ RecordId rid0(0);
ASSERT(rid0.isNull());
- ASSERT_EQ(0, rid0.asLong());
- ASSERT_EQ(nullRid, rid0);
+
+ RecordId nullRid;
+ ASSERT(nullRid.isNull());
+ ASSERT_EQ(0, nullRid.asLong());
+ ASSERT_NE(rid0, nullRid);
}
TEST(RecordId, OidTestCompare) {
diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp
index dba72ba9275..2bd275c6a8d 100644
--- a/src/mongo/db/repl/oplog.cpp
+++ b/src/mongo/db/repl/oplog.cpp
@@ -407,7 +407,7 @@ OpTime logOp(OperationContext* opCtx, MutableOplogEntry* oplogEntry) {
auto bsonOplogEntry = oplogEntry->toBSON();
// The storage engine will assign the RecordId based on the "ts" field of the oplog entry, see
- // oploghack::extractKey.
+ // record_id_helpers::extractKey.
std::vector<Record> records{
{RecordId(), RecordData(bsonOplogEntry.objdata(), bsonOplogEntry.objsize())}};
std::vector<Timestamp> timestamps{slot.getTimestamp()};
@@ -477,7 +477,7 @@ std::vector<OpTime> logInsertOps(OperationContext* opCtx,
timestamps[i] = insertStatementOplogSlot.getTimestamp();
bsonOplogEntries[i] = oplogEntry.toBSON();
// The storage engine will assign the RecordId based on the "ts" field of the oplog entry,
- // see oploghack::extractKey.
+ // see record_id_helpers::extractKey.
records[i] = Record{
RecordId(), RecordData(bsonOplogEntries[i].objdata(), bsonOplogEntries[i].objsize())};
}
diff --git a/src/mongo/db/storage/SConscript b/src/mongo/db/storage/SConscript
index 28f61230234..2891e015e3f 100644
--- a/src/mongo/db/storage/SConscript
+++ b/src/mongo/db/storage/SConscript
@@ -105,16 +105,6 @@ env.Library(
)
env.Library(
- target='oplog_hack',
- source=[
- 'oplog_hack.cpp',
- ],
- LIBDEPS=[
- '$BUILD_DIR/mongo/base',
- ],
-)
-
-env.Library(
target='storage_control',
source=[
'control/storage_control.cpp',
diff --git a/src/mongo/db/storage/devnull/SConscript b/src/mongo/db/storage/devnull/SConscript
index d1cf4757d68..d5f1a5c30c8 100644
--- a/src/mongo/db/storage/devnull/SConscript
+++ b/src/mongo/db/storage/devnull/SConscript
@@ -11,7 +11,7 @@ env.Library(
'ephemeral_catalog_record_store.cpp',
],
LIBDEPS=[
- '$BUILD_DIR/mongo/db/storage/oplog_hack',
+ '$BUILD_DIR/mongo/db/record_id_helpers',
'$BUILD_DIR/mongo/db/storage/recovery_unit_base',
],
LIBDEPS_PRIVATE=[
diff --git a/src/mongo/db/storage/devnull/devnull_kv_engine.cpp b/src/mongo/db/storage/devnull/devnull_kv_engine.cpp
index 7f380c8b980..ce2f1fe5e28 100644
--- a/src/mongo/db/storage/devnull/devnull_kv_engine.cpp
+++ b/src/mongo/db/storage/devnull/devnull_kv_engine.cpp
@@ -47,6 +47,9 @@ public:
boost::optional<Record> seekExact(const RecordId& id) final {
return {};
}
+ boost::optional<Record> seekNear(const RecordId& id) final {
+ return {};
+ }
void save() final {}
bool restore() final {
return true;
diff --git a/src/mongo/db/storage/devnull/ephemeral_catalog_record_store.cpp b/src/mongo/db/storage/devnull/ephemeral_catalog_record_store.cpp
index fef0b5cb35a..f39c325863d 100644
--- a/src/mongo/db/storage/devnull/ephemeral_catalog_record_store.cpp
+++ b/src/mongo/db/storage/devnull/ephemeral_catalog_record_store.cpp
@@ -36,7 +36,7 @@
#include "mongo/db/jsobj.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/operation_context.h"
-#include "mongo/db/storage/oplog_hack.h"
+#include "mongo/db/record_id_helpers.h"
#include "mongo/db/storage/recovery_unit.h"
#include "mongo/logv2/log.h"
#include "mongo/util/str.h"
@@ -150,6 +150,11 @@ public:
return {{_it->first, _it->second.toRecordData()}};
}
+ boost::optional<Record> seekNear(const RecordId& id) final {
+ // not implemented
+ return boost::none;
+ }
+
void save() final {
if (!_needFirstSeek && !_lastMoveWasRestore)
_savedId = _it == _records.end() ? RecordId() : _it->first;
@@ -223,6 +228,11 @@ public:
return {{_it->first, _it->second.toRecordData()}};
}
+ boost::optional<Record> seekNear(const RecordId& id) final {
+ // not implemented
+ return boost::none;
+ }
+
void save() final {
if (!_needFirstSeek && !_lastMoveWasRestore)
_savedId = _it == _records.rend() ? RecordId() : _it->first;
@@ -386,7 +396,7 @@ void EphemeralForTestRecordStore::cappedDeleteAsNeeded(WithLock lk, OperationCon
StatusWith<RecordId> EphemeralForTestRecordStore::extractAndCheckLocForOplog(WithLock,
const char* data,
int len) const {
- StatusWith<RecordId> status = oploghack::extractKey(data, len);
+ StatusWith<RecordId> status = record_id_helpers::extractKey(data, len);
if (!status.isOK())
return status;
@@ -566,29 +576,4 @@ RecordId EphemeralForTestRecordStore::allocateLoc(WithLock) {
invariant(out.isValid());
return out;
}
-
-boost::optional<RecordId> EphemeralForTestRecordStore::oplogStartHack(
- OperationContext* opCtx, const RecordId& startingPosition) const {
- if (!_data->isOplog)
- return boost::none;
-
- stdx::lock_guard<stdx::recursive_mutex> lock(_data->recordsMutex);
- const Records& records = _data->records;
-
- if (records.empty())
- return RecordId();
-
- Records::const_iterator it = records.lower_bound(startingPosition);
- if (it == records.end() || it->first > startingPosition) {
- // If the startingPosition is before the oldest oplog entry, this ensures that we return
- // RecordId() as specified in record_store.h.
- if (it == records.begin()) {
- return RecordId();
- }
- --it;
- }
-
- return it->first;
-}
-
} // namespace mongo
diff --git a/src/mongo/db/storage/devnull/ephemeral_catalog_record_store.h b/src/mongo/db/storage/devnull/ephemeral_catalog_record_store.h
index 2a405eddb93..b56fc826d3c 100644
--- a/src/mongo/db/storage/devnull/ephemeral_catalog_record_store.h
+++ b/src/mongo/db/storage/devnull/ephemeral_catalog_record_store.h
@@ -108,9 +108,6 @@ public:
return _data->records.size();
}
- virtual boost::optional<RecordId> oplogStartHack(OperationContext* opCtx,
- const RecordId& startingPosition) const;
-
void waitForAllEarlierOplogWritesToBeVisible(OperationContext* opCtx) const override {}
virtual void updateStatsAfterRepair(OperationContext* opCtx,
diff --git a/src/mongo/db/storage/ephemeral_for_test/SConscript b/src/mongo/db/storage/ephemeral_for_test/SConscript
index 939f5bf1b80..d876aa0b96d 100644
--- a/src/mongo/db/storage/ephemeral_for_test/SConscript
+++ b/src/mongo/db/storage/ephemeral_for_test/SConscript
@@ -22,8 +22,8 @@ env.Library(
LIBDEPS_PRIVATE=[
'$BUILD_DIR/mongo/db/catalog/collection_options',
'$BUILD_DIR/mongo/db/commands/server_status',
+ '$BUILD_DIR/mongo/db/record_id_helpers',
'$BUILD_DIR/mongo/db/storage/key_string',
- '$BUILD_DIR/mongo/db/storage/oplog_hack',
'$BUILD_DIR/mongo/db/storage/storage_options',
'$BUILD_DIR/mongo/db/storage/write_unit_of_work',
],
diff --git a/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_kv_engine.cpp b/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_kv_engine.cpp
index 3b3b4308c68..4206a42ed3a 100644
--- a/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_kv_engine.cpp
+++ b/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_kv_engine.cpp
@@ -264,6 +264,9 @@ public:
boost::optional<Record> seekExact(const RecordId& id) final {
return {};
}
+ boost::optional<Record> seekNear(const RecordId& id) final {
+ return {};
+ }
void save() final {}
bool restore() final {
return true;
diff --git a/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_record_store.cpp b/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_record_store.cpp
index b5148c8aff4..a58e83c90e8 100644
--- a/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_record_store.cpp
+++ b/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_record_store.cpp
@@ -39,12 +39,13 @@
#include "mongo/bson/bsonobj.h"
#include "mongo/db/operation_context.h"
+#include "mongo/db/record_id_helpers.h"
#include "mongo/db/storage/ephemeral_for_test/ephemeral_for_test_radix_store.h"
#include "mongo/db/storage/ephemeral_for_test/ephemeral_for_test_recovery_unit.h"
#include "mongo/db/storage/ephemeral_for_test/ephemeral_for_test_visibility_manager.h"
#include "mongo/db/storage/key_string.h"
-#include "mongo/db/storage/oplog_hack.h"
#include "mongo/db/storage/write_unit_of_work.h"
+#include "mongo/logv2/log.h"
#include "mongo/util/hex.h"
namespace mongo {
@@ -162,7 +163,7 @@ Status RecordStore::insertRecords(OperationContext* opCtx,
int64_t thisRecordId = 0;
if (_isOplog) {
StatusWith<RecordId> status =
- oploghack::extractKey(record.data.data(), record.data.size());
+ record_id_helpers::extractKey(record.data.data(), record.data.size());
if (!status.isOK())
return status.getStatus();
thisRecordId = status.getValue().asLong();
@@ -306,29 +307,6 @@ void RecordStore::waitForAllEarlierOplogWritesToBeVisible(OperationContext* opCt
_visibilityManager->waitForAllEarlierOplogWritesToBeVisible(opCtx);
}
-boost::optional<RecordId> RecordStore::oplogStartHack(OperationContext* opCtx,
- const RecordId& startingPosition) const {
- if (!_isOplog)
- return boost::none;
-
- if (numRecords(opCtx) == 0)
- return RecordId();
-
- StringStore* workingCopy{RecoveryUnit::get(opCtx)->getHead()};
-
- std::string key = createKey(_ident, startingPosition.asLong());
- StringStore::const_reverse_iterator it(workingCopy->upper_bound(key));
-
- if (it == workingCopy->rend())
- return RecordId();
-
- RecordId rid = RecordId(extractRecordId(it->first));
- if (rid > startingPosition)
- return RecordId();
-
- return rid;
-}
-
Status RecordStore::oplogDiskLocRegister(OperationContext* opCtx,
const Timestamp& opTime,
bool orderedCommit) {
@@ -472,6 +450,59 @@ boost::optional<Record> RecordStore::Cursor::seekExact(const RecordId& id) {
return Record{id, RecordData(it->second.c_str(), it->second.length())};
}
+boost::optional<Record> RecordStore::Cursor::seekNear(const RecordId& id) {
+ _savedPosition = boost::none;
+ _lastMoveWasRestore = false;
+
+ RecordId search = id;
+ if (_rs._isOplog && id > _oplogVisibility) {
+ search = RecordId(_oplogVisibility);
+ }
+
+ auto numRecords = _rs.numRecords(opCtx);
+ if (numRecords == 0)
+ return boost::none;
+
+ StringStore* workingCopy{RecoveryUnit::get(opCtx)->getHead()};
+ std::string key = createKey(_rs._ident, search.asLong());
+ // We may land higher and that is fine per the API contract.
+ it = workingCopy->lower_bound(key);
+
+ // If we're at the end of this record store, we didn't find anything >= id. Position on the
+ // immediately previous record, which must exist.
+ if (it == workingCopy->end() || !inPrefix(it->first)) {
+ // The reverse iterator constructor positions on the next record automatically.
+ StringStore::const_reverse_iterator revIt(it);
+ invariant(revIt != workingCopy->rend());
+ it = workingCopy->lower_bound(revIt->first);
+ invariant(it != workingCopy->end());
+ invariant(inPrefix(it->first));
+ }
+
+ // If we landed one higher, then per the API contract, we need to return the previous record.
+ RecordId rid = extractRecordId(it->first);
+ if (rid > search) {
+ StringStore::const_reverse_iterator revIt(it);
+ // The reverse iterator constructor positions on the next record automatically.
+ if (revIt != workingCopy->rend() && inPrefix(revIt->first)) {
+ it = workingCopy->lower_bound(revIt->first);
+ rid = RecordId(extractRecordId(it->first));
+ }
+ // Otherwise, we hit the beginning of this record store, then there is only one record and
+ // we should return that.
+ }
+
+ // For forward cursors on the oplog, the oplog visible timestamp is treated as the end of the
+ // record store. So if we are positioned past this point, then there are no visible records.
+ if (_rs._isOplog && rid > _oplogVisibility) {
+ return boost::none;
+ }
+
+ _needFirstSeek = false;
+ _savedPosition = it->first;
+ return Record{rid, RecordData(it->second.c_str(), it->second.length())};
+}
+
// Positions are saved as we go.
void RecordStore::Cursor::save() {}
void RecordStore::Cursor::saveUnpositioned() {}
@@ -553,6 +584,46 @@ boost::optional<Record> RecordStore::ReverseCursor::seekExact(const RecordId& id
return Record{id, RecordData(it->second.c_str(), it->second.length())};
}
+boost::optional<Record> RecordStore::ReverseCursor::seekNear(const RecordId& id) {
+ _savedPosition = boost::none;
+ _lastMoveWasRestore = false;
+
+ auto numRecords = _rs.numRecords(opCtx);
+ if (numRecords == 0)
+ return boost::none;
+
+ StringStore* workingCopy{RecoveryUnit::get(opCtx)->getHead()};
+ std::string key = createKey(_rs._ident, id.asLong());
+ it = StringStore::const_reverse_iterator(workingCopy->upper_bound(key));
+
+ // Since there is at least 1 record, if we hit the beginning we need to return the only record.
+ if (it == workingCopy->rend() || !inPrefix(it->first)) {
+ // This lands on the next key.
+ auto fwdIt = workingCopy->upper_bound(key);
+ // reverse iterator increments one item before
+ it = StringStore::const_reverse_iterator(++fwdIt);
+ invariant(it != workingCopy->end());
+ invariant(inPrefix(it->first));
+ }
+
+ // If we landed lower, then per the API contract, we need to return the previous record.
+ RecordId rid = extractRecordId(it->first);
+ if (rid < id) {
+ // This lands on the next key.
+ auto fwdIt = workingCopy->upper_bound(key);
+ if (fwdIt != workingCopy->end() && inPrefix(fwdIt->first)) {
+ it = StringStore::const_reverse_iterator(++fwdIt);
+ }
+ // Otherwise, we hit the beginning of this record store, then there is only one record and
+ // we should return that.
+ }
+
+ rid = RecordId(extractRecordId(it->first));
+ _needFirstSeek = false;
+ _savedPosition = it->first;
+ return Record{rid, RecordData(it->second.c_str(), it->second.length())};
+}
+
void RecordStore::ReverseCursor::save() {}
void RecordStore::ReverseCursor::saveUnpositioned() {}
diff --git a/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_record_store.h b/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_record_store.h
index 1758343b211..ec3d86461f4 100644
--- a/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_record_store.h
+++ b/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_record_store.h
@@ -106,9 +106,6 @@ public:
BSONObjBuilder* result,
double scale) const;
- virtual boost::optional<RecordId> oplogStartHack(OperationContext* opCtx,
- const RecordId& startingPosition) const;
-
void waitForAllEarlierOplogWritesToBeVisible(OperationContext* opCtx) const override;
virtual void updateStatsAfterRepair(OperationContext* opCtx,
@@ -191,6 +188,7 @@ private:
VisibilityManager* visibilityManager);
boost::optional<Record> next() final;
boost::optional<Record> seekExact(const RecordId& id) final override;
+ boost::optional<Record> seekNear(const RecordId& id) final override;
void save() final;
void saveUnpositioned() final override;
bool restore() final;
@@ -216,6 +214,7 @@ private:
VisibilityManager* visibilityManager);
boost::optional<Record> next() final;
boost::optional<Record> seekExact(const RecordId& id) final override;
+ boost::optional<Record> seekNear(const RecordId& id) final override;
void save() final;
void saveUnpositioned() final override;
bool restore() final;
diff --git a/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_recovery_unit.cpp b/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_recovery_unit.cpp
index 2ac9f9d55ce..f3726d1ace6 100644
--- a/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_recovery_unit.cpp
+++ b/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_recovery_unit.cpp
@@ -34,8 +34,8 @@
#include <mutex>
#include "mongo/db/concurrency/write_conflict_exception.h"
+#include "mongo/db/record_id_helpers.h"
#include "mongo/db/storage/ephemeral_for_test/ephemeral_for_test_recovery_unit.h"
-#include "mongo/db/storage/oplog_hack.h"
namespace mongo {
namespace ephemeral_for_test {
@@ -146,7 +146,7 @@ bool RecoveryUnit::forkIfNeeded() {
}
Status RecoveryUnit::setTimestamp(Timestamp timestamp) {
- auto key = oploghack::keyForOptime(timestamp);
+ auto key = record_id_helpers::keyForOptime(timestamp);
if (!key.isOK())
return key.getStatus();
diff --git a/src/mongo/db/storage/record_store.h b/src/mongo/db/storage/record_store.h
index 3d44a0de857..e7ac8dd5d9b 100644
--- a/src/mongo/db/storage/record_store.h
+++ b/src/mongo/db/storage/record_store.h
@@ -184,6 +184,20 @@ public:
virtual boost::optional<Record> seekExact(const RecordId& id) = 0;
/**
+ * Positions this cursor near 'start' or an adjacent record if 'start' does not exist. If there
+ * is not an exact match, the cursor is positioned on the directionally previous Record. If no
+ * earlier record exists, the cursor is positioned on the directionally following record.
+ * Returns boost::none if the RecordStore is empty.
+ *
+ * For forward cursors, returns the Record with the highest RecordId less than or equal to
+ * 'start'. If no such record exists, positions on the next highest RecordId after 'start'.
+ *
+ * For reverse cursors, returns the Record with the lowest RecordId greater than or equal to
+ * 'start'. If no such record exists, positions on the next lowest RecordId before 'start'.
+ */
+ virtual boost::optional<Record> seekNear(const RecordId& start) = 0;
+
+ /**
* Prepares for state changes in underlying data without necessarily saving the current
* state.
*
@@ -491,18 +505,6 @@ public:
double scale) const = 0;
/**
- * Return the RecordId of an oplog entry as close to startingPosition as possible without
- * being higher. If there are no entries <= startingPosition, return RecordId().
- *
- * If you don't implement the oplogStartHack, just use the default implementation which
- * returns boost::none.
- */
- virtual boost::optional<RecordId> oplogStartHack(OperationContext* opCtx,
- const RecordId& startingPosition) const {
- return boost::none;
- }
-
- /**
* When we write to an oplog, we call this so that that the storage engine can manage the
* visibility of oplog entries to ensure they are ordered.
*
diff --git a/src/mongo/db/storage/record_store_test_harness.cpp b/src/mongo/db/storage/record_store_test_harness.cpp
index 29db8562890..12e9a4bf912 100644
--- a/src/mongo/db/storage/record_store_test_harness.cpp
+++ b/src/mongo/db/storage/record_store_test_harness.cpp
@@ -513,5 +513,70 @@ TEST(RecordStoreTestHarness, ClusteredRecordStore) {
}
}
+TEST(RecordStoreTestHarness, ClusteredRecordStoreSeekNear) {
+ const auto harnessHelper = newRecordStoreHarnessHelper();
+ if (!harnessHelper->getEngine()->supportsClusteredIdIndex()) {
+ // Only WiredTiger supports clustered indexes on _id.
+ return;
+ }
+
+ const std::string ns = "test.system.buckets.a";
+ CollectionOptions options;
+ options.clusteredIndex = ClusteredIndexOptions{};
+ std::unique_ptr<RecordStore> rs = harnessHelper->newNonCappedRecordStore(ns, options);
+ invariant(rs->keyFormat() == KeyFormat::String);
+
+ auto opCtx = harnessHelper->newOperationContext();
+
+ const int numRecords = 100;
+ std::vector<Record> records;
+ std::vector<Timestamp> timestamps;
+ for (int i = 0; i < numRecords; i++) {
+ timestamps.push_back(Timestamp(i, 1));
+ }
+
+ // Insert RecordIds where the timestamp part of the OID correlates directly with the seconds in
+ // the Timestamp.
+ for (int i = 0; i < numRecords; i++) {
+ BSONObj doc = BSON("i" << i);
+ RecordData recordData = RecordData(doc.objdata(), doc.objsize());
+ recordData.makeOwned();
+
+ auto oid = OID::gen();
+ oid.setTimestamp(timestamps[i].getSecs());
+ auto record = Record{RecordId(oid.view().view(), OID::kOIDSize), recordData};
+ std::vector<Record> recVec = {record};
+
+ WriteUnitOfWork wuow(opCtx.get());
+ ASSERT_OK(rs->insertRecords(opCtx.get(), &recVec, {timestamps[i]}));
+ wuow.commit();
+
+ records.push_back(record);
+ }
+
+ for (int i = 0; i < numRecords; i++) {
+ // Generate an OID RecordId with a timestamp part and high bits elsewhere such that it
+ // always compares greater than or equal to the OIDs we inserted.
+ auto oid = OID::max();
+ oid.setTimestamp(i);
+ auto rid = RecordId(oid.view().view(), OID::kOIDSize);
+ auto cur = rs->getCursor(opCtx.get());
+ auto rec = cur->seekNear(rid);
+ ASSERT(rec);
+ ASSERT_EQ(records[i].id, rec->id);
+ }
+
+ for (int i = 0; i < numRecords; i++) {
+ // Generate an OID RecordId with only a timestamp part and zeroes elsewhere such that it
+ // always compares less than or equal to the OIDs we inserted.
+ auto oid = OID();
+ oid.setTimestamp(i);
+ auto rid = RecordId(oid.view().view(), OID::kOIDSize);
+ auto cur = rs->getCursor(opCtx.get(), false /* forward */);
+ auto rec = cur->seekNear(rid);
+ ASSERT(rec);
+ ASSERT_EQ(records[i].id, rec->id);
+ }
+}
} // namespace
} // namespace mongo
diff --git a/src/mongo/db/storage/record_store_test_oplog.cpp b/src/mongo/db/storage/record_store_test_oplog.cpp
index 34b68dbc7f9..a0889e5fffc 100644
--- a/src/mongo/db/storage/record_store_test_oplog.cpp
+++ b/src/mongo/db/storage/record_store_test_oplog.cpp
@@ -63,7 +63,7 @@ RecordId _oplogOrderInsertOplog(OperationContext* opCtx,
return res.getValue();
}
-TEST(RecordStoreTestHarness, OplogHack) {
+TEST(RecordStoreTestHarness, SeekNearOplog) {
std::unique_ptr<RecordStoreHarnessHelper> harnessHelper = newRecordStoreHarnessHelper();
// Use a large enough cappedMaxSize so that the limit is not reached by doing the inserts within
@@ -104,14 +104,78 @@ TEST(RecordStoreTestHarness, OplogHack) {
// Make sure all are visible.
rs->waitForAllEarlierOplogWritesToBeVisible(harnessHelper->newOperationContext().get());
+ // Forward cursor seeks
{
ServiceContext::UniqueOperationContext opCtx(harnessHelper->newOperationContext());
WriteUnitOfWork wuow(opCtx.get());
- // find start
- ASSERT_EQ(rs->oplogStartHack(opCtx.get(), RecordId(0, 1)), RecordId()); // nothing <=
- ASSERT_EQ(rs->oplogStartHack(opCtx.get(), RecordId(2, 1)), RecordId(1, 2)); // between
- ASSERT_EQ(rs->oplogStartHack(opCtx.get(), RecordId(2, 2)), RecordId(2, 2)); // ==
- ASSERT_EQ(rs->oplogStartHack(opCtx.get(), RecordId(2, 3)), RecordId(2, 2)); // > highest
+ auto cur = rs->getCursor(opCtx.get());
+ auto rec = cur->seekNear(RecordId(0, 1));
+ ASSERT(rec);
+ ASSERT_EQ(rec->id, RecordId(1, 1));
+ }
+
+ {
+ ServiceContext::UniqueOperationContext opCtx(harnessHelper->newOperationContext());
+ WriteUnitOfWork wuow(opCtx.get());
+ auto cur = rs->getCursor(opCtx.get());
+ auto rec = cur->seekNear(RecordId(2, 1));
+ ASSERT(rec);
+ ASSERT_EQ(rec->id, RecordId(1, 2));
+ }
+
+ {
+ ServiceContext::UniqueOperationContext opCtx(harnessHelper->newOperationContext());
+ WriteUnitOfWork wuow(opCtx.get());
+ auto cur = rs->getCursor(opCtx.get());
+ auto rec = cur->seekNear(RecordId(2, 2));
+ ASSERT(rec);
+ ASSERT_EQ(rec->id, RecordId(2, 2));
+ }
+
+ {
+ ServiceContext::UniqueOperationContext opCtx(harnessHelper->newOperationContext());
+ WriteUnitOfWork wuow(opCtx.get());
+ auto cur = rs->getCursor(opCtx.get());
+ auto rec = cur->seekNear(RecordId(2, 3));
+ ASSERT(rec);
+ ASSERT_EQ(rec->id, RecordId(2, 2));
+ }
+
+ // Reverse cursor seeks
+ {
+ ServiceContext::UniqueOperationContext opCtx(harnessHelper->newOperationContext());
+ WriteUnitOfWork wuow(opCtx.get());
+ auto cur = rs->getCursor(opCtx.get(), false /* forward */);
+ auto rec = cur->seekNear(RecordId(0, 1));
+ ASSERT(rec);
+ ASSERT_EQ(rec->id, RecordId(1, 1));
+ }
+
+ {
+ ServiceContext::UniqueOperationContext opCtx(harnessHelper->newOperationContext());
+ WriteUnitOfWork wuow(opCtx.get());
+ auto cur = rs->getCursor(opCtx.get(), false /* forward */);
+ auto rec = cur->seekNear(RecordId(2, 1));
+ ASSERT(rec);
+ ASSERT_EQ(rec->id, RecordId(2, 2));
+ }
+
+ {
+ ServiceContext::UniqueOperationContext opCtx(harnessHelper->newOperationContext());
+ WriteUnitOfWork wuow(opCtx.get());
+ auto cur = rs->getCursor(opCtx.get(), false /* forward */);
+ auto rec = cur->seekNear(RecordId(2, 2));
+ ASSERT(rec);
+ ASSERT_EQ(rec->id, RecordId(2, 2));
+ }
+
+ {
+ ServiceContext::UniqueOperationContext opCtx(harnessHelper->newOperationContext());
+ WriteUnitOfWork wuow(opCtx.get());
+ auto cur = rs->getCursor(opCtx.get(), false /* forward */);
+ auto rec = cur->seekNear(RecordId(2, 3));
+ ASSERT(rec);
+ ASSERT_EQ(rec->id, RecordId(2, 2));
}
{
@@ -121,7 +185,10 @@ TEST(RecordStoreTestHarness, OplogHack) {
{
ServiceContext::UniqueOperationContext opCtx(harnessHelper->newOperationContext());
- ASSERT_EQ(rs->oplogStartHack(opCtx.get(), RecordId(2, 3)), RecordId(2, 2));
+ auto cur = rs->getCursor(opCtx.get());
+ auto rec = cur->seekNear(RecordId(2, 3));
+ ASSERT(rec);
+ ASSERT_EQ(rec->id, RecordId(2, 2));
}
{
@@ -131,7 +198,10 @@ TEST(RecordStoreTestHarness, OplogHack) {
{
ServiceContext::UniqueOperationContext opCtx(harnessHelper->newOperationContext());
- ASSERT_EQ(rs->oplogStartHack(opCtx.get(), RecordId(2, 3)), RecordId(1, 2));
+ auto cur = rs->getCursor(opCtx.get());
+ auto rec = cur->seekNear(RecordId(2, 3));
+ ASSERT(rec);
+ ASSERT_EQ(rec->id, RecordId(1, 2));
}
{
@@ -141,7 +211,10 @@ TEST(RecordStoreTestHarness, OplogHack) {
{
ServiceContext::UniqueOperationContext opCtx(harnessHelper->newOperationContext());
- ASSERT_EQ(rs->oplogStartHack(opCtx.get(), RecordId(2, 3)), RecordId(1, 1));
+ auto cur = rs->getCursor(opCtx.get());
+ auto rec = cur->seekNear(RecordId(2, 3));
+ ASSERT(rec);
+ ASSERT_EQ(rec->id, RecordId(1, 1));
}
{
@@ -153,7 +226,9 @@ TEST(RecordStoreTestHarness, OplogHack) {
{
ServiceContext::UniqueOperationContext opCtx(harnessHelper->newOperationContext());
- ASSERT_EQ(rs->oplogStartHack(opCtx.get(), RecordId(2, 3)), RecordId());
+ auto cur = rs->getCursor(opCtx.get());
+ auto rec = cur->seekNear(RecordId(2, 3));
+ ASSERT_FALSE(rec);
}
}
@@ -182,7 +257,7 @@ TEST(RecordStoreTestHarness, OplogInsertOutOfOrder) {
}
}
-TEST(RecordStoreTestHarness, OplogHackOnNonOplog) {
+TEST(RecordStoreTestHarness, SeekNearOnNonOplog) {
std::unique_ptr<RecordStoreHarnessHelper> harnessHelper = newRecordStoreHarnessHelper();
std::unique_ptr<RecordStore> rs(harnessHelper->newNonCappedRecordStore("local.NOT_oplog.foo"));
@@ -195,7 +270,12 @@ TEST(RecordStoreTestHarness, OplogHackOnNonOplog) {
.getStatus());
wuow.commit();
}
- ASSERT_EQ(rs->oplogStartHack(opCtx.get(), RecordId(0, 1)), boost::none);
+ auto cur = rs->getCursor(opCtx.get());
+ auto rec = cur->seekNear(RecordId(0, 1));
+ ASSERT(rec);
+ // Regular record stores don't use timestamps for their RecordId, so expect the first
+ // auto-incrementing RecordId to be 1.
+ ASSERT_EQ(rec->id, RecordId(1));
}
@@ -228,6 +308,15 @@ TEST(RecordStoreTestHarness, OplogOrder) {
}
{
+ ServiceContext::UniqueOperationContext opCtx(harnessHelper->newOperationContext());
+ auto cursor = rs->getCursor(opCtx.get());
+ auto record = cursor->seekNear(RecordId(id1.asLong() + 1));
+ ASSERT(record);
+ ASSERT_EQ(id1, record->id);
+ ASSERT(!cursor->next());
+ }
+
+ {
// now we insert 2 docs, but commit the 2nd one first.
// we make sure we can't find the 2nd until the first is committed.
ServiceContext::UniqueOperationContext earlyReader(harnessHelper->newOperationContext());
@@ -240,15 +329,16 @@ TEST(RecordStoreTestHarness, OplogOrder) {
auto client1 = harnessHelper->serviceContext()->makeClient("c1");
auto t1 = harnessHelper->newOperationContext(client1.get());
WriteUnitOfWork w1(t1.get());
- _oplogOrderInsertOplog(t1.get(), rs, 20);
+ RecordId id2 = _oplogOrderInsertOplog(t1.get(), rs, 20);
// do not commit yet
+ RecordId id3;
{ // create 2nd doc
auto client2 = harnessHelper->serviceContext()->makeClient("c2");
auto t2 = harnessHelper->newOperationContext(client2.get());
{
WriteUnitOfWork w2(t2.get());
- _oplogOrderInsertOplog(t2.get(), rs, 30);
+ id3 = _oplogOrderInsertOplog(t2.get(), rs, 30);
w2.commit();
}
}
@@ -261,6 +351,27 @@ TEST(RecordStoreTestHarness, OplogOrder) {
auto opCtx = harnessHelper->newOperationContext(client2.get());
auto cursor = rs->getCursor(opCtx.get());
auto record = cursor->seekExact(id1);
+ ASSERT(record);
+ ASSERT_EQ(id1, record->id);
+ ASSERT(!cursor->next());
+ }
+
+ {
+ auto client2 = harnessHelper->serviceContext()->makeClient("c2");
+ auto opCtx = harnessHelper->newOperationContext(client2.get());
+ auto cursor = rs->getCursor(opCtx.get());
+ auto record = cursor->seekNear(id2);
+ ASSERT(record);
+ ASSERT_EQ(id1, record->id);
+ ASSERT(!cursor->next());
+ }
+
+ {
+ auto client2 = harnessHelper->serviceContext()->makeClient("c2");
+ auto opCtx = harnessHelper->newOperationContext(client2.get());
+ auto cursor = rs->getCursor(opCtx.get());
+ auto record = cursor->seekNear(id3);
+ ASSERT(record);
ASSERT_EQ(id1, record->id);
ASSERT(!cursor->next());
}
@@ -303,15 +414,17 @@ TEST(RecordStoreTestHarness, OplogOrder) {
auto client1 = harnessHelper->serviceContext()->makeClient("c1");
auto t1 = harnessHelper->newOperationContext(client1.get());
WriteUnitOfWork w1(t1.get());
- _oplogOrderInsertOplog(t1.get(), rs, 2);
+ RecordId id2 = _oplogOrderInsertOplog(t1.get(), rs, 2);
+
// do not commit yet
+ RecordId id3;
{ // create 2nd doc
auto client2 = harnessHelper->serviceContext()->makeClient("c2");
auto t2 = harnessHelper->newOperationContext(client2.get());
{
WriteUnitOfWork w2(t2.get());
- _oplogOrderInsertOplog(t2.get(), rs, 3);
+ id3 = _oplogOrderInsertOplog(t2.get(), rs, 3);
w2.commit();
}
}
@@ -324,6 +437,27 @@ TEST(RecordStoreTestHarness, OplogOrder) {
auto opCtx = harnessHelper->newOperationContext(client2.get());
auto cursor = rs->getCursor(opCtx.get());
auto record = cursor->seekExact(id1);
+ ASSERT(record);
+ ASSERT_EQ(id1, record->id);
+ ASSERT(!cursor->next());
+ }
+
+ {
+ auto client2 = harnessHelper->serviceContext()->makeClient("c2");
+ auto opCtx = harnessHelper->newOperationContext(client2.get());
+ auto cursor = rs->getCursor(opCtx.get());
+ auto record = cursor->seekNear(id2);
+ ASSERT(record);
+ ASSERT_EQ(id1, record->id);
+ ASSERT(!cursor->next());
+ }
+
+ {
+ auto client2 = harnessHelper->serviceContext()->makeClient("c2");
+ auto opCtx = harnessHelper->newOperationContext(client2.get());
+ auto cursor = rs->getCursor(opCtx.get());
+ auto record = cursor->seekNear(id3);
+ ASSERT(record);
ASSERT_EQ(id1, record->id);
ASSERT(!cursor->next());
}
diff --git a/src/mongo/db/storage/wiredtiger/SConscript b/src/mongo/db/storage/wiredtiger/SConscript
index e0a3c8e4ad5..4da815c266e 100644
--- a/src/mongo/db/storage/wiredtiger/SConscript
+++ b/src/mongo/db/storage/wiredtiger/SConscript
@@ -58,13 +58,13 @@ wtEnv.Library(
'$BUILD_DIR/mongo/db/index/index_descriptor',
'$BUILD_DIR/mongo/db/namespace_string',
'$BUILD_DIR/mongo/db/prepare_conflict_tracker',
+ '$BUILD_DIR/mongo/db/record_id_helpers',
'$BUILD_DIR/mongo/db/repl/repl_coordinator_interface',
'$BUILD_DIR/mongo/db/repl/repl_settings',
'$BUILD_DIR/mongo/db/server_options_core',
'$BUILD_DIR/mongo/db/service_context',
'$BUILD_DIR/mongo/db/storage/index_entry_comparison',
'$BUILD_DIR/mongo/db/storage/key_string',
- '$BUILD_DIR/mongo/db/storage/oplog_hack',
'$BUILD_DIR/mongo/db/storage/recovery_unit_base',
'$BUILD_DIR/mongo/db/storage/storage_file_util',
'$BUILD_DIR/mongo/db/storage/storage_options',
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp
index 1a8814c009d..4785eda902f 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp
@@ -49,12 +49,12 @@
#include "mongo/db/global_settings.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/operation_context.h"
+#include "mongo/db/record_id_helpers.h"
#include "mongo/db/repl/repl_settings.h"
#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/db/server_recovery.h"
#include "mongo/db/service_context.h"
#include "mongo/db/stats/resource_consumption_metrics.h"
-#include "mongo/db/storage/oplog_hack.h"
#include "mongo/db/storage/wiredtiger/oplog_stone_parameters_gen.h"
#include "mongo/db/storage/wiredtiger/wiredtiger_cursor_helpers.h"
#include "mongo/db/storage/wiredtiger/wiredtiger_customization_hooks.h"
@@ -1576,7 +1576,7 @@ Status WiredTigerRecordStore::_insertRecords(OperationContext* opCtx,
auto& record = records[i];
if (_isOplog) {
StatusWith<RecordId> status =
- oploghack::extractKey(record.data.data(), record.data.size());
+ record_id_helpers::extractKey(record.data.data(), record.data.size());
if (!status.isOK())
return status.getStatus();
record.id = status.getValue();
@@ -2006,40 +2006,6 @@ void WiredTigerRecordStore::waitForAllEarlierOplogWritesToBeVisible(OperationCon
}
}
-boost::optional<RecordId> WiredTigerRecordStore::oplogStartHack(
- OperationContext* opCtx, const RecordId& startingPosition) const {
- dassert(opCtx->lockState()->isReadLocked());
-
- if (!_isOplog)
- return boost::none;
-
- auto wtRu = WiredTigerRecoveryUnit::get(opCtx);
- wtRu->setIsOplogReader();
-
- RecordId searchFor = startingPosition;
- auto visibilityTs = wtRu->getOplogVisibilityTs();
- if (visibilityTs && searchFor.asLong() > *visibilityTs) {
- searchFor = RecordId(*visibilityTs);
- }
-
- WiredTigerCursor cursor(_uri, _tableId, true, opCtx);
- WT_CURSOR* c = cursor.get();
-
- int cmp;
- CursorKey key = makeCursorKey(searchFor);
- setKey(c, key);
- int ret = c->search_near(c, &cmp);
- if (ret == 0 && cmp > 0)
- ret = c->prev(c); // landed one higher than startingPosition
- if (ret == WT_NOTFOUND)
- return RecordId(); // nothing <= startingPosition
- // It's illegal for oplog documents to be in a prepare state.
- invariant(ret != WT_PREPARE_CONFLICT);
- invariantWTOK(ret);
-
- return getKey(c);
-}
-
void WiredTigerRecordStore::updateStatsAfterRepair(OperationContext* opCtx,
long long numRecords,
long long dataSize) {
@@ -2418,6 +2384,74 @@ boost::optional<Record> WiredTigerRecordStoreCursorBase::seekExact(const RecordI
return {{id, {static_cast<const char*>(value.data), static_cast<int>(value.size)}}};
}
+boost::optional<Record> WiredTigerRecordStoreCursorBase::seekNear(const RecordId& id) {
+ dassert(_opCtx->lockState()->isReadLocked());
+
+ // Forward scans on the oplog must round down to the oplog visibility timestamp.
+ RecordId start = id;
+ if (_forward && _oplogVisibleTs && start.asLong() > *_oplogVisibleTs) {
+ start = RecordId(*_oplogVisibleTs);
+ }
+
+ _skipNextAdvance = false;
+ WiredTigerRecoveryUnit::get(_opCtx)->getSession();
+ WT_CURSOR* c = _cursor->get();
+
+ WiredTigerRecordStore::CursorKey key = makeCursorKey(start);
+ setKey(c, key);
+
+ int cmp;
+ int ret = wiredTigerPrepareConflictRetry(_opCtx, [&] { return c->search_near(c, &cmp); });
+ if (ret == WT_NOTFOUND) {
+ _eof = true;
+ return boost::none;
+ }
+ invariantWTOK(ret);
+
+ auto& metricsCollector = ResourceConsumption::MetricsCollector::get(_opCtx);
+ metricsCollector.incrementOneCursorSeek();
+
+ RecordId curId = getKey(c);
+
+ // Per the requirement of the API, return the lower (for forward) or higher (for reverse)
+ // record.
+ if (_forward && cmp > 0) {
+ ret = wiredTigerPrepareConflictRetry(_opCtx, [&] { return c->prev(c); });
+ } else if (!_forward && cmp < 0) {
+ ret = wiredTigerPrepareConflictRetry(_opCtx, [&] { return c->next(c); });
+ }
+
+ // If we tried to return an earlier record but we found the end (for forward) or beginning (for
+ // reverse), go back to our original location so that we have something to return.
+ if (ret == WT_NOTFOUND) {
+ if (_forward) {
+ invariant(cmp > 0);
+ ret = wiredTigerPrepareConflictRetry(_opCtx, [&] { return c->next(c); });
+ } else if (!_forward) {
+ invariant(cmp < 0);
+ ret = wiredTigerPrepareConflictRetry(_opCtx, [&] { return c->prev(c); });
+ }
+ }
+ invariantWTOK(ret);
+
+ curId = getKey(c);
+
+ // For forward cursors on the oplog, the oplog visible timestamp is treated as the end of the
+ // record store. So if we are positioned past this point, then there are no visible records.
+ if (_forward && _oplogVisibleTs && curId.asLong() > *_oplogVisibleTs) {
+ _eof = true;
+ return boost::none;
+ }
+
+ WT_ITEM value;
+ invariantWTOK(c->get_value(c, &value));
+
+ metricsCollector.incrementOneDocRead(value.size);
+
+ _lastReturnedId = curId;
+ _eof = false;
+ return {{curId, {static_cast<const char*>(value.data), static_cast<int>(value.size)}}};
+}
void WiredTigerRecordStoreCursorBase::save() {
try {
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h
index dc8a44dca29..2e1016b625c 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h
@@ -197,9 +197,6 @@ public:
virtual void cappedTruncateAfter(OperationContext* opCtx, RecordId end, bool inclusive);
- virtual boost::optional<RecordId> oplogStartHack(OperationContext* opCtx,
- const RecordId& startingPosition) const;
-
virtual Status oplogDiskLocRegister(OperationContext* opCtx,
const Timestamp& opTime,
bool orderedCommit);
@@ -426,6 +423,8 @@ public:
boost::optional<Record> seekExact(const RecordId& id);
+ boost::optional<Record> seekNear(const RecordId& start);
+
void save();
void saveUnpositioned();