diff options
Diffstat (limited to 'src/mongo/db/commands/run_aggregate.cpp')
-rw-r--r-- | src/mongo/db/commands/run_aggregate.cpp | 18 |
1 files changed, 13 insertions, 5 deletions
diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp index 4fe8a736ad7..b7bd221ec61 100644 --- a/src/mongo/db/commands/run_aggregate.cpp +++ b/src/mongo/db/commands/run_aggregate.cpp @@ -41,6 +41,7 @@ #include "mongo/db/catalog/database_holder.h" #include "mongo/db/change_stream_change_collection_manager.h" #include "mongo/db/change_stream_pre_images_collection_manager.h" +#include "mongo/db/change_stream_serverless_helpers.h" #include "mongo/db/curop.h" #include "mongo/db/cursor_manager.h" #include "mongo/db/db_raii.h" @@ -757,13 +758,20 @@ Status runAggregate(OperationContext* opCtx, nss = NamespaceString::kRsOplogNamespace; // In case of serverless the change stream will be opened on the change collection. - if (ChangeStreamChangeCollectionManager::isChangeCollectionsModeActive()) { - auto& changeCollectionManager = ChangeStreamChangeCollectionManager::get(opCtx); + if (change_stream_serverless_helpers::isChangeCollectionsModeActive()) { + const auto tenantId = + change_stream_serverless_helpers::resolveTenantId(origNss.tenantId()); + + uassert(ErrorCodes::BadValue, + "Change streams cannot be used without tenant id", + tenantId); + uassert(ErrorCodes::ChangeStreamNotEnabled, "Change streams must be enabled before being used.", - changeCollectionManager.isChangeStreamEnabled(opCtx, origNss.tenantId())); + change_stream_serverless_helpers::isChangeStreamEnabled(opCtx, *tenantId)); + - nss = NamespaceString::makeChangeCollectionNSS(origNss.tenantId()); + nss = NamespaceString::makeChangeCollectionNSS(tenantId); } // Assert that a change stream on the config server is always opened on the oplog. @@ -785,7 +793,7 @@ Status runAggregate(OperationContext* opCtx, LOGV2_INFO(6689200, "Opening change stream on the namespace: {nss}", "Opening change stream", - "nss"_attr = nss.toString()); + "nss"_attr = nss.toStringWithTenantId()); } // Upgrade and wait for read concern if necessary. |