summaryrefslogtreecommitdiff
path: root/src/mongo/db/commands/run_aggregate.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/commands/run_aggregate.cpp')
-rw-r--r--src/mongo/db/commands/run_aggregate.cpp18
1 files changed, 13 insertions, 5 deletions
diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp
index 4fe8a736ad7..b7bd221ec61 100644
--- a/src/mongo/db/commands/run_aggregate.cpp
+++ b/src/mongo/db/commands/run_aggregate.cpp
@@ -41,6 +41,7 @@
#include "mongo/db/catalog/database_holder.h"
#include "mongo/db/change_stream_change_collection_manager.h"
#include "mongo/db/change_stream_pre_images_collection_manager.h"
+#include "mongo/db/change_stream_serverless_helpers.h"
#include "mongo/db/curop.h"
#include "mongo/db/cursor_manager.h"
#include "mongo/db/db_raii.h"
@@ -757,13 +758,20 @@ Status runAggregate(OperationContext* opCtx,
nss = NamespaceString::kRsOplogNamespace;
// In case of serverless the change stream will be opened on the change collection.
- if (ChangeStreamChangeCollectionManager::isChangeCollectionsModeActive()) {
- auto& changeCollectionManager = ChangeStreamChangeCollectionManager::get(opCtx);
+ if (change_stream_serverless_helpers::isChangeCollectionsModeActive()) {
+ const auto tenantId =
+ change_stream_serverless_helpers::resolveTenantId(origNss.tenantId());
+
+ uassert(ErrorCodes::BadValue,
+ "Change streams cannot be used without tenant id",
+ tenantId);
+
uassert(ErrorCodes::ChangeStreamNotEnabled,
"Change streams must be enabled before being used.",
- changeCollectionManager.isChangeStreamEnabled(opCtx, origNss.tenantId()));
+ change_stream_serverless_helpers::isChangeStreamEnabled(opCtx, *tenantId));
+
- nss = NamespaceString::makeChangeCollectionNSS(origNss.tenantId());
+ nss = NamespaceString::makeChangeCollectionNSS(tenantId);
}
// Assert that a change stream on the config server is always opened on the oplog.
@@ -785,7 +793,7 @@ Status runAggregate(OperationContext* opCtx,
LOGV2_INFO(6689200,
"Opening change stream on the namespace: {nss}",
"Opening change stream",
- "nss"_attr = nss.toString());
+ "nss"_attr = nss.toStringWithTenantId());
}
// Upgrade and wait for read concern if necessary.