diff options
author | Louis Williams <louis.williams@mongodb.com> | 2021-02-23 16:03:49 -0500 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-02-24 00:42:55 +0000 |
commit | dd395723139d3d04cb05c66136a670102be210f1 (patch) | |
tree | fc64859029e3884270f522e2ae29f24be5b512c3 /src/mongo/db | |
parent | 24a71228f4bf3ca9051b72f0777bae2286e7182a (diff) | |
download | mongo-dd395723139d3d04cb05c66136a670102be210f1.tar.gz |
SERVER-54008 Generalize CollectionScan to perform queries over RecordId
ranges
Diffstat (limited to 'src/mongo/db')
35 files changed, 630 insertions, 300 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index 174e8d70de4..da599c0ce21 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -1136,6 +1136,16 @@ env.Library( ) env.Library( + target='record_id_helpers', + source=[ + 'record_id_helpers.cpp', + ], + LIBDEPS=[ + '$BUILD_DIR/mongo/base', + ], +) + +env.Library( target='query_exec', source=[ 'clientcursor.cpp', @@ -1268,7 +1278,6 @@ env.Library( 's/sharding_api_d', 'shared_request_handling', 'stats/serveronly_stats', - 'storage/oplog_hack', 'storage/remove_saver', 'storage/storage_options', 'update/update_driver', diff --git a/src/mongo/db/exec/collection_scan.cpp b/src/mongo/db/exec/collection_scan.cpp index 237da29c8a2..7b63afcbd3d 100644 --- a/src/mongo/db/exec/collection_scan.cpp +++ b/src/mongo/db/exec/collection_scan.cpp @@ -42,12 +42,9 @@ #include "mongo/db/exec/working_set.h" #include "mongo/db/exec/working_set_common.h" #include "mongo/db/repl/optime.h" -#include "mongo/db/storage/oplog_hack.h" #include "mongo/logv2/log.h" #include "mongo/util/fail_point.h" -#include "mongo/db/client.h" // XXX-ERH - namespace mongo { using std::unique_ptr; @@ -67,23 +64,29 @@ CollectionScan::CollectionScan(ExpressionContext* expCtx, _params(params) { // Explain reports the direction of the collection scan. _specificStats.direction = params.direction; - _specificStats.minTs = params.minTs; - _specificStats.maxTs = params.maxTs; + _specificStats.minRecord = params.minRecord; + _specificStats.maxRecord = params.maxRecord; _specificStats.tailable = params.tailable; - if (params.minTs || params.maxTs) { - // The 'minTs' and 'maxTs' parameters are used for a special optimization that - // applies only to forwards scans of the oplog. - invariant(params.direction == CollectionScanParams::FORWARD); - invariant(collection->ns().isOplog()); + if (params.minRecord || params.maxRecord) { + // The 'minRecord' and 'maxRecord' parameters are used for a special optimization that + // applies only to forwards scans of the oplog and scans on collections clustered by _id. invariant(!params.resumeAfterRecordId); + if (collection->ns().isOplog()) { + invariant(params.direction == CollectionScanParams::FORWARD); + } else { + invariant(collection->isClustered()); + } } + LOGV2_DEBUG(5400802, + 5, + "collection scan bounds", + "min"_attr = (!_params.minRecord) ? "none" : _params.minRecord->toString(), + "max"_attr = (!_params.maxRecord) ? "none" : _params.maxRecord->toString()); invariant(!_params.shouldTrackLatestOplogTimestamp || collection->ns().isOplog()); - // We should never see 'assertMinTsHasNotFallenOffOplog' if 'minTS' is not present. This can be - // incorrectly requested by the user, but in that case we should already have uasserted by now. - if (params.assertMinTsHasNotFallenOffOplog) { + if (params.assertTsHasNotFallenOffOplog) { invariant(params.shouldTrackLatestOplogTimestamp); - invariant(params.minTs); + invariant(params.direction == CollectionScanParams::FORWARD); } if (params.resumeAfterRecordId) { @@ -91,13 +94,6 @@ CollectionScan::CollectionScan(ExpressionContext* expCtx, // only support in the forward direction. invariant(params.direction == CollectionScanParams::FORWARD); } - - // Set early stop condition. - if (params.maxTs) { - _endConditionBSON = BSON("$gte"_sd << *(params.maxTs)); - _endCondition = std::make_unique<GTEMatchExpression>(repl::OpTime::kTimestampFieldName, - _endConditionBSON.firstElement()); - } } PlanStage::StageState CollectionScan::doWork(WorkingSetID* out) { @@ -169,17 +165,16 @@ PlanStage::StageState CollectionScan::doWork(WorkingSetID* out) { return PlanStage::NEED_TIME; } - if (_lastSeenId.isNull() && _params.minTs) { - // See if the RecordStore supports the oplogStartHack. - StatusWith<RecordId> goal = oploghack::keyForOptime(*_params.minTs); - if (goal.isOK()) { - boost::optional<RecordId> startLoc = - collection()->getRecordStore()->oplogStartHack(opCtx(), goal.getValue()); - if (startLoc && !startLoc->isNull()) { - LOGV2_DEBUG(20584, 3, "Using direct oplog seek"); - record = _cursor->seekExact(*startLoc); - } - } + if (_lastSeenId.isNull() && _params.direction == CollectionScanParams::FORWARD && + _params.minRecord) { + // Seek to the approximate start location. + record = _cursor->seekNear(*_params.minRecord); + } + + if (_lastSeenId.isNull() && _params.direction == CollectionScanParams::BACKWARD && + _params.maxRecord) { + // Seek to the approximate start location (at the end). + record = _cursor->seekNear(*_params.maxRecord); } if (!record) { @@ -205,8 +200,8 @@ PlanStage::StageState CollectionScan::doWork(WorkingSetID* out) { } _lastSeenId = record->id; - if (_params.assertMinTsHasNotFallenOffOplog) { - assertMinTsHasNotFallenOffOplog(*record); + if (_params.assertTsHasNotFallenOffOplog) { + assertTsHasNotFallenOffOplog(*record); } if (_params.shouldTrackLatestOplogTimestamp) { setLatestOplogEntryTimestamp(*record); @@ -231,37 +226,54 @@ void CollectionScan::setLatestOplogEntryTimestamp(const Record& record) { _latestOplogEntryTimestamp = std::max(_latestOplogEntryTimestamp, tsElem.timestamp()); } -void CollectionScan::assertMinTsHasNotFallenOffOplog(const Record& record) { +void CollectionScan::assertTsHasNotFallenOffOplog(const Record& record) { // If the first entry we see in the oplog is the replset initialization, then it doesn't matter - // if its timestamp is later than the specified minTs; no events earlier than the minTs can have - // fallen off this oplog. Otherwise, verify that the timestamp of the first observed oplog entry - // is earlier than or equal to the minTs time. + // if its timestamp is later than the timestamp that should not have fallen off the oplog; no + // events earlier can have fallen off this oplog. Otherwise, verify that the timestamp of the + // first observed oplog entry is earlier than or equal to timestamp that should not have fallen + // off the oplog. auto oplogEntry = invariantStatusOK(repl::OplogEntry::parse(record.data.toBson())); invariant(_specificStats.docsTested == 0); const bool isNewRS = oplogEntry.getObject().binaryEqual(BSON("msg" << repl::kInitiatingSetMsg)) && oplogEntry.getOpType() == repl::OpTypeEnum::kNoop; uassert(ErrorCodes::OplogQueryMinTsMissing, - "Specified minTs has already fallen off the oplog", - isNewRS || oplogEntry.getTimestamp() <= *_params.minTs); + "Specified timestamp has already fallen off the oplog", + isNewRS || oplogEntry.getTimestamp() <= *_params.assertTsHasNotFallenOffOplog); // We don't need to check this assertion again after we've confirmed the first oplog event. - _params.assertMinTsHasNotFallenOffOplog = false; + _params.assertTsHasNotFallenOffOplog = boost::none; +} + +namespace { +bool atEndOfRangeInclusive(const CollectionScanParams& params, const WorkingSetMember& member) { + if (params.direction == CollectionScanParams::FORWARD) { + return params.maxRecord && member.recordId > *params.maxRecord; + } else { + return params.minRecord && member.recordId < *params.minRecord; + } } +} // namespace PlanStage::StageState CollectionScan::returnIfMatches(WorkingSetMember* member, WorkingSetID memberID, WorkingSetID* out) { ++_specificStats.docsTested; + + // The 'minRecord' and 'maxRecord' bounds are always inclusive, even if the query predicate is + // an exclusive inequality like $gt or $lt. In such cases, we rely on '_filter' to either + // exclude or include the endpoints as required by the user's query. + if (atEndOfRangeInclusive(_params, *member)) { + _workingSet->free(memberID); + _commonStats.isEOF = true; + return PlanStage::IS_EOF; + } + if (Filter::passes(member, _filter)) { if (_params.stopApplyingFilterAfterFirstMatch) { _filter = nullptr; } *out = memberID; return PlanStage::ADVANCED; - } else if (_endCondition && Filter::passes(member, _endCondition.get())) { - _workingSet->free(memberID); - _commonStats.isEOF = true; - return PlanStage::IS_EOF; } else { _workingSet->free(memberID); return PlanStage::NEED_TIME; diff --git a/src/mongo/db/exec/collection_scan.h b/src/mongo/db/exec/collection_scan.h index b8030e367be..8fdbb70a1e2 100644 --- a/src/mongo/db/exec/collection_scan.h +++ b/src/mongo/db/exec/collection_scan.h @@ -114,9 +114,9 @@ private: void setLatestOplogEntryTimestamp(const Record& record); /** - * Asserts that the 'minTs' specified in the query filter has not already fallen off the oplog. + * Asserts that the minimum timestamp in the query filter has not already fallen off the oplog. */ - void assertMinTsHasNotFallenOffOplog(const Record& record); + void assertTsHasNotFallenOffOplog(const Record& record); // WorkingSet is not owned by us. WorkingSet* _workingSet; @@ -124,11 +124,6 @@ private: // The filter is not owned by us. const MatchExpression* _filter; - // If a document does not pass '_filter' but passes '_endCondition', stop scanning and return - // IS_EOF. - BSONObj _endConditionBSON; - std::unique_ptr<GTEMatchExpression> _endCondition; - std::unique_ptr<SeekableRecordCursor> _cursor; CollectionScanParams _params; diff --git a/src/mongo/db/exec/collection_scan_common.h b/src/mongo/db/exec/collection_scan_common.h index fb847553b36..39254279e09 100644 --- a/src/mongo/db/exec/collection_scan_common.h +++ b/src/mongo/db/exec/collection_scan_common.h @@ -40,23 +40,30 @@ struct CollectionScanParams { BACKWARD = -1, }; - // If present, the collection scan will seek directly to the RecordId of an oplog entry as - // close to 'minTs' as possible without going higher. Must only be set on forward oplog scans. - // This field cannot be used in conjunction with 'resumeAfterRecordId'. - boost::optional<Timestamp> minTs; + // If present, this parameter sets the start point of a forward scan or the end point of a + // reverse scan. A forward scan will start scanning at the document with the lowest RecordId + // greater than or equal to minRecord. A reverse scan will stop and return EOF on the first + // document with a RecordId less than minRecord, or a higher record if none exists. May only + // be used for scans on collections clustered by _id and forward oplog scans. If exclusive + // bounds are required, a MatchExpression must be passed to the CollectionScan stage. This field + // cannot be used in conjunction with 'resumeAfterRecordId' + boost::optional<RecordId> minRecord; - // If present, the collection scan will stop and return EOF the first time it sees a document - // that does not pass the filter and has 'ts' greater than 'maxTs'. Must only be set on forward - // oplog scans. - // This field cannot be used in conjunction with 'resumeAfterRecordId'. - boost::optional<Timestamp> maxTs; + // If present, this parameter sets the start point of a reverse scan or the end point of a + // forward scan. A forward scan will stop and return EOF on the first document with a RecordId + // greater than maxRecord. A reverse scan will start scanning at the document with the + // highest RecordId less than or equal to maxRecord, or a lower record if none exists. May + // only be used for scans on collections clustered by _id and forward oplog scans. If exclusive + // bounds are required, a MatchExpression must be passed to the CollectionScan stage. This field + // cannot be used in conjunction with 'resumeAfterRecordId'. + boost::optional<RecordId> maxRecord; // If true, the collection scan will return a token that can be used to resume the scan. bool requestResumeToken = false; // If present, the collection scan will seek to the exact RecordId, or return KeyNotFound if it // does not exist. Must only be set on forward collection scans. - // This field cannot be used in conjunction with 'minTs' or 'maxTs'. + // This field cannot be used in conjunction with 'minRecord' or 'maxRecord'. boost::optional<RecordId> resumeAfterRecordId; Direction direction = FORWARD; @@ -64,8 +71,8 @@ struct CollectionScanParams { // Do we want the scan to be 'tailable'? Only meaningful if the collection is capped. bool tailable = false; - // Should we assert that the specified minTS has not fallen off the oplog? - bool assertMinTsHasNotFallenOffOplog = false; + // Assert that the specified timestamp has not fallen off the oplog on a forward scan. + boost::optional<Timestamp> assertTsHasNotFallenOffOplog = boost::none; // Should we keep track of the timestamp of the latest oplog entry we've seen? This information // is needed to merge cursors from the oplog in order of operation time when reading the oplog diff --git a/src/mongo/db/exec/plan_stats.h b/src/mongo/db/exec/plan_stats.h index a40685f03aa..aad7e562913 100644 --- a/src/mongo/db/exec/plan_stats.h +++ b/src/mongo/db/exec/plan_stats.h @@ -38,6 +38,7 @@ #include "mongo/db/jsobj.h" #include "mongo/db/query/plan_summary_stats.h" #include "mongo/db/query/stage_types.h" +#include "mongo/db/record_id.h" #include "mongo/util/container_size_helper.h" #include "mongo/util/time_support.h" @@ -246,13 +247,11 @@ struct CollectionScanStats : public SpecificStats { bool tailable{false}; - // The start location of the scan. Must only be set on forward oplog scans. - boost::optional<Timestamp> minTs; + // The start location of a forward scan and end location for a reverse scan. + boost::optional<RecordId> minRecord; - // Indicates that the collection scan will stop and return EOF the first time it sees a - // document that does not pass the filter and has a "ts" Timestamp field greater than 'maxTs'. - // Must only be set on forward oplog scans. - boost::optional<Timestamp> maxTs; + // The end location of a reverse scan and start location for a forward scan. + boost::optional<RecordId> maxRecord; }; struct CountStats : public SpecificStats { diff --git a/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp index fbea1ea1076..668596500a0 100644 --- a/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp +++ b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp @@ -77,6 +77,9 @@ public: boost::optional<Record> seekExact(const RecordId& id) override { return Record{}; } + boost::optional<Record> seekNear(const RecordId& id) override { + return boost::none; + } void save() override {} bool restore() override { return true; @@ -115,7 +118,7 @@ public: _data.push_back(mutableDoc.freeze().toBson()); Record record; record.data = {_data.back().objdata(), _data.back().objsize()}; - record.id = RecordId{static_cast<int64_t>(_data.size())}; + record.id = RecordId{doc["ts"].getTimestamp().asLL()}; _records.push_back(std::move(record)); } @@ -162,9 +165,9 @@ public: : DocumentSourceMock({}, expCtx), _collectionPtr(&_collection) { _filterExpr = BSON("ns" << kTestNs); _filter = MatchExpressionParser::parseAndNormalize(_filterExpr, pExpCtx); - _params.assertMinTsHasNotFallenOffOplog = true; + _params.assertTsHasNotFallenOffOplog = Timestamp(0); _params.shouldTrackLatestOplogTimestamp = true; - _params.minTs = Timestamp(0, 0); + _params.minRecord = RecordId(0); _params.tailable = true; } @@ -172,7 +175,8 @@ public: invariant(!_collScan); _filterExpr = BSON("ns" << kTestNs << "ts" << BSON("$gte" << resumeToken.clusterTime)); _filter = MatchExpressionParser::parseAndNormalize(_filterExpr, pExpCtx); - _params.minTs = resumeToken.clusterTime; + _params.minRecord = RecordId(resumeToken.clusterTime.asLL()); + _params.assertTsHasNotFallenOffOplog = resumeToken.clusterTime; } void push_back(GetNextResult&& result) { diff --git a/src/mongo/db/query/SConscript b/src/mongo/db/query/SConscript index c12394340b0..da30db42262 100644 --- a/src/mongo/db/query/SConscript +++ b/src/mongo/db/query/SConscript @@ -70,6 +70,7 @@ env.Library( "query_knobs", ], LIBDEPS_PRIVATE=[ + "$BUILD_DIR/mongo/db/record_id_helpers", "$BUILD_DIR/mongo/idl/server_parameter", ], ) diff --git a/src/mongo/db/query/classic_stage_builder.cpp b/src/mongo/db/query/classic_stage_builder.cpp index 5a5d2f311dd..3d52f8b3a9f 100644 --- a/src/mongo/db/query/classic_stage_builder.cpp +++ b/src/mongo/db/query/classic_stage_builder.cpp @@ -62,8 +62,8 @@ #include "mongo/db/exec/text.h" #include "mongo/db/index/fts_access_method.h" #include "mongo/db/matcher/extensions_callback_real.h" +#include "mongo/db/record_id_helpers.h" #include "mongo/db/s/collection_sharding_state.h" -#include "mongo/db/storage/oplog_hack.h" #include "mongo/logv2/log.h" namespace mongo::stage_builder { @@ -78,12 +78,12 @@ std::unique_ptr<PlanStage> ClassicStageBuilder::build(const QuerySolutionNode* r CollectionScanParams params; params.tailable = csn->tailable; params.shouldTrackLatestOplogTimestamp = csn->shouldTrackLatestOplogTimestamp; - params.assertMinTsHasNotFallenOffOplog = csn->assertMinTsHasNotFallenOffOplog; + params.assertTsHasNotFallenOffOplog = csn->assertTsHasNotFallenOffOplog; params.direction = (csn->direction == 1) ? CollectionScanParams::FORWARD : CollectionScanParams::BACKWARD; params.shouldWaitForOplogVisibility = csn->shouldWaitForOplogVisibility; - params.minTs = csn->minTs; - params.maxTs = csn->maxTs; + params.minRecord = csn->minRecord; + params.maxRecord = csn->maxRecord; params.requestResumeToken = csn->requestResumeToken; params.resumeAfterRecordId = csn->resumeAfterRecordId; params.stopApplyingFilterAfterFirstMatch = csn->stopApplyingFilterAfterFirstMatch; diff --git a/src/mongo/db/query/plan_explainer_impl.cpp b/src/mongo/db/query/plan_explainer_impl.cpp index 480d8c663a2..b5ffc341545 100644 --- a/src/mongo/db/query/plan_explainer_impl.cpp +++ b/src/mongo/db/query/plan_explainer_impl.cpp @@ -235,11 +235,17 @@ void statsToBSON(const PlanStageStats& stats, } else if (STAGE_COLLSCAN == stats.stageType) { CollectionScanStats* spec = static_cast<CollectionScanStats*>(stats.specific.get()); bob->append("direction", spec->direction > 0 ? "forward" : "backward"); - if (spec->minTs) { - bob->append("minTs", *(spec->minTs)); - } - if (spec->maxTs) { - bob->append("maxTs", *(spec->maxTs)); + if (spec->minRecord) { + spec->minRecord->withFormat( + [&](RecordId::Null n) { bob->appendNull("minRecord"); }, + [&](int64_t rid) { bob->append("minRecord", rid); }, + [&](const char* str, int size) { bob->append("minRecord", OID::from(str)); }); + } + if (spec->maxRecord) { + spec->maxRecord->withFormat( + [&](RecordId::Null n) { bob->appendNull("maxRecord"); }, + [&](int64_t rid) { bob->append("maxRecord", rid); }, + [&](const char* str, int size) { bob->append("maxRecord", OID::from(str)); }); } if (verbosity >= ExplainOptions::Verbosity::kExecStats) { bob->appendNumber("docsExamined", static_cast<long long>(spec->docsTested)); diff --git a/src/mongo/db/query/plan_explainer_sbe.cpp b/src/mongo/db/query/plan_explainer_sbe.cpp index 61106a9674e..0f9df195196 100644 --- a/src/mongo/db/query/plan_explainer_sbe.cpp +++ b/src/mongo/db/query/plan_explainer_sbe.cpp @@ -65,11 +65,17 @@ void statsToBSON(const QuerySolutionNode* node, case STAGE_COLLSCAN: { auto csn = static_cast<const CollectionScanNode*>(node); bob->append("direction", csn->direction > 0 ? "forward" : "backward"); - if (csn->minTs) { - bob->append("minTs", *csn->minTs); + if (csn->minRecord) { + csn->minRecord->withFormat( + [&](RecordId::Null n) { bob->appendNull("minRecord"); }, + [&](int64_t rid) { bob->append("minRecord", rid); }, + [&](const char* str, int size) { bob->append("minRecord", OID::from(str)); }); } - if (csn->maxTs) { - bob->append("maxTs", *csn->maxTs); + if (csn->maxRecord) { + csn->maxRecord->withFormat( + [&](RecordId::Null n) { bob->appendNull("maxRecord"); }, + [&](int64_t rid) { bob->append("maxRecord", rid); }, + [&](const char* str, int size) { bob->append("maxRecord", OID::from(str)); }); } break; } diff --git a/src/mongo/db/query/planner_access.cpp b/src/mongo/db/query/planner_access.cpp index 194110865fd..56893e7ab11 100644 --- a/src/mongo/db/query/planner_access.cpp +++ b/src/mongo/db/query/planner_access.cpp @@ -50,6 +50,7 @@ #include "mongo/db/query/query_knobs_gen.h" #include "mongo/db/query/query_planner.h" #include "mongo/db/query/query_planner_common.h" +#include "mongo/db/record_id_helpers.h" #include "mongo/logv2/log.h" #include "mongo/util/transitional_tools_do_not_use/vector_spooling.h" @@ -217,8 +218,6 @@ std::unique_ptr<QuerySolutionNode> QueryPlannerAccess::makeCollectionScan( csn->tailable = tailable; csn->shouldTrackLatestOplogTimestamp = params.options & QueryPlannerParams::TRACK_LATEST_OPLOG_TS; - csn->assertMinTsHasNotFallenOffOplog = - params.options & QueryPlannerParams::ASSERT_MIN_TS_HAS_NOT_FALLEN_OFF_OPLOG; csn->shouldWaitForOplogVisibility = params.options & QueryPlannerParams::OPLOG_SCAN_WAIT_FOR_VISIBLE; @@ -257,11 +256,29 @@ std::unique_ptr<QuerySolutionNode> QueryPlannerAccess::makeCollectionScan( } } + const bool assertMinTsHasNotFallenOffOplog = + params.options & QueryPlannerParams::ASSERT_MIN_TS_HAS_NOT_FALLEN_OFF_OPLOG; if (query.nss().isOplog() && csn->direction == 1) { // Optimizes the start and end location parameters for a collection scan for an oplog // collection. Not compatible with $_resumeAfter so we do not optimize in that case. if (resumeAfterObj.isEmpty()) { - std::tie(csn->minTs, csn->maxTs) = extractTsRange(query.root()); + auto [minTs, maxTs] = extractTsRange(query.root()); + if (minTs) { + StatusWith<RecordId> goal = record_id_helpers::keyForOptime(*minTs); + if (goal.isOK()) { + csn->minRecord = goal.getValue(); + } + + if (assertMinTsHasNotFallenOffOplog) { + csn->assertTsHasNotFallenOffOplog = *minTs; + } + } + if (maxTs) { + StatusWith<RecordId> goal = record_id_helpers::keyForOptime(*maxTs); + if (goal.isOK()) { + csn->maxRecord = goal.getValue(); + } + } } // If the query is just a lower bound on "ts" on a forward scan, every document in the @@ -275,11 +292,12 @@ std::unique_ptr<QuerySolutionNode> QueryPlannerAccess::makeCollectionScan( // The user may have requested 'assertMinTsHasNotFallenOffOplog' for a query that does not // specify a minimum timestamp. This is not a valid request, so we throw InvalidOptions. - uassert(ErrorCodes::InvalidOptions, - str::stream() << "assertMinTsHasNotFallenOffOplog cannot be applied to a query " - "which does not imply a minimum 'ts' value ", - !(csn->assertMinTsHasNotFallenOffOplog && !csn->minTs)); - + if (assertMinTsHasNotFallenOffOplog) { + uassert(ErrorCodes::InvalidOptions, + str::stream() << "assertTsHasNotFallenOffOplog cannot be applied to a query " + "which does not imply a minimum 'ts' value ", + csn->assertTsHasNotFallenOffOplog); + } return csn; } diff --git a/src/mongo/db/query/query_solution.cpp b/src/mongo/db/query/query_solution.cpp index 35b16f72588..d86f6c5865f 100644 --- a/src/mongo/db/query/query_solution.cpp +++ b/src/mongo/db/query/query_solution.cpp @@ -231,7 +231,7 @@ QuerySolutionNode* CollectionScanNode::clone() const { copy->tailable = this->tailable; copy->direction = this->direction; copy->shouldTrackLatestOplogTimestamp = this->shouldTrackLatestOplogTimestamp; - copy->assertMinTsHasNotFallenOffOplog = this->assertMinTsHasNotFallenOffOplog; + copy->assertTsHasNotFallenOffOplog = this->assertTsHasNotFallenOffOplog; copy->shouldWaitForOplogVisibility = this->shouldWaitForOplogVisibility; return copy; diff --git a/src/mongo/db/query/query_solution.h b/src/mongo/db/query/query_solution.h index 7e5d6632a74..5a7f6a0db09 100644 --- a/src/mongo/db/query/query_solution.h +++ b/src/mongo/db/query/query_solution.h @@ -470,23 +470,20 @@ struct CollectionScanNode : public QuerySolutionNodeWithSortSet { // Name of the namespace. std::string name; - // If present, the collection scan will seek directly to the RecordId of an oplog entry as - // close to 'minTs' as possible without going higher. Should only be set on forward oplog scans. - // This field cannot be used in conjunction with 'resumeAfterRecordId'. - boost::optional<Timestamp> minTs; + // If present, this parameter sets the start point of a forward scan or the end point of a + // reverse scan. + boost::optional<RecordId> minRecord; - // If present the collection scan will stop and return EOF the first time it sees a document - // that does not pass the filter and has 'ts' greater than 'maxTs'. Should only be set on - // forward oplog scans. - // This field cannot be used in conjunction with 'resumeAfterRecordId'. - boost::optional<Timestamp> maxTs; + // If present, this parameter sets the start point of a reverse scan or the end point of a + // forward scan. + boost::optional<RecordId> maxRecord; // If true, the collection scan will return a token that can be used to resume the scan. bool requestResumeToken = false; // If present, the collection scan will seek to the exact RecordId, or return KeyNotFound if it // does not exist. Must only be set on forward collection scans. - // This field cannot be used in conjunction with 'minTs' or 'maxTs'. + // This field cannot be used in conjunction with 'minRecord' or 'maxRecord'. boost::optional<RecordId> resumeAfterRecordId; // Should we make a tailable cursor? @@ -497,8 +494,8 @@ struct CollectionScanNode : public QuerySolutionNodeWithSortSet { // across a sharded cluster. bool shouldTrackLatestOplogTimestamp = false; - // Should we assert that the specified minTS has not fallen off the oplog? - bool assertMinTsHasNotFallenOffOplog = false; + // Assert that the specified timestamp has not fallen off the oplog. + boost::optional<Timestamp> assertTsHasNotFallenOffOplog = boost::none; int direction{1}; diff --git a/src/mongo/db/query/sbe_stage_builder_coll_scan.cpp b/src/mongo/db/query/sbe_stage_builder_coll_scan.cpp index 252a5be9ccf..f8e8d038c63 100644 --- a/src/mongo/db/query/sbe_stage_builder_coll_scan.cpp +++ b/src/mongo/db/query/sbe_stage_builder_coll_scan.cpp @@ -46,7 +46,7 @@ #include "mongo/db/query/sbe_stage_builder_filter.h" #include "mongo/db/query/sbe_stage_builder_helpers.h" #include "mongo/db/query/util/make_data_structure.h" -#include "mongo/db/storage/oplog_hack.h" +#include "mongo/db/record_id_helpers.h" #include "mongo/logv2/log.h" #include "mongo/util/str.h" @@ -126,16 +126,15 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> generateOptimizedOplo bool isTailableResumeBranch, sbe::LockAcquisitionCallback lockAcquisitionCallback) { invariant(collection->ns().isOplog()); - // The minTs and maxTs optimizations are not compatible with resumeAfterRecordId and can only - // be done for a forward scan. + // The minRecord and maxRecord optimizations are not compatible with resumeAfterRecordId and can + // only be done for a forward scan. invariant(!csn->resumeAfterRecordId); invariant(csn->direction == CollectionScanParams::FORWARD); auto resultSlot = slotIdGenerator->generate(); auto recordIdSlot = slotIdGenerator->generate(); - // See if the RecordStore supports the oplogStartHack. If so, the scan will start from the - // RecordId stored in seekRecordId. + // Start the scan from the RecordId stored in seekRecordId. // Otherwise, if we're building a collection scan for a resume branch of a special union // sub-tree implementing a tailable cursor scan, we can use the seekRecordIdSlot directly // to access the recordId to resume the scan from. @@ -144,25 +143,22 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> generateOptimizedOplo if (isTailableResumeBranch) { auto resumeRecordIdSlot = env->getSlot("resumeRecordId"_sd); return {{}, resumeRecordIdSlot}; - } else if (csn->minTs) { - auto goal = oploghack::keyForOptime(*csn->minTs); - if (goal.isOK()) { - auto startLoc = - collection->getRecordStore()->oplogStartHack(opCtx, goal.getValue()); - if (startLoc && !startLoc->isNull()) { - LOGV2_DEBUG(205841, 3, "Using direct oplog seek"); - return {startLoc, slotIdGenerator->generate()}; - } + } else if (csn->minRecord) { + auto cursor = collection->getRecordStore()->getCursor(opCtx); + auto startRec = cursor->seekNear(*csn->minRecord); + if (startRec) { + LOGV2_DEBUG(205841, 3, "Using direct oplog seek"); + return {startRec->id, slotIdGenerator->generate()}; } } return {}; }(); // Check if we need to project out an oplog 'ts' field as part of the collection scan. We will - // need it either when 'maxTs' bound has been provided, so that we can apply an EOF filter, of - // if we need to track the latest oplog timestamp. + // need it either when 'maxRecord' bound has been provided, so that we can apply an EOF filter, + // of if we need to track the latest oplog timestamp. const auto shouldTrackLatestOplogTimestamp = !csn->stopApplyingFilterAfterFirstMatch && - (csn->maxTs || csn->shouldTrackLatestOplogTimestamp); + (csn->maxRecord || csn->shouldTrackLatestOplogTimestamp); auto&& [fields, slots, tsSlot] = makeOplogTimestampSlotsIfNeeded( collection, slotIdGenerator, shouldTrackLatestOplogTimestamp); @@ -179,7 +175,7 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> generateOptimizedOplo lockAcquisitionCallback, makeOpenCallbackIfNeeded(collection, csn)); - // Start the scan from the seekRecordId if we can use the oplogStartHack. + // Start the scan from the seekRecordId. if (seekRecordId) { invariant(seekRecordIdSlot); @@ -199,14 +195,12 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> generateOptimizedOplo } // Create a filter which checks the first document to ensure either that its 'ts' is less than - // or equal to minTs, or that it is a replica set initialization message. If this fails, then we - // throw ErrorCodes::OplogQueryMinTsMissing. We avoid doing this check on the resumable branch - // of a tailable scan; it only needs to be done once, when the initial branch is run. - if (csn->assertMinTsHasNotFallenOffOplog && !isTailableResumeBranch) { - // We should never see 'assertMinTsHasNotFallenOffOplog' if 'minTS' is not present. This can - // be incorrectly requested by the user, but in that case we should already have uasserted. + // or equal the minimum timestamp that should not have rolled off the oplog, or that it is a + // replica set initialization message. If this fails, then we throw + // ErrorCodes::OplogQueryMinTsMissing. We avoid doing this check on the resumable branch of a + // tailable scan; it only needs to be done once, when the initial branch is run. + if (csn->assertTsHasNotFallenOffOplog && !isTailableResumeBranch) { invariant(csn->shouldTrackLatestOplogTimestamp); - invariant(csn->minTs); // We will be constructing a filter that needs to see the 'ts' field. We name it 'minTsSlot' // here so that it does not shadow the 'tsSlot' which we allocated earlier. @@ -261,10 +255,10 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> generateOptimizedOplo sbe::makeE<sbe::EIf>( makeBinaryOp( sbe::EPrimBinary::logicOr, - makeBinaryOp( - sbe::EPrimBinary::lessEq, - makeVariable(*minTsSlot), - makeConstant(sbe::value::TypeTags::Timestamp, csn->minTs->asULL())), + makeBinaryOp(sbe::EPrimBinary::lessEq, + makeVariable(*minTsSlot), + makeConstant(sbe::value::TypeTags::Timestamp, + csn->assertTsHasNotFallenOffOplog->asULL())), makeBinaryOp( sbe::EPrimBinary::logicAnd, makeBinaryOp( @@ -306,8 +300,8 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> generateOptimizedOplo // Add an EOF filter to stop the scan after we fetch the first document that has 'ts' greater // than the upper bound. - if (csn->maxTs) { - // The 'maxTs' optimization is not compatible with 'stopApplyingFilterAfterFirstMatch'. + if (csn->maxRecord) { + // The 'maxRecord' optimization is not compatible with 'stopApplyingFilterAfterFirstMatch'. invariant(!csn->stopApplyingFilterAfterFirstMatch); invariant(tsSlot); @@ -315,7 +309,7 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> generateOptimizedOplo std::move(stage), makeBinaryOp(sbe::EPrimBinary::lessEq, makeVariable(*tsSlot), - makeConstant(sbe::value::TypeTags::Timestamp, csn->maxTs->asULL())), + makeConstant(sbe::value::TypeTags::Timestamp, csn->maxRecord->asLong())), csn->nodeId()); } @@ -361,7 +355,7 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> generateOptimizedOplo // inner branch, and the execution will continue from this point further on, without // applying the filter. if (csn->stopApplyingFilterAfterFirstMatch) { - invariant(csn->minTs); + invariant(csn->minRecord); invariant(csn->direction == CollectionScanParams::FORWARD); std::tie(fields, slots, tsSlot) = makeOplogTimestampSlotsIfNeeded( @@ -569,7 +563,7 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> generateCollScan( sbe::RuntimeEnvironment* env, bool isTailableResumeBranch, sbe::LockAcquisitionCallback lockAcquisitionCallback) { - if (csn->minTs || csn->maxTs) { + if (csn->minRecord || csn->maxRecord) { return generateOptimizedOplogScan(opCtx, collection, csn, diff --git a/src/mongo/db/record_id.h b/src/mongo/db/record_id.h index 370a0d5b350..9189e77f1ab 100644 --- a/src/mongo/db/record_id.h +++ b/src/mongo/db/record_id.h @@ -139,12 +139,12 @@ public: } int compare(const RecordId& rhs) const { - // Null always compares less than every other RecordId. - if (isNull() && rhs.isNull()) { + // Null always compares less than every other RecordId format. + if (_format == Format::kNull && rhs._format == Format::kNull) { return 0; - } else if (isNull()) { + } else if (_format == Format::kNull) { return -1; - } else if (rhs.isNull()) { + } else if (rhs._format == Format::kNull) { return 1; } invariant(_format == rhs._format); diff --git a/src/mongo/db/storage/oplog_hack.cpp b/src/mongo/db/record_id_helpers.cpp index dc3ac3e2f5f..2cc3bb43353 100644 --- a/src/mongo/db/storage/oplog_hack.cpp +++ b/src/mongo/db/record_id_helpers.cpp @@ -29,7 +29,7 @@ #define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kStorage -#include "mongo/db/storage/oplog_hack.h" +#include "mongo/db/record_id_helpers.h" #include <limits> @@ -40,7 +40,7 @@ #include "mongo/util/debug_util.h" namespace mongo { -namespace oploghack { +namespace record_id_helpers { StatusWith<RecordId> keyForOptime(const Timestamp& opTime) { // Make sure secs and inc wouldn't be negative if treated as signed. This ensures that they @@ -80,5 +80,5 @@ StatusWith<RecordId> extractKey(const char* data, int len) { return keyForOptime(elem.timestamp()); } -} // namespace oploghack +} // namespace record_id_helpers } // namespace mongo diff --git a/src/mongo/db/storage/oplog_hack.h b/src/mongo/db/record_id_helpers.h index 2aac4b60556..ed9016822f3 100644 --- a/src/mongo/db/storage/oplog_hack.h +++ b/src/mongo/db/record_id_helpers.h @@ -36,7 +36,7 @@ namespace mongo { class RecordId; class Timestamp; -namespace oploghack { +namespace record_id_helpers { /** * Converts Timestamp to a RecordId in an unspecified manor that is safe to use as the key to @@ -49,5 +49,5 @@ StatusWith<RecordId> keyForOptime(const Timestamp& opTime); */ StatusWith<RecordId> extractKey(const char* data, int len); -} // namespace oploghack +} // namespace record_id_helpers } // namespace mongo diff --git a/src/mongo/db/record_id_test.cpp b/src/mongo/db/record_id_test.cpp index e606762a251..63f36c07029 100644 --- a/src/mongo/db/record_id_test.cpp +++ b/src/mongo/db/record_id_test.cpp @@ -112,13 +112,13 @@ TEST(RecordId, OidTest) { TEST(RecordId, NullTest) { // The int64 format should be considered null if its value is 0. Likewise, the value should be // interpreted as int64_t(0) if it is null. - RecordId nullRid(0); - ASSERT(nullRid.isNull()); - - RecordId rid0; + RecordId rid0(0); ASSERT(rid0.isNull()); - ASSERT_EQ(0, rid0.asLong()); - ASSERT_EQ(nullRid, rid0); + + RecordId nullRid; + ASSERT(nullRid.isNull()); + ASSERT_EQ(0, nullRid.asLong()); + ASSERT_NE(rid0, nullRid); } TEST(RecordId, OidTestCompare) { diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp index dba72ba9275..2bd275c6a8d 100644 --- a/src/mongo/db/repl/oplog.cpp +++ b/src/mongo/db/repl/oplog.cpp @@ -407,7 +407,7 @@ OpTime logOp(OperationContext* opCtx, MutableOplogEntry* oplogEntry) { auto bsonOplogEntry = oplogEntry->toBSON(); // The storage engine will assign the RecordId based on the "ts" field of the oplog entry, see - // oploghack::extractKey. + // record_id_helpers::extractKey. std::vector<Record> records{ {RecordId(), RecordData(bsonOplogEntry.objdata(), bsonOplogEntry.objsize())}}; std::vector<Timestamp> timestamps{slot.getTimestamp()}; @@ -477,7 +477,7 @@ std::vector<OpTime> logInsertOps(OperationContext* opCtx, timestamps[i] = insertStatementOplogSlot.getTimestamp(); bsonOplogEntries[i] = oplogEntry.toBSON(); // The storage engine will assign the RecordId based on the "ts" field of the oplog entry, - // see oploghack::extractKey. + // see record_id_helpers::extractKey. records[i] = Record{ RecordId(), RecordData(bsonOplogEntries[i].objdata(), bsonOplogEntries[i].objsize())}; } diff --git a/src/mongo/db/storage/SConscript b/src/mongo/db/storage/SConscript index 28f61230234..2891e015e3f 100644 --- a/src/mongo/db/storage/SConscript +++ b/src/mongo/db/storage/SConscript @@ -105,16 +105,6 @@ env.Library( ) env.Library( - target='oplog_hack', - source=[ - 'oplog_hack.cpp', - ], - LIBDEPS=[ - '$BUILD_DIR/mongo/base', - ], -) - -env.Library( target='storage_control', source=[ 'control/storage_control.cpp', diff --git a/src/mongo/db/storage/devnull/SConscript b/src/mongo/db/storage/devnull/SConscript index d1cf4757d68..d5f1a5c30c8 100644 --- a/src/mongo/db/storage/devnull/SConscript +++ b/src/mongo/db/storage/devnull/SConscript @@ -11,7 +11,7 @@ env.Library( 'ephemeral_catalog_record_store.cpp', ], LIBDEPS=[ - '$BUILD_DIR/mongo/db/storage/oplog_hack', + '$BUILD_DIR/mongo/db/record_id_helpers', '$BUILD_DIR/mongo/db/storage/recovery_unit_base', ], LIBDEPS_PRIVATE=[ diff --git a/src/mongo/db/storage/devnull/devnull_kv_engine.cpp b/src/mongo/db/storage/devnull/devnull_kv_engine.cpp index 7f380c8b980..ce2f1fe5e28 100644 --- a/src/mongo/db/storage/devnull/devnull_kv_engine.cpp +++ b/src/mongo/db/storage/devnull/devnull_kv_engine.cpp @@ -47,6 +47,9 @@ public: boost::optional<Record> seekExact(const RecordId& id) final { return {}; } + boost::optional<Record> seekNear(const RecordId& id) final { + return {}; + } void save() final {} bool restore() final { return true; diff --git a/src/mongo/db/storage/devnull/ephemeral_catalog_record_store.cpp b/src/mongo/db/storage/devnull/ephemeral_catalog_record_store.cpp index fef0b5cb35a..f39c325863d 100644 --- a/src/mongo/db/storage/devnull/ephemeral_catalog_record_store.cpp +++ b/src/mongo/db/storage/devnull/ephemeral_catalog_record_store.cpp @@ -36,7 +36,7 @@ #include "mongo/db/jsobj.h" #include "mongo/db/namespace_string.h" #include "mongo/db/operation_context.h" -#include "mongo/db/storage/oplog_hack.h" +#include "mongo/db/record_id_helpers.h" #include "mongo/db/storage/recovery_unit.h" #include "mongo/logv2/log.h" #include "mongo/util/str.h" @@ -150,6 +150,11 @@ public: return {{_it->first, _it->second.toRecordData()}}; } + boost::optional<Record> seekNear(const RecordId& id) final { + // not implemented + return boost::none; + } + void save() final { if (!_needFirstSeek && !_lastMoveWasRestore) _savedId = _it == _records.end() ? RecordId() : _it->first; @@ -223,6 +228,11 @@ public: return {{_it->first, _it->second.toRecordData()}}; } + boost::optional<Record> seekNear(const RecordId& id) final { + // not implemented + return boost::none; + } + void save() final { if (!_needFirstSeek && !_lastMoveWasRestore) _savedId = _it == _records.rend() ? RecordId() : _it->first; @@ -386,7 +396,7 @@ void EphemeralForTestRecordStore::cappedDeleteAsNeeded(WithLock lk, OperationCon StatusWith<RecordId> EphemeralForTestRecordStore::extractAndCheckLocForOplog(WithLock, const char* data, int len) const { - StatusWith<RecordId> status = oploghack::extractKey(data, len); + StatusWith<RecordId> status = record_id_helpers::extractKey(data, len); if (!status.isOK()) return status; @@ -566,29 +576,4 @@ RecordId EphemeralForTestRecordStore::allocateLoc(WithLock) { invariant(out.isValid()); return out; } - -boost::optional<RecordId> EphemeralForTestRecordStore::oplogStartHack( - OperationContext* opCtx, const RecordId& startingPosition) const { - if (!_data->isOplog) - return boost::none; - - stdx::lock_guard<stdx::recursive_mutex> lock(_data->recordsMutex); - const Records& records = _data->records; - - if (records.empty()) - return RecordId(); - - Records::const_iterator it = records.lower_bound(startingPosition); - if (it == records.end() || it->first > startingPosition) { - // If the startingPosition is before the oldest oplog entry, this ensures that we return - // RecordId() as specified in record_store.h. - if (it == records.begin()) { - return RecordId(); - } - --it; - } - - return it->first; -} - } // namespace mongo diff --git a/src/mongo/db/storage/devnull/ephemeral_catalog_record_store.h b/src/mongo/db/storage/devnull/ephemeral_catalog_record_store.h index 2a405eddb93..b56fc826d3c 100644 --- a/src/mongo/db/storage/devnull/ephemeral_catalog_record_store.h +++ b/src/mongo/db/storage/devnull/ephemeral_catalog_record_store.h @@ -108,9 +108,6 @@ public: return _data->records.size(); } - virtual boost::optional<RecordId> oplogStartHack(OperationContext* opCtx, - const RecordId& startingPosition) const; - void waitForAllEarlierOplogWritesToBeVisible(OperationContext* opCtx) const override {} virtual void updateStatsAfterRepair(OperationContext* opCtx, diff --git a/src/mongo/db/storage/ephemeral_for_test/SConscript b/src/mongo/db/storage/ephemeral_for_test/SConscript index 939f5bf1b80..d876aa0b96d 100644 --- a/src/mongo/db/storage/ephemeral_for_test/SConscript +++ b/src/mongo/db/storage/ephemeral_for_test/SConscript @@ -22,8 +22,8 @@ env.Library( LIBDEPS_PRIVATE=[ '$BUILD_DIR/mongo/db/catalog/collection_options', '$BUILD_DIR/mongo/db/commands/server_status', + '$BUILD_DIR/mongo/db/record_id_helpers', '$BUILD_DIR/mongo/db/storage/key_string', - '$BUILD_DIR/mongo/db/storage/oplog_hack', '$BUILD_DIR/mongo/db/storage/storage_options', '$BUILD_DIR/mongo/db/storage/write_unit_of_work', ], diff --git a/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_kv_engine.cpp b/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_kv_engine.cpp index 3b3b4308c68..4206a42ed3a 100644 --- a/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_kv_engine.cpp +++ b/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_kv_engine.cpp @@ -264,6 +264,9 @@ public: boost::optional<Record> seekExact(const RecordId& id) final { return {}; } + boost::optional<Record> seekNear(const RecordId& id) final { + return {}; + } void save() final {} bool restore() final { return true; diff --git a/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_record_store.cpp b/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_record_store.cpp index b5148c8aff4..a58e83c90e8 100644 --- a/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_record_store.cpp +++ b/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_record_store.cpp @@ -39,12 +39,13 @@ #include "mongo/bson/bsonobj.h" #include "mongo/db/operation_context.h" +#include "mongo/db/record_id_helpers.h" #include "mongo/db/storage/ephemeral_for_test/ephemeral_for_test_radix_store.h" #include "mongo/db/storage/ephemeral_for_test/ephemeral_for_test_recovery_unit.h" #include "mongo/db/storage/ephemeral_for_test/ephemeral_for_test_visibility_manager.h" #include "mongo/db/storage/key_string.h" -#include "mongo/db/storage/oplog_hack.h" #include "mongo/db/storage/write_unit_of_work.h" +#include "mongo/logv2/log.h" #include "mongo/util/hex.h" namespace mongo { @@ -162,7 +163,7 @@ Status RecordStore::insertRecords(OperationContext* opCtx, int64_t thisRecordId = 0; if (_isOplog) { StatusWith<RecordId> status = - oploghack::extractKey(record.data.data(), record.data.size()); + record_id_helpers::extractKey(record.data.data(), record.data.size()); if (!status.isOK()) return status.getStatus(); thisRecordId = status.getValue().asLong(); @@ -306,29 +307,6 @@ void RecordStore::waitForAllEarlierOplogWritesToBeVisible(OperationContext* opCt _visibilityManager->waitForAllEarlierOplogWritesToBeVisible(opCtx); } -boost::optional<RecordId> RecordStore::oplogStartHack(OperationContext* opCtx, - const RecordId& startingPosition) const { - if (!_isOplog) - return boost::none; - - if (numRecords(opCtx) == 0) - return RecordId(); - - StringStore* workingCopy{RecoveryUnit::get(opCtx)->getHead()}; - - std::string key = createKey(_ident, startingPosition.asLong()); - StringStore::const_reverse_iterator it(workingCopy->upper_bound(key)); - - if (it == workingCopy->rend()) - return RecordId(); - - RecordId rid = RecordId(extractRecordId(it->first)); - if (rid > startingPosition) - return RecordId(); - - return rid; -} - Status RecordStore::oplogDiskLocRegister(OperationContext* opCtx, const Timestamp& opTime, bool orderedCommit) { @@ -472,6 +450,59 @@ boost::optional<Record> RecordStore::Cursor::seekExact(const RecordId& id) { return Record{id, RecordData(it->second.c_str(), it->second.length())}; } +boost::optional<Record> RecordStore::Cursor::seekNear(const RecordId& id) { + _savedPosition = boost::none; + _lastMoveWasRestore = false; + + RecordId search = id; + if (_rs._isOplog && id > _oplogVisibility) { + search = RecordId(_oplogVisibility); + } + + auto numRecords = _rs.numRecords(opCtx); + if (numRecords == 0) + return boost::none; + + StringStore* workingCopy{RecoveryUnit::get(opCtx)->getHead()}; + std::string key = createKey(_rs._ident, search.asLong()); + // We may land higher and that is fine per the API contract. + it = workingCopy->lower_bound(key); + + // If we're at the end of this record store, we didn't find anything >= id. Position on the + // immediately previous record, which must exist. + if (it == workingCopy->end() || !inPrefix(it->first)) { + // The reverse iterator constructor positions on the next record automatically. + StringStore::const_reverse_iterator revIt(it); + invariant(revIt != workingCopy->rend()); + it = workingCopy->lower_bound(revIt->first); + invariant(it != workingCopy->end()); + invariant(inPrefix(it->first)); + } + + // If we landed one higher, then per the API contract, we need to return the previous record. + RecordId rid = extractRecordId(it->first); + if (rid > search) { + StringStore::const_reverse_iterator revIt(it); + // The reverse iterator constructor positions on the next record automatically. + if (revIt != workingCopy->rend() && inPrefix(revIt->first)) { + it = workingCopy->lower_bound(revIt->first); + rid = RecordId(extractRecordId(it->first)); + } + // Otherwise, we hit the beginning of this record store, then there is only one record and + // we should return that. + } + + // For forward cursors on the oplog, the oplog visible timestamp is treated as the end of the + // record store. So if we are positioned past this point, then there are no visible records. + if (_rs._isOplog && rid > _oplogVisibility) { + return boost::none; + } + + _needFirstSeek = false; + _savedPosition = it->first; + return Record{rid, RecordData(it->second.c_str(), it->second.length())}; +} + // Positions are saved as we go. void RecordStore::Cursor::save() {} void RecordStore::Cursor::saveUnpositioned() {} @@ -553,6 +584,46 @@ boost::optional<Record> RecordStore::ReverseCursor::seekExact(const RecordId& id return Record{id, RecordData(it->second.c_str(), it->second.length())}; } +boost::optional<Record> RecordStore::ReverseCursor::seekNear(const RecordId& id) { + _savedPosition = boost::none; + _lastMoveWasRestore = false; + + auto numRecords = _rs.numRecords(opCtx); + if (numRecords == 0) + return boost::none; + + StringStore* workingCopy{RecoveryUnit::get(opCtx)->getHead()}; + std::string key = createKey(_rs._ident, id.asLong()); + it = StringStore::const_reverse_iterator(workingCopy->upper_bound(key)); + + // Since there is at least 1 record, if we hit the beginning we need to return the only record. + if (it == workingCopy->rend() || !inPrefix(it->first)) { + // This lands on the next key. + auto fwdIt = workingCopy->upper_bound(key); + // reverse iterator increments one item before + it = StringStore::const_reverse_iterator(++fwdIt); + invariant(it != workingCopy->end()); + invariant(inPrefix(it->first)); + } + + // If we landed lower, then per the API contract, we need to return the previous record. + RecordId rid = extractRecordId(it->first); + if (rid < id) { + // This lands on the next key. + auto fwdIt = workingCopy->upper_bound(key); + if (fwdIt != workingCopy->end() && inPrefix(fwdIt->first)) { + it = StringStore::const_reverse_iterator(++fwdIt); + } + // Otherwise, we hit the beginning of this record store, then there is only one record and + // we should return that. + } + + rid = RecordId(extractRecordId(it->first)); + _needFirstSeek = false; + _savedPosition = it->first; + return Record{rid, RecordData(it->second.c_str(), it->second.length())}; +} + void RecordStore::ReverseCursor::save() {} void RecordStore::ReverseCursor::saveUnpositioned() {} diff --git a/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_record_store.h b/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_record_store.h index 1758343b211..ec3d86461f4 100644 --- a/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_record_store.h +++ b/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_record_store.h @@ -106,9 +106,6 @@ public: BSONObjBuilder* result, double scale) const; - virtual boost::optional<RecordId> oplogStartHack(OperationContext* opCtx, - const RecordId& startingPosition) const; - void waitForAllEarlierOplogWritesToBeVisible(OperationContext* opCtx) const override; virtual void updateStatsAfterRepair(OperationContext* opCtx, @@ -191,6 +188,7 @@ private: VisibilityManager* visibilityManager); boost::optional<Record> next() final; boost::optional<Record> seekExact(const RecordId& id) final override; + boost::optional<Record> seekNear(const RecordId& id) final override; void save() final; void saveUnpositioned() final override; bool restore() final; @@ -216,6 +214,7 @@ private: VisibilityManager* visibilityManager); boost::optional<Record> next() final; boost::optional<Record> seekExact(const RecordId& id) final override; + boost::optional<Record> seekNear(const RecordId& id) final override; void save() final; void saveUnpositioned() final override; bool restore() final; diff --git a/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_recovery_unit.cpp b/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_recovery_unit.cpp index 2ac9f9d55ce..f3726d1ace6 100644 --- a/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_recovery_unit.cpp +++ b/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_recovery_unit.cpp @@ -34,8 +34,8 @@ #include <mutex> #include "mongo/db/concurrency/write_conflict_exception.h" +#include "mongo/db/record_id_helpers.h" #include "mongo/db/storage/ephemeral_for_test/ephemeral_for_test_recovery_unit.h" -#include "mongo/db/storage/oplog_hack.h" namespace mongo { namespace ephemeral_for_test { @@ -146,7 +146,7 @@ bool RecoveryUnit::forkIfNeeded() { } Status RecoveryUnit::setTimestamp(Timestamp timestamp) { - auto key = oploghack::keyForOptime(timestamp); + auto key = record_id_helpers::keyForOptime(timestamp); if (!key.isOK()) return key.getStatus(); diff --git a/src/mongo/db/storage/record_store.h b/src/mongo/db/storage/record_store.h index 3d44a0de857..e7ac8dd5d9b 100644 --- a/src/mongo/db/storage/record_store.h +++ b/src/mongo/db/storage/record_store.h @@ -184,6 +184,20 @@ public: virtual boost::optional<Record> seekExact(const RecordId& id) = 0; /** + * Positions this cursor near 'start' or an adjacent record if 'start' does not exist. If there + * is not an exact match, the cursor is positioned on the directionally previous Record. If no + * earlier record exists, the cursor is positioned on the directionally following record. + * Returns boost::none if the RecordStore is empty. + * + * For forward cursors, returns the Record with the highest RecordId less than or equal to + * 'start'. If no such record exists, positions on the next highest RecordId after 'start'. + * + * For reverse cursors, returns the Record with the lowest RecordId greater than or equal to + * 'start'. If no such record exists, positions on the next lowest RecordId before 'start'. + */ + virtual boost::optional<Record> seekNear(const RecordId& start) = 0; + + /** * Prepares for state changes in underlying data without necessarily saving the current * state. * @@ -491,18 +505,6 @@ public: double scale) const = 0; /** - * Return the RecordId of an oplog entry as close to startingPosition as possible without - * being higher. If there are no entries <= startingPosition, return RecordId(). - * - * If you don't implement the oplogStartHack, just use the default implementation which - * returns boost::none. - */ - virtual boost::optional<RecordId> oplogStartHack(OperationContext* opCtx, - const RecordId& startingPosition) const { - return boost::none; - } - - /** * When we write to an oplog, we call this so that that the storage engine can manage the * visibility of oplog entries to ensure they are ordered. * diff --git a/src/mongo/db/storage/record_store_test_harness.cpp b/src/mongo/db/storage/record_store_test_harness.cpp index 29db8562890..12e9a4bf912 100644 --- a/src/mongo/db/storage/record_store_test_harness.cpp +++ b/src/mongo/db/storage/record_store_test_harness.cpp @@ -513,5 +513,70 @@ TEST(RecordStoreTestHarness, ClusteredRecordStore) { } } +TEST(RecordStoreTestHarness, ClusteredRecordStoreSeekNear) { + const auto harnessHelper = newRecordStoreHarnessHelper(); + if (!harnessHelper->getEngine()->supportsClusteredIdIndex()) { + // Only WiredTiger supports clustered indexes on _id. + return; + } + + const std::string ns = "test.system.buckets.a"; + CollectionOptions options; + options.clusteredIndex = ClusteredIndexOptions{}; + std::unique_ptr<RecordStore> rs = harnessHelper->newNonCappedRecordStore(ns, options); + invariant(rs->keyFormat() == KeyFormat::String); + + auto opCtx = harnessHelper->newOperationContext(); + + const int numRecords = 100; + std::vector<Record> records; + std::vector<Timestamp> timestamps; + for (int i = 0; i < numRecords; i++) { + timestamps.push_back(Timestamp(i, 1)); + } + + // Insert RecordIds where the timestamp part of the OID correlates directly with the seconds in + // the Timestamp. + for (int i = 0; i < numRecords; i++) { + BSONObj doc = BSON("i" << i); + RecordData recordData = RecordData(doc.objdata(), doc.objsize()); + recordData.makeOwned(); + + auto oid = OID::gen(); + oid.setTimestamp(timestamps[i].getSecs()); + auto record = Record{RecordId(oid.view().view(), OID::kOIDSize), recordData}; + std::vector<Record> recVec = {record}; + + WriteUnitOfWork wuow(opCtx.get()); + ASSERT_OK(rs->insertRecords(opCtx.get(), &recVec, {timestamps[i]})); + wuow.commit(); + + records.push_back(record); + } + + for (int i = 0; i < numRecords; i++) { + // Generate an OID RecordId with a timestamp part and high bits elsewhere such that it + // always compares greater than or equal to the OIDs we inserted. + auto oid = OID::max(); + oid.setTimestamp(i); + auto rid = RecordId(oid.view().view(), OID::kOIDSize); + auto cur = rs->getCursor(opCtx.get()); + auto rec = cur->seekNear(rid); + ASSERT(rec); + ASSERT_EQ(records[i].id, rec->id); + } + + for (int i = 0; i < numRecords; i++) { + // Generate an OID RecordId with only a timestamp part and zeroes elsewhere such that it + // always compares less than or equal to the OIDs we inserted. + auto oid = OID(); + oid.setTimestamp(i); + auto rid = RecordId(oid.view().view(), OID::kOIDSize); + auto cur = rs->getCursor(opCtx.get(), false /* forward */); + auto rec = cur->seekNear(rid); + ASSERT(rec); + ASSERT_EQ(records[i].id, rec->id); + } +} } // namespace } // namespace mongo diff --git a/src/mongo/db/storage/record_store_test_oplog.cpp b/src/mongo/db/storage/record_store_test_oplog.cpp index 34b68dbc7f9..a0889e5fffc 100644 --- a/src/mongo/db/storage/record_store_test_oplog.cpp +++ b/src/mongo/db/storage/record_store_test_oplog.cpp @@ -63,7 +63,7 @@ RecordId _oplogOrderInsertOplog(OperationContext* opCtx, return res.getValue(); } -TEST(RecordStoreTestHarness, OplogHack) { +TEST(RecordStoreTestHarness, SeekNearOplog) { std::unique_ptr<RecordStoreHarnessHelper> harnessHelper = newRecordStoreHarnessHelper(); // Use a large enough cappedMaxSize so that the limit is not reached by doing the inserts within @@ -104,14 +104,78 @@ TEST(RecordStoreTestHarness, OplogHack) { // Make sure all are visible. rs->waitForAllEarlierOplogWritesToBeVisible(harnessHelper->newOperationContext().get()); + // Forward cursor seeks { ServiceContext::UniqueOperationContext opCtx(harnessHelper->newOperationContext()); WriteUnitOfWork wuow(opCtx.get()); - // find start - ASSERT_EQ(rs->oplogStartHack(opCtx.get(), RecordId(0, 1)), RecordId()); // nothing <= - ASSERT_EQ(rs->oplogStartHack(opCtx.get(), RecordId(2, 1)), RecordId(1, 2)); // between - ASSERT_EQ(rs->oplogStartHack(opCtx.get(), RecordId(2, 2)), RecordId(2, 2)); // == - ASSERT_EQ(rs->oplogStartHack(opCtx.get(), RecordId(2, 3)), RecordId(2, 2)); // > highest + auto cur = rs->getCursor(opCtx.get()); + auto rec = cur->seekNear(RecordId(0, 1)); + ASSERT(rec); + ASSERT_EQ(rec->id, RecordId(1, 1)); + } + + { + ServiceContext::UniqueOperationContext opCtx(harnessHelper->newOperationContext()); + WriteUnitOfWork wuow(opCtx.get()); + auto cur = rs->getCursor(opCtx.get()); + auto rec = cur->seekNear(RecordId(2, 1)); + ASSERT(rec); + ASSERT_EQ(rec->id, RecordId(1, 2)); + } + + { + ServiceContext::UniqueOperationContext opCtx(harnessHelper->newOperationContext()); + WriteUnitOfWork wuow(opCtx.get()); + auto cur = rs->getCursor(opCtx.get()); + auto rec = cur->seekNear(RecordId(2, 2)); + ASSERT(rec); + ASSERT_EQ(rec->id, RecordId(2, 2)); + } + + { + ServiceContext::UniqueOperationContext opCtx(harnessHelper->newOperationContext()); + WriteUnitOfWork wuow(opCtx.get()); + auto cur = rs->getCursor(opCtx.get()); + auto rec = cur->seekNear(RecordId(2, 3)); + ASSERT(rec); + ASSERT_EQ(rec->id, RecordId(2, 2)); + } + + // Reverse cursor seeks + { + ServiceContext::UniqueOperationContext opCtx(harnessHelper->newOperationContext()); + WriteUnitOfWork wuow(opCtx.get()); + auto cur = rs->getCursor(opCtx.get(), false /* forward */); + auto rec = cur->seekNear(RecordId(0, 1)); + ASSERT(rec); + ASSERT_EQ(rec->id, RecordId(1, 1)); + } + + { + ServiceContext::UniqueOperationContext opCtx(harnessHelper->newOperationContext()); + WriteUnitOfWork wuow(opCtx.get()); + auto cur = rs->getCursor(opCtx.get(), false /* forward */); + auto rec = cur->seekNear(RecordId(2, 1)); + ASSERT(rec); + ASSERT_EQ(rec->id, RecordId(2, 2)); + } + + { + ServiceContext::UniqueOperationContext opCtx(harnessHelper->newOperationContext()); + WriteUnitOfWork wuow(opCtx.get()); + auto cur = rs->getCursor(opCtx.get(), false /* forward */); + auto rec = cur->seekNear(RecordId(2, 2)); + ASSERT(rec); + ASSERT_EQ(rec->id, RecordId(2, 2)); + } + + { + ServiceContext::UniqueOperationContext opCtx(harnessHelper->newOperationContext()); + WriteUnitOfWork wuow(opCtx.get()); + auto cur = rs->getCursor(opCtx.get(), false /* forward */); + auto rec = cur->seekNear(RecordId(2, 3)); + ASSERT(rec); + ASSERT_EQ(rec->id, RecordId(2, 2)); } { @@ -121,7 +185,10 @@ TEST(RecordStoreTestHarness, OplogHack) { { ServiceContext::UniqueOperationContext opCtx(harnessHelper->newOperationContext()); - ASSERT_EQ(rs->oplogStartHack(opCtx.get(), RecordId(2, 3)), RecordId(2, 2)); + auto cur = rs->getCursor(opCtx.get()); + auto rec = cur->seekNear(RecordId(2, 3)); + ASSERT(rec); + ASSERT_EQ(rec->id, RecordId(2, 2)); } { @@ -131,7 +198,10 @@ TEST(RecordStoreTestHarness, OplogHack) { { ServiceContext::UniqueOperationContext opCtx(harnessHelper->newOperationContext()); - ASSERT_EQ(rs->oplogStartHack(opCtx.get(), RecordId(2, 3)), RecordId(1, 2)); + auto cur = rs->getCursor(opCtx.get()); + auto rec = cur->seekNear(RecordId(2, 3)); + ASSERT(rec); + ASSERT_EQ(rec->id, RecordId(1, 2)); } { @@ -141,7 +211,10 @@ TEST(RecordStoreTestHarness, OplogHack) { { ServiceContext::UniqueOperationContext opCtx(harnessHelper->newOperationContext()); - ASSERT_EQ(rs->oplogStartHack(opCtx.get(), RecordId(2, 3)), RecordId(1, 1)); + auto cur = rs->getCursor(opCtx.get()); + auto rec = cur->seekNear(RecordId(2, 3)); + ASSERT(rec); + ASSERT_EQ(rec->id, RecordId(1, 1)); } { @@ -153,7 +226,9 @@ TEST(RecordStoreTestHarness, OplogHack) { { ServiceContext::UniqueOperationContext opCtx(harnessHelper->newOperationContext()); - ASSERT_EQ(rs->oplogStartHack(opCtx.get(), RecordId(2, 3)), RecordId()); + auto cur = rs->getCursor(opCtx.get()); + auto rec = cur->seekNear(RecordId(2, 3)); + ASSERT_FALSE(rec); } } @@ -182,7 +257,7 @@ TEST(RecordStoreTestHarness, OplogInsertOutOfOrder) { } } -TEST(RecordStoreTestHarness, OplogHackOnNonOplog) { +TEST(RecordStoreTestHarness, SeekNearOnNonOplog) { std::unique_ptr<RecordStoreHarnessHelper> harnessHelper = newRecordStoreHarnessHelper(); std::unique_ptr<RecordStore> rs(harnessHelper->newNonCappedRecordStore("local.NOT_oplog.foo")); @@ -195,7 +270,12 @@ TEST(RecordStoreTestHarness, OplogHackOnNonOplog) { .getStatus()); wuow.commit(); } - ASSERT_EQ(rs->oplogStartHack(opCtx.get(), RecordId(0, 1)), boost::none); + auto cur = rs->getCursor(opCtx.get()); + auto rec = cur->seekNear(RecordId(0, 1)); + ASSERT(rec); + // Regular record stores don't use timestamps for their RecordId, so expect the first + // auto-incrementing RecordId to be 1. + ASSERT_EQ(rec->id, RecordId(1)); } @@ -228,6 +308,15 @@ TEST(RecordStoreTestHarness, OplogOrder) { } { + ServiceContext::UniqueOperationContext opCtx(harnessHelper->newOperationContext()); + auto cursor = rs->getCursor(opCtx.get()); + auto record = cursor->seekNear(RecordId(id1.asLong() + 1)); + ASSERT(record); + ASSERT_EQ(id1, record->id); + ASSERT(!cursor->next()); + } + + { // now we insert 2 docs, but commit the 2nd one first. // we make sure we can't find the 2nd until the first is committed. ServiceContext::UniqueOperationContext earlyReader(harnessHelper->newOperationContext()); @@ -240,15 +329,16 @@ TEST(RecordStoreTestHarness, OplogOrder) { auto client1 = harnessHelper->serviceContext()->makeClient("c1"); auto t1 = harnessHelper->newOperationContext(client1.get()); WriteUnitOfWork w1(t1.get()); - _oplogOrderInsertOplog(t1.get(), rs, 20); + RecordId id2 = _oplogOrderInsertOplog(t1.get(), rs, 20); // do not commit yet + RecordId id3; { // create 2nd doc auto client2 = harnessHelper->serviceContext()->makeClient("c2"); auto t2 = harnessHelper->newOperationContext(client2.get()); { WriteUnitOfWork w2(t2.get()); - _oplogOrderInsertOplog(t2.get(), rs, 30); + id3 = _oplogOrderInsertOplog(t2.get(), rs, 30); w2.commit(); } } @@ -261,6 +351,27 @@ TEST(RecordStoreTestHarness, OplogOrder) { auto opCtx = harnessHelper->newOperationContext(client2.get()); auto cursor = rs->getCursor(opCtx.get()); auto record = cursor->seekExact(id1); + ASSERT(record); + ASSERT_EQ(id1, record->id); + ASSERT(!cursor->next()); + } + + { + auto client2 = harnessHelper->serviceContext()->makeClient("c2"); + auto opCtx = harnessHelper->newOperationContext(client2.get()); + auto cursor = rs->getCursor(opCtx.get()); + auto record = cursor->seekNear(id2); + ASSERT(record); + ASSERT_EQ(id1, record->id); + ASSERT(!cursor->next()); + } + + { + auto client2 = harnessHelper->serviceContext()->makeClient("c2"); + auto opCtx = harnessHelper->newOperationContext(client2.get()); + auto cursor = rs->getCursor(opCtx.get()); + auto record = cursor->seekNear(id3); + ASSERT(record); ASSERT_EQ(id1, record->id); ASSERT(!cursor->next()); } @@ -303,15 +414,17 @@ TEST(RecordStoreTestHarness, OplogOrder) { auto client1 = harnessHelper->serviceContext()->makeClient("c1"); auto t1 = harnessHelper->newOperationContext(client1.get()); WriteUnitOfWork w1(t1.get()); - _oplogOrderInsertOplog(t1.get(), rs, 2); + RecordId id2 = _oplogOrderInsertOplog(t1.get(), rs, 2); + // do not commit yet + RecordId id3; { // create 2nd doc auto client2 = harnessHelper->serviceContext()->makeClient("c2"); auto t2 = harnessHelper->newOperationContext(client2.get()); { WriteUnitOfWork w2(t2.get()); - _oplogOrderInsertOplog(t2.get(), rs, 3); + id3 = _oplogOrderInsertOplog(t2.get(), rs, 3); w2.commit(); } } @@ -324,6 +437,27 @@ TEST(RecordStoreTestHarness, OplogOrder) { auto opCtx = harnessHelper->newOperationContext(client2.get()); auto cursor = rs->getCursor(opCtx.get()); auto record = cursor->seekExact(id1); + ASSERT(record); + ASSERT_EQ(id1, record->id); + ASSERT(!cursor->next()); + } + + { + auto client2 = harnessHelper->serviceContext()->makeClient("c2"); + auto opCtx = harnessHelper->newOperationContext(client2.get()); + auto cursor = rs->getCursor(opCtx.get()); + auto record = cursor->seekNear(id2); + ASSERT(record); + ASSERT_EQ(id1, record->id); + ASSERT(!cursor->next()); + } + + { + auto client2 = harnessHelper->serviceContext()->makeClient("c2"); + auto opCtx = harnessHelper->newOperationContext(client2.get()); + auto cursor = rs->getCursor(opCtx.get()); + auto record = cursor->seekNear(id3); + ASSERT(record); ASSERT_EQ(id1, record->id); ASSERT(!cursor->next()); } diff --git a/src/mongo/db/storage/wiredtiger/SConscript b/src/mongo/db/storage/wiredtiger/SConscript index e0a3c8e4ad5..4da815c266e 100644 --- a/src/mongo/db/storage/wiredtiger/SConscript +++ b/src/mongo/db/storage/wiredtiger/SConscript @@ -58,13 +58,13 @@ wtEnv.Library( '$BUILD_DIR/mongo/db/index/index_descriptor', '$BUILD_DIR/mongo/db/namespace_string', '$BUILD_DIR/mongo/db/prepare_conflict_tracker', + '$BUILD_DIR/mongo/db/record_id_helpers', '$BUILD_DIR/mongo/db/repl/repl_coordinator_interface', '$BUILD_DIR/mongo/db/repl/repl_settings', '$BUILD_DIR/mongo/db/server_options_core', '$BUILD_DIR/mongo/db/service_context', '$BUILD_DIR/mongo/db/storage/index_entry_comparison', '$BUILD_DIR/mongo/db/storage/key_string', - '$BUILD_DIR/mongo/db/storage/oplog_hack', '$BUILD_DIR/mongo/db/storage/recovery_unit_base', '$BUILD_DIR/mongo/db/storage/storage_file_util', '$BUILD_DIR/mongo/db/storage/storage_options', diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp index 1a8814c009d..4785eda902f 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp @@ -49,12 +49,12 @@ #include "mongo/db/global_settings.h" #include "mongo/db/namespace_string.h" #include "mongo/db/operation_context.h" +#include "mongo/db/record_id_helpers.h" #include "mongo/db/repl/repl_settings.h" #include "mongo/db/repl/replication_coordinator.h" #include "mongo/db/server_recovery.h" #include "mongo/db/service_context.h" #include "mongo/db/stats/resource_consumption_metrics.h" -#include "mongo/db/storage/oplog_hack.h" #include "mongo/db/storage/wiredtiger/oplog_stone_parameters_gen.h" #include "mongo/db/storage/wiredtiger/wiredtiger_cursor_helpers.h" #include "mongo/db/storage/wiredtiger/wiredtiger_customization_hooks.h" @@ -1576,7 +1576,7 @@ Status WiredTigerRecordStore::_insertRecords(OperationContext* opCtx, auto& record = records[i]; if (_isOplog) { StatusWith<RecordId> status = - oploghack::extractKey(record.data.data(), record.data.size()); + record_id_helpers::extractKey(record.data.data(), record.data.size()); if (!status.isOK()) return status.getStatus(); record.id = status.getValue(); @@ -2006,40 +2006,6 @@ void WiredTigerRecordStore::waitForAllEarlierOplogWritesToBeVisible(OperationCon } } -boost::optional<RecordId> WiredTigerRecordStore::oplogStartHack( - OperationContext* opCtx, const RecordId& startingPosition) const { - dassert(opCtx->lockState()->isReadLocked()); - - if (!_isOplog) - return boost::none; - - auto wtRu = WiredTigerRecoveryUnit::get(opCtx); - wtRu->setIsOplogReader(); - - RecordId searchFor = startingPosition; - auto visibilityTs = wtRu->getOplogVisibilityTs(); - if (visibilityTs && searchFor.asLong() > *visibilityTs) { - searchFor = RecordId(*visibilityTs); - } - - WiredTigerCursor cursor(_uri, _tableId, true, opCtx); - WT_CURSOR* c = cursor.get(); - - int cmp; - CursorKey key = makeCursorKey(searchFor); - setKey(c, key); - int ret = c->search_near(c, &cmp); - if (ret == 0 && cmp > 0) - ret = c->prev(c); // landed one higher than startingPosition - if (ret == WT_NOTFOUND) - return RecordId(); // nothing <= startingPosition - // It's illegal for oplog documents to be in a prepare state. - invariant(ret != WT_PREPARE_CONFLICT); - invariantWTOK(ret); - - return getKey(c); -} - void WiredTigerRecordStore::updateStatsAfterRepair(OperationContext* opCtx, long long numRecords, long long dataSize) { @@ -2418,6 +2384,74 @@ boost::optional<Record> WiredTigerRecordStoreCursorBase::seekExact(const RecordI return {{id, {static_cast<const char*>(value.data), static_cast<int>(value.size)}}}; } +boost::optional<Record> WiredTigerRecordStoreCursorBase::seekNear(const RecordId& id) { + dassert(_opCtx->lockState()->isReadLocked()); + + // Forward scans on the oplog must round down to the oplog visibility timestamp. + RecordId start = id; + if (_forward && _oplogVisibleTs && start.asLong() > *_oplogVisibleTs) { + start = RecordId(*_oplogVisibleTs); + } + + _skipNextAdvance = false; + WiredTigerRecoveryUnit::get(_opCtx)->getSession(); + WT_CURSOR* c = _cursor->get(); + + WiredTigerRecordStore::CursorKey key = makeCursorKey(start); + setKey(c, key); + + int cmp; + int ret = wiredTigerPrepareConflictRetry(_opCtx, [&] { return c->search_near(c, &cmp); }); + if (ret == WT_NOTFOUND) { + _eof = true; + return boost::none; + } + invariantWTOK(ret); + + auto& metricsCollector = ResourceConsumption::MetricsCollector::get(_opCtx); + metricsCollector.incrementOneCursorSeek(); + + RecordId curId = getKey(c); + + // Per the requirement of the API, return the lower (for forward) or higher (for reverse) + // record. + if (_forward && cmp > 0) { + ret = wiredTigerPrepareConflictRetry(_opCtx, [&] { return c->prev(c); }); + } else if (!_forward && cmp < 0) { + ret = wiredTigerPrepareConflictRetry(_opCtx, [&] { return c->next(c); }); + } + + // If we tried to return an earlier record but we found the end (for forward) or beginning (for + // reverse), go back to our original location so that we have something to return. + if (ret == WT_NOTFOUND) { + if (_forward) { + invariant(cmp > 0); + ret = wiredTigerPrepareConflictRetry(_opCtx, [&] { return c->next(c); }); + } else if (!_forward) { + invariant(cmp < 0); + ret = wiredTigerPrepareConflictRetry(_opCtx, [&] { return c->prev(c); }); + } + } + invariantWTOK(ret); + + curId = getKey(c); + + // For forward cursors on the oplog, the oplog visible timestamp is treated as the end of the + // record store. So if we are positioned past this point, then there are no visible records. + if (_forward && _oplogVisibleTs && curId.asLong() > *_oplogVisibleTs) { + _eof = true; + return boost::none; + } + + WT_ITEM value; + invariantWTOK(c->get_value(c, &value)); + + metricsCollector.incrementOneDocRead(value.size); + + _lastReturnedId = curId; + _eof = false; + return {{curId, {static_cast<const char*>(value.data), static_cast<int>(value.size)}}}; +} void WiredTigerRecordStoreCursorBase::save() { try { diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h index dc8a44dca29..2e1016b625c 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h @@ -197,9 +197,6 @@ public: virtual void cappedTruncateAfter(OperationContext* opCtx, RecordId end, bool inclusive); - virtual boost::optional<RecordId> oplogStartHack(OperationContext* opCtx, - const RecordId& startingPosition) const; - virtual Status oplogDiskLocRegister(OperationContext* opCtx, const Timestamp& opTime, bool orderedCommit); @@ -426,6 +423,8 @@ public: boost::optional<Record> seekExact(const RecordId& id); + boost::optional<Record> seekNear(const RecordId& start); + void save(); void saveUnpositioned(); |