summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEsha Maharishi <esha.maharishi@mongodb.com>2020-02-05 21:52:42 +0000
committerevergreen <evergreen@mongodb.com>2020-02-05 21:52:42 +0000
commit769ee6a62ad027541e70237ab1b9d013bd36e84c (patch)
treeb0dd46754aa18f7bceffbcba49791278fa53a5d1
parent18966553fc7f9d350c3428ea1a23162aba2526ef (diff)
downloadmongo-769ee6a62ad027541e70237ab1b9d013bd36e84c.tar.gz
SERVER-45901 Make moveChunk robust to being killOp'd after commit has been sent to the config but before the node has found out the commit decision
-rw-r--r--buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml2
-rw-r--r--jstests/sharding/migration_coordinator_failover.js8
-rw-r--r--jstests/sharding/migration_coordinator_killop_in_critical_section.js98
-rw-r--r--jstests/sharding/migration_coordinator_shutdown_in_critical_section.js75
-rw-r--r--src/mongo/db/s/migration_util.cpp109
5 files changed, 271 insertions, 21 deletions
diff --git a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml
index 28ae7ed1343..b86ff502f99 100644
--- a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml
+++ b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml
@@ -23,6 +23,8 @@ selector:
- jstests/sharding/merge_from_stale_mongos.js
- jstests/sharding/migration_coordinator_basic.js
- jstests/sharding/migration_coordinator_failover.js
+ - jstests/sharding/migration_coordinator_killop_in_critical_section.js
+ - jstests/sharding/migration_coordinator_shutdown_in_critical_section.js
# Enable when SERVER-44733 is backported
- jstests/sharding/change_streams_update_lookup_shard_metadata_missing.js
# Enable when SERVER-43310 is backported
diff --git a/jstests/sharding/migration_coordinator_failover.js b/jstests/sharding/migration_coordinator_failover.js
index 4586771bdb6..4418ea7589f 100644
--- a/jstests/sharding/migration_coordinator_failover.js
+++ b/jstests/sharding/migration_coordinator_failover.js
@@ -145,6 +145,14 @@ runMoveChunkMakeDonorStepDownAfterFailpoint("moveChunkHangAtStep5",
false /* shouldMakeMigrationFailToCommitOnConfig */,
ErrorCodes.OperationFailed);
+runMoveChunkMakeDonorStepDownAfterFailpoint("hangInEnsureChunkVersionIsGreaterThanThenThrow",
+ true /* shouldMakeMigrationFailToCommitOnConfig */,
+ ErrorCodes.OperationFailed);
+
+runMoveChunkMakeDonorStepDownAfterFailpoint("hangInRefreshFilteringMetadataUntilSuccessThenThrow",
+ true /* shouldMakeMigrationFailToCommitOnConfig */,
+ ErrorCodes.OperationFailed);
+
runMoveChunkMakeDonorStepDownAfterFailpoint("hangBeforeMakingAbortDecisionDurable",
true /* shouldMakeMigrationFailToCommitOnConfig */,
ErrorCodes.StaleEpoch);
diff --git a/jstests/sharding/migration_coordinator_killop_in_critical_section.js b/jstests/sharding/migration_coordinator_killop_in_critical_section.js
new file mode 100644
index 00000000000..3c107f7ab8d
--- /dev/null
+++ b/jstests/sharding/migration_coordinator_killop_in_critical_section.js
@@ -0,0 +1,98 @@
+/**
+ * Kills the OperationContext used by the donor shard to send
+ * _configsvrEnsureChunkVersionIsGreaterThan and to force a filtering metadata refresh.
+ *
+ * Depends on the checkOrphansAreDeleted hook at the end of ShardingTest to verify that the orphans,
+ * range deletion tasks, and migration coordinator state are deleted despite the killOps.
+ *
+ * Marked as multiversion_incompatible because the failpoints used in this test were introduced
+ * on v4.4 mongod.
+ * @tags: [multiversion_incompatible]
+ */
+
+(function() {
+'use strict';
+
+load('jstests/libs/parallel_shell_helpers.js');
+
+function getNewNs(dbName) {
+ if (typeof getNewNs.counter == 'undefined') {
+ getNewNs.counter = 0;
+ }
+ getNewNs.counter++;
+ const collName = "ns" + getNewNs.counter;
+ return [collName, dbName + "." + collName];
+}
+
+const dbName = "test";
+
+let st = new ShardingTest({shards: 2});
+
+const donorShard = st.shard0;
+const recipientShard = st.shard1;
+
+assert.commandWorked(st.s.adminCommand({enableSharding: dbName}));
+assert.commandWorked(st.s.adminCommand({movePrimary: dbName, to: donorShard.shardName}));
+
+function testKillOpAfterFailPoint(failPointName, opToKillThreadName) {
+ const [collName, ns] = getNewNs(dbName);
+ jsTest.log("Testing with " + tojson(arguments) + " using ns " + ns);
+
+ assert.commandWorked(st.s.adminCommand({shardCollection: ns, key: {_id: 1}}));
+
+ // Insert some docs into the collection.
+ const numDocs = 1000;
+ var bulk = st.s.getDB(dbName).getCollection(collName).initializeUnorderedBulkOp();
+ for (var i = 0; i < numDocs; i++) {
+ bulk.insert({_id: i});
+ }
+ assert.commandWorked(bulk.execute());
+
+ // Simulate a network error on sending commit to the config server, so that the donor tries to
+ // recover the commit decision.
+ configureFailPoint(donorShard, "migrationCommitNetworkError");
+
+ // Set the requested failpoint and launch the moveChunk asynchronously.
+ let failPoint = configureFailPoint(donorShard, failPointName);
+ const awaitResult = startParallelShell(
+ funWithArgs(function(ns, toShardName) {
+ assert.commandWorked(db.adminCommand({moveChunk: ns, find: {_id: 0}, to: toShardName}));
+ }, ns, recipientShard.shardName), st.s.port);
+
+ jsTest.log("Waiting for moveChunk to reach " + failPointName + " failpoint");
+ failPoint.wait();
+
+ // Kill the OperationContext being used for the commit decision recovery several times. Note, by
+ // expecting to find a matching OperationContext multiple times, we are verifying that the
+ // commit decision recovery is resumed with a fresh OperationContext after the previous
+ // OperationContext was interrupted by the killOp.
+ jsTest.log("Killing OperationContext for " + opToKillThreadName + " several times");
+ for (let i = 0; i < 10; i++) {
+ let matchingOps;
+ assert.soon(() => {
+ matchingOps = donorShard.getDB("admin")
+ .aggregate([
+ {$currentOp: {'allUsers': true, 'idleConnections': true}},
+ {$match: {desc: opToKillThreadName}}
+ ])
+ .toArray();
+ // Wait for the opid to be present, since it's possible for currentOp to run after the
+ // Client has been created but before it has been associated with a new
+ // OperationContext.
+ return 1 === matchingOps.length && matchingOps[0].opid != null;
+ }, "Failed to find op with desc " + opToKillThreadName);
+ donorShard.getDB("admin").killOp(matchingOps[0].opid);
+ }
+
+ failPoint.off();
+
+ awaitResult();
+}
+
+testKillOpAfterFailPoint("hangInEnsureChunkVersionIsGreaterThanThenThrow",
+ "ensureChunkVersionIsGreaterThan");
+testKillOpAfterFailPoint("hangInRefreshFilteringMetadataUntilSuccessThenThrow",
+ "refreshFilteringMetadataUntilSuccess");
+
+st.stop();
+})();
diff --git a/jstests/sharding/migration_coordinator_shutdown_in_critical_section.js b/jstests/sharding/migration_coordinator_shutdown_in_critical_section.js
new file mode 100644
index 00000000000..b83927da3c8
--- /dev/null
+++ b/jstests/sharding/migration_coordinator_shutdown_in_critical_section.js
@@ -0,0 +1,75 @@
+/**
+ * Shuts down the donor primary at two points in the critical section: while the node is executing
+ * _configsvrEnsureChunkVersionIsGreaterThan and while the node is forcing a filtering metadata
+ * refresh.
+ *
+ * Marked as multiversion_incompatible because the failpoints used in this test were introduced
+ * on v4.4 mongod.
+ * @tags: [multiversion_incompatible]
+ */
+
+(function() {
+'use strict';
+
+// This test shuts down a shard primary.
+TestData.skipCheckingUUIDsConsistentAcrossCluster = true;
+TestData.skipCheckingIndexesConsistentAcrossCluster = true;
+
+load('jstests/libs/parallel_shell_helpers.js');
+
+function getNewNs(dbName) {
+ if (typeof getNewNs.counter == 'undefined') {
+ getNewNs.counter = 0;
+ }
+ getNewNs.counter++;
+ const collName = "ns" + getNewNs.counter;
+ return [collName, dbName + "." + collName];
+}
+
+const dbName = "test";
+
+function testShutDownAfterFailPoint(failPointName) {
+ let st = new ShardingTest({shards: 2});
+
+ const donorShard = st.shard0;
+ const recipientShard = st.shard1;
+
+ assert.commandWorked(st.s.adminCommand({enableSharding: dbName}));
+ assert.commandWorked(st.s.adminCommand({movePrimary: dbName, to: donorShard.shardName}));
+
+ const [collName, ns] = getNewNs(dbName);
+ jsTest.log("Testing with " + tojson(arguments) + " using ns " + ns);
+
+ assert.commandWorked(st.s.adminCommand({shardCollection: ns, key: {_id: 1}}));
+
+ // Insert some docs into the collection.
+ const numDocs = 1000;
+ var bulk = st.s.getDB(dbName).getCollection(collName).initializeUnorderedBulkOp();
+ for (var i = 0; i < numDocs; i++) {
+ bulk.insert({_id: i});
+ }
+ assert.commandWorked(bulk.execute());
+
+ // Simulate a network error on sending commit to the config server, so that the donor tries to
+ // recover the commit decision.
+ configureFailPoint(st.rs0.getPrimary(), "migrationCommitNetworkError");
+
+ // Set the requested failpoint and launch the moveChunk asynchronously.
+ let failPoint = configureFailPoint(st.rs0.getPrimary(), failPointName);
+ const awaitResult = startParallelShell(
+ funWithArgs(function(ns, toShardName) {
+ assert.commandWorked(db.adminCommand({moveChunk: ns, find: {_id: 0}, to: toShardName}));
+ }, ns, recipientShard.shardName), st.s.port);
+
+ jsTest.log("Waiting for moveChunk to reach " + failPointName + " failpoint");
+ failPoint.wait();
+
+ // Ensure we are able to shut down the donor primary by asserting that its exit code is 0.
+ assert.eq(0, MongoRunner.stopMongod(st.rs0.getPrimary(), null, {}, true /* waitpid */));
+
+ st.stop();
+}
+
+testShutDownAfterFailPoint("hangInEnsureChunkVersionIsGreaterThanThenThrow");
+testShutDownAfterFailPoint("hangInRefreshFilteringMetadataUntilSuccessThenThrow");
+})();
diff --git a/src/mongo/db/s/migration_util.cpp b/src/mongo/db/s/migration_util.cpp
index f539a715804..77589cedc85 100644
--- a/src/mongo/db/s/migration_util.cpp
+++ b/src/mongo/db/s/migration_util.cpp
@@ -44,6 +44,7 @@
#include "mongo/db/namespace_string.h"
#include "mongo/db/ops/write_ops.h"
#include "mongo/db/repl/repl_client_info.h"
+#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/db/s/active_migrations_registry.h"
#include "mongo/db/s/collection_sharding_runtime.h"
#include "mongo/db/s/migration_coordinator.h"
@@ -64,6 +65,8 @@ namespace migrationutil {
namespace {
MONGO_FAIL_POINT_DEFINE(hangBeforeFilteringMetadataRefresh);
+MONGO_FAIL_POINT_DEFINE(hangInEnsureChunkVersionIsGreaterThanThenThrow);
+MONGO_FAIL_POINT_DEFINE(hangInRefreshFilteringMetadataUntilSuccessThenThrow);
const char kSourceShard[] = "source";
const char kDestinationShard[] = "destination";
@@ -445,43 +448,107 @@ void ensureChunkVersionIsGreaterThan(OperationContext* opCtx,
const auto ensureChunkVersionIsGreaterThanRequestBSON =
ensureChunkVersionIsGreaterThanRequest.toBSON({});
+ const auto term = repl::ReplicationCoordinator::get(opCtx)->getTerm();
+
for (int attempts = 1;; attempts++) {
- const auto ensureChunkVersionIsGreaterThanResponse =
- Grid::get(opCtx)->shardRegistry()->getConfigShard()->runCommandWithFixedRetryAttempts(
- opCtx,
- ReadPreferenceSetting{ReadPreference::PrimaryOnly},
- "admin",
- ensureChunkVersionIsGreaterThanRequestBSON,
- Shard::RetryPolicy::kIdempotent);
- const auto ensureChunkVersionIsGreaterThanStatus =
- Shard::CommandResponse::getEffectiveStatus(ensureChunkVersionIsGreaterThanResponse);
- if (ensureChunkVersionIsGreaterThanStatus.isOK()) {
+ try {
+ auto newClient =
+ opCtx->getServiceContext()->makeClient("ensureChunkVersionIsGreaterThan");
+ {
+ stdx::lock_guard<Client> lk(*newClient.get());
+ newClient->setSystemOperationKillable(lk);
+ }
+ AlternativeClientRegion acr(newClient);
+ auto newOpCtxPtr = cc().makeOperationContext();
+ auto newOpCtx = newOpCtxPtr.get();
+
+ const auto ensureChunkVersionIsGreaterThanResponse =
+ Grid::get(newOpCtx)
+ ->shardRegistry()
+ ->getConfigShard()
+ ->runCommandWithFixedRetryAttempts(
+ newOpCtx,
+ ReadPreferenceSetting{ReadPreference::PrimaryOnly},
+ "admin",
+ ensureChunkVersionIsGreaterThanRequestBSON,
+ Shard::RetryPolicy::kIdempotent);
+ const auto ensureChunkVersionIsGreaterThanStatus =
+ Shard::CommandResponse::getEffectiveStatus(ensureChunkVersionIsGreaterThanResponse);
+
+ uassertStatusOK(ensureChunkVersionIsGreaterThanStatus);
+
+ // 'newOpCtx' won't get interrupted if a stepdown occurs while the thread is hanging in
+ // the failpoint, because 'newOpCtx' hasn't been used to take a MODE_S, MODE_IX, or
+ // MODE_X lock. To ensure the catch block is entered if the failpoint was set, throw an
+ // arbitrary error.
+ if (hangInEnsureChunkVersionIsGreaterThanThenThrow.shouldFail()) {
+ hangInEnsureChunkVersionIsGreaterThanThenThrow.pauseWhileSet(newOpCtx);
+ uasserted(
+ ErrorCodes::InternalError,
+ "simulate an error response for _configsvrEnsureChunkVersionIsGreaterThan");
+ }
break;
- }
+ } catch (const DBException& ex) {
+ // If the server is already doing a clean shutdown, join the shutdown.
+ if (globalInShutdownDeprecated()) {
+ shutdown(waitForShutdown());
+ }
- // If the server is already doing a clean shutdown, join the shutdown.
- if (globalInShutdownDeprecated()) {
- shutdown(waitForShutdown());
+ // If this node has stepped down, stop retrying.
+ uassert(
+ ErrorCodes::InterruptedDueToReplStateChange,
+ "Stepped down while trying to send ensureChunkVersionIsGreaterThan to recover a "
+ "migration commit decision",
+ repl::ReplicationCoordinator::get(opCtx)->getMemberState() ==
+ repl::MemberState::RS_PRIMARY &&
+ term == repl::ReplicationCoordinator::get(opCtx)->getTerm());
+
+ LOG(0) << "_configsvrEnsureChunkVersionIsGreaterThan failed after " << attempts
+ << " attempts " << causedBy(redact(ex.toStatus())) << " . Will try again.";
}
- opCtx->checkForInterrupt();
-
- LOG(0) << "_configsvrEnsureChunkVersionIsGreaterThan failed after " << attempts
- << " attempts " << causedBy(ensureChunkVersionIsGreaterThanStatus)
- << " . Will try again.";
}
}
void refreshFilteringMetadataUntilSuccess(OperationContext* opCtx, const NamespaceString& nss) {
+ const auto term = repl::ReplicationCoordinator::get(opCtx)->getTerm();
+
for (int attempts = 1;; attempts++) {
try {
- forceShardFilteringMetadataRefresh(opCtx, nss, true);
+ auto newClient =
+ opCtx->getServiceContext()->makeClient("refreshFilteringMetadataUntilSuccess");
+ {
+ stdx::lock_guard<Client> lk(*newClient.get());
+ newClient->setSystemOperationKillable(lk);
+ }
+ AlternativeClientRegion acr(newClient);
+ auto newOpCtxPtr = cc().makeOperationContext();
+ auto newOpCtx = newOpCtxPtr.get();
+
+ forceShardFilteringMetadataRefresh(newOpCtx, nss, true);
+
+ // 'newOpCtx' won't get interrupted if a stepdown occurs while the thread is hanging in
+ // the failpoint, because 'newOpCtx' hasn't been used to take a MODE_S, MODE_IX, or
+ // MODE_X lock. To ensure the catch block is entered if the failpoint was set, throw an
+ // arbitrary error.
+ if (hangInRefreshFilteringMetadataUntilSuccessThenThrow.shouldFail()) {
+ hangInRefreshFilteringMetadataUntilSuccessThenThrow.pauseWhileSet(newOpCtx);
+ uasserted(ErrorCodes::InternalError,
+ "simulate an error response for forceShardFilteringMetadataRefresh");
+ }
break;
} catch (const DBException& ex) {
// If the server is already doing a clean shutdown, join the shutdown.
if (globalInShutdownDeprecated()) {
shutdown(waitForShutdown());
}
- opCtx->checkForInterrupt();
+
+ // If this node has stepped down, stop retrying.
+ uassert(ErrorCodes::InterruptedDueToReplStateChange,
+ "Stepped down while trying to force a filtering metadata refresh to recover a "
+ "migration commit decision",
+ repl::ReplicationCoordinator::get(opCtx)->getMemberState() ==
+ repl::MemberState::RS_PRIMARY &&
+ term == repl::ReplicationCoordinator::get(opCtx)->getTerm());
LOG(0) << "Failed to refresh metadata for " << nss.ns() << " after " << attempts
<< " attempts " << causedBy(redact(ex.toStatus()))