diff options
Diffstat (limited to 'src/mongo/db')
-rw-r--r-- | src/mongo/db/change_stream_options_manager.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/change_stream_pre_image_util.cpp | 6 | ||||
-rw-r--r-- | src/mongo/db/change_stream_serverless_helpers.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/change_stream_serverless_helpers.h | 2 | ||||
-rw-r--r-- | src/mongo/db/change_streams_cluster_parameter.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/commands/run_aggregate.cpp | 7 | ||||
-rw-r--r-- | src/mongo/db/op_observer/op_observer_impl.cpp | 28 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog.cpp | 24 |
8 files changed, 14 insertions, 61 deletions
diff --git a/src/mongo/db/change_stream_options_manager.cpp b/src/mongo/db/change_stream_options_manager.cpp index 37e82659886..9d3300e58b7 100644 --- a/src/mongo/db/change_stream_options_manager.cpp +++ b/src/mongo/db/change_stream_options_manager.cpp @@ -136,7 +136,7 @@ Status ChangeStreamOptionsParameter::validate(const BSONElement& newValueElement } }, [&](const std::int64_t& expireAfterSeconds) { - if (change_stream_serverless_helpers::isChangeCollectionsModeActive()) { + if (change_stream_serverless_helpers::isServerlessEnvironment()) { validateStatus = { ErrorCodes::CommandNotSupported, "The 'changeStreamOptions.preAndPostImages.expireAfterSeconds' is " diff --git a/src/mongo/db/change_stream_pre_image_util.cpp b/src/mongo/db/change_stream_pre_image_util.cpp index 56291553149..c66f32d8a96 100644 --- a/src/mongo/db/change_stream_pre_image_util.cpp +++ b/src/mongo/db/change_stream_pre_image_util.cpp @@ -57,11 +57,7 @@ boost::optional<std::int64_t> getExpireAfterSecondsFromChangeStreamOptions( boost::optional<Date_t> getPreImageExpirationTime(OperationContext* opCtx, Date_t currentTime) { // Non-serverless and serverless environments expire pre-images according to different logic and - // parameters. - // - // This method retrieves the 'expireAfterSeconds' for a single-tenant environment. Prohibit - // callers from using this in a serverless setting. - invariant(!change_stream_serverless_helpers::isChangeCollectionsModeActive()); + // parameters. This method retrieves the 'expireAfterSeconds' for a single-tenant environment. boost::optional<std::int64_t> expireAfterSeconds = boost::none; // Get the expiration time directly from the change stream manager. diff --git a/src/mongo/db/change_stream_serverless_helpers.cpp b/src/mongo/db/change_stream_serverless_helpers.cpp index f8406d07a99..b48bd36667d 100644 --- a/src/mongo/db/change_stream_serverless_helpers.cpp +++ b/src/mongo/db/change_stream_serverless_helpers.cpp @@ -74,10 +74,10 @@ bool canInitializeServices() { return false; } - return canRunInTargetEnvironment(); + return isServerlessEnvironment(); } -bool canRunInTargetEnvironment() { +bool isServerlessEnvironment() { // A change stream services are enabled only in the multitenant serverless settings. For the // sharded cluster, 'internalChangeStreamUseTenantIdForTesting' maybe provided for the testing // purposes until the support is available. diff --git a/src/mongo/db/change_stream_serverless_helpers.h b/src/mongo/db/change_stream_serverless_helpers.h index 9298603982f..5d088c65be2 100644 --- a/src/mongo/db/change_stream_serverless_helpers.h +++ b/src/mongo/db/change_stream_serverless_helpers.h @@ -60,7 +60,7 @@ bool canInitializeServices(); * Returns true if the target environment (replica-set or sharded-cluster) supports running change * stream in the serverless, false otherwise. */ -bool canRunInTargetEnvironment(); +bool isServerlessEnvironment(); /** * Returns an internal tenant id that will be used for testing purposes. This tenant id will not diff --git a/src/mongo/db/change_streams_cluster_parameter.cpp b/src/mongo/db/change_streams_cluster_parameter.cpp index f3636b6e41c..036a27fd631 100644 --- a/src/mongo/db/change_streams_cluster_parameter.cpp +++ b/src/mongo/db/change_streams_cluster_parameter.cpp @@ -49,7 +49,7 @@ Status validateChangeStreamsClusterParameter( return {ErrorCodes::IllegalOperation, "The 'changeStreams' parameter is unsupported in standalone."}; } - if (!change_stream_serverless_helpers::canRunInTargetEnvironment()) { + if (!change_stream_serverless_helpers::isServerlessEnvironment()) { return Status( ErrorCodes::CommandNotSupported, "The 'changeStreams' cluster-wide parameter is only available in serverless."); diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp index 4a95b56eb13..a290ef56713 100644 --- a/src/mongo/db/commands/run_aggregate.cpp +++ b/src/mongo/db/commands/run_aggregate.cpp @@ -874,9 +874,8 @@ Status runAggregate(OperationContext* opCtx, nss = NamespaceString::kRsOplogNamespace; // In case of serverless the change stream will be opened on the change collection. - const bool changeCollectionsMode = - change_stream_serverless_helpers::isChangeCollectionsModeActive(); - if (changeCollectionsMode) { + const bool isServerless = change_stream_serverless_helpers::isServerlessEnvironment(); + if (isServerless) { const auto tenantId = change_stream_serverless_helpers::resolveTenantId(origNss.tenantId()); @@ -923,7 +922,7 @@ Status runAggregate(OperationContext* opCtx, registerTelemetry(); uassert(ErrorCodes::ChangeStreamNotEnabled, "Change streams must be enabled before being used", - !changeCollectionsMode || + !isServerless || change_stream_serverless_helpers::isChangeStreamEnabled(opCtx, *nss.tenantId())); } else if (nss.isCollectionlessAggregateNS() && pipelineInvolvedNamespaces.empty()) { diff --git a/src/mongo/db/op_observer/op_observer_impl.cpp b/src/mongo/db/op_observer/op_observer_impl.cpp index e3ed6a0159c..a4f14b9a6c2 100644 --- a/src/mongo/db/op_observer/op_observer_impl.cpp +++ b/src/mongo/db/op_observer/op_observer_impl.cpp @@ -380,30 +380,6 @@ void logGlobalIndexDDLOperation(OperationContext* opCtx, onWriteOpCompleted(opCtx, {stmtId}, sessionTxnRecord); } -/** - * See isTenantChangeStreamEnabled() in oplog.cpp. - */ -bool isTenantChangeStreamEnabled(OperationContext* opCtx, boost::optional<TenantId> tenantId) { - const auto& settings = repl::ReplicationCoordinator::get(opCtx)->getSettings(); - if (!settings.isServerless()) { - return false; - } - - if (!change_stream_serverless_helpers::isChangeCollectionsModeActive()) { - return false; - } - - if (!tenantId) { - return false; - } - - if (!change_stream_serverless_helpers::isChangeStreamEnabled(opCtx, tenantId.get())) { - return false; - } - - return true; -} - } // namespace OpObserverImpl::OpObserverImpl(std::unique_ptr<OplogWriter> oplogWriter) @@ -629,7 +605,9 @@ std::vector<repl::OpTime> _logInsertOps(OperationContext* opCtx, // UUID and optional donor timeline metadata. if (const auto& recipientInfo = repl::tenantMigrationInfo(opCtx)) { oplogEntryTemplate->setFromTenantMigration(recipientInfo->uuid); - if (isTenantChangeStreamEnabled(opCtx, oplogEntryTemplate->getTid()) && + if (oplogEntryTemplate->getTid() && + change_stream_serverless_helpers::isChangeStreamEnabled( + opCtx, *oplogEntryTemplate->getTid()) && recipientInfo->donorOplogEntryData) { oplogEntryTemplate->setDonorOpTime(recipientInfo->donorOplogEntryData->donorOpTime); oplogEntryTemplate->setDonorApplyOpsIndex( diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp index 908e64d878c..c6448ea3f62 100644 --- a/src/mongo/db/repl/oplog.cpp +++ b/src/mongo/db/repl/oplog.cpp @@ -174,27 +174,6 @@ StringData getInvalidatingReason(const OplogApplication::Mode mode, const bool i return ""_sd; } -bool isTenantChangeStreamEnabled(OperationContext* opCtx, boost::optional<TenantId> tenantId) { - const auto& settings = ReplicationCoordinator::get(opCtx)->getSettings(); - if (!settings.isServerless()) { - return false; - } - - if (!change_stream_serverless_helpers::isChangeCollectionsModeActive()) { - return false; - } - - if (!tenantId) { - return false; - } - - if (!change_stream_serverless_helpers::isChangeStreamEnabled(opCtx, tenantId.get())) { - return false; - } - - return true; -} - Status insertDocumentsForOplog(OperationContext* opCtx, const CollectionPtr& oplogCollection, std::vector<Record>* records, @@ -518,7 +497,8 @@ OpTime logOp(OperationContext* opCtx, MutableOplogEntry* oplogEntry) { // UUID and optional donor timeline metadata. if (const auto& recipientInfo = tenantMigrationInfo(opCtx)) { oplogEntry->setFromTenantMigration(recipientInfo->uuid); - if (isTenantChangeStreamEnabled(opCtx, oplogEntry->getTid()) && + if (oplogEntry->getTid() && + change_stream_serverless_helpers::isChangeStreamEnabled(opCtx, *oplogEntry->getTid()) && recipientInfo->donorOplogEntryData) { oplogEntry->setDonorOpTime(recipientInfo->donorOplogEntryData->donorOpTime); oplogEntry->setDonorApplyOpsIndex(recipientInfo->donorOplogEntryData->applyOpsIndex); |