diff options
author | Rishab Joshi <rishab.joshi@mongodb.com> | 2022-06-09 07:25:06 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-06-09 08:23:30 +0000 |
commit | 4bed6550bf55fc9e1780093431665d536bc6cb57 (patch) | |
tree | 9cf2935c7d2b5526f0738b79ca5400d287a21678 /src/mongo | |
parent | 824b9b7e608687ba0db7af2d5ccc5b6811a46720 (diff) | |
download | mongo-4bed6550bf55fc9e1780093431665d536bc6cb57.tar.gz |
SERVER-65210 Basic code to read change streams using the change collection.
Diffstat (limited to 'src/mongo')
23 files changed, 164 insertions, 81 deletions
diff --git a/src/mongo/base/error_codes.yml b/src/mongo/base/error_codes.yml index 523c52065cf..9d02993c5bf 100644 --- a/src/mongo/base/error_codes.yml +++ b/src/mongo/base/error_codes.yml @@ -490,6 +490,7 @@ error_codes: - {code: 374, name: TransactionAPIMustRetryTransaction, categories: [InternalOnly]} - {code: 375, name: TransactionAPIMustRetryCommit, categories: [InternalOnly]} + - {code: 376, name: ChangeStreamNotEnabled} # Error codes 4000-8999 are reserved. diff --git a/src/mongo/db/catalog/database_impl.cpp b/src/mongo/db/catalog/database_impl.cpp index cd743dcac46..2983935e564 100644 --- a/src/mongo/db/catalog/database_impl.cpp +++ b/src/mongo/db/catalog/database_impl.cpp @@ -450,6 +450,16 @@ Status DatabaseImpl::dropCollection(OperationContext* opCtx, invariant(nss.db() == _name.db()); + // Returns true if the supplied namespace 'nss' is a system collection that can be dropped, + // false otherwise. + auto isDroppableSystemCollection = [](const auto& nss) { + return nss.isHealthlog() || nss == NamespaceString::kLogicalSessionsNamespace || + nss == NamespaceString::kKeysCollectionNamespace || + nss.isTemporaryReshardingCollection() || nss.isTimeseriesBucketsCollection() || + nss.isChangeStreamPreImagesCollection() || + nss == NamespaceString::kConfigsvrRestoreNamespace || nss.isChangeCollection(); + }; + if (nss.isSystem()) { if (nss.isSystemDotProfile()) { if (catalog->getDatabaseProfileLevel(_name) != 0) @@ -463,11 +473,7 @@ Status DatabaseImpl::dropCollection(OperationContext* opCtx, << " when time-series collections are present.", viewStats && viewStats->userTimeseries == 0); } - } else if (!(nss.isHealthlog() || nss == NamespaceString::kLogicalSessionsNamespace || - nss == NamespaceString::kKeysCollectionNamespace || - nss.isTemporaryReshardingCollection() || nss.isTimeseriesBucketsCollection() || - nss.isChangeStreamPreImagesCollection() || - nss == NamespaceString::kConfigsvrRestoreNamespace)) { + } else if (!isDroppableSystemCollection(nss)) { return Status(ErrorCodes::IllegalOperation, str::stream() << "can't drop system collection " << nss); } diff --git a/src/mongo/db/change_stream_change_collection_manager.cpp b/src/mongo/db/change_stream_change_collection_manager.cpp index 3311c7be092..840e045ebd9 100644 --- a/src/mongo/db/change_stream_change_collection_manager.cpp +++ b/src/mongo/db/change_stream_change_collection_manager.cpp @@ -48,11 +48,6 @@ namespace { const auto getChangeCollectionManager = ServiceContext::declareDecoration<boost::optional<ChangeStreamChangeCollectionManager>>(); -// TODO: SERVER-65950 create or update the change collection for a particular tenant. -NamespaceString getTenantChangeCollectionNamespace(boost::optional<TenantId> tenantId) { - return NamespaceString{NamespaceString::kConfigDb, NamespaceString::kChangeCollectionName}; -} - } // namespace ChangeStreamChangeCollectionManager& ChangeStreamChangeCollectionManager::get( @@ -69,12 +64,19 @@ void ChangeStreamChangeCollectionManager::create(ServiceContext* service) { getChangeCollectionManager(service).emplace(service); } -bool ChangeStreamChangeCollectionManager::isChangeCollectionEnabled() { +bool ChangeStreamChangeCollectionManager::isChangeCollectionsModeActive() { return feature_flags::gFeatureFlagServerlessChangeStreams.isEnabled( serverGlobalParams.featureCompatibility) && gMultitenancySupport; } +bool ChangeStreamChangeCollectionManager::hasChangeCollection( + OperationContext* opCtx, boost::optional<TenantId> tenantId) const { + auto catalog = CollectionCatalog::get(opCtx); + return static_cast<bool>(catalog->lookupCollectionByNamespace( + opCtx, NamespaceString::makeChangeCollectionNSS(tenantId))); +} + Status ChangeStreamChangeCollectionManager::createChangeCollection( OperationContext* opCtx, boost::optional<TenantId> tenantId) { // Make the change collection clustered by '_id'. The '_id' field will have the same value as @@ -83,8 +85,10 @@ Status ChangeStreamChangeCollectionManager::createChangeCollection( changeCollectionOptions.clusteredIndex.emplace(clustered_util::makeDefaultClusteredIdIndex()); changeCollectionOptions.capped = true; - auto status = createCollection( - opCtx, getTenantChangeCollectionNamespace(tenantId), changeCollectionOptions, BSONObj()); + auto status = createCollection(opCtx, + NamespaceString::makeChangeCollectionNSS(tenantId), + changeCollectionOptions, + BSONObj()); if (status.code() == ErrorCodes::NamespaceExists) { return Status::OK(); } @@ -96,7 +100,7 @@ Status ChangeStreamChangeCollectionManager::dropChangeCollection( OperationContext* opCtx, boost::optional<TenantId> tenantId) { DropReply dropReply; return dropCollection(opCtx, - getTenantChangeCollectionNamespace(tenantId), + NamespaceString::makeChangeCollectionNSS(tenantId), &dropReply, DropCollectionSystemCollectionMode::kAllowSystemCollectionDrops); } @@ -137,7 +141,7 @@ void ChangeStreamChangeCollectionManager::insertDocumentsToChangeCollection( // 'AllowLockAcquisitionOnTimestampedUnitOfWork'. AllowLockAcquisitionOnTimestampedUnitOfWork allowLockAcquisition(opCtx->lockState()); AutoGetCollection tenantChangeCollection( - opCtx, getTenantChangeCollectionNamespace(tenantId), LockMode::MODE_IX); + opCtx, NamespaceString::makeChangeCollectionNSS(tenantId), LockMode::MODE_IX); // The change collection does not exist for a particular tenant because either the change // collection is not enabled or is in the process of enablement. Ignore this insert for now. diff --git a/src/mongo/db/change_stream_change_collection_manager.h b/src/mongo/db/change_stream_change_collection_manager.h index 8ecc48b9a5c..2006b8c9426 100644 --- a/src/mongo/db/change_stream_change_collection_manager.h +++ b/src/mongo/db/change_stream_change_collection_manager.h @@ -63,7 +63,12 @@ public: * Returns true if change collections are enabled for recording oplog entries, false * otherwise. */ - static bool isChangeCollectionEnabled(); + static bool isChangeCollectionsModeActive(); + + /** + * Returns true if the change collection is present for the specified tenant, false otherwise. + */ + bool hasChangeCollection(OperationContext* opCtx, boost::optional<TenantId> tenantId) const; /** * Creates a change collection for the specified tenant, if it doesn't exist. Returns Status::OK diff --git a/src/mongo/db/commands/SConscript b/src/mongo/db/commands/SConscript index cf6f79671bc..248b76aa534 100644 --- a/src/mongo/db/commands/SConscript +++ b/src/mongo/db/commands/SConscript @@ -369,6 +369,7 @@ env.Library( '$BUILD_DIR/mongo/db/catalog/database_holder', '$BUILD_DIR/mongo/db/catalog/index_key_validate', '$BUILD_DIR/mongo/db/catalog/multi_index_block', + '$BUILD_DIR/mongo/db/change_stream_change_collection_manager', '$BUILD_DIR/mongo/db/command_can_run_here', '$BUILD_DIR/mongo/db/commands', '$BUILD_DIR/mongo/db/concurrency/exception_util', diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp index 6cb0760d139..252e1823c99 100644 --- a/src/mongo/db/commands/run_aggregate.cpp +++ b/src/mongo/db/commands/run_aggregate.cpp @@ -41,6 +41,7 @@ #include "mongo/db/catalog/collection_uuid_mismatch.h" #include "mongo/db/catalog/database.h" #include "mongo/db/catalog/database_holder.h" +#include "mongo/db/change_stream_change_collection_manager.h" #include "mongo/db/commands/cqf/cqf_aggregate.h" #include "mongo/db/curop.h" #include "mongo/db/cursor_manager.h" @@ -752,9 +753,21 @@ Status runAggregate(OperationContext* opCtx, << " is not supported for a change stream", !request.getCollectionUUID()); - // Replace the execution namespace with that of the oplog. + // Replace the execution namespace with the oplog. nss = NamespaceString::kRsOplogNamespace; + // In case of serverless the change stream will be opened on the change collection. We + // should first check if the change collection for the particular tenant exists and then + // replace the namespace with the change collection. + if (ChangeStreamChangeCollectionManager::isChangeCollectionsModeActive()) { + auto& changeCollectionManager = ChangeStreamChangeCollectionManager::get(opCtx); + uassert(ErrorCodes::ChangeStreamNotEnabled, + "Change streams must be enabled before being used.", + changeCollectionManager.hasChangeCollection(opCtx, origNss.tenantId())); + + nss = NamespaceString::makeChangeCollectionNSS(origNss.tenantId()); + } + // Upgrade and wait for read concern if necessary. _adjustChangeStreamReadConcern(opCtx); diff --git a/src/mongo/db/exec/collection_scan.cpp b/src/mongo/db/exec/collection_scan.cpp index b8f0df82dcd..19d8033d3f1 100644 --- a/src/mongo/db/exec/collection_scan.cpp +++ b/src/mongo/db/exec/collection_scan.cpp @@ -80,7 +80,7 @@ CollectionScan::CollectionScan(ExpressionContext* expCtx, // The 'minRecord' and 'maxRecord' parameters are used for a special optimization that // applies only to forwards scans of the oplog and scans on clustered collections. invariant(!params.resumeAfterRecordId); - if (collection->ns().isOplog()) { + if (collection->ns().isOplogOrChangeCollection()) { invariant(params.direction == CollectionScanParams::FORWARD); } else { invariant(collection->isClustered()); @@ -109,17 +109,26 @@ CollectionScan::CollectionScan(ExpressionContext* expCtx, "collection scan bounds", "min"_attr = (!_params.minRecord) ? "none" : _params.minRecord->toString(), "max"_attr = (!_params.maxRecord) ? "none" : _params.maxRecord->toString()); - invariant(!_params.shouldTrackLatestOplogTimestamp || collection->ns().isOplog()); - - if (params.assertTsHasNotFallenOffOplog) { - invariant(params.shouldTrackLatestOplogTimestamp); - invariant(params.direction == CollectionScanParams::FORWARD); + tassert(6521000, + "Expected an oplog or a change collection with 'shouldTrackLatestOplogTimestamp'", + !_params.shouldTrackLatestOplogTimestamp || + collection->ns().isOplogOrChangeCollection()); + + if (params.assertTsHasNotFallenOff) { + tassert(6521001, + "Expected 'shouldTrackLatestOplogTimestamp' with 'assertTsHasNotFallenOff'", + params.shouldTrackLatestOplogTimestamp); + tassert(6521002, + "Expected forward collection scan with 'assertTsHasNotFallenOff'", + params.direction == CollectionScanParams::FORWARD); } if (params.resumeAfterRecordId) { // The 'resumeAfterRecordId' parameter is used for resumable collection scans, which we // only support in the forward direction. - invariant(params.direction == CollectionScanParams::FORWARD); + tassert(6521003, + "Expected forward collection scan with 'resumeAfterRecordId'", + params.direction == CollectionScanParams::FORWARD); } } @@ -227,8 +236,8 @@ PlanStage::StageState CollectionScan::doWork(WorkingSetID* out) { } _lastSeenId = record->id; - if (_params.assertTsHasNotFallenOffOplog) { - assertTsHasNotFallenOffOplog(*record); + if (_params.assertTsHasNotFallenOff) { + assertTsHasNotFallenOff(*record); } if (_params.shouldTrackLatestOplogTimestamp) { setLatestOplogEntryTimestamp(*record); @@ -259,22 +268,28 @@ void CollectionScan::setLatestOplogEntryTimestamp(const Record& record) { _latestOplogEntryTimestamp = std::max(_latestOplogEntryTimestamp, tsElem.timestamp()); } -void CollectionScan::assertTsHasNotFallenOffOplog(const Record& record) { - // If the first entry we see in the oplog is the replset initialization, then it doesn't matter - // if its timestamp is later than the timestamp that should not have fallen off the oplog; no - // events earlier can have fallen off this oplog. Otherwise, verify that the timestamp of the - // first observed oplog entry is earlier than or equal to timestamp that should not have fallen - // off the oplog. +void CollectionScan::assertTsHasNotFallenOff(const Record& record) { auto oplogEntry = uassertStatusOK(repl::OplogEntry::parse(record.data.toBson())); invariant(_specificStats.docsTested == 0); + + // If the first entry we see in the oplog is the replset initialization, then it doesn't matter + // if its timestamp is later than the timestamp that should not have fallen off the oplog; no + // events earlier can have fallen off this oplog. + // NOTE: A change collection can be created at any moment as such it might not have replset + // initialization message, as such this case is not fully applicable for the change collection. const bool isNewRS = oplogEntry.getObject().binaryEqual(BSON("msg" << repl::kInitiatingSetMsg)) && oplogEntry.getOpType() == repl::OpTypeEnum::kNoop; + + // Verify that the timestamp of the first observed oplog entry is earlier than or equal to + // timestamp that should not have fallen off the oplog. + const bool tsHasNotFallenOff = oplogEntry.getTimestamp() <= *_params.assertTsHasNotFallenOff; + uassert(ErrorCodes::OplogQueryMinTsMissing, "Specified timestamp has already fallen off the oplog", - isNewRS || oplogEntry.getTimestamp() <= *_params.assertTsHasNotFallenOffOplog); + isNewRS || tsHasNotFallenOff); // We don't need to check this assertion again after we've confirmed the first oplog event. - _params.assertTsHasNotFallenOffOplog = boost::none; + _params.assertTsHasNotFallenOff = boost::none; } namespace { diff --git a/src/mongo/db/exec/collection_scan.h b/src/mongo/db/exec/collection_scan.h index f9ce637dbad..a3737635ad6 100644 --- a/src/mongo/db/exec/collection_scan.h +++ b/src/mongo/db/exec/collection_scan.h @@ -117,7 +117,7 @@ private: /** * Asserts that the minimum timestamp in the query filter has not already fallen off the oplog. */ - void assertTsHasNotFallenOffOplog(const Record& record); + void assertTsHasNotFallenOff(const Record& record); // WorkingSet is not owned by us. WorkingSet* _workingSet; diff --git a/src/mongo/db/exec/collection_scan_common.h b/src/mongo/db/exec/collection_scan_common.h index ba5559a4491..a0e550a904d 100644 --- a/src/mongo/db/exec/collection_scan_common.h +++ b/src/mongo/db/exec/collection_scan_common.h @@ -98,7 +98,7 @@ struct CollectionScanParams { bool tailable = false; // Assert that the specified timestamp has not fallen off the oplog on a forward scan. - boost::optional<Timestamp> assertTsHasNotFallenOffOplog = boost::none; + boost::optional<Timestamp> assertTsHasNotFallenOff = boost::none; // Should we keep track of the timestamp of the latest oplog entry we've seen? This information // is needed to merge cursors from the oplog in order of operation time when reading the oplog diff --git a/src/mongo/db/namespace_string.cpp b/src/mongo/db/namespace_string.cpp index 3d74f7a507a..9d29b806e5d 100644 --- a/src/mongo/db/namespace_string.cpp +++ b/src/mongo/db/namespace_string.cpp @@ -286,6 +286,12 @@ NamespaceString NamespaceString::makeCollectionlessAggregateNSS(const DatabaseNa return nss; } +NamespaceString NamespaceString::makeChangeCollectionNSS( + const boost::optional<TenantId>& tenantId) { + // TODO: SERVER-65950 create namespace for a particular tenant. + return NamespaceString{NamespaceString::kConfigDb, NamespaceString::kChangeCollectionName}; +} + std::string NamespaceString::getSisterNS(StringData local) const { verify(local.size() && local[0] != '.'); return db().toString() + "." + local.toString(); @@ -422,6 +428,10 @@ bool NamespaceString::isFLE2StateCollection() const { coll().endsWith(fle2EcocSuffix)); } +bool NamespaceString::isOplogOrChangeCollection() const { + return isOplog() || isChangeCollection(); +} + NamespaceString NamespaceString::makeTimeseriesBucketsNamespace() const { return {db(), kTimeseriesBucketsCollectionPrefix.toString() + coll()}; } diff --git a/src/mongo/db/namespace_string.h b/src/mongo/db/namespace_string.h index 730a2859b91..face16ac3b5 100644 --- a/src/mongo/db/namespace_string.h +++ b/src/mongo/db/namespace_string.h @@ -314,6 +314,11 @@ public: static NamespaceString makeCollectionlessAggregateNSS(const DatabaseName& dbName); /** + * Constructs the change collection namespace for the specified tenant. + */ + static NamespaceString makeChangeCollectionNSS(const boost::optional<TenantId>& tenantId); + + /** * Constructs a NamespaceString representing a listCollections namespace. The format for this * namespace is "<dbName>.$cmd.listCollections". */ @@ -482,6 +487,11 @@ public: bool isFLE2StateCollection() const; /** + * Returns true if the namespace is an oplog or a change collection, false otherwise. + */ + bool isOplogOrChangeCollection() const; + + /** * Returns the time-series buckets namespace for this view. */ NamespaceString makeTimeseriesBucketsNamespace() const; diff --git a/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp index 701c5b495a7..5d250101fa2 100644 --- a/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp +++ b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp @@ -167,7 +167,7 @@ public: : DocumentSourceMock({}, expCtx), _collectionPtr(&_collection) { _filterExpr = BSON("ns" << kTestNs); _filter = MatchExpressionParser::parseAndNormalize(_filterExpr, pExpCtx); - _params.assertTsHasNotFallenOffOplog = Timestamp(0); + _params.assertTsHasNotFallenOff = Timestamp(0); _params.shouldTrackLatestOplogTimestamp = true; _params.minRecord = RecordIdBound(RecordId(0)); _params.tailable = true; @@ -178,7 +178,7 @@ public: _filterExpr = BSON("ns" << kTestNs << "ts" << BSON("$gte" << resumeToken.clusterTime)); _filter = MatchExpressionParser::parseAndNormalize(_filterExpr, pExpCtx); _params.minRecord = RecordIdBound(RecordId(resumeToken.clusterTime.asLL())); - _params.assertTsHasNotFallenOffOplog = resumeToken.clusterTime; + _params.assertTsHasNotFallenOff = resumeToken.clusterTime; } void push_back(GetNextResult&& result) { diff --git a/src/mongo/db/query/classic_stage_builder.cpp b/src/mongo/db/query/classic_stage_builder.cpp index cc1915510c1..4404e2ab6da 100644 --- a/src/mongo/db/query/classic_stage_builder.cpp +++ b/src/mongo/db/query/classic_stage_builder.cpp @@ -79,7 +79,7 @@ std::unique_ptr<PlanStage> ClassicStageBuilder::build(const QuerySolutionNode* r CollectionScanParams params; params.tailable = csn->tailable; params.shouldTrackLatestOplogTimestamp = csn->shouldTrackLatestOplogTimestamp; - params.assertTsHasNotFallenOffOplog = csn->assertTsHasNotFallenOffOplog; + params.assertTsHasNotFallenOff = csn->assertTsHasNotFallenOff; params.direction = (csn->direction == 1) ? CollectionScanParams::FORWARD : CollectionScanParams::BACKWARD; params.shouldWaitForOplogVisibility = csn->shouldWaitForOplogVisibility; diff --git a/src/mongo/db/query/planner_access.cpp b/src/mongo/db/query/planner_access.cpp index f283979cc5c..7de2c18ca96 100644 --- a/src/mongo/db/query/planner_access.cpp +++ b/src/mongo/db/query/planner_access.cpp @@ -384,8 +384,8 @@ std::unique_ptr<QuerySolutionNode> QueryPlannerAccess::makeCollectionScan( // the collection scan to return timestamp-based tokens. Otherwise, we should // return generic RecordId-based tokens. if (query.getFindCommandRequest().getRequestResumeToken()) { - csn->shouldTrackLatestOplogTimestamp = query.nss().isOplog(); - csn->requestResumeToken = !query.nss().isOplog(); + csn->shouldTrackLatestOplogTimestamp = query.nss().isOplogOrChangeCollection(); + csn->requestResumeToken = !query.nss().isOplogOrChangeCollection(); } // Extract and assign the RecordId from the 'resumeAfter' token, if present. @@ -397,26 +397,31 @@ std::unique_ptr<QuerySolutionNode> QueryPlannerAccess::makeCollectionScan( const bool assertMinTsHasNotFallenOffOplog = params.options & QueryPlannerParams::ASSERT_MIN_TS_HAS_NOT_FALLEN_OFF_OPLOG; - if (query.nss().isOplog() && csn->direction == 1) { + if (query.nss().isOplogOrChangeCollection() && csn->direction == 1) { + // Takes Timestamp 'ts' as input, transforms it to the RecordIdBound and assigns it to the + // output parameter 'recordId'. The RecordId format for the change collection is a string, + // where as the RecordId format for the oplog is a long integer. The timestamp should be + // converted to the required format before assigning it to the 'recordId'. + auto assignRecordIdFromTimestamp = [&](auto& ts, auto* recordId) { + auto keyFormat = query.nss().isChangeCollection() ? KeyFormat::String : KeyFormat::Long; + auto status = record_id_helpers::keyForOptime(ts, keyFormat); + if (status.isOK()) { + *recordId = RecordIdBound(status.getValue()); + } + }; + // Optimizes the start and end location parameters for a collection scan for an oplog // collection. Not compatible with $_resumeAfter so we do not optimize in that case. if (resumeAfterObj.isEmpty()) { auto [minTs, maxTs] = extractTsRange(query.root()); if (minTs) { - StatusWith<RecordId> goal = record_id_helpers::keyForOptime(*minTs); - if (goal.isOK()) { - csn->minRecord = RecordIdBound(goal.getValue()); - } - + assignRecordIdFromTimestamp(*minTs, &csn->minRecord); if (assertMinTsHasNotFallenOffOplog) { - csn->assertTsHasNotFallenOffOplog = *minTs; + csn->assertTsHasNotFallenOff = *minTs; } } if (maxTs) { - StatusWith<RecordId> goal = record_id_helpers::keyForOptime(*maxTs); - if (goal.isOK()) { - csn->maxRecord = RecordIdBound(goal.getValue()); - } + assignRecordIdFromTimestamp(*maxTs, &csn->maxRecord); } } @@ -433,9 +438,9 @@ std::unique_ptr<QuerySolutionNode> QueryPlannerAccess::makeCollectionScan( // specify a minimum timestamp. This is not a valid request, so we throw InvalidOptions. if (assertMinTsHasNotFallenOffOplog) { uassert(ErrorCodes::InvalidOptions, - str::stream() << "assertTsHasNotFallenOffOplog cannot be applied to a query " + str::stream() << "assertTsHasNotFallenOff cannot be applied to a query " "which does not imply a minimum 'ts' value ", - csn->assertTsHasNotFallenOffOplog); + csn->assertTsHasNotFallenOff); } auto queryCollator = query.getCollator(); diff --git a/src/mongo/db/query/query_solution.cpp b/src/mongo/db/query/query_solution.cpp index 893fef833e0..b62b54c386c 100644 --- a/src/mongo/db/query/query_solution.cpp +++ b/src/mongo/db/query/query_solution.cpp @@ -332,7 +332,7 @@ std::unique_ptr<QuerySolutionNode> CollectionScanNode::clone() const { copy->tailable = this->tailable; copy->direction = this->direction; copy->shouldTrackLatestOplogTimestamp = this->shouldTrackLatestOplogTimestamp; - copy->assertTsHasNotFallenOffOplog = this->assertTsHasNotFallenOffOplog; + copy->assertTsHasNotFallenOff = this->assertTsHasNotFallenOff; copy->shouldWaitForOplogVisibility = this->shouldWaitForOplogVisibility; copy->clusteredIndex = this->clusteredIndex; copy->hasCompatibleCollation = this->hasCompatibleCollation; diff --git a/src/mongo/db/query/query_solution.h b/src/mongo/db/query/query_solution.h index 455c5aabcaa..27a4ff33977 100644 --- a/src/mongo/db/query/query_solution.h +++ b/src/mongo/db/query/query_solution.h @@ -489,7 +489,7 @@ struct CollectionScanNode : public QuerySolutionNodeWithSortSet { bool shouldTrackLatestOplogTimestamp = false; // Assert that the specified timestamp has not fallen off the oplog. - boost::optional<Timestamp> assertTsHasNotFallenOffOplog = boost::none; + boost::optional<Timestamp> assertTsHasNotFallenOff = boost::none; int direction{1}; diff --git a/src/mongo/db/query/sbe_stage_builder_coll_scan.cpp b/src/mongo/db/query/sbe_stage_builder_coll_scan.cpp index f3011ec2bac..35c752f7dcb 100644 --- a/src/mongo/db/query/sbe_stage_builder_coll_scan.cpp +++ b/src/mongo/db/query/sbe_stage_builder_coll_scan.cpp @@ -328,7 +328,7 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> generateOptimizedOplo // replica set initialization message. If this fails, then we throw // ErrorCodes::OplogQueryMinTsMissing. We avoid doing this check on the resumable branch of a // tailable scan; it only needs to be done once, when the initial branch is run. - if (csn->assertTsHasNotFallenOffOplog && !isTailableResumeBranch) { + if (csn->assertTsHasNotFallenOff && !isTailableResumeBranch) { invariant(csn->shouldTrackLatestOplogTimestamp); // There should always be a 'tsSlot' already allocated on the RuntimeEnvironment for the @@ -388,7 +388,7 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> generateOptimizedOplo makeBinaryOp(sbe::EPrimBinary::lessEq, makeVariable(minTsSlot), makeConstant(sbe::value::TypeTags::Timestamp, - csn->assertTsHasNotFallenOffOplog->asULL())), + csn->assertTsHasNotFallenOff->asULL())), makeBinaryOp( sbe::EPrimBinary::logicAnd, makeBinaryOp(sbe::EPrimBinary::eq, diff --git a/src/mongo/db/query/sbe_utils.cpp b/src/mongo/db/query/sbe_utils.cpp index 34d7c8f6545..4284646b510 100644 --- a/src/mongo/db/query/sbe_utils.cpp +++ b/src/mongo/db/query/sbe_utils.cpp @@ -238,6 +238,7 @@ bool isQuerySbeCompatible(const CollectionPtr* collection, const bool allExpressionsSupported = expCtx && expCtx->sbeCompatible; const bool isNotCount = !(plannerOptions & QueryPlannerParams::IS_COUNT); const bool isNotOplog = !cq->nss().isOplog(); + const bool isNotChangeCollection = !cq->nss().isChangeCollection(); const bool doesNotContainMetadataRequirements = cq->metadataDeps().none(); const bool doesNotSortOnMetaOrPathWithNumericComponents = !sortPattern || std::all_of(sortPattern->begin(), sortPattern->end(), [](auto&& part) { @@ -261,7 +262,7 @@ bool isQuerySbeCompatible(const CollectionPtr* collection, return allExpressionsSupported && isNotCount && doesNotContainMetadataRequirements && isQueryNotAgainstTimeseriesCollection && isQueryNotAgainstClusteredCollection && doesNotSortOnMetaOrPathWithNumericComponents && isNotOplog && doesNotRequireMatchDetails && - doesNotHaveElemMatchProject; + doesNotHaveElemMatchProject && isNotChangeCollection; } bool validateInputParamsBindings( diff --git a/src/mongo/db/record_id_helpers.cpp b/src/mongo/db/record_id_helpers.cpp index e9147666da8..bf313976a3b 100644 --- a/src/mongo/db/record_id_helpers.cpp +++ b/src/mongo/db/record_id_helpers.cpp @@ -48,23 +48,35 @@ namespace mongo { namespace record_id_helpers { -StatusWith<RecordId> keyForOptime(const Timestamp& opTime) { - // Make sure secs and inc wouldn't be negative if treated as signed. This ensures that they - // don't sort differently when put in a RecordId. It also avoids issues with Null/Invalid - // RecordIds - if (opTime.getSecs() > uint32_t(std::numeric_limits<int32_t>::max())) - return {ErrorCodes::BadValue, "ts secs too high"}; - - if (opTime.getInc() > uint32_t(std::numeric_limits<int32_t>::max())) - return {ErrorCodes::BadValue, "ts inc too high"}; - - const auto out = RecordId(opTime.getSecs(), opTime.getInc()); - if (out <= RecordId::minLong()) - return {ErrorCodes::BadValue, "ts too low"}; - if (out >= RecordId::maxLong()) - return {ErrorCodes::BadValue, "ts too high"}; - - return out; +StatusWith<RecordId> keyForOptime(const Timestamp& opTime, const KeyFormat keyFormat) { + switch (keyFormat) { + case KeyFormat::Long: { + // Make sure secs and inc wouldn't be negative if treated as signed. This ensures that + // they don't sort differently when put in a RecordId. It also avoids issues with + // Null/Invalid RecordIds + if (opTime.getSecs() > uint32_t(std::numeric_limits<int32_t>::max())) + return {ErrorCodes::BadValue, "ts secs too high"}; + + if (opTime.getInc() > uint32_t(std::numeric_limits<int32_t>::max())) + return {ErrorCodes::BadValue, "ts inc too high"}; + + const auto out = RecordId(opTime.getSecs(), opTime.getInc()); + if (out <= RecordId::minLong()) + return {ErrorCodes::BadValue, "ts too low"}; + if (out >= RecordId::maxLong()) + return {ErrorCodes::BadValue, "ts too high"}; + + return out; + } + case KeyFormat::String: { + KeyString::Builder keyBuilder(KeyString::Version::kLatestVersion); + keyBuilder.appendTimestamp(opTime); + return RecordId(keyBuilder.getBuffer(), keyBuilder.getSize()); + } + default: { MONGO_UNREACHABLE_TASSERT(6521004); } + } + + MONGO_UNREACHABLE_TASSERT(6521005); } @@ -84,7 +96,7 @@ StatusWith<RecordId> extractKeyOptime(const char* data, int len) { if (elem.type() != bsonTimestamp) return {ErrorCodes::BadValue, "ts must be a Timestamp"}; - return keyForOptime(elem.timestamp()); + return keyForOptime(elem.timestamp(), KeyFormat::Long); } StatusWith<RecordId> keyForDoc(const BSONObj& doc, diff --git a/src/mongo/db/record_id_helpers.h b/src/mongo/db/record_id_helpers.h index 378466df45a..b957b30cce6 100644 --- a/src/mongo/db/record_id_helpers.h +++ b/src/mongo/db/record_id_helpers.h @@ -46,7 +46,7 @@ namespace record_id_helpers { * Converts Timestamp to a RecordId in an unspecified manor that is safe to use as the key to * in a RecordStore. */ -StatusWith<RecordId> keyForOptime(const Timestamp& opTime); +StatusWith<RecordId> keyForOptime(const Timestamp& opTime, KeyFormat keyFormat); /** * For clustered collections, converts various values into a RecordId. diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp index 0908b06213a..d6ae765a1fc 100644 --- a/src/mongo/db/repl/oplog.cpp +++ b/src/mongo/db/repl/oplog.cpp @@ -389,7 +389,7 @@ void _logOpsInner(OperationContext* opCtx, } // Insert the oplog records to the respective tenants change collections. - if (ChangeStreamChangeCollectionManager::isChangeCollectionEnabled()) { + if (ChangeStreamChangeCollectionManager::isChangeCollectionsModeActive()) { ChangeStreamChangeCollectionManager::get(opCtx).insertDocumentsToChangeCollection( opCtx, *records, timestamps); } diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp index 7cbc79f9aed..5d450af12d7 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp @@ -560,7 +560,7 @@ OpTime ReplicationCoordinatorExternalStateImpl::onTransitionToPrimary(OperationC // TODO: SERVER-65948 move the change collection creation logic from here to the PM-2502 hooks. // The change collection will be created when the change stream is enabled. - if (ChangeStreamChangeCollectionManager::isChangeCollectionEnabled()) { + if (ChangeStreamChangeCollectionManager::isChangeCollectionsModeActive()) { auto status = ChangeStreamChangeCollectionManager::get(opCtx).createChangeCollection( opCtx, boost::none); if (!status.isOK()) { diff --git a/src/mongo/db/storage/record_store_test_oplog.cpp b/src/mongo/db/storage/record_store_test_oplog.cpp index 6c61f93ee76..cc014de1681 100644 --- a/src/mongo/db/storage/record_store_test_oplog.cpp +++ b/src/mongo/db/storage/record_store_test_oplog.cpp @@ -530,7 +530,7 @@ TEST(RecordStoreTestHarness, OplogVisibilityStandalone) { rs->insertRecord(opCtx.get(), obj.objdata(), obj.objsize(), Timestamp()); ASSERT_OK(res.getStatus()); id1 = res.getValue(); - StatusWith<RecordId> expectedId = record_id_helpers::keyForOptime(ts); + StatusWith<RecordId> expectedId = record_id_helpers::keyForOptime(ts, KeyFormat::Long); ASSERT_OK(expectedId.getStatus()); // RecordId should be extracted from 'ts' field when inserting into oplog namespace ASSERT(expectedId.getValue().compare(id1) == 0); |