diff options
author | Bernard Gorman <bernard.gorman@mongodb.com> | 2019-09-13 10:12:39 +0000 |
---|---|---|
committer | evergreen <evergreen@mongodb.com> | 2019-09-13 10:12:39 +0000 |
commit | 89d5c8b10e40648a403f12a55bcb66f2f5bef384 (patch) | |
tree | 3674111f238de4c3bbd7b0ab717cac8bccbb864d | |
parent | 11eaee8aace14a3b8ba1d7c3ab462e8badc8ffee (diff) | |
download | mongo-89d5c8b10e40648a403f12a55bcb66f2f5bef384.tar.gz |
SERVER-41758 Verify that at least 1 shard exists after hard-reload in aggregation routing path
(cherry picked from commit 1fa4766c621bd4cfd74319094469eff3a5de3b79)
6 files changed, 212 insertions, 12 deletions
diff --git a/buildscripts/resmokeconfig/suites/concurrency_sharded_clusterwide_ops_add_remove_shards.yml b/buildscripts/resmokeconfig/suites/concurrency_sharded_clusterwide_ops_add_remove_shards.yml new file mode 100644 index 00000000000..31bffac4895 --- /dev/null +++ b/buildscripts/resmokeconfig/suites/concurrency_sharded_clusterwide_ops_add_remove_shards.yml @@ -0,0 +1,41 @@ +test_kind: fsm_workload_test + +selector: + roots: + - jstests/concurrency/fsm_workloads_add_remove_shards/**/*.js + + exclude_with_any_tags: + - requires_replication + +executor: + archive: + hooks: + - CheckReplDBHashInBackground + - CheckReplDBHash + - ValidateCollections + tests: true + config: + shell_options: + readMode: commands + global_vars: + TestData: + usingReplicaSetShards: true + hooks: + - class: CheckReplDBHashInBackground + - class: CheckReplDBHash + - class: ValidateCollections + - class: CleanupConcurrencyWorkloads + fixture: + class: ShardedClusterFixture + mongos_options: + set_parameters: + enableTestCommands: 1 + shard_options: + mongod_options: + oplogSize: 1024 + mongod_options: + set_parameters: + enableTestCommands: 1 + num_rs_nodes_per_shard: 3 + num_shards: 2 + num_mongos: 2 diff --git a/etc/evergreen.yml b/etc/evergreen.yml index 2f1c4232c24..6e4b45c54df 100644 --- a/etc/evergreen.yml +++ b/etc/evergreen.yml @@ -6609,6 +6609,16 @@ tasks: resmoke_args: --suites=concurrency_sharded_replication_with_balancer --excludeWithAnyTags=uses_transactions --storageEngine=wiredTiger resmoke_jobs_max: 1 +- <<: *task_template + name: concurrency_sharded_clusterwide_ops_add_remove_shards + tags: ["concurrency", "common", "read_concern_maj"] + commands: + - func: "do setup" + - func: "run tests" + vars: + resmoke_args: --suites=concurrency_sharded_clusterwide_ops_add_remove_shards --storageEngine=wiredTiger + resmoke_jobs_max: 1 + - name: concurrency_sharded_causal_consistency_gen tags: ["concurrency"] commands: @@ -10220,6 +10230,7 @@ buildvariants: - name: concurrency_replication_multi_stmt_txn - name: concurrency_sharded_replication - name: concurrency_sharded_replication_with_balancer + - name: concurrency_sharded_clusterwide_ops_add_remove_shards - name: concurrency_sharded_local_read_write_multi_stmt_txn - name: concurrency_sharded_local_read_write_multi_stmt_txn_with_balancer - name: concurrency_sharded_multi_stmt_txn @@ -11842,6 +11853,7 @@ buildvariants: - rhel62-large - name: concurrency_sharded_replication - name: concurrency_sharded_replication_with_balancer + - name: concurrency_sharded_clusterwide_ops_add_remove_shards - name: concurrency_simultaneous - name: concurrency_simultaneous_replication distros: diff --git a/jstests/concurrency/fsm_workloads_add_remove_shards/clusterwide_ops_with_add_drop_shards.js b/jstests/concurrency/fsm_workloads_add_remove_shards/clusterwide_ops_with_add_drop_shards.js new file mode 100644 index 00000000000..54e6ce913bf --- /dev/null +++ b/jstests/concurrency/fsm_workloads_add_remove_shards/clusterwide_ops_with_add_drop_shards.js @@ -0,0 +1,28 @@ +/** + * Verify that operations which must run on all shards, such as $currentOp and $changeStream, do not + * crash when shards are added to the cluster mid-operation, or when config.shards is dropped. + * + * This test inherits from 'sharded_clusterwide_ops_with_add_remove_shards.js' but is kept separate + * from it, because (1) we may remove the ability to write to config.shards in the future, at which + * point this test can simply be removed; and (2) running a single FSM test with both removeShard + * and config.shards.remove({}) can cause the former to hang indefinitely while waiting for the + * removed shard to drain. + * + * @tags: [requires_sharding, requires_non_retryable_writes, catches_command_failures, + * uses_change_streams, uses_curop_agg_stage] + */ + +"use strict"; + +// For base $config setup. +const baseDir = 'jstests/concurrency/fsm_workloads_add_remove_shards/'; +load(baseDir + 'clusterwide_ops_with_add_remove_shards.js'); + +// After loading the base file, $config has been populated with states and transitions. We simply +// overwrite 'states.removeShard' such that it instantly wipes all shards from the cluster rather +// than removing a single shard via the removeShard command. This is the only way to test that +// mongoS is resilient to the sudden absence of shards in the middle of an operation, as the +// removeShard command is not permitted to remove the last existing shard in the cluster. +$config.states.removeShard = function(db, collName) { + assert.commandWorked(db.getSiblingDB("config").shards.remove({})); +}; diff --git a/jstests/concurrency/fsm_workloads_add_remove_shards/clusterwide_ops_with_add_remove_shards.js b/jstests/concurrency/fsm_workloads_add_remove_shards/clusterwide_ops_with_add_remove_shards.js new file mode 100644 index 00000000000..b16ddcd319b --- /dev/null +++ b/jstests/concurrency/fsm_workloads_add_remove_shards/clusterwide_ops_with_add_remove_shards.js @@ -0,0 +1,110 @@ +/** + * Verify that operations which must run on all shards, such as $currentOp and $changeStream, do not + * crash when shards are added to or removed from the cluster mid-operation. + * + * @tags: [requires_sharding, requires_non_retryable_writes, catches_command_failures, + * uses_change_streams, uses_curop_agg_stage] + */ + +"use strict"; + +var $config = (function() { + // The 'setup' function is run once by the parent thread after the cluster has been initialized, + // before the worker threads have been spawned. The 'this' argument is bound as '$config.data'. + function setup(db, collName, cluster) { + // Obtain the list of shards present in the cluster. Used to remove and restore shards. + this.shardList = db.getSiblingDB("config").shards.find().toArray(); + // Drop the test database. It's not needed and will complicate re-adding shards. + assert.commandWorked(db.dropDatabase()); + } + + // Returns a random integer in the range [0, max). + function randomInt(max) { + return Math.floor(Math.random() * Math.floor(max)); + } + + // Helper to close a clusterwide cursor, given a command result. + function closeClusterWideCursor(db, res) { + if (res.ok) { + db.adminCommand({ + killCursors: "$cmd.aggregate", + cursors: [res.cursor.id], + }); + } + } + + var states = { + runChangeStream: function(db, collName) { + const res = db.adminCommand({ + aggregate: 1, + pipeline: [{$changeStream: {allChangesForCluster: true}}], + cursor: {} + }); + closeClusterWideCursor(db, res); + }, + + runCurrentOp: function(db, collName) { + const res = db.adminCommand({aggregate: 1, pipeline: [{$currentOp: {}}], cursor: {}}); + closeClusterWideCursor(db, res); + }, + + removeShard: function(db, collName) { + // Make sure that only a single removeShard operation is running at any time. + const testLocksColl = db.getSiblingDB("config").testLocks; + if (!testLocksColl.insert({_id: "removeShard"}).nInserted) { + return; + } + // Iterate until we successfully remove a shard or run out of shards. + for (let shardIdx = 0; shardIdx < this.shardList.length; ++shardIdx) { + const shardName = this.shardList[shardIdx]._id; + if (db.adminCommand({removeShard: shardName}).state === "started") { + break; + } + } + // Remove the lock document so that other threads can call removeShard. + assert.commandWorked(testLocksColl.remove({_id: "removeShard"})); + }, + + addShard: function addShard(db, collName) { + const shardIdx = randomInt(this.shardList.length); + const shardEntry = this.shardList[shardIdx]; + db.adminCommand({addShard: shardEntry.host, name: shardEntry._id}); + }, + + init: function(db, collName) { + // Do nothing. This is only used to randomize the first action taken by each worker. + } + }; + + const transitionProbabilities = + {runChangeStream: 0.25, runCurrentOp: 0.25, removeShard: 0.25, addShard: 0.25}; + var transitions = { + init: transitionProbabilities, + runChangeStream: transitionProbabilities, + runCurrentOp: transitionProbabilities, + removeShard: transitionProbabilities, + addShard: transitionProbabilities + }; + + // The 'teardown' function is run once by the parent thread before the cluster is destroyed, but + // after the worker threads have been reaped. The 'this' argument is bound as '$config.data'. + function teardown(db, collName, cluster) { + // If any shards are draining, unset them so we don't impact subsequent tests. + db.getSiblingDB("config").shards.update({}, {$unset: {draining: 1}}, {multi: true}); + // Ensure that all shards are present in the cluster before shutting down the ShardingTest. + for (let shardEntry of this.shardList) { + assert.soon(() => db.adminCommand({addShard: shardEntry.host, name: shardEntry._id}).ok, + `failed to add shard ${shardEntry._id} back into cluster at end of test`); + } + } + + return { + threadCount: 100, + iterations: 1000, + startState: "init", + states: states, + transitions: transitions, + setup: setup, + teardown: teardown + }; +})(); diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.cpp b/src/mongo/db/pipeline/sharded_agg_helpers.cpp index 1c32c251ff5..e17dddc7e50 100644 --- a/src/mongo/db/pipeline/sharded_agg_helpers.cpp +++ b/src/mongo/db/pipeline/sharded_agg_helpers.cpp @@ -285,10 +285,14 @@ DispatchShardPipelineResults dispatchShardPipeline( : createPassthroughCommandForShard( opCtx, aggRequest, expCtx->getRuntimeConstants(), pipeline.get(), collationObj); - // Refresh the shard registry if we're targeting all shards. We need the shard registry - // to be at least as current as the logical time used when creating the command for - // $changeStream to work reliably, so we do a "hard" reload. - if (mustRunOnAll) { + // In order for a $changeStream to work reliably, we need the shard registry to be at least as + // current as the logical time at which the pipeline was serialized to 'targetedCommand' above. + // We therefore hard-reload and retarget the shards here. We don't refresh for other pipelines + // that must run on all shards (e.g. $currentOp) because, unlike $changeStream, those pipelines + // may not have been forced to split if there was only one shard in the cluster when the command + // began execution. If a shard was added since the earlier targeting logic ran, then refreshing + // here may cause us to illegally target an unsplit pipeline to more than one shard. + if (litePipe.hasChangeStream()) { auto* shardRegistry = Grid::get(opCtx)->shardRegistry(); if (!shardRegistry->reload(opCtx)) { shardRegistry->reload(opCtx); @@ -298,6 +302,13 @@ DispatchShardPipelineResults dispatchShardPipeline( opCtx, mustRunOnAll, executionNsRoutingInfo, shardQuery, aggRequest.getCollation()); } + // If there were no shards when we began execution, we wouldn't have run this aggregation in the + // first place. Here, we double-check that the shards have not been removed mid-operation. + uassert(ErrorCodes::ShardNotFound, + "Unexpectedly found 0 shards while preparing to dispatch aggregation requests. Were " + "the shards removed mid-operation?", + shardIds.size() > 0); + // Explain does not produce a cursor, so instead we scatter-gather commands to the shards. if (expCtx->explain) { if (mustRunOnAll) { @@ -329,10 +340,10 @@ DispatchShardPipelineResults dispatchShardPipeline( executionNss, litePipe, executionNsRoutingInfo, + shardIds, targetedCommand, aggRequest, - ReadPreferenceSetting::get(opCtx), - shardQuery); + ReadPreferenceSetting::get(opCtx)); invariant(cursors.size() % shardIds.size() == 0, str::stream() << "Number of cursors (" << cursors.size() << ") is not a multiple of producers (" << shardIds.size() << ")"); @@ -384,15 +395,13 @@ std::vector<RemoteCursor> establishShardCursors( const NamespaceString& nss, const LiteParsedPipeline& litePipe, boost::optional<CachedCollectionRoutingInfo>& routingInfo, + const std::set<ShardId>& shardIds, const BSONObj& cmdObj, const AggregationRequest& request, - const ReadPreferenceSetting& readPref, - const BSONObj& shardQuery) { + const ReadPreferenceSetting& readPref) { LOG(1) << "Dispatching command " << redact(cmdObj) << " to establish cursors on shards"; const bool mustRunOnAll = mustRunOnAllShards(nss, litePipe); - std::set<ShardId> shardIds = - getTargetedShards(opCtx, mustRunOnAll, routingInfo, shardQuery, request.getCollation()); std::vector<std::pair<ShardId, BSONObj>> requests; // If we don't need to run on all shards, then we should always have a valid routing table. diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.h b/src/mongo/db/pipeline/sharded_agg_helpers.h index 861e1acb84c..15e0dd51c2e 100644 --- a/src/mongo/db/pipeline/sharded_agg_helpers.h +++ b/src/mongo/db/pipeline/sharded_agg_helpers.h @@ -98,10 +98,10 @@ std::vector<RemoteCursor> establishShardCursors( const NamespaceString& nss, const LiteParsedPipeline& litePipe, boost::optional<CachedCollectionRoutingInfo>& routingInfo, + const std::set<ShardId>& shardIds, const BSONObj& cmdObj, const AggregationRequest& request, - const ReadPreferenceSetting& readPref, - const BSONObj& shardQuery); + const ReadPreferenceSetting& readPref); BSONObj createCommandForTargetedShards( OperationContext* opCtx, |