summaryrefslogtreecommitdiff
path: root/src/mongo/db/commands/run_aggregate.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/commands/run_aggregate.cpp')
-rw-r--r--src/mongo/db/commands/run_aggregate.cpp22
1 files changed, 20 insertions, 2 deletions
diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp
index eb32a6c220b..cb15cf82247 100644
--- a/src/mongo/db/commands/run_aggregate.cpp
+++ b/src/mongo/db/commands/run_aggregate.cpp
@@ -42,6 +42,7 @@
#include "mongo/db/change_stream_change_collection_manager.h"
#include "mongo/db/change_stream_pre_images_collection_manager.h"
#include "mongo/db/change_stream_serverless_helpers.h"
+#include "mongo/db/commands/external_data_source_scope_guard.h"
#include "mongo/db/curop.h"
#include "mongo/db/cursor_manager.h"
#include "mongo/db/db_raii.h"
@@ -81,6 +82,7 @@
#include "mongo/db/repl/speculative_majority_read_info.h"
#include "mongo/db/s/collection_sharding_state.h"
#include "mongo/db/s/operation_sharding_state.h"
+#include "mongo/db/s/query_analysis_writer.h"
#include "mongo/db/s/sharding_state.h"
#include "mongo/db/service_context.h"
#include "mongo/db/stats/resource_consumption_metrics.h"
@@ -659,7 +661,7 @@ Status runAggregate(OperationContext* opCtx,
const BSONObj& cmdObj,
const PrivilegeVector& privileges,
rpc::ReplyBuilderInterface* result) {
- return runAggregate(opCtx, nss, request, {request}, cmdObj, privileges, result);
+ return runAggregate(opCtx, nss, request, {request}, cmdObj, privileges, result, {});
}
Status runAggregate(OperationContext* opCtx,
@@ -668,7 +670,8 @@ Status runAggregate(OperationContext* opCtx,
const LiteParsedPipeline& liteParsedPipeline,
const BSONObj& cmdObj,
const PrivilegeVector& privileges,
- rpc::ReplyBuilderInterface* result) {
+ rpc::ReplyBuilderInterface* result,
+ ExternalDataSourceScopeGuard externalDataSourceGuard) {
// Perform some validations on the LiteParsedPipeline and request before continuing with the
// aggregation command.
@@ -945,6 +948,15 @@ Status runAggregate(OperationContext* opCtx,
}
}
+ if (analyze_shard_key::supportsPersistingSampledQueries() && request.getSampleId()) {
+ analyze_shard_key::QueryAnalysisWriter::get(opCtx)
+ .addAggregateQuery(*request.getSampleId(),
+ expCtx->ns,
+ pipeline->getInitialQuery(),
+ expCtx->getCollatorBSON())
+ .getAsync([](auto) {});
+ }
+
// If the aggregate command supports encrypted collections, do rewrites of the pipeline to
// support querying against encrypted fields.
if (shouldDoFLERewrite(request)) {
@@ -1021,6 +1033,8 @@ Status runAggregate(OperationContext* opCtx,
p.deleteUnderlying();
}
});
+ auto extDataSrcGuard =
+ std::make_shared<ExternalDataSourceScopeGuard>(std::move(externalDataSourceGuard));
for (auto&& exec : execs) {
ClientCursorParams cursorParams(
std::move(exec),
@@ -1038,6 +1052,10 @@ Status runAggregate(OperationContext* opCtx,
pin->incNBatches();
cursors.emplace_back(pin.getCursor());
+ // All cursors share the ownership to 'extDataSrcGuard' and if the last cursor is destroyed,
+ // 'extDataSrcGuard' is also destroyed and created virtual collections are dropped by the
+ // destructor of ExternalDataSourceScopeGuard.
+ ExternalDataSourceScopeGuard::get(pin.getCursor()) = extDataSrcGuard;
pins.emplace_back(std::move(pin));
}