summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMickey. J Winters <mickey.winters@mongodb.com>2022-12-16 19:41:35 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-12-16 20:29:15 +0000
commit6530690040dca4c52b78b2f268e31bf11a015131 (patch)
tree37f5d5e51be1f0480e22fa85c5fc9754e2633c24
parent0f6dd5be6ac02d498f4b4e4cb07d20a22200aa63 (diff)
downloadmongo-6530690040dca4c52b78b2f268e31bf11a015131.tar.gz
SERVER-65259 fix cursor leak in aggregation that requires merging on shard
(cherry-picked from commit 7f40c171562cb8de1dfae04368ca54e44a193f96)
-rw-r--r--etc/backports_required_for_multiversion_tests.yml4
-rw-r--r--jstests/sharding/query/shard_refuses_cursor_ownership.js83
-rw-r--r--src/mongo/s/query/cluster_aggregation_planner.cpp28
3 files changed, 108 insertions, 7 deletions
diff --git a/etc/backports_required_for_multiversion_tests.yml b/etc/backports_required_for_multiversion_tests.yml
index dff8fb855fe..1cefceb09e4 100644
--- a/etc/backports_required_for_multiversion_tests.yml
+++ b/etc/backports_required_for_multiversion_tests.yml
@@ -88,6 +88,8 @@ last-continuous:
ticket: SERVER-68728
- test_file: jstests/sharding/prepare_transaction_then_migrate.js
ticket: SERVER-68361
+ - test_file: jstests/sharding/query/shard_refuses_cursor_ownership.js
+ ticket: SERVER-65259
suites:
change_streams_multiversion_passthrough: null
change_streams_sharded_collections_multiversion_passthrough: null
@@ -273,6 +275,8 @@ last-lts:
ticket: SERVER-68728
- test_file: jstests/sharding/prepare_transaction_then_migrate.js
ticket: SERVER-68361
+ - test_file: jstests/sharding/query/shard_refuses_cursor_ownership.js
+ ticket: SERVER-65259
suites:
change_streams_multiversion_passthrough: null
change_streams_sharded_collections_multiversion_passthrough: null
diff --git a/jstests/sharding/query/shard_refuses_cursor_ownership.js b/jstests/sharding/query/shard_refuses_cursor_ownership.js
new file mode 100644
index 00000000000..30e6e88d896
--- /dev/null
+++ b/jstests/sharding/query/shard_refuses_cursor_ownership.js
@@ -0,0 +1,83 @@
+/**
+ * This test runs and unsharded $out query which results in mongos setting up a cursor and giving
+ * it to a shard to complete. mongos assumes the shard will kill the cursor, but if a shard doesn't
+ * accept ownership of the cursor then previously no one would kill it. this test ensures mongos
+ * will kill the cursor if a shard doesn't accept ownership.
+ * @tags: [
+ * ]
+ */
+(function() {
+"use strict";
+
+load("jstests/libs/fail_point_util.js");
+load("jstests/libs/parallel_shell_helpers.js");
+
+const st = new ShardingTest({shards: 2});
+
+const dbName = jsTestName();
+const collName = "foo";
+const ns = dbName + "." + collName;
+
+let db = st.s.getDB(dbName);
+assert.commandWorked(db.dropDatabase());
+let coll = db[collName];
+
+st.shardColl(collName, {x: 1}, {x: 0}, {x: 1}, dbName, true);
+
+assert.commandWorked(coll.insert([{x: -2}, {x: -1}, {x: 1}, {x: 2}]));
+
+const primary = st.getPrimaryShard(dbName);
+const other = st.getOther(st.getPrimaryShard(dbName));
+
+// Start an aggregation that requires merging on a shard. Let it run until the shard cursors have
+// been established but make it hang right before opening the merge cursor.
+let shardedAggregateHangBeforeDispatchMergingPipelineFP =
+ configureFailPoint(st.s, "shardedAggregateHangBeforeDispatchMergingPipeline");
+let awaitAggregationShell = startParallelShell(
+ funWithArgs((dbName, collName) => {
+ assert.eq(
+ 0, db.getSiblingDB(dbName)[collName].aggregate([{$out: collName + ".out"}]).itcount());
+ }, dbName, collName), st.s.port);
+shardedAggregateHangBeforeDispatchMergingPipelineFP.wait();
+
+// Start a chunk migration, let it run until it enters the critical section.
+let hangBeforePostMigrationCommitRefresh =
+ configureFailPoint(primary, "hangBeforePostMigrationCommitRefresh");
+let awaitMoveChunkShell = startParallelShell(
+ funWithArgs((recipientShard, ns) => {
+ assert.commandWorked(db.adminCommand({moveChunk: ns, find: {x: -1}, to: recipientShard}));
+ }, other.shardName, ns), st.s.port);
+hangBeforePostMigrationCommitRefresh.wait();
+
+// Let the aggregation continue and try to establish the merge cursor (it will first fail because
+// the shard is in the critical section. Mongos will transparently retry).
+shardedAggregateHangBeforeDispatchMergingPipelineFP.off();
+
+// Let the migration exit the critical section and complete.
+hangBeforePostMigrationCommitRefresh.off();
+
+// The aggregation will be able to complete now.
+awaitAggregationShell();
+
+awaitMoveChunkShell();
+
+// Did any cursor leak?
+const idleCursors = primary.getDB("admin")
+ .aggregate([
+ {$currentOp: {idleCursors: true, allUsers: true}},
+ {$match: {type: "idleCursor", ns: ns}}
+ ])
+ .toArray();
+assert.eq(0, idleCursors.length, "Found idle cursors: " + tojson(idleCursors));
+
+// Check that range deletions can be completed (if a cursor was left open, the range deletion would
+// not finish).
+assert.soon(
+ () => {
+ return primary.getDB("config")["rangeDeletions"].find().itcount() === 0;
+ },
+ "Range deletion tasks did not finish: + " +
+ tojson(primary.getDB("config")["rangeDeletions"].find().toArray()));
+
+st.stop();
+})();
diff --git a/src/mongo/s/query/cluster_aggregation_planner.cpp b/src/mongo/s/query/cluster_aggregation_planner.cpp
index 2ccebf5870b..1837e581fbf 100644
--- a/src/mongo/s/query/cluster_aggregation_planner.cpp
+++ b/src/mongo/s/query/cluster_aggregation_planner.cpp
@@ -65,6 +65,7 @@ namespace cluster_aggregation_planner {
MONGO_FAIL_POINT_DEFINE(shardedAggregateFailToDispatchExchangeConsumerPipeline);
MONGO_FAIL_POINT_DEFINE(shardedAggregateFailToEstablishMergingShardCursor);
+MONGO_FAIL_POINT_DEFINE(shardedAggregateHangBeforeDispatchMergingPipeline);
using sharded_agg_helpers::DispatchShardPipelineResults;
using sharded_agg_helpers::SplitPipeline;
@@ -171,9 +172,13 @@ Status dispatchMergingPipeline(const boost::intrusive_ptr<ExpressionContext>& ex
const PrivilegeVector& privileges,
bool hasChangeStream) {
// We should never be in a situation where we call this function on a non-merge pipeline.
- invariant(shardDispatchResults.splitPipeline);
+ tassert(6525900,
+ "tried to dispatch merge pipeline but the pipeline was not split",
+ shardDispatchResults.splitPipeline);
auto* mergePipeline = shardDispatchResults.splitPipeline->mergePipeline.get();
- invariant(mergePipeline);
+ tassert(6525901,
+ "tried to dispatch merge pipeline but there was no merge portion of the split pipeline",
+ mergePipeline);
auto* opCtx = expCtx->opCtx;
std::vector<ShardId> targetedShards;
@@ -235,9 +240,13 @@ Status dispatchMergingPipeline(const boost::intrusive_ptr<ExpressionContext>& ex
privileges,
expCtx->tailableMode));
- // Ownership for the shard cursors has been transferred to the merging shard. Dismiss the
- // ownership in the current merging pipeline such that when it goes out of scope it does not
- // attempt to kill the cursors.
+ // If the mergingShard returned an error and did not accept ownership it is our responsibility
+ // to kill the cursors.
+ uassertStatusOK(getStatusFromCommandResult(mergeResponse.swResponse.getValue().data));
+
+ // If we didn't get an error from the merging shard, ownership for the shard cursors has been
+ // transferred to the merging shard. Dismiss the ownership in the current merging pipeline such
+ // that when it goes out of scope it does not attempt to kill the cursors.
auto mergeCursors = static_cast<DocumentSourceMergeCursors*>(mergePipeline->peekFront());
mergeCursors->dismissCursorOwnership();
@@ -442,8 +451,11 @@ DispatchShardPipelineResults dispatchExchangeConsumerPipeline(
SplitPipeline splitPipeline{nullptr, std::move(mergePipeline), boost::none};
- // Relinquish ownership of the local consumer pipelines' cursors as each shard is now
- // responsible for its own producer cursors.
+ // Relinquish ownership of the consumer pipelines' cursors. These cursors are now set up to be
+ // merged by a set of $mergeCursors pipelines that we just dispatched to the shards above. Now
+ // that we've established those pipelines on the shards, we are no longer responsible for
+ // ensuring they are cleaned up. If there was a problem establishing the cursors then
+ // establishCursors() would have thrown and mongos would kill all the consumer cursors itself.
for (const auto& pipeline : consumerPipelines) {
const auto& mergeCursors =
static_cast<DocumentSourceMergeCursors*>(pipeline.shardsPipeline->peekFront());
@@ -709,6 +721,8 @@ Status dispatchPipelineAndMerge(OperationContext* opCtx,
expCtx, namespaces.executionNss, serializedCommand, &shardDispatchResults);
}
+ shardedAggregateHangBeforeDispatchMergingPipeline.pauseWhileSet();
+
// If we reach here, we have a merge pipeline to dispatch.
return dispatchMergingPipeline(expCtx,
namespaces,