diff options
author | Mickey. J Winters <mickey.winters@mongodb.com> | 2022-12-16 19:41:56 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-12-16 19:56:07 +0000 |
commit | ca6b6dc8eb0a0bd527f812f4fb1196794ccd1a83 (patch) | |
tree | d23bdb3a165dd17eed672bbfb54bc7f3548f34e0 /src | |
parent | 32d00c545bfaa34509547cb68919d29ede9ed5fa (diff) | |
download | mongo-ca6b6dc8eb0a0bd527f812f4fb1196794ccd1a83.tar.gz |
SERVER-65259 fix cursor leak in aggregation that requires merging on shard
(cherry-picked from commit 7f40c171562cb8de1dfae04368ca54e44a193f96)
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/s/query/cluster_aggregation_planner.cpp | 28 |
1 files changed, 21 insertions, 7 deletions
diff --git a/src/mongo/s/query/cluster_aggregation_planner.cpp b/src/mongo/s/query/cluster_aggregation_planner.cpp index c57d252d8d2..2f962bf771d 100644 --- a/src/mongo/s/query/cluster_aggregation_planner.cpp +++ b/src/mongo/s/query/cluster_aggregation_planner.cpp @@ -64,6 +64,7 @@ namespace cluster_aggregation_planner { MONGO_FAIL_POINT_DEFINE(shardedAggregateFailToDispatchExchangeConsumerPipeline); MONGO_FAIL_POINT_DEFINE(shardedAggregateFailToEstablishMergingShardCursor); +MONGO_FAIL_POINT_DEFINE(shardedAggregateHangBeforeDispatchMergingPipeline); using sharded_agg_helpers::DispatchShardPipelineResults; using sharded_agg_helpers::SplitPipeline; @@ -177,9 +178,13 @@ Status dispatchMergingPipeline(const boost::intrusive_ptr<ExpressionContext>& ex const PrivilegeVector& privileges, bool hasChangeStream) { // We should never be in a situation where we call this function on a non-merge pipeline. - invariant(shardDispatchResults.splitPipeline); + uassert(6525900, + "tried to dispatch merge pipeline but the pipeline was not split", + shardDispatchResults.splitPipeline); auto* mergePipeline = shardDispatchResults.splitPipeline->mergePipeline.get(); - invariant(mergePipeline); + uassert(6525901, + "tried to dispatch merge pipeline but there was no merge portion of the split pipeline", + mergePipeline); auto* opCtx = expCtx->opCtx; std::vector<ShardId> targetedShards; @@ -244,9 +249,13 @@ Status dispatchMergingPipeline(const boost::intrusive_ptr<ExpressionContext>& ex privileges, expCtx->tailableMode)); - // Ownership for the shard cursors has been transferred to the merging shard. Dismiss the - // ownership in the current merging pipeline such that when it goes out of scope it does not - // attempt to kill the cursors. + // If the mergingShard returned an error and did not accept ownership it is our responsibility + // to kill the cursors. + uassertStatusOK(getStatusFromCommandResult(mergeResponse.swResponse.getValue().data)); + + // If we didn't get an error from the merging shard, ownership for the shard cursors has been + // transferred to the merging shard. Dismiss the ownership in the current merging pipeline such + // that when it goes out of scope it does not attempt to kill the cursors. auto mergeCursors = static_cast<DocumentSourceMergeCursors*>(mergePipeline->peekFront()); mergeCursors->dismissCursorOwnership(); @@ -433,8 +442,11 @@ DispatchShardPipelineResults dispatchExchangeConsumerPipeline( SplitPipeline splitPipeline{nullptr, std::move(mergePipeline), boost::none}; - // Relinquish ownership of the local consumer pipelines' cursors as each shard is now - // responsible for its own producer cursors. + // Relinquish ownership of the consumer pipelines' cursors. These cursors are now set up to be + // merged by a set of $mergeCursors pipelines that we just dispatched to the shards above. Now + // that we've established those pipelines on the shards, we are no longer responsible for + // ensuring they are cleaned up. If there was a problem establishing the cursors then + // establishCursors() would have thrown and mongos would kill all the consumer cursors itself. for (const auto& pipeline : consumerPipelines) { const auto& mergeCursors = static_cast<DocumentSourceMergeCursors*>(pipeline.shardsPipeline->peekFront()); @@ -697,6 +709,8 @@ Status dispatchPipelineAndMerge(OperationContext* opCtx, expCtx, namespaces.executionNss, serializedCommand, &shardDispatchResults); } + shardedAggregateHangBeforeDispatchMergingPipeline.pauseWhileSet(); + // If we reach here, we have a merge pipeline to dispatch. return dispatchMergingPipeline(expCtx, namespaces, |