summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMickey. J Winters <mickey.winters@mongodb.com>2022-12-15 18:47:46 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-12-15 19:35:24 +0000
commitb953dbf36cba018c98a407d362580ed9ed9b9d23 (patch)
tree921f536dcd665e9162274004259d11567cb90204
parent8084d22db543a236c0b39d209b98f52b7296cac1 (diff)
downloadmongo-b953dbf36cba018c98a407d362580ed9ed9b9d23.tar.gz
SERVER-65259 fix cursor leak in aggregation that requires merging on shard
(cherry-picked from commit 7f40c171562cb8de1dfae04368ca54e44a193f96)
-rw-r--r--etc/backports_required_for_multiversion_tests.yml4
-rw-r--r--jstests/sharding/query/shard_refuses_cursor_ownership.js83
-rw-r--r--src/mongo/s/query/cluster_aggregation_planner.cpp28
3 files changed, 108 insertions, 7 deletions
diff --git a/etc/backports_required_for_multiversion_tests.yml b/etc/backports_required_for_multiversion_tests.yml
index c6b3b849aa9..9b9a1a149c2 100644
--- a/etc/backports_required_for_multiversion_tests.yml
+++ b/etc/backports_required_for_multiversion_tests.yml
@@ -230,6 +230,8 @@ last-continuous:
ticket: SERVER-71477
- test_file: src/mongo/db/modules/enterprise/jstests/fcbis/fcbis_election_during_storage_change.js
ticket: SERVER-69861
+ - test_file: jstests/sharding/query/shard_refuses_cursor_ownership.js
+ ticket: SERVER-65259
suites: null
last-lts:
all:
@@ -537,4 +539,6 @@ last-lts:
ticket: SERVER-71477
- test_file: src/mongo/db/modules/enterprise/jstests/fcbis/fcbis_election_during_storage_change.js
ticket: SERVER-69861
+ - test_file: jstests/sharding/query/shard_refuses_cursor_ownership.js
+ ticket: SERVER-65259
suites: null
diff --git a/jstests/sharding/query/shard_refuses_cursor_ownership.js b/jstests/sharding/query/shard_refuses_cursor_ownership.js
new file mode 100644
index 00000000000..30e6e88d896
--- /dev/null
+++ b/jstests/sharding/query/shard_refuses_cursor_ownership.js
@@ -0,0 +1,83 @@
+/**
+ * This test runs and unsharded $out query which results in mongos setting up a cursor and giving
+ * it to a shard to complete. mongos assumes the shard will kill the cursor, but if a shard doesn't
+ * accept ownership of the cursor then previously no one would kill it. this test ensures mongos
+ * will kill the cursor if a shard doesn't accept ownership.
+ * @tags: [
+ * ]
+ */
+(function() {
+"use strict";
+
+load("jstests/libs/fail_point_util.js");
+load("jstests/libs/parallel_shell_helpers.js");
+
+const st = new ShardingTest({shards: 2});
+
+const dbName = jsTestName();
+const collName = "foo";
+const ns = dbName + "." + collName;
+
+let db = st.s.getDB(dbName);
+assert.commandWorked(db.dropDatabase());
+let coll = db[collName];
+
+st.shardColl(collName, {x: 1}, {x: 0}, {x: 1}, dbName, true);
+
+assert.commandWorked(coll.insert([{x: -2}, {x: -1}, {x: 1}, {x: 2}]));
+
+const primary = st.getPrimaryShard(dbName);
+const other = st.getOther(st.getPrimaryShard(dbName));
+
+// Start an aggregation that requires merging on a shard. Let it run until the shard cursors have
+// been established but make it hang right before opening the merge cursor.
+let shardedAggregateHangBeforeDispatchMergingPipelineFP =
+ configureFailPoint(st.s, "shardedAggregateHangBeforeDispatchMergingPipeline");
+let awaitAggregationShell = startParallelShell(
+ funWithArgs((dbName, collName) => {
+ assert.eq(
+ 0, db.getSiblingDB(dbName)[collName].aggregate([{$out: collName + ".out"}]).itcount());
+ }, dbName, collName), st.s.port);
+shardedAggregateHangBeforeDispatchMergingPipelineFP.wait();
+
+// Start a chunk migration, let it run until it enters the critical section.
+let hangBeforePostMigrationCommitRefresh =
+ configureFailPoint(primary, "hangBeforePostMigrationCommitRefresh");
+let awaitMoveChunkShell = startParallelShell(
+ funWithArgs((recipientShard, ns) => {
+ assert.commandWorked(db.adminCommand({moveChunk: ns, find: {x: -1}, to: recipientShard}));
+ }, other.shardName, ns), st.s.port);
+hangBeforePostMigrationCommitRefresh.wait();
+
+// Let the aggregation continue and try to establish the merge cursor (it will first fail because
+// the shard is in the critical section. Mongos will transparently retry).
+shardedAggregateHangBeforeDispatchMergingPipelineFP.off();
+
+// Let the migration exit the critical section and complete.
+hangBeforePostMigrationCommitRefresh.off();
+
+// The aggregation will be able to complete now.
+awaitAggregationShell();
+
+awaitMoveChunkShell();
+
+// Did any cursor leak?
+const idleCursors = primary.getDB("admin")
+ .aggregate([
+ {$currentOp: {idleCursors: true, allUsers: true}},
+ {$match: {type: "idleCursor", ns: ns}}
+ ])
+ .toArray();
+assert.eq(0, idleCursors.length, "Found idle cursors: " + tojson(idleCursors));
+
+// Check that range deletions can be completed (if a cursor was left open, the range deletion would
+// not finish).
+assert.soon(
+ () => {
+ return primary.getDB("config")["rangeDeletions"].find().itcount() === 0;
+ },
+ "Range deletion tasks did not finish: + " +
+ tojson(primary.getDB("config")["rangeDeletions"].find().toArray()));
+
+st.stop();
+})();
diff --git a/src/mongo/s/query/cluster_aggregation_planner.cpp b/src/mongo/s/query/cluster_aggregation_planner.cpp
index 5788b30a7d4..43e6b5ffb44 100644
--- a/src/mongo/s/query/cluster_aggregation_planner.cpp
+++ b/src/mongo/s/query/cluster_aggregation_planner.cpp
@@ -65,6 +65,7 @@ namespace cluster_aggregation_planner {
MONGO_FAIL_POINT_DEFINE(shardedAggregateFailToDispatchExchangeConsumerPipeline);
MONGO_FAIL_POINT_DEFINE(shardedAggregateFailToEstablishMergingShardCursor);
+MONGO_FAIL_POINT_DEFINE(shardedAggregateHangBeforeDispatchMergingPipeline);
using sharded_agg_helpers::DispatchShardPipelineResults;
using sharded_agg_helpers::SplitPipeline;
@@ -171,9 +172,13 @@ Status dispatchMergingPipeline(const boost::intrusive_ptr<ExpressionContext>& ex
const PrivilegeVector& privileges,
bool hasChangeStream) {
// We should never be in a situation where we call this function on a non-merge pipeline.
- invariant(shardDispatchResults.splitPipeline);
+ tassert(6525900,
+ "tried to dispatch merge pipeline but the pipeline was not split",
+ shardDispatchResults.splitPipeline);
auto* mergePipeline = shardDispatchResults.splitPipeline->mergePipeline.get();
- invariant(mergePipeline);
+ tassert(6525901,
+ "tried to dispatch merge pipeline but there was no merge portion of the split pipeline",
+ mergePipeline);
auto* opCtx = expCtx->opCtx;
std::vector<ShardId> targetedShards;
@@ -232,9 +237,13 @@ Status dispatchMergingPipeline(const boost::intrusive_ptr<ExpressionContext>& ex
privileges,
expCtx->tailableMode));
- // Ownership for the shard cursors has been transferred to the merging shard. Dismiss the
- // ownership in the current merging pipeline such that when it goes out of scope it does not
- // attempt to kill the cursors.
+ // If the mergingShard returned an error and did not accept ownership it is our responsibility
+ // to kill the cursors.
+ uassertStatusOK(getStatusFromCommandResult(mergeResponse.swResponse.getValue().data));
+
+ // If we didn't get an error from the merging shard, ownership for the shard cursors has been
+ // transferred to the merging shard. Dismiss the ownership in the current merging pipeline such
+ // that when it goes out of scope it does not attempt to kill the cursors.
auto mergeCursors = static_cast<DocumentSourceMergeCursors*>(mergePipeline->peekFront());
mergeCursors->dismissCursorOwnership();
@@ -435,8 +444,11 @@ DispatchShardPipelineResults dispatchExchangeConsumerPipeline(
SplitPipeline splitPipeline{nullptr, std::move(mergePipeline), boost::none};
- // Relinquish ownership of the local consumer pipelines' cursors as each shard is now
- // responsible for its own producer cursors.
+ // Relinquish ownership of the consumer pipelines' cursors. These cursors are now set up to be
+ // merged by a set of $mergeCursors pipelines that we just dispatched to the shards above. Now
+ // that we've established those pipelines on the shards, we are no longer responsible for
+ // ensuring they are cleaned up. If there was a problem establishing the cursors then
+ // establishCursors() would have thrown and mongos would kill all the consumer cursors itself.
for (const auto& pipeline : consumerPipelines) {
const auto& mergeCursors =
static_cast<DocumentSourceMergeCursors*>(pipeline.shardsPipeline->peekFront());
@@ -704,6 +716,8 @@ Status dispatchPipelineAndMerge(OperationContext* opCtx,
expCtx, namespaces.executionNss, serializedCommand, &shardDispatchResults);
}
+ shardedAggregateHangBeforeDispatchMergingPipeline.pauseWhileSet();
+
// If we reach here, we have a merge pipeline to dispatch.
return dispatchMergingPipeline(expCtx,
namespaces,