diff options
author | Mickey. J Winters <mickey.winters@mongodb.com> | 2022-12-16 19:41:56 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-12-16 19:56:07 +0000 |
commit | ca6b6dc8eb0a0bd527f812f4fb1196794ccd1a83 (patch) | |
tree | d23bdb3a165dd17eed672bbfb54bc7f3548f34e0 | |
parent | 32d00c545bfaa34509547cb68919d29ede9ed5fa (diff) | |
download | mongo-ca6b6dc8eb0a0bd527f812f4fb1196794ccd1a83.tar.gz |
SERVER-65259 fix cursor leak in aggregation that requires merging on shard
(cherry-picked from commit 7f40c171562cb8de1dfae04368ca54e44a193f96)
-rw-r--r-- | jstests/sharding/shard_refuses_cursor_ownership.js | 84 | ||||
-rw-r--r-- | src/mongo/s/query/cluster_aggregation_planner.cpp | 28 |
2 files changed, 105 insertions, 7 deletions
diff --git a/jstests/sharding/shard_refuses_cursor_ownership.js b/jstests/sharding/shard_refuses_cursor_ownership.js new file mode 100644 index 00000000000..99d609516cf --- /dev/null +++ b/jstests/sharding/shard_refuses_cursor_ownership.js @@ -0,0 +1,84 @@ +/** + * This test runs and unsharded $out query which results in mongos setting up a cursor and giving + * it to a shard to complete. mongos assumes the shard will kill the cursor, but if a shard doesn't + * accept ownership of the cursor then previously no one would kill it. this test ensures mongos + * will kill the cursor if a shard doesn't accept ownership. + * @tags: [ + * requires_fcv_44 + * ] + */ +(function() { +"use strict"; + +load("jstests/libs/fail_point_util.js"); +load("jstests/libs/parallel_shell_helpers.js"); + +const st = new ShardingTest({shards: 2}); + +const dbName = jsTestName(); +const collName = "foo"; +const ns = dbName + "." + collName; + +let db = st.s.getDB(dbName); +assert.commandWorked(db.dropDatabase()); +let coll = db[collName]; + +st.shardColl(collName, {x: 1}, {x: 0}, {x: 1}, dbName, true); + +assert.commandWorked(coll.insert([{x: -2}, {x: -1}, {x: 1}, {x: 2}])); + +const primary = st.getPrimaryShard(dbName); +const other = st.getOther(st.getPrimaryShard(dbName)); + +// Start an aggregation that requires merging on a shard. Let it run until the shard cursors have +// been established but make it hang right before opening the merge cursor. +let shardedAggregateHangBeforeDispatchMergingPipelineFP = + configureFailPoint(st.s, "shardedAggregateHangBeforeDispatchMergingPipeline"); +let awaitAggregationShell = startParallelShell( + funWithArgs((dbName, collName) => { + assert.eq( + 0, db.getSiblingDB(dbName)[collName].aggregate([{$out: collName + ".out"}]).itcount()); + }, dbName, collName), st.s.port); +shardedAggregateHangBeforeDispatchMergingPipelineFP.wait(); + +// Start a chunk migration, let it run until it enters the critical section. +let hangBeforePostMigrationCommitRefresh = + configureFailPoint(primary, "hangBeforePostMigrationCommitRefresh"); +let awaitMoveChunkShell = startParallelShell( + funWithArgs((recipientShard, ns) => { + assert.commandWorked(db.adminCommand({moveChunk: ns, find: {x: -1}, to: recipientShard})); + }, other.shardName, ns), st.s.port); +hangBeforePostMigrationCommitRefresh.wait(); + +// Let the aggregation continue and try to establish the merge cursor (it will first fail because +// the shard is in the critical section. Mongos will transparently retry). +shardedAggregateHangBeforeDispatchMergingPipelineFP.off(); + +// Let the migration exit the critical section and complete. +hangBeforePostMigrationCommitRefresh.off(); + +// The aggregation will be able to complete now. +awaitAggregationShell(); + +awaitMoveChunkShell(); + +// Did any cursor leak? +const idleCursors = primary.getDB("admin") + .aggregate([ + {$currentOp: {idleCursors: true, allUsers: true}}, + {$match: {type: "idleCursor", ns: ns}} + ]) + .toArray(); +assert.eq(0, idleCursors.length, "Found idle cursors: " + tojson(idleCursors)); + +// Check that range deletions can be completed (if a cursor was left open, the range deletion would +// not finish). +assert.soon( + () => { + return primary.getDB("config")["rangeDeletions"].find().itcount() === 0; + }, + "Range deletion tasks did not finish: + " + + tojson(primary.getDB("config")["rangeDeletions"].find().toArray())); + +st.stop(); +})(); diff --git a/src/mongo/s/query/cluster_aggregation_planner.cpp b/src/mongo/s/query/cluster_aggregation_planner.cpp index c57d252d8d2..2f962bf771d 100644 --- a/src/mongo/s/query/cluster_aggregation_planner.cpp +++ b/src/mongo/s/query/cluster_aggregation_planner.cpp @@ -64,6 +64,7 @@ namespace cluster_aggregation_planner { MONGO_FAIL_POINT_DEFINE(shardedAggregateFailToDispatchExchangeConsumerPipeline); MONGO_FAIL_POINT_DEFINE(shardedAggregateFailToEstablishMergingShardCursor); +MONGO_FAIL_POINT_DEFINE(shardedAggregateHangBeforeDispatchMergingPipeline); using sharded_agg_helpers::DispatchShardPipelineResults; using sharded_agg_helpers::SplitPipeline; @@ -177,9 +178,13 @@ Status dispatchMergingPipeline(const boost::intrusive_ptr<ExpressionContext>& ex const PrivilegeVector& privileges, bool hasChangeStream) { // We should never be in a situation where we call this function on a non-merge pipeline. - invariant(shardDispatchResults.splitPipeline); + uassert(6525900, + "tried to dispatch merge pipeline but the pipeline was not split", + shardDispatchResults.splitPipeline); auto* mergePipeline = shardDispatchResults.splitPipeline->mergePipeline.get(); - invariant(mergePipeline); + uassert(6525901, + "tried to dispatch merge pipeline but there was no merge portion of the split pipeline", + mergePipeline); auto* opCtx = expCtx->opCtx; std::vector<ShardId> targetedShards; @@ -244,9 +249,13 @@ Status dispatchMergingPipeline(const boost::intrusive_ptr<ExpressionContext>& ex privileges, expCtx->tailableMode)); - // Ownership for the shard cursors has been transferred to the merging shard. Dismiss the - // ownership in the current merging pipeline such that when it goes out of scope it does not - // attempt to kill the cursors. + // If the mergingShard returned an error and did not accept ownership it is our responsibility + // to kill the cursors. + uassertStatusOK(getStatusFromCommandResult(mergeResponse.swResponse.getValue().data)); + + // If we didn't get an error from the merging shard, ownership for the shard cursors has been + // transferred to the merging shard. Dismiss the ownership in the current merging pipeline such + // that when it goes out of scope it does not attempt to kill the cursors. auto mergeCursors = static_cast<DocumentSourceMergeCursors*>(mergePipeline->peekFront()); mergeCursors->dismissCursorOwnership(); @@ -433,8 +442,11 @@ DispatchShardPipelineResults dispatchExchangeConsumerPipeline( SplitPipeline splitPipeline{nullptr, std::move(mergePipeline), boost::none}; - // Relinquish ownership of the local consumer pipelines' cursors as each shard is now - // responsible for its own producer cursors. + // Relinquish ownership of the consumer pipelines' cursors. These cursors are now set up to be + // merged by a set of $mergeCursors pipelines that we just dispatched to the shards above. Now + // that we've established those pipelines on the shards, we are no longer responsible for + // ensuring they are cleaned up. If there was a problem establishing the cursors then + // establishCursors() would have thrown and mongos would kill all the consumer cursors itself. for (const auto& pipeline : consumerPipelines) { const auto& mergeCursors = static_cast<DocumentSourceMergeCursors*>(pipeline.shardsPipeline->peekFront()); @@ -697,6 +709,8 @@ Status dispatchPipelineAndMerge(OperationContext* opCtx, expCtx, namespaces.executionNss, serializedCommand, &shardDispatchResults); } + shardedAggregateHangBeforeDispatchMergingPipeline.pauseWhileSet(); + // If we reach here, we have a merge pipeline to dispatch. return dispatchMergingPipeline(expCtx, namespaces, |