diff options
-rw-r--r-- | etc/backports_required_for_multiversion_tests.yml | 4 | ||||
-rw-r--r-- | jstests/sharding/query/shard_refuses_cursor_ownership.js | 83 | ||||
-rw-r--r-- | src/mongo/s/query/cluster_aggregation_planner.cpp | 28 |
3 files changed, 108 insertions, 7 deletions
diff --git a/etc/backports_required_for_multiversion_tests.yml b/etc/backports_required_for_multiversion_tests.yml index c6b3b849aa9..9b9a1a149c2 100644 --- a/etc/backports_required_for_multiversion_tests.yml +++ b/etc/backports_required_for_multiversion_tests.yml @@ -230,6 +230,8 @@ last-continuous: ticket: SERVER-71477 - test_file: src/mongo/db/modules/enterprise/jstests/fcbis/fcbis_election_during_storage_change.js ticket: SERVER-69861 + - test_file: jstests/sharding/query/shard_refuses_cursor_ownership.js + ticket: SERVER-65259 suites: null last-lts: all: @@ -537,4 +539,6 @@ last-lts: ticket: SERVER-71477 - test_file: src/mongo/db/modules/enterprise/jstests/fcbis/fcbis_election_during_storage_change.js ticket: SERVER-69861 + - test_file: jstests/sharding/query/shard_refuses_cursor_ownership.js + ticket: SERVER-65259 suites: null diff --git a/jstests/sharding/query/shard_refuses_cursor_ownership.js b/jstests/sharding/query/shard_refuses_cursor_ownership.js new file mode 100644 index 00000000000..30e6e88d896 --- /dev/null +++ b/jstests/sharding/query/shard_refuses_cursor_ownership.js @@ -0,0 +1,83 @@ +/** + * This test runs and unsharded $out query which results in mongos setting up a cursor and giving + * it to a shard to complete. mongos assumes the shard will kill the cursor, but if a shard doesn't + * accept ownership of the cursor then previously no one would kill it. this test ensures mongos + * will kill the cursor if a shard doesn't accept ownership. + * @tags: [ + * ] + */ +(function() { +"use strict"; + +load("jstests/libs/fail_point_util.js"); +load("jstests/libs/parallel_shell_helpers.js"); + +const st = new ShardingTest({shards: 2}); + +const dbName = jsTestName(); +const collName = "foo"; +const ns = dbName + "." + collName; + +let db = st.s.getDB(dbName); +assert.commandWorked(db.dropDatabase()); +let coll = db[collName]; + +st.shardColl(collName, {x: 1}, {x: 0}, {x: 1}, dbName, true); + +assert.commandWorked(coll.insert([{x: -2}, {x: -1}, {x: 1}, {x: 2}])); + +const primary = st.getPrimaryShard(dbName); +const other = st.getOther(st.getPrimaryShard(dbName)); + +// Start an aggregation that requires merging on a shard. Let it run until the shard cursors have +// been established but make it hang right before opening the merge cursor. +let shardedAggregateHangBeforeDispatchMergingPipelineFP = + configureFailPoint(st.s, "shardedAggregateHangBeforeDispatchMergingPipeline"); +let awaitAggregationShell = startParallelShell( + funWithArgs((dbName, collName) => { + assert.eq( + 0, db.getSiblingDB(dbName)[collName].aggregate([{$out: collName + ".out"}]).itcount()); + }, dbName, collName), st.s.port); +shardedAggregateHangBeforeDispatchMergingPipelineFP.wait(); + +// Start a chunk migration, let it run until it enters the critical section. +let hangBeforePostMigrationCommitRefresh = + configureFailPoint(primary, "hangBeforePostMigrationCommitRefresh"); +let awaitMoveChunkShell = startParallelShell( + funWithArgs((recipientShard, ns) => { + assert.commandWorked(db.adminCommand({moveChunk: ns, find: {x: -1}, to: recipientShard})); + }, other.shardName, ns), st.s.port); +hangBeforePostMigrationCommitRefresh.wait(); + +// Let the aggregation continue and try to establish the merge cursor (it will first fail because +// the shard is in the critical section. Mongos will transparently retry). +shardedAggregateHangBeforeDispatchMergingPipelineFP.off(); + +// Let the migration exit the critical section and complete. +hangBeforePostMigrationCommitRefresh.off(); + +// The aggregation will be able to complete now. +awaitAggregationShell(); + +awaitMoveChunkShell(); + +// Did any cursor leak? +const idleCursors = primary.getDB("admin") + .aggregate([ + {$currentOp: {idleCursors: true, allUsers: true}}, + {$match: {type: "idleCursor", ns: ns}} + ]) + .toArray(); +assert.eq(0, idleCursors.length, "Found idle cursors: " + tojson(idleCursors)); + +// Check that range deletions can be completed (if a cursor was left open, the range deletion would +// not finish). +assert.soon( + () => { + return primary.getDB("config")["rangeDeletions"].find().itcount() === 0; + }, + "Range deletion tasks did not finish: + " + + tojson(primary.getDB("config")["rangeDeletions"].find().toArray())); + +st.stop(); +})(); diff --git a/src/mongo/s/query/cluster_aggregation_planner.cpp b/src/mongo/s/query/cluster_aggregation_planner.cpp index 5788b30a7d4..43e6b5ffb44 100644 --- a/src/mongo/s/query/cluster_aggregation_planner.cpp +++ b/src/mongo/s/query/cluster_aggregation_planner.cpp @@ -65,6 +65,7 @@ namespace cluster_aggregation_planner { MONGO_FAIL_POINT_DEFINE(shardedAggregateFailToDispatchExchangeConsumerPipeline); MONGO_FAIL_POINT_DEFINE(shardedAggregateFailToEstablishMergingShardCursor); +MONGO_FAIL_POINT_DEFINE(shardedAggregateHangBeforeDispatchMergingPipeline); using sharded_agg_helpers::DispatchShardPipelineResults; using sharded_agg_helpers::SplitPipeline; @@ -171,9 +172,13 @@ Status dispatchMergingPipeline(const boost::intrusive_ptr<ExpressionContext>& ex const PrivilegeVector& privileges, bool hasChangeStream) { // We should never be in a situation where we call this function on a non-merge pipeline. - invariant(shardDispatchResults.splitPipeline); + tassert(6525900, + "tried to dispatch merge pipeline but the pipeline was not split", + shardDispatchResults.splitPipeline); auto* mergePipeline = shardDispatchResults.splitPipeline->mergePipeline.get(); - invariant(mergePipeline); + tassert(6525901, + "tried to dispatch merge pipeline but there was no merge portion of the split pipeline", + mergePipeline); auto* opCtx = expCtx->opCtx; std::vector<ShardId> targetedShards; @@ -232,9 +237,13 @@ Status dispatchMergingPipeline(const boost::intrusive_ptr<ExpressionContext>& ex privileges, expCtx->tailableMode)); - // Ownership for the shard cursors has been transferred to the merging shard. Dismiss the - // ownership in the current merging pipeline such that when it goes out of scope it does not - // attempt to kill the cursors. + // If the mergingShard returned an error and did not accept ownership it is our responsibility + // to kill the cursors. + uassertStatusOK(getStatusFromCommandResult(mergeResponse.swResponse.getValue().data)); + + // If we didn't get an error from the merging shard, ownership for the shard cursors has been + // transferred to the merging shard. Dismiss the ownership in the current merging pipeline such + // that when it goes out of scope it does not attempt to kill the cursors. auto mergeCursors = static_cast<DocumentSourceMergeCursors*>(mergePipeline->peekFront()); mergeCursors->dismissCursorOwnership(); @@ -435,8 +444,11 @@ DispatchShardPipelineResults dispatchExchangeConsumerPipeline( SplitPipeline splitPipeline{nullptr, std::move(mergePipeline), boost::none}; - // Relinquish ownership of the local consumer pipelines' cursors as each shard is now - // responsible for its own producer cursors. + // Relinquish ownership of the consumer pipelines' cursors. These cursors are now set up to be + // merged by a set of $mergeCursors pipelines that we just dispatched to the shards above. Now + // that we've established those pipelines on the shards, we are no longer responsible for + // ensuring they are cleaned up. If there was a problem establishing the cursors then + // establishCursors() would have thrown and mongos would kill all the consumer cursors itself. for (const auto& pipeline : consumerPipelines) { const auto& mergeCursors = static_cast<DocumentSourceMergeCursors*>(pipeline.shardsPipeline->peekFront()); @@ -704,6 +716,8 @@ Status dispatchPipelineAndMerge(OperationContext* opCtx, expCtx, namespaces.executionNss, serializedCommand, &shardDispatchResults); } + shardedAggregateHangBeforeDispatchMergingPipeline.pauseWhileSet(); + // If we reach here, we have a merge pipeline to dispatch. return dispatchMergingPipeline(expCtx, namespaces, |