summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMickey. J Winters <mickey.winters@mongodb.com>2022-12-16 19:41:56 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-12-16 19:56:07 +0000
commitca6b6dc8eb0a0bd527f812f4fb1196794ccd1a83 (patch)
treed23bdb3a165dd17eed672bbfb54bc7f3548f34e0 /src
parent32d00c545bfaa34509547cb68919d29ede9ed5fa (diff)
downloadmongo-ca6b6dc8eb0a0bd527f812f4fb1196794ccd1a83.tar.gz
SERVER-65259 fix cursor leak in aggregation that requires merging on shard
(cherry-picked from commit 7f40c171562cb8de1dfae04368ca54e44a193f96)
Diffstat (limited to 'src')
-rw-r--r--src/mongo/s/query/cluster_aggregation_planner.cpp28
1 files changed, 21 insertions, 7 deletions
diff --git a/src/mongo/s/query/cluster_aggregation_planner.cpp b/src/mongo/s/query/cluster_aggregation_planner.cpp
index c57d252d8d2..2f962bf771d 100644
--- a/src/mongo/s/query/cluster_aggregation_planner.cpp
+++ b/src/mongo/s/query/cluster_aggregation_planner.cpp
@@ -64,6 +64,7 @@ namespace cluster_aggregation_planner {
MONGO_FAIL_POINT_DEFINE(shardedAggregateFailToDispatchExchangeConsumerPipeline);
MONGO_FAIL_POINT_DEFINE(shardedAggregateFailToEstablishMergingShardCursor);
+MONGO_FAIL_POINT_DEFINE(shardedAggregateHangBeforeDispatchMergingPipeline);
using sharded_agg_helpers::DispatchShardPipelineResults;
using sharded_agg_helpers::SplitPipeline;
@@ -177,9 +178,13 @@ Status dispatchMergingPipeline(const boost::intrusive_ptr<ExpressionContext>& ex
const PrivilegeVector& privileges,
bool hasChangeStream) {
// We should never be in a situation where we call this function on a non-merge pipeline.
- invariant(shardDispatchResults.splitPipeline);
+ uassert(6525900,
+ "tried to dispatch merge pipeline but the pipeline was not split",
+ shardDispatchResults.splitPipeline);
auto* mergePipeline = shardDispatchResults.splitPipeline->mergePipeline.get();
- invariant(mergePipeline);
+ uassert(6525901,
+ "tried to dispatch merge pipeline but there was no merge portion of the split pipeline",
+ mergePipeline);
auto* opCtx = expCtx->opCtx;
std::vector<ShardId> targetedShards;
@@ -244,9 +249,13 @@ Status dispatchMergingPipeline(const boost::intrusive_ptr<ExpressionContext>& ex
privileges,
expCtx->tailableMode));
- // Ownership for the shard cursors has been transferred to the merging shard. Dismiss the
- // ownership in the current merging pipeline such that when it goes out of scope it does not
- // attempt to kill the cursors.
+ // If the mergingShard returned an error and did not accept ownership it is our responsibility
+ // to kill the cursors.
+ uassertStatusOK(getStatusFromCommandResult(mergeResponse.swResponse.getValue().data));
+
+ // If we didn't get an error from the merging shard, ownership for the shard cursors has been
+ // transferred to the merging shard. Dismiss the ownership in the current merging pipeline such
+ // that when it goes out of scope it does not attempt to kill the cursors.
auto mergeCursors = static_cast<DocumentSourceMergeCursors*>(mergePipeline->peekFront());
mergeCursors->dismissCursorOwnership();
@@ -433,8 +442,11 @@ DispatchShardPipelineResults dispatchExchangeConsumerPipeline(
SplitPipeline splitPipeline{nullptr, std::move(mergePipeline), boost::none};
- // Relinquish ownership of the local consumer pipelines' cursors as each shard is now
- // responsible for its own producer cursors.
+ // Relinquish ownership of the consumer pipelines' cursors. These cursors are now set up to be
+ // merged by a set of $mergeCursors pipelines that we just dispatched to the shards above. Now
+ // that we've established those pipelines on the shards, we are no longer responsible for
+ // ensuring they are cleaned up. If there was a problem establishing the cursors then
+ // establishCursors() would have thrown and mongos would kill all the consumer cursors itself.
for (const auto& pipeline : consumerPipelines) {
const auto& mergeCursors =
static_cast<DocumentSourceMergeCursors*>(pipeline.shardsPipeline->peekFront());
@@ -697,6 +709,8 @@ Status dispatchPipelineAndMerge(OperationContext* opCtx,
expCtx, namespaces.executionNss, serializedCommand, &shardDispatchResults);
}
+ shardedAggregateHangBeforeDispatchMergingPipeline.pauseWhileSet();
+
// If we reach here, we have a merge pipeline to dispatch.
return dispatchMergingPipeline(expCtx,
namespaces,