summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBernard Gorman <bernard.gorman@mongodb.com>2019-09-13 10:12:39 +0000
committerevergreen <evergreen@mongodb.com>2019-09-13 10:12:39 +0000
commit89d5c8b10e40648a403f12a55bcb66f2f5bef384 (patch)
tree3674111f238de4c3bbd7b0ab717cac8bccbb864d
parent11eaee8aace14a3b8ba1d7c3ab462e8badc8ffee (diff)
downloadmongo-89d5c8b10e40648a403f12a55bcb66f2f5bef384.tar.gz
SERVER-41758 Verify that at least 1 shard exists after hard-reload in aggregation routing path
(cherry picked from commit 1fa4766c621bd4cfd74319094469eff3a5de3b79)
-rw-r--r--buildscripts/resmokeconfig/suites/concurrency_sharded_clusterwide_ops_add_remove_shards.yml41
-rw-r--r--etc/evergreen.yml12
-rw-r--r--jstests/concurrency/fsm_workloads_add_remove_shards/clusterwide_ops_with_add_drop_shards.js28
-rw-r--r--jstests/concurrency/fsm_workloads_add_remove_shards/clusterwide_ops_with_add_remove_shards.js110
-rw-r--r--src/mongo/db/pipeline/sharded_agg_helpers.cpp29
-rw-r--r--src/mongo/db/pipeline/sharded_agg_helpers.h4
6 files changed, 212 insertions, 12 deletions
diff --git a/buildscripts/resmokeconfig/suites/concurrency_sharded_clusterwide_ops_add_remove_shards.yml b/buildscripts/resmokeconfig/suites/concurrency_sharded_clusterwide_ops_add_remove_shards.yml
new file mode 100644
index 00000000000..31bffac4895
--- /dev/null
+++ b/buildscripts/resmokeconfig/suites/concurrency_sharded_clusterwide_ops_add_remove_shards.yml
@@ -0,0 +1,41 @@
+test_kind: fsm_workload_test
+
+selector:
+ roots:
+ - jstests/concurrency/fsm_workloads_add_remove_shards/**/*.js
+
+ exclude_with_any_tags:
+ - requires_replication
+
+executor:
+ archive:
+ hooks:
+ - CheckReplDBHashInBackground
+ - CheckReplDBHash
+ - ValidateCollections
+ tests: true
+ config:
+ shell_options:
+ readMode: commands
+ global_vars:
+ TestData:
+ usingReplicaSetShards: true
+ hooks:
+ - class: CheckReplDBHashInBackground
+ - class: CheckReplDBHash
+ - class: ValidateCollections
+ - class: CleanupConcurrencyWorkloads
+ fixture:
+ class: ShardedClusterFixture
+ mongos_options:
+ set_parameters:
+ enableTestCommands: 1
+ shard_options:
+ mongod_options:
+ oplogSize: 1024
+ mongod_options:
+ set_parameters:
+ enableTestCommands: 1
+ num_rs_nodes_per_shard: 3
+ num_shards: 2
+ num_mongos: 2
diff --git a/etc/evergreen.yml b/etc/evergreen.yml
index 2f1c4232c24..6e4b45c54df 100644
--- a/etc/evergreen.yml
+++ b/etc/evergreen.yml
@@ -6609,6 +6609,16 @@ tasks:
resmoke_args: --suites=concurrency_sharded_replication_with_balancer --excludeWithAnyTags=uses_transactions --storageEngine=wiredTiger
resmoke_jobs_max: 1
+- <<: *task_template
+ name: concurrency_sharded_clusterwide_ops_add_remove_shards
+ tags: ["concurrency", "common", "read_concern_maj"]
+ commands:
+ - func: "do setup"
+ - func: "run tests"
+ vars:
+ resmoke_args: --suites=concurrency_sharded_clusterwide_ops_add_remove_shards --storageEngine=wiredTiger
+ resmoke_jobs_max: 1
+
- name: concurrency_sharded_causal_consistency_gen
tags: ["concurrency"]
commands:
@@ -10220,6 +10230,7 @@ buildvariants:
- name: concurrency_replication_multi_stmt_txn
- name: concurrency_sharded_replication
- name: concurrency_sharded_replication_with_balancer
+ - name: concurrency_sharded_clusterwide_ops_add_remove_shards
- name: concurrency_sharded_local_read_write_multi_stmt_txn
- name: concurrency_sharded_local_read_write_multi_stmt_txn_with_balancer
- name: concurrency_sharded_multi_stmt_txn
@@ -11842,6 +11853,7 @@ buildvariants:
- rhel62-large
- name: concurrency_sharded_replication
- name: concurrency_sharded_replication_with_balancer
+ - name: concurrency_sharded_clusterwide_ops_add_remove_shards
- name: concurrency_simultaneous
- name: concurrency_simultaneous_replication
distros:
diff --git a/jstests/concurrency/fsm_workloads_add_remove_shards/clusterwide_ops_with_add_drop_shards.js b/jstests/concurrency/fsm_workloads_add_remove_shards/clusterwide_ops_with_add_drop_shards.js
new file mode 100644
index 00000000000..54e6ce913bf
--- /dev/null
+++ b/jstests/concurrency/fsm_workloads_add_remove_shards/clusterwide_ops_with_add_drop_shards.js
@@ -0,0 +1,28 @@
+/**
+ * Verify that operations which must run on all shards, such as $currentOp and $changeStream, do not
+ * crash when shards are added to the cluster mid-operation, or when config.shards is dropped.
+ *
+ * This test inherits from 'sharded_clusterwide_ops_with_add_remove_shards.js' but is kept separate
+ * from it, because (1) we may remove the ability to write to config.shards in the future, at which
+ * point this test can simply be removed; and (2) running a single FSM test with both removeShard
+ * and config.shards.remove({}) can cause the former to hang indefinitely while waiting for the
+ * removed shard to drain.
+ *
+ * @tags: [requires_sharding, requires_non_retryable_writes, catches_command_failures,
+ * uses_change_streams, uses_curop_agg_stage]
+ */
+
+"use strict";
+
+// For base $config setup.
+const baseDir = 'jstests/concurrency/fsm_workloads_add_remove_shards/';
+load(baseDir + 'clusterwide_ops_with_add_remove_shards.js');
+
+// After loading the base file, $config has been populated with states and transitions. We simply
+// overwrite 'states.removeShard' such that it instantly wipes all shards from the cluster rather
+// than removing a single shard via the removeShard command. This is the only way to test that
+// mongoS is resilient to the sudden absence of shards in the middle of an operation, as the
+// removeShard command is not permitted to remove the last existing shard in the cluster.
+$config.states.removeShard = function(db, collName) {
+ assert.commandWorked(db.getSiblingDB("config").shards.remove({}));
+};
diff --git a/jstests/concurrency/fsm_workloads_add_remove_shards/clusterwide_ops_with_add_remove_shards.js b/jstests/concurrency/fsm_workloads_add_remove_shards/clusterwide_ops_with_add_remove_shards.js
new file mode 100644
index 00000000000..b16ddcd319b
--- /dev/null
+++ b/jstests/concurrency/fsm_workloads_add_remove_shards/clusterwide_ops_with_add_remove_shards.js
@@ -0,0 +1,110 @@
+/**
+ * Verify that operations which must run on all shards, such as $currentOp and $changeStream, do not
+ * crash when shards are added to or removed from the cluster mid-operation.
+ *
+ * @tags: [requires_sharding, requires_non_retryable_writes, catches_command_failures,
+ * uses_change_streams, uses_curop_agg_stage]
+ */
+
+"use strict";
+
+var $config = (function() {
+ // The 'setup' function is run once by the parent thread after the cluster has been initialized,
+ // before the worker threads have been spawned. The 'this' argument is bound as '$config.data'.
+ function setup(db, collName, cluster) {
+ // Obtain the list of shards present in the cluster. Used to remove and restore shards.
+ this.shardList = db.getSiblingDB("config").shards.find().toArray();
+ // Drop the test database. It's not needed and will complicate re-adding shards.
+ assert.commandWorked(db.dropDatabase());
+ }
+
+ // Returns a random integer in the range [0, max).
+ function randomInt(max) {
+ return Math.floor(Math.random() * Math.floor(max));
+ }
+
+ // Helper to close a clusterwide cursor, given a command result.
+ function closeClusterWideCursor(db, res) {
+ if (res.ok) {
+ db.adminCommand({
+ killCursors: "$cmd.aggregate",
+ cursors: [res.cursor.id],
+ });
+ }
+ }
+
+ var states = {
+ runChangeStream: function(db, collName) {
+ const res = db.adminCommand({
+ aggregate: 1,
+ pipeline: [{$changeStream: {allChangesForCluster: true}}],
+ cursor: {}
+ });
+ closeClusterWideCursor(db, res);
+ },
+
+ runCurrentOp: function(db, collName) {
+ const res = db.adminCommand({aggregate: 1, pipeline: [{$currentOp: {}}], cursor: {}});
+ closeClusterWideCursor(db, res);
+ },
+
+ removeShard: function(db, collName) {
+ // Make sure that only a single removeShard operation is running at any time.
+ const testLocksColl = db.getSiblingDB("config").testLocks;
+ if (!testLocksColl.insert({_id: "removeShard"}).nInserted) {
+ return;
+ }
+ // Iterate until we successfully remove a shard or run out of shards.
+ for (let shardIdx = 0; shardIdx < this.shardList.length; ++shardIdx) {
+ const shardName = this.shardList[shardIdx]._id;
+ if (db.adminCommand({removeShard: shardName}).state === "started") {
+ break;
+ }
+ }
+ // Remove the lock document so that other threads can call removeShard.
+ assert.commandWorked(testLocksColl.remove({_id: "removeShard"}));
+ },
+
+ addShard: function addShard(db, collName) {
+ const shardIdx = randomInt(this.shardList.length);
+ const shardEntry = this.shardList[shardIdx];
+ db.adminCommand({addShard: shardEntry.host, name: shardEntry._id});
+ },
+
+ init: function(db, collName) {
+ // Do nothing. This is only used to randomize the first action taken by each worker.
+ }
+ };
+
+ const transitionProbabilities =
+ {runChangeStream: 0.25, runCurrentOp: 0.25, removeShard: 0.25, addShard: 0.25};
+ var transitions = {
+ init: transitionProbabilities,
+ runChangeStream: transitionProbabilities,
+ runCurrentOp: transitionProbabilities,
+ removeShard: transitionProbabilities,
+ addShard: transitionProbabilities
+ };
+
+ // The 'teardown' function is run once by the parent thread before the cluster is destroyed, but
+ // after the worker threads have been reaped. The 'this' argument is bound as '$config.data'.
+ function teardown(db, collName, cluster) {
+ // If any shards are draining, unset them so we don't impact subsequent tests.
+ db.getSiblingDB("config").shards.update({}, {$unset: {draining: 1}}, {multi: true});
+ // Ensure that all shards are present in the cluster before shutting down the ShardingTest.
+ for (let shardEntry of this.shardList) {
+ assert.soon(() => db.adminCommand({addShard: shardEntry.host, name: shardEntry._id}).ok,
+ `failed to add shard ${shardEntry._id} back into cluster at end of test`);
+ }
+ }
+
+ return {
+ threadCount: 100,
+ iterations: 1000,
+ startState: "init",
+ states: states,
+ transitions: transitions,
+ setup: setup,
+ teardown: teardown
+ };
+})();
diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.cpp b/src/mongo/db/pipeline/sharded_agg_helpers.cpp
index 1c32c251ff5..e17dddc7e50 100644
--- a/src/mongo/db/pipeline/sharded_agg_helpers.cpp
+++ b/src/mongo/db/pipeline/sharded_agg_helpers.cpp
@@ -285,10 +285,14 @@ DispatchShardPipelineResults dispatchShardPipeline(
: createPassthroughCommandForShard(
opCtx, aggRequest, expCtx->getRuntimeConstants(), pipeline.get(), collationObj);
- // Refresh the shard registry if we're targeting all shards. We need the shard registry
- // to be at least as current as the logical time used when creating the command for
- // $changeStream to work reliably, so we do a "hard" reload.
- if (mustRunOnAll) {
+ // In order for a $changeStream to work reliably, we need the shard registry to be at least as
+ // current as the logical time at which the pipeline was serialized to 'targetedCommand' above.
+ // We therefore hard-reload and retarget the shards here. We don't refresh for other pipelines
+ // that must run on all shards (e.g. $currentOp) because, unlike $changeStream, those pipelines
+ // may not have been forced to split if there was only one shard in the cluster when the command
+ // began execution. If a shard was added since the earlier targeting logic ran, then refreshing
+ // here may cause us to illegally target an unsplit pipeline to more than one shard.
+ if (litePipe.hasChangeStream()) {
auto* shardRegistry = Grid::get(opCtx)->shardRegistry();
if (!shardRegistry->reload(opCtx)) {
shardRegistry->reload(opCtx);
@@ -298,6 +302,13 @@ DispatchShardPipelineResults dispatchShardPipeline(
opCtx, mustRunOnAll, executionNsRoutingInfo, shardQuery, aggRequest.getCollation());
}
+ // If there were no shards when we began execution, we wouldn't have run this aggregation in the
+ // first place. Here, we double-check that the shards have not been removed mid-operation.
+ uassert(ErrorCodes::ShardNotFound,
+ "Unexpectedly found 0 shards while preparing to dispatch aggregation requests. Were "
+ "the shards removed mid-operation?",
+ shardIds.size() > 0);
+
// Explain does not produce a cursor, so instead we scatter-gather commands to the shards.
if (expCtx->explain) {
if (mustRunOnAll) {
@@ -329,10 +340,10 @@ DispatchShardPipelineResults dispatchShardPipeline(
executionNss,
litePipe,
executionNsRoutingInfo,
+ shardIds,
targetedCommand,
aggRequest,
- ReadPreferenceSetting::get(opCtx),
- shardQuery);
+ ReadPreferenceSetting::get(opCtx));
invariant(cursors.size() % shardIds.size() == 0,
str::stream() << "Number of cursors (" << cursors.size()
<< ") is not a multiple of producers (" << shardIds.size() << ")");
@@ -384,15 +395,13 @@ std::vector<RemoteCursor> establishShardCursors(
const NamespaceString& nss,
const LiteParsedPipeline& litePipe,
boost::optional<CachedCollectionRoutingInfo>& routingInfo,
+ const std::set<ShardId>& shardIds,
const BSONObj& cmdObj,
const AggregationRequest& request,
- const ReadPreferenceSetting& readPref,
- const BSONObj& shardQuery) {
+ const ReadPreferenceSetting& readPref) {
LOG(1) << "Dispatching command " << redact(cmdObj) << " to establish cursors on shards";
const bool mustRunOnAll = mustRunOnAllShards(nss, litePipe);
- std::set<ShardId> shardIds =
- getTargetedShards(opCtx, mustRunOnAll, routingInfo, shardQuery, request.getCollation());
std::vector<std::pair<ShardId, BSONObj>> requests;
// If we don't need to run on all shards, then we should always have a valid routing table.
diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.h b/src/mongo/db/pipeline/sharded_agg_helpers.h
index 861e1acb84c..15e0dd51c2e 100644
--- a/src/mongo/db/pipeline/sharded_agg_helpers.h
+++ b/src/mongo/db/pipeline/sharded_agg_helpers.h
@@ -98,10 +98,10 @@ std::vector<RemoteCursor> establishShardCursors(
const NamespaceString& nss,
const LiteParsedPipeline& litePipe,
boost::optional<CachedCollectionRoutingInfo>& routingInfo,
+ const std::set<ShardId>& shardIds,
const BSONObj& cmdObj,
const AggregationRequest& request,
- const ReadPreferenceSetting& readPref,
- const BSONObj& shardQuery);
+ const ReadPreferenceSetting& readPref);
BSONObj createCommandForTargetedShards(
OperationContext* opCtx,