summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorRishab Joshi <rishab.joshi@mongodb.com>2022-06-09 07:25:06 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-06-09 08:23:30 +0000
commit4bed6550bf55fc9e1780093431665d536bc6cb57 (patch)
tree9cf2935c7d2b5526f0738b79ca5400d287a21678 /src/mongo
parent824b9b7e608687ba0db7af2d5ccc5b6811a46720 (diff)
downloadmongo-4bed6550bf55fc9e1780093431665d536bc6cb57.tar.gz
SERVER-65210 Basic code to read change streams using the change collection.
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/base/error_codes.yml1
-rw-r--r--src/mongo/db/catalog/database_impl.cpp16
-rw-r--r--src/mongo/db/change_stream_change_collection_manager.cpp24
-rw-r--r--src/mongo/db/change_stream_change_collection_manager.h7
-rw-r--r--src/mongo/db/commands/SConscript1
-rw-r--r--src/mongo/db/commands/run_aggregate.cpp15
-rw-r--r--src/mongo/db/exec/collection_scan.cpp49
-rw-r--r--src/mongo/db/exec/collection_scan.h2
-rw-r--r--src/mongo/db/exec/collection_scan_common.h2
-rw-r--r--src/mongo/db/namespace_string.cpp10
-rw-r--r--src/mongo/db/namespace_string.h10
-rw-r--r--src/mongo/db/pipeline/document_source_check_resume_token_test.cpp4
-rw-r--r--src/mongo/db/query/classic_stage_builder.cpp2
-rw-r--r--src/mongo/db/query/planner_access.cpp35
-rw-r--r--src/mongo/db/query/query_solution.cpp2
-rw-r--r--src/mongo/db/query/query_solution.h2
-rw-r--r--src/mongo/db/query/sbe_stage_builder_coll_scan.cpp4
-rw-r--r--src/mongo/db/query/sbe_utils.cpp3
-rw-r--r--src/mongo/db/record_id_helpers.cpp48
-rw-r--r--src/mongo/db/record_id_helpers.h2
-rw-r--r--src/mongo/db/repl/oplog.cpp2
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.cpp2
-rw-r--r--src/mongo/db/storage/record_store_test_oplog.cpp2
23 files changed, 164 insertions, 81 deletions
diff --git a/src/mongo/base/error_codes.yml b/src/mongo/base/error_codes.yml
index 523c52065cf..9d02993c5bf 100644
--- a/src/mongo/base/error_codes.yml
+++ b/src/mongo/base/error_codes.yml
@@ -490,6 +490,7 @@ error_codes:
- {code: 374, name: TransactionAPIMustRetryTransaction, categories: [InternalOnly]}
- {code: 375, name: TransactionAPIMustRetryCommit, categories: [InternalOnly]}
+ - {code: 376, name: ChangeStreamNotEnabled}
# Error codes 4000-8999 are reserved.
diff --git a/src/mongo/db/catalog/database_impl.cpp b/src/mongo/db/catalog/database_impl.cpp
index cd743dcac46..2983935e564 100644
--- a/src/mongo/db/catalog/database_impl.cpp
+++ b/src/mongo/db/catalog/database_impl.cpp
@@ -450,6 +450,16 @@ Status DatabaseImpl::dropCollection(OperationContext* opCtx,
invariant(nss.db() == _name.db());
+ // Returns true if the supplied namespace 'nss' is a system collection that can be dropped,
+ // false otherwise.
+ auto isDroppableSystemCollection = [](const auto& nss) {
+ return nss.isHealthlog() || nss == NamespaceString::kLogicalSessionsNamespace ||
+ nss == NamespaceString::kKeysCollectionNamespace ||
+ nss.isTemporaryReshardingCollection() || nss.isTimeseriesBucketsCollection() ||
+ nss.isChangeStreamPreImagesCollection() ||
+ nss == NamespaceString::kConfigsvrRestoreNamespace || nss.isChangeCollection();
+ };
+
if (nss.isSystem()) {
if (nss.isSystemDotProfile()) {
if (catalog->getDatabaseProfileLevel(_name) != 0)
@@ -463,11 +473,7 @@ Status DatabaseImpl::dropCollection(OperationContext* opCtx,
<< " when time-series collections are present.",
viewStats && viewStats->userTimeseries == 0);
}
- } else if (!(nss.isHealthlog() || nss == NamespaceString::kLogicalSessionsNamespace ||
- nss == NamespaceString::kKeysCollectionNamespace ||
- nss.isTemporaryReshardingCollection() || nss.isTimeseriesBucketsCollection() ||
- nss.isChangeStreamPreImagesCollection() ||
- nss == NamespaceString::kConfigsvrRestoreNamespace)) {
+ } else if (!isDroppableSystemCollection(nss)) {
return Status(ErrorCodes::IllegalOperation,
str::stream() << "can't drop system collection " << nss);
}
diff --git a/src/mongo/db/change_stream_change_collection_manager.cpp b/src/mongo/db/change_stream_change_collection_manager.cpp
index 3311c7be092..840e045ebd9 100644
--- a/src/mongo/db/change_stream_change_collection_manager.cpp
+++ b/src/mongo/db/change_stream_change_collection_manager.cpp
@@ -48,11 +48,6 @@ namespace {
const auto getChangeCollectionManager =
ServiceContext::declareDecoration<boost::optional<ChangeStreamChangeCollectionManager>>();
-// TODO: SERVER-65950 create or update the change collection for a particular tenant.
-NamespaceString getTenantChangeCollectionNamespace(boost::optional<TenantId> tenantId) {
- return NamespaceString{NamespaceString::kConfigDb, NamespaceString::kChangeCollectionName};
-}
-
} // namespace
ChangeStreamChangeCollectionManager& ChangeStreamChangeCollectionManager::get(
@@ -69,12 +64,19 @@ void ChangeStreamChangeCollectionManager::create(ServiceContext* service) {
getChangeCollectionManager(service).emplace(service);
}
-bool ChangeStreamChangeCollectionManager::isChangeCollectionEnabled() {
+bool ChangeStreamChangeCollectionManager::isChangeCollectionsModeActive() {
return feature_flags::gFeatureFlagServerlessChangeStreams.isEnabled(
serverGlobalParams.featureCompatibility) &&
gMultitenancySupport;
}
+bool ChangeStreamChangeCollectionManager::hasChangeCollection(
+ OperationContext* opCtx, boost::optional<TenantId> tenantId) const {
+ auto catalog = CollectionCatalog::get(opCtx);
+ return static_cast<bool>(catalog->lookupCollectionByNamespace(
+ opCtx, NamespaceString::makeChangeCollectionNSS(tenantId)));
+}
+
Status ChangeStreamChangeCollectionManager::createChangeCollection(
OperationContext* opCtx, boost::optional<TenantId> tenantId) {
// Make the change collection clustered by '_id'. The '_id' field will have the same value as
@@ -83,8 +85,10 @@ Status ChangeStreamChangeCollectionManager::createChangeCollection(
changeCollectionOptions.clusteredIndex.emplace(clustered_util::makeDefaultClusteredIdIndex());
changeCollectionOptions.capped = true;
- auto status = createCollection(
- opCtx, getTenantChangeCollectionNamespace(tenantId), changeCollectionOptions, BSONObj());
+ auto status = createCollection(opCtx,
+ NamespaceString::makeChangeCollectionNSS(tenantId),
+ changeCollectionOptions,
+ BSONObj());
if (status.code() == ErrorCodes::NamespaceExists) {
return Status::OK();
}
@@ -96,7 +100,7 @@ Status ChangeStreamChangeCollectionManager::dropChangeCollection(
OperationContext* opCtx, boost::optional<TenantId> tenantId) {
DropReply dropReply;
return dropCollection(opCtx,
- getTenantChangeCollectionNamespace(tenantId),
+ NamespaceString::makeChangeCollectionNSS(tenantId),
&dropReply,
DropCollectionSystemCollectionMode::kAllowSystemCollectionDrops);
}
@@ -137,7 +141,7 @@ void ChangeStreamChangeCollectionManager::insertDocumentsToChangeCollection(
// 'AllowLockAcquisitionOnTimestampedUnitOfWork'.
AllowLockAcquisitionOnTimestampedUnitOfWork allowLockAcquisition(opCtx->lockState());
AutoGetCollection tenantChangeCollection(
- opCtx, getTenantChangeCollectionNamespace(tenantId), LockMode::MODE_IX);
+ opCtx, NamespaceString::makeChangeCollectionNSS(tenantId), LockMode::MODE_IX);
// The change collection does not exist for a particular tenant because either the change
// collection is not enabled or is in the process of enablement. Ignore this insert for now.
diff --git a/src/mongo/db/change_stream_change_collection_manager.h b/src/mongo/db/change_stream_change_collection_manager.h
index 8ecc48b9a5c..2006b8c9426 100644
--- a/src/mongo/db/change_stream_change_collection_manager.h
+++ b/src/mongo/db/change_stream_change_collection_manager.h
@@ -63,7 +63,12 @@ public:
* Returns true if change collections are enabled for recording oplog entries, false
* otherwise.
*/
- static bool isChangeCollectionEnabled();
+ static bool isChangeCollectionsModeActive();
+
+ /**
+ * Returns true if the change collection is present for the specified tenant, false otherwise.
+ */
+ bool hasChangeCollection(OperationContext* opCtx, boost::optional<TenantId> tenantId) const;
/**
* Creates a change collection for the specified tenant, if it doesn't exist. Returns Status::OK
diff --git a/src/mongo/db/commands/SConscript b/src/mongo/db/commands/SConscript
index cf6f79671bc..248b76aa534 100644
--- a/src/mongo/db/commands/SConscript
+++ b/src/mongo/db/commands/SConscript
@@ -369,6 +369,7 @@ env.Library(
'$BUILD_DIR/mongo/db/catalog/database_holder',
'$BUILD_DIR/mongo/db/catalog/index_key_validate',
'$BUILD_DIR/mongo/db/catalog/multi_index_block',
+ '$BUILD_DIR/mongo/db/change_stream_change_collection_manager',
'$BUILD_DIR/mongo/db/command_can_run_here',
'$BUILD_DIR/mongo/db/commands',
'$BUILD_DIR/mongo/db/concurrency/exception_util',
diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp
index 6cb0760d139..252e1823c99 100644
--- a/src/mongo/db/commands/run_aggregate.cpp
+++ b/src/mongo/db/commands/run_aggregate.cpp
@@ -41,6 +41,7 @@
#include "mongo/db/catalog/collection_uuid_mismatch.h"
#include "mongo/db/catalog/database.h"
#include "mongo/db/catalog/database_holder.h"
+#include "mongo/db/change_stream_change_collection_manager.h"
#include "mongo/db/commands/cqf/cqf_aggregate.h"
#include "mongo/db/curop.h"
#include "mongo/db/cursor_manager.h"
@@ -752,9 +753,21 @@ Status runAggregate(OperationContext* opCtx,
<< " is not supported for a change stream",
!request.getCollectionUUID());
- // Replace the execution namespace with that of the oplog.
+ // Replace the execution namespace with the oplog.
nss = NamespaceString::kRsOplogNamespace;
+ // In case of serverless the change stream will be opened on the change collection. We
+ // should first check if the change collection for the particular tenant exists and then
+ // replace the namespace with the change collection.
+ if (ChangeStreamChangeCollectionManager::isChangeCollectionsModeActive()) {
+ auto& changeCollectionManager = ChangeStreamChangeCollectionManager::get(opCtx);
+ uassert(ErrorCodes::ChangeStreamNotEnabled,
+ "Change streams must be enabled before being used.",
+ changeCollectionManager.hasChangeCollection(opCtx, origNss.tenantId()));
+
+ nss = NamespaceString::makeChangeCollectionNSS(origNss.tenantId());
+ }
+
// Upgrade and wait for read concern if necessary.
_adjustChangeStreamReadConcern(opCtx);
diff --git a/src/mongo/db/exec/collection_scan.cpp b/src/mongo/db/exec/collection_scan.cpp
index b8f0df82dcd..19d8033d3f1 100644
--- a/src/mongo/db/exec/collection_scan.cpp
+++ b/src/mongo/db/exec/collection_scan.cpp
@@ -80,7 +80,7 @@ CollectionScan::CollectionScan(ExpressionContext* expCtx,
// The 'minRecord' and 'maxRecord' parameters are used for a special optimization that
// applies only to forwards scans of the oplog and scans on clustered collections.
invariant(!params.resumeAfterRecordId);
- if (collection->ns().isOplog()) {
+ if (collection->ns().isOplogOrChangeCollection()) {
invariant(params.direction == CollectionScanParams::FORWARD);
} else {
invariant(collection->isClustered());
@@ -109,17 +109,26 @@ CollectionScan::CollectionScan(ExpressionContext* expCtx,
"collection scan bounds",
"min"_attr = (!_params.minRecord) ? "none" : _params.minRecord->toString(),
"max"_attr = (!_params.maxRecord) ? "none" : _params.maxRecord->toString());
- invariant(!_params.shouldTrackLatestOplogTimestamp || collection->ns().isOplog());
-
- if (params.assertTsHasNotFallenOffOplog) {
- invariant(params.shouldTrackLatestOplogTimestamp);
- invariant(params.direction == CollectionScanParams::FORWARD);
+ tassert(6521000,
+ "Expected an oplog or a change collection with 'shouldTrackLatestOplogTimestamp'",
+ !_params.shouldTrackLatestOplogTimestamp ||
+ collection->ns().isOplogOrChangeCollection());
+
+ if (params.assertTsHasNotFallenOff) {
+ tassert(6521001,
+ "Expected 'shouldTrackLatestOplogTimestamp' with 'assertTsHasNotFallenOff'",
+ params.shouldTrackLatestOplogTimestamp);
+ tassert(6521002,
+ "Expected forward collection scan with 'assertTsHasNotFallenOff'",
+ params.direction == CollectionScanParams::FORWARD);
}
if (params.resumeAfterRecordId) {
// The 'resumeAfterRecordId' parameter is used for resumable collection scans, which we
// only support in the forward direction.
- invariant(params.direction == CollectionScanParams::FORWARD);
+ tassert(6521003,
+ "Expected forward collection scan with 'resumeAfterRecordId'",
+ params.direction == CollectionScanParams::FORWARD);
}
}
@@ -227,8 +236,8 @@ PlanStage::StageState CollectionScan::doWork(WorkingSetID* out) {
}
_lastSeenId = record->id;
- if (_params.assertTsHasNotFallenOffOplog) {
- assertTsHasNotFallenOffOplog(*record);
+ if (_params.assertTsHasNotFallenOff) {
+ assertTsHasNotFallenOff(*record);
}
if (_params.shouldTrackLatestOplogTimestamp) {
setLatestOplogEntryTimestamp(*record);
@@ -259,22 +268,28 @@ void CollectionScan::setLatestOplogEntryTimestamp(const Record& record) {
_latestOplogEntryTimestamp = std::max(_latestOplogEntryTimestamp, tsElem.timestamp());
}
-void CollectionScan::assertTsHasNotFallenOffOplog(const Record& record) {
- // If the first entry we see in the oplog is the replset initialization, then it doesn't matter
- // if its timestamp is later than the timestamp that should not have fallen off the oplog; no
- // events earlier can have fallen off this oplog. Otherwise, verify that the timestamp of the
- // first observed oplog entry is earlier than or equal to timestamp that should not have fallen
- // off the oplog.
+void CollectionScan::assertTsHasNotFallenOff(const Record& record) {
auto oplogEntry = uassertStatusOK(repl::OplogEntry::parse(record.data.toBson()));
invariant(_specificStats.docsTested == 0);
+
+ // If the first entry we see in the oplog is the replset initialization, then it doesn't matter
+ // if its timestamp is later than the timestamp that should not have fallen off the oplog; no
+ // events earlier can have fallen off this oplog.
+ // NOTE: A change collection can be created at any moment as such it might not have replset
+ // initialization message, as such this case is not fully applicable for the change collection.
const bool isNewRS =
oplogEntry.getObject().binaryEqual(BSON("msg" << repl::kInitiatingSetMsg)) &&
oplogEntry.getOpType() == repl::OpTypeEnum::kNoop;
+
+ // Verify that the timestamp of the first observed oplog entry is earlier than or equal to
+ // timestamp that should not have fallen off the oplog.
+ const bool tsHasNotFallenOff = oplogEntry.getTimestamp() <= *_params.assertTsHasNotFallenOff;
+
uassert(ErrorCodes::OplogQueryMinTsMissing,
"Specified timestamp has already fallen off the oplog",
- isNewRS || oplogEntry.getTimestamp() <= *_params.assertTsHasNotFallenOffOplog);
+ isNewRS || tsHasNotFallenOff);
// We don't need to check this assertion again after we've confirmed the first oplog event.
- _params.assertTsHasNotFallenOffOplog = boost::none;
+ _params.assertTsHasNotFallenOff = boost::none;
}
namespace {
diff --git a/src/mongo/db/exec/collection_scan.h b/src/mongo/db/exec/collection_scan.h
index f9ce637dbad..a3737635ad6 100644
--- a/src/mongo/db/exec/collection_scan.h
+++ b/src/mongo/db/exec/collection_scan.h
@@ -117,7 +117,7 @@ private:
/**
* Asserts that the minimum timestamp in the query filter has not already fallen off the oplog.
*/
- void assertTsHasNotFallenOffOplog(const Record& record);
+ void assertTsHasNotFallenOff(const Record& record);
// WorkingSet is not owned by us.
WorkingSet* _workingSet;
diff --git a/src/mongo/db/exec/collection_scan_common.h b/src/mongo/db/exec/collection_scan_common.h
index ba5559a4491..a0e550a904d 100644
--- a/src/mongo/db/exec/collection_scan_common.h
+++ b/src/mongo/db/exec/collection_scan_common.h
@@ -98,7 +98,7 @@ struct CollectionScanParams {
bool tailable = false;
// Assert that the specified timestamp has not fallen off the oplog on a forward scan.
- boost::optional<Timestamp> assertTsHasNotFallenOffOplog = boost::none;
+ boost::optional<Timestamp> assertTsHasNotFallenOff = boost::none;
// Should we keep track of the timestamp of the latest oplog entry we've seen? This information
// is needed to merge cursors from the oplog in order of operation time when reading the oplog
diff --git a/src/mongo/db/namespace_string.cpp b/src/mongo/db/namespace_string.cpp
index 3d74f7a507a..9d29b806e5d 100644
--- a/src/mongo/db/namespace_string.cpp
+++ b/src/mongo/db/namespace_string.cpp
@@ -286,6 +286,12 @@ NamespaceString NamespaceString::makeCollectionlessAggregateNSS(const DatabaseNa
return nss;
}
+NamespaceString NamespaceString::makeChangeCollectionNSS(
+ const boost::optional<TenantId>& tenantId) {
+ // TODO: SERVER-65950 create namespace for a particular tenant.
+ return NamespaceString{NamespaceString::kConfigDb, NamespaceString::kChangeCollectionName};
+}
+
std::string NamespaceString::getSisterNS(StringData local) const {
verify(local.size() && local[0] != '.');
return db().toString() + "." + local.toString();
@@ -422,6 +428,10 @@ bool NamespaceString::isFLE2StateCollection() const {
coll().endsWith(fle2EcocSuffix));
}
+bool NamespaceString::isOplogOrChangeCollection() const {
+ return isOplog() || isChangeCollection();
+}
+
NamespaceString NamespaceString::makeTimeseriesBucketsNamespace() const {
return {db(), kTimeseriesBucketsCollectionPrefix.toString() + coll()};
}
diff --git a/src/mongo/db/namespace_string.h b/src/mongo/db/namespace_string.h
index 730a2859b91..face16ac3b5 100644
--- a/src/mongo/db/namespace_string.h
+++ b/src/mongo/db/namespace_string.h
@@ -314,6 +314,11 @@ public:
static NamespaceString makeCollectionlessAggregateNSS(const DatabaseName& dbName);
/**
+ * Constructs the change collection namespace for the specified tenant.
+ */
+ static NamespaceString makeChangeCollectionNSS(const boost::optional<TenantId>& tenantId);
+
+ /**
* Constructs a NamespaceString representing a listCollections namespace. The format for this
* namespace is "<dbName>.$cmd.listCollections".
*/
@@ -482,6 +487,11 @@ public:
bool isFLE2StateCollection() const;
/**
+ * Returns true if the namespace is an oplog or a change collection, false otherwise.
+ */
+ bool isOplogOrChangeCollection() const;
+
+ /**
* Returns the time-series buckets namespace for this view.
*/
NamespaceString makeTimeseriesBucketsNamespace() const;
diff --git a/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp
index 701c5b495a7..5d250101fa2 100644
--- a/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp
+++ b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp
@@ -167,7 +167,7 @@ public:
: DocumentSourceMock({}, expCtx), _collectionPtr(&_collection) {
_filterExpr = BSON("ns" << kTestNs);
_filter = MatchExpressionParser::parseAndNormalize(_filterExpr, pExpCtx);
- _params.assertTsHasNotFallenOffOplog = Timestamp(0);
+ _params.assertTsHasNotFallenOff = Timestamp(0);
_params.shouldTrackLatestOplogTimestamp = true;
_params.minRecord = RecordIdBound(RecordId(0));
_params.tailable = true;
@@ -178,7 +178,7 @@ public:
_filterExpr = BSON("ns" << kTestNs << "ts" << BSON("$gte" << resumeToken.clusterTime));
_filter = MatchExpressionParser::parseAndNormalize(_filterExpr, pExpCtx);
_params.minRecord = RecordIdBound(RecordId(resumeToken.clusterTime.asLL()));
- _params.assertTsHasNotFallenOffOplog = resumeToken.clusterTime;
+ _params.assertTsHasNotFallenOff = resumeToken.clusterTime;
}
void push_back(GetNextResult&& result) {
diff --git a/src/mongo/db/query/classic_stage_builder.cpp b/src/mongo/db/query/classic_stage_builder.cpp
index cc1915510c1..4404e2ab6da 100644
--- a/src/mongo/db/query/classic_stage_builder.cpp
+++ b/src/mongo/db/query/classic_stage_builder.cpp
@@ -79,7 +79,7 @@ std::unique_ptr<PlanStage> ClassicStageBuilder::build(const QuerySolutionNode* r
CollectionScanParams params;
params.tailable = csn->tailable;
params.shouldTrackLatestOplogTimestamp = csn->shouldTrackLatestOplogTimestamp;
- params.assertTsHasNotFallenOffOplog = csn->assertTsHasNotFallenOffOplog;
+ params.assertTsHasNotFallenOff = csn->assertTsHasNotFallenOff;
params.direction = (csn->direction == 1) ? CollectionScanParams::FORWARD
: CollectionScanParams::BACKWARD;
params.shouldWaitForOplogVisibility = csn->shouldWaitForOplogVisibility;
diff --git a/src/mongo/db/query/planner_access.cpp b/src/mongo/db/query/planner_access.cpp
index f283979cc5c..7de2c18ca96 100644
--- a/src/mongo/db/query/planner_access.cpp
+++ b/src/mongo/db/query/planner_access.cpp
@@ -384,8 +384,8 @@ std::unique_ptr<QuerySolutionNode> QueryPlannerAccess::makeCollectionScan(
// the collection scan to return timestamp-based tokens. Otherwise, we should
// return generic RecordId-based tokens.
if (query.getFindCommandRequest().getRequestResumeToken()) {
- csn->shouldTrackLatestOplogTimestamp = query.nss().isOplog();
- csn->requestResumeToken = !query.nss().isOplog();
+ csn->shouldTrackLatestOplogTimestamp = query.nss().isOplogOrChangeCollection();
+ csn->requestResumeToken = !query.nss().isOplogOrChangeCollection();
}
// Extract and assign the RecordId from the 'resumeAfter' token, if present.
@@ -397,26 +397,31 @@ std::unique_ptr<QuerySolutionNode> QueryPlannerAccess::makeCollectionScan(
const bool assertMinTsHasNotFallenOffOplog =
params.options & QueryPlannerParams::ASSERT_MIN_TS_HAS_NOT_FALLEN_OFF_OPLOG;
- if (query.nss().isOplog() && csn->direction == 1) {
+ if (query.nss().isOplogOrChangeCollection() && csn->direction == 1) {
+ // Takes Timestamp 'ts' as input, transforms it to the RecordIdBound and assigns it to the
+ // output parameter 'recordId'. The RecordId format for the change collection is a string,
+ // where as the RecordId format for the oplog is a long integer. The timestamp should be
+ // converted to the required format before assigning it to the 'recordId'.
+ auto assignRecordIdFromTimestamp = [&](auto& ts, auto* recordId) {
+ auto keyFormat = query.nss().isChangeCollection() ? KeyFormat::String : KeyFormat::Long;
+ auto status = record_id_helpers::keyForOptime(ts, keyFormat);
+ if (status.isOK()) {
+ *recordId = RecordIdBound(status.getValue());
+ }
+ };
+
// Optimizes the start and end location parameters for a collection scan for an oplog
// collection. Not compatible with $_resumeAfter so we do not optimize in that case.
if (resumeAfterObj.isEmpty()) {
auto [minTs, maxTs] = extractTsRange(query.root());
if (minTs) {
- StatusWith<RecordId> goal = record_id_helpers::keyForOptime(*minTs);
- if (goal.isOK()) {
- csn->minRecord = RecordIdBound(goal.getValue());
- }
-
+ assignRecordIdFromTimestamp(*minTs, &csn->minRecord);
if (assertMinTsHasNotFallenOffOplog) {
- csn->assertTsHasNotFallenOffOplog = *minTs;
+ csn->assertTsHasNotFallenOff = *minTs;
}
}
if (maxTs) {
- StatusWith<RecordId> goal = record_id_helpers::keyForOptime(*maxTs);
- if (goal.isOK()) {
- csn->maxRecord = RecordIdBound(goal.getValue());
- }
+ assignRecordIdFromTimestamp(*maxTs, &csn->maxRecord);
}
}
@@ -433,9 +438,9 @@ std::unique_ptr<QuerySolutionNode> QueryPlannerAccess::makeCollectionScan(
// specify a minimum timestamp. This is not a valid request, so we throw InvalidOptions.
if (assertMinTsHasNotFallenOffOplog) {
uassert(ErrorCodes::InvalidOptions,
- str::stream() << "assertTsHasNotFallenOffOplog cannot be applied to a query "
+ str::stream() << "assertTsHasNotFallenOff cannot be applied to a query "
"which does not imply a minimum 'ts' value ",
- csn->assertTsHasNotFallenOffOplog);
+ csn->assertTsHasNotFallenOff);
}
auto queryCollator = query.getCollator();
diff --git a/src/mongo/db/query/query_solution.cpp b/src/mongo/db/query/query_solution.cpp
index 893fef833e0..b62b54c386c 100644
--- a/src/mongo/db/query/query_solution.cpp
+++ b/src/mongo/db/query/query_solution.cpp
@@ -332,7 +332,7 @@ std::unique_ptr<QuerySolutionNode> CollectionScanNode::clone() const {
copy->tailable = this->tailable;
copy->direction = this->direction;
copy->shouldTrackLatestOplogTimestamp = this->shouldTrackLatestOplogTimestamp;
- copy->assertTsHasNotFallenOffOplog = this->assertTsHasNotFallenOffOplog;
+ copy->assertTsHasNotFallenOff = this->assertTsHasNotFallenOff;
copy->shouldWaitForOplogVisibility = this->shouldWaitForOplogVisibility;
copy->clusteredIndex = this->clusteredIndex;
copy->hasCompatibleCollation = this->hasCompatibleCollation;
diff --git a/src/mongo/db/query/query_solution.h b/src/mongo/db/query/query_solution.h
index 455c5aabcaa..27a4ff33977 100644
--- a/src/mongo/db/query/query_solution.h
+++ b/src/mongo/db/query/query_solution.h
@@ -489,7 +489,7 @@ struct CollectionScanNode : public QuerySolutionNodeWithSortSet {
bool shouldTrackLatestOplogTimestamp = false;
// Assert that the specified timestamp has not fallen off the oplog.
- boost::optional<Timestamp> assertTsHasNotFallenOffOplog = boost::none;
+ boost::optional<Timestamp> assertTsHasNotFallenOff = boost::none;
int direction{1};
diff --git a/src/mongo/db/query/sbe_stage_builder_coll_scan.cpp b/src/mongo/db/query/sbe_stage_builder_coll_scan.cpp
index f3011ec2bac..35c752f7dcb 100644
--- a/src/mongo/db/query/sbe_stage_builder_coll_scan.cpp
+++ b/src/mongo/db/query/sbe_stage_builder_coll_scan.cpp
@@ -328,7 +328,7 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> generateOptimizedOplo
// replica set initialization message. If this fails, then we throw
// ErrorCodes::OplogQueryMinTsMissing. We avoid doing this check on the resumable branch of a
// tailable scan; it only needs to be done once, when the initial branch is run.
- if (csn->assertTsHasNotFallenOffOplog && !isTailableResumeBranch) {
+ if (csn->assertTsHasNotFallenOff && !isTailableResumeBranch) {
invariant(csn->shouldTrackLatestOplogTimestamp);
// There should always be a 'tsSlot' already allocated on the RuntimeEnvironment for the
@@ -388,7 +388,7 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> generateOptimizedOplo
makeBinaryOp(sbe::EPrimBinary::lessEq,
makeVariable(minTsSlot),
makeConstant(sbe::value::TypeTags::Timestamp,
- csn->assertTsHasNotFallenOffOplog->asULL())),
+ csn->assertTsHasNotFallenOff->asULL())),
makeBinaryOp(
sbe::EPrimBinary::logicAnd,
makeBinaryOp(sbe::EPrimBinary::eq,
diff --git a/src/mongo/db/query/sbe_utils.cpp b/src/mongo/db/query/sbe_utils.cpp
index 34d7c8f6545..4284646b510 100644
--- a/src/mongo/db/query/sbe_utils.cpp
+++ b/src/mongo/db/query/sbe_utils.cpp
@@ -238,6 +238,7 @@ bool isQuerySbeCompatible(const CollectionPtr* collection,
const bool allExpressionsSupported = expCtx && expCtx->sbeCompatible;
const bool isNotCount = !(plannerOptions & QueryPlannerParams::IS_COUNT);
const bool isNotOplog = !cq->nss().isOplog();
+ const bool isNotChangeCollection = !cq->nss().isChangeCollection();
const bool doesNotContainMetadataRequirements = cq->metadataDeps().none();
const bool doesNotSortOnMetaOrPathWithNumericComponents =
!sortPattern || std::all_of(sortPattern->begin(), sortPattern->end(), [](auto&& part) {
@@ -261,7 +262,7 @@ bool isQuerySbeCompatible(const CollectionPtr* collection,
return allExpressionsSupported && isNotCount && doesNotContainMetadataRequirements &&
isQueryNotAgainstTimeseriesCollection && isQueryNotAgainstClusteredCollection &&
doesNotSortOnMetaOrPathWithNumericComponents && isNotOplog && doesNotRequireMatchDetails &&
- doesNotHaveElemMatchProject;
+ doesNotHaveElemMatchProject && isNotChangeCollection;
}
bool validateInputParamsBindings(
diff --git a/src/mongo/db/record_id_helpers.cpp b/src/mongo/db/record_id_helpers.cpp
index e9147666da8..bf313976a3b 100644
--- a/src/mongo/db/record_id_helpers.cpp
+++ b/src/mongo/db/record_id_helpers.cpp
@@ -48,23 +48,35 @@
namespace mongo {
namespace record_id_helpers {
-StatusWith<RecordId> keyForOptime(const Timestamp& opTime) {
- // Make sure secs and inc wouldn't be negative if treated as signed. This ensures that they
- // don't sort differently when put in a RecordId. It also avoids issues with Null/Invalid
- // RecordIds
- if (opTime.getSecs() > uint32_t(std::numeric_limits<int32_t>::max()))
- return {ErrorCodes::BadValue, "ts secs too high"};
-
- if (opTime.getInc() > uint32_t(std::numeric_limits<int32_t>::max()))
- return {ErrorCodes::BadValue, "ts inc too high"};
-
- const auto out = RecordId(opTime.getSecs(), opTime.getInc());
- if (out <= RecordId::minLong())
- return {ErrorCodes::BadValue, "ts too low"};
- if (out >= RecordId::maxLong())
- return {ErrorCodes::BadValue, "ts too high"};
-
- return out;
+StatusWith<RecordId> keyForOptime(const Timestamp& opTime, const KeyFormat keyFormat) {
+ switch (keyFormat) {
+ case KeyFormat::Long: {
+ // Make sure secs and inc wouldn't be negative if treated as signed. This ensures that
+ // they don't sort differently when put in a RecordId. It also avoids issues with
+ // Null/Invalid RecordIds
+ if (opTime.getSecs() > uint32_t(std::numeric_limits<int32_t>::max()))
+ return {ErrorCodes::BadValue, "ts secs too high"};
+
+ if (opTime.getInc() > uint32_t(std::numeric_limits<int32_t>::max()))
+ return {ErrorCodes::BadValue, "ts inc too high"};
+
+ const auto out = RecordId(opTime.getSecs(), opTime.getInc());
+ if (out <= RecordId::minLong())
+ return {ErrorCodes::BadValue, "ts too low"};
+ if (out >= RecordId::maxLong())
+ return {ErrorCodes::BadValue, "ts too high"};
+
+ return out;
+ }
+ case KeyFormat::String: {
+ KeyString::Builder keyBuilder(KeyString::Version::kLatestVersion);
+ keyBuilder.appendTimestamp(opTime);
+ return RecordId(keyBuilder.getBuffer(), keyBuilder.getSize());
+ }
+ default: { MONGO_UNREACHABLE_TASSERT(6521004); }
+ }
+
+ MONGO_UNREACHABLE_TASSERT(6521005);
}
@@ -84,7 +96,7 @@ StatusWith<RecordId> extractKeyOptime(const char* data, int len) {
if (elem.type() != bsonTimestamp)
return {ErrorCodes::BadValue, "ts must be a Timestamp"};
- return keyForOptime(elem.timestamp());
+ return keyForOptime(elem.timestamp(), KeyFormat::Long);
}
StatusWith<RecordId> keyForDoc(const BSONObj& doc,
diff --git a/src/mongo/db/record_id_helpers.h b/src/mongo/db/record_id_helpers.h
index 378466df45a..b957b30cce6 100644
--- a/src/mongo/db/record_id_helpers.h
+++ b/src/mongo/db/record_id_helpers.h
@@ -46,7 +46,7 @@ namespace record_id_helpers {
* Converts Timestamp to a RecordId in an unspecified manor that is safe to use as the key to
* in a RecordStore.
*/
-StatusWith<RecordId> keyForOptime(const Timestamp& opTime);
+StatusWith<RecordId> keyForOptime(const Timestamp& opTime, KeyFormat keyFormat);
/**
* For clustered collections, converts various values into a RecordId.
diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp
index 0908b06213a..d6ae765a1fc 100644
--- a/src/mongo/db/repl/oplog.cpp
+++ b/src/mongo/db/repl/oplog.cpp
@@ -389,7 +389,7 @@ void _logOpsInner(OperationContext* opCtx,
}
// Insert the oplog records to the respective tenants change collections.
- if (ChangeStreamChangeCollectionManager::isChangeCollectionEnabled()) {
+ if (ChangeStreamChangeCollectionManager::isChangeCollectionsModeActive()) {
ChangeStreamChangeCollectionManager::get(opCtx).insertDocumentsToChangeCollection(
opCtx, *records, timestamps);
}
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
index 7cbc79f9aed..5d450af12d7 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
@@ -560,7 +560,7 @@ OpTime ReplicationCoordinatorExternalStateImpl::onTransitionToPrimary(OperationC
// TODO: SERVER-65948 move the change collection creation logic from here to the PM-2502 hooks.
// The change collection will be created when the change stream is enabled.
- if (ChangeStreamChangeCollectionManager::isChangeCollectionEnabled()) {
+ if (ChangeStreamChangeCollectionManager::isChangeCollectionsModeActive()) {
auto status = ChangeStreamChangeCollectionManager::get(opCtx).createChangeCollection(
opCtx, boost::none);
if (!status.isOK()) {
diff --git a/src/mongo/db/storage/record_store_test_oplog.cpp b/src/mongo/db/storage/record_store_test_oplog.cpp
index 6c61f93ee76..cc014de1681 100644
--- a/src/mongo/db/storage/record_store_test_oplog.cpp
+++ b/src/mongo/db/storage/record_store_test_oplog.cpp
@@ -530,7 +530,7 @@ TEST(RecordStoreTestHarness, OplogVisibilityStandalone) {
rs->insertRecord(opCtx.get(), obj.objdata(), obj.objsize(), Timestamp());
ASSERT_OK(res.getStatus());
id1 = res.getValue();
- StatusWith<RecordId> expectedId = record_id_helpers::keyForOptime(ts);
+ StatusWith<RecordId> expectedId = record_id_helpers::keyForOptime(ts, KeyFormat::Long);
ASSERT_OK(expectedId.getStatus());
// RecordId should be extracted from 'ts' field when inserting into oplog namespace
ASSERT(expectedId.getValue().compare(id1) == 0);