diff options
Diffstat (limited to 'src/mongo/db/commands/run_aggregate.cpp')
-rw-r--r-- | src/mongo/db/commands/run_aggregate.cpp | 22 |
1 files changed, 20 insertions, 2 deletions
diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp index eb32a6c220b..cb15cf82247 100644 --- a/src/mongo/db/commands/run_aggregate.cpp +++ b/src/mongo/db/commands/run_aggregate.cpp @@ -42,6 +42,7 @@ #include "mongo/db/change_stream_change_collection_manager.h" #include "mongo/db/change_stream_pre_images_collection_manager.h" #include "mongo/db/change_stream_serverless_helpers.h" +#include "mongo/db/commands/external_data_source_scope_guard.h" #include "mongo/db/curop.h" #include "mongo/db/cursor_manager.h" #include "mongo/db/db_raii.h" @@ -81,6 +82,7 @@ #include "mongo/db/repl/speculative_majority_read_info.h" #include "mongo/db/s/collection_sharding_state.h" #include "mongo/db/s/operation_sharding_state.h" +#include "mongo/db/s/query_analysis_writer.h" #include "mongo/db/s/sharding_state.h" #include "mongo/db/service_context.h" #include "mongo/db/stats/resource_consumption_metrics.h" @@ -659,7 +661,7 @@ Status runAggregate(OperationContext* opCtx, const BSONObj& cmdObj, const PrivilegeVector& privileges, rpc::ReplyBuilderInterface* result) { - return runAggregate(opCtx, nss, request, {request}, cmdObj, privileges, result); + return runAggregate(opCtx, nss, request, {request}, cmdObj, privileges, result, {}); } Status runAggregate(OperationContext* opCtx, @@ -668,7 +670,8 @@ Status runAggregate(OperationContext* opCtx, const LiteParsedPipeline& liteParsedPipeline, const BSONObj& cmdObj, const PrivilegeVector& privileges, - rpc::ReplyBuilderInterface* result) { + rpc::ReplyBuilderInterface* result, + ExternalDataSourceScopeGuard externalDataSourceGuard) { // Perform some validations on the LiteParsedPipeline and request before continuing with the // aggregation command. @@ -945,6 +948,15 @@ Status runAggregate(OperationContext* opCtx, } } + if (analyze_shard_key::supportsPersistingSampledQueries() && request.getSampleId()) { + analyze_shard_key::QueryAnalysisWriter::get(opCtx) + .addAggregateQuery(*request.getSampleId(), + expCtx->ns, + pipeline->getInitialQuery(), + expCtx->getCollatorBSON()) + .getAsync([](auto) {}); + } + // If the aggregate command supports encrypted collections, do rewrites of the pipeline to // support querying against encrypted fields. if (shouldDoFLERewrite(request)) { @@ -1021,6 +1033,8 @@ Status runAggregate(OperationContext* opCtx, p.deleteUnderlying(); } }); + auto extDataSrcGuard = + std::make_shared<ExternalDataSourceScopeGuard>(std::move(externalDataSourceGuard)); for (auto&& exec : execs) { ClientCursorParams cursorParams( std::move(exec), @@ -1038,6 +1052,10 @@ Status runAggregate(OperationContext* opCtx, pin->incNBatches(); cursors.emplace_back(pin.getCursor()); + // All cursors share the ownership to 'extDataSrcGuard' and if the last cursor is destroyed, + // 'extDataSrcGuard' is also destroyed and created virtual collections are dropped by the + // destructor of ExternalDataSourceScopeGuard. + ExternalDataSourceScopeGuard::get(pin.getCursor()) = extDataSrcGuard; pins.emplace_back(std::move(pin)); } |