diff options
author | Esha Maharishi <esha.maharishi@mongodb.com> | 2020-02-05 21:52:42 +0000 |
---|---|---|
committer | evergreen <evergreen@mongodb.com> | 2020-02-05 21:52:42 +0000 |
commit | 769ee6a62ad027541e70237ab1b9d013bd36e84c (patch) | |
tree | b0dd46754aa18f7bceffbcba49791278fa53a5d1 | |
parent | 18966553fc7f9d350c3428ea1a23162aba2526ef (diff) | |
download | mongo-769ee6a62ad027541e70237ab1b9d013bd36e84c.tar.gz |
SERVER-45901 Make moveChunk robust to being killOp'd after commit has been sent to the config but before the node has found out the commit decision
5 files changed, 271 insertions, 21 deletions
diff --git a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml index 28ae7ed1343..b86ff502f99 100644 --- a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml +++ b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml @@ -23,6 +23,8 @@ selector: - jstests/sharding/merge_from_stale_mongos.js - jstests/sharding/migration_coordinator_basic.js - jstests/sharding/migration_coordinator_failover.js + - jstests/sharding/migration_coordinator_killop_in_critical_section.js + - jstests/sharding/migration_coordinator_shutdown_in_critical_section.js # Enable when SERVER-44733 is backported - jstests/sharding/change_streams_update_lookup_shard_metadata_missing.js # Enable when SERVER-43310 is backported diff --git a/jstests/sharding/migration_coordinator_failover.js b/jstests/sharding/migration_coordinator_failover.js index 4586771bdb6..4418ea7589f 100644 --- a/jstests/sharding/migration_coordinator_failover.js +++ b/jstests/sharding/migration_coordinator_failover.js @@ -145,6 +145,14 @@ runMoveChunkMakeDonorStepDownAfterFailpoint("moveChunkHangAtStep5", false /* shouldMakeMigrationFailToCommitOnConfig */, ErrorCodes.OperationFailed); +runMoveChunkMakeDonorStepDownAfterFailpoint("hangInEnsureChunkVersionIsGreaterThanThenThrow", + true /* shouldMakeMigrationFailToCommitOnConfig */, + ErrorCodes.OperationFailed); + +runMoveChunkMakeDonorStepDownAfterFailpoint("hangInRefreshFilteringMetadataUntilSuccessThenThrow", + true /* shouldMakeMigrationFailToCommitOnConfig */, + ErrorCodes.OperationFailed); + runMoveChunkMakeDonorStepDownAfterFailpoint("hangBeforeMakingAbortDecisionDurable", true /* shouldMakeMigrationFailToCommitOnConfig */, ErrorCodes.StaleEpoch); diff --git a/jstests/sharding/migration_coordinator_killop_in_critical_section.js b/jstests/sharding/migration_coordinator_killop_in_critical_section.js new file mode 100644 index 00000000000..3c107f7ab8d --- /dev/null +++ b/jstests/sharding/migration_coordinator_killop_in_critical_section.js @@ -0,0 +1,98 @@ +/** + * Kills the OperationContext used by the donor shard to send + * _configsvrEnsureChunkVersionIsGreaterThan and to force a filtering metadata refresh. + * + * Depends on the checkOrphansAreDeleted hook at the end of ShardingTest to verify that the orphans, + * range deletion tasks, and migration coordinator state are deleted despite the killOps. + * + * Marked as multiversion_incompatible because the failpoints used in this test were introduced + * on v4.4 mongod. + * @tags: [multiversion_incompatible] + */ + +(function() { +'use strict'; + +load('jstests/libs/parallel_shell_helpers.js'); + +function getNewNs(dbName) { + if (typeof getNewNs.counter == 'undefined') { + getNewNs.counter = 0; + } + getNewNs.counter++; + const collName = "ns" + getNewNs.counter; + return [collName, dbName + "." + collName]; +} + +const dbName = "test"; + +let st = new ShardingTest({shards: 2}); + +const donorShard = st.shard0; +const recipientShard = st.shard1; + +assert.commandWorked(st.s.adminCommand({enableSharding: dbName})); +assert.commandWorked(st.s.adminCommand({movePrimary: dbName, to: donorShard.shardName})); + +function testKillOpAfterFailPoint(failPointName, opToKillThreadName) { + const [collName, ns] = getNewNs(dbName); + jsTest.log("Testing with " + tojson(arguments) + " using ns " + ns); + + assert.commandWorked(st.s.adminCommand({shardCollection: ns, key: {_id: 1}})); + + // Insert some docs into the collection. + const numDocs = 1000; + var bulk = st.s.getDB(dbName).getCollection(collName).initializeUnorderedBulkOp(); + for (var i = 0; i < numDocs; i++) { + bulk.insert({_id: i}); + } + assert.commandWorked(bulk.execute()); + + // Simulate a network error on sending commit to the config server, so that the donor tries to + // recover the commit decision. + configureFailPoint(donorShard, "migrationCommitNetworkError"); + + // Set the requested failpoint and launch the moveChunk asynchronously. + let failPoint = configureFailPoint(donorShard, failPointName); + const awaitResult = startParallelShell( + funWithArgs(function(ns, toShardName) { + assert.commandWorked(db.adminCommand({moveChunk: ns, find: {_id: 0}, to: toShardName})); + }, ns, recipientShard.shardName), st.s.port); + + jsTest.log("Waiting for moveChunk to reach " + failPointName + " failpoint"); + failPoint.wait(); + + // Kill the OperationContext being used for the commit decision recovery several times. Note, by + // expecting to find a matching OperationContext multiple times, we are verifying that the + // commit decision recovery is resumed with a fresh OperationContext after the previous + // OperationContext was interrupted by the killOp. + jsTest.log("Killing OperationContext for " + opToKillThreadName + " several times"); + for (let i = 0; i < 10; i++) { + let matchingOps; + assert.soon(() => { + matchingOps = donorShard.getDB("admin") + .aggregate([ + {$currentOp: {'allUsers': true, 'idleConnections': true}}, + {$match: {desc: opToKillThreadName}} + ]) + .toArray(); + // Wait for the opid to be present, since it's possible for currentOp to run after the + // Client has been created but before it has been associated with a new + // OperationContext. + return 1 === matchingOps.length && matchingOps[0].opid != null; + }, "Failed to find op with desc " + opToKillThreadName); + donorShard.getDB("admin").killOp(matchingOps[0].opid); + } + + failPoint.off(); + + awaitResult(); +} + +testKillOpAfterFailPoint("hangInEnsureChunkVersionIsGreaterThanThenThrow", + "ensureChunkVersionIsGreaterThan"); +testKillOpAfterFailPoint("hangInRefreshFilteringMetadataUntilSuccessThenThrow", + "refreshFilteringMetadataUntilSuccess"); + +st.stop(); +})(); diff --git a/jstests/sharding/migration_coordinator_shutdown_in_critical_section.js b/jstests/sharding/migration_coordinator_shutdown_in_critical_section.js new file mode 100644 index 00000000000..b83927da3c8 --- /dev/null +++ b/jstests/sharding/migration_coordinator_shutdown_in_critical_section.js @@ -0,0 +1,75 @@ +/** + * Shuts down the donor primary at two points in the critical section: while the node is executing + * _configsvrEnsureChunkVersionIsGreaterThan and while the node is forcing a filtering metadata + * refresh. + * + * Marked as multiversion_incompatible because the failpoints used in this test were introduced + * on v4.4 mongod. + * @tags: [multiversion_incompatible] + */ + +(function() { +'use strict'; + +// This test shuts down a shard primary. +TestData.skipCheckingUUIDsConsistentAcrossCluster = true; +TestData.skipCheckingIndexesConsistentAcrossCluster = true; + +load('jstests/libs/parallel_shell_helpers.js'); + +function getNewNs(dbName) { + if (typeof getNewNs.counter == 'undefined') { + getNewNs.counter = 0; + } + getNewNs.counter++; + const collName = "ns" + getNewNs.counter; + return [collName, dbName + "." + collName]; +} + +const dbName = "test"; + +function testShutDownAfterFailPoint(failPointName) { + let st = new ShardingTest({shards: 2}); + + const donorShard = st.shard0; + const recipientShard = st.shard1; + + assert.commandWorked(st.s.adminCommand({enableSharding: dbName})); + assert.commandWorked(st.s.adminCommand({movePrimary: dbName, to: donorShard.shardName})); + + const [collName, ns] = getNewNs(dbName); + jsTest.log("Testing with " + tojson(arguments) + " using ns " + ns); + + assert.commandWorked(st.s.adminCommand({shardCollection: ns, key: {_id: 1}})); + + // Insert some docs into the collection. + const numDocs = 1000; + var bulk = st.s.getDB(dbName).getCollection(collName).initializeUnorderedBulkOp(); + for (var i = 0; i < numDocs; i++) { + bulk.insert({_id: i}); + } + assert.commandWorked(bulk.execute()); + + // Simulate a network error on sending commit to the config server, so that the donor tries to + // recover the commit decision. + configureFailPoint(st.rs0.getPrimary(), "migrationCommitNetworkError"); + + // Set the requested failpoint and launch the moveChunk asynchronously. + let failPoint = configureFailPoint(st.rs0.getPrimary(), failPointName); + const awaitResult = startParallelShell( + funWithArgs(function(ns, toShardName) { + assert.commandWorked(db.adminCommand({moveChunk: ns, find: {_id: 0}, to: toShardName})); + }, ns, recipientShard.shardName), st.s.port); + + jsTest.log("Waiting for moveChunk to reach " + failPointName + " failpoint"); + failPoint.wait(); + + // Ensure we are able to shut down the donor primary by asserting that its exit code is 0. + assert.eq(0, MongoRunner.stopMongod(st.rs0.getPrimary(), null, {}, true /* waitpid */)); + + st.stop(); +} + +testShutDownAfterFailPoint("hangInEnsureChunkVersionIsGreaterThanThenThrow"); +testShutDownAfterFailPoint("hangInRefreshFilteringMetadataUntilSuccessThenThrow"); +})(); diff --git a/src/mongo/db/s/migration_util.cpp b/src/mongo/db/s/migration_util.cpp index f539a715804..77589cedc85 100644 --- a/src/mongo/db/s/migration_util.cpp +++ b/src/mongo/db/s/migration_util.cpp @@ -44,6 +44,7 @@ #include "mongo/db/namespace_string.h" #include "mongo/db/ops/write_ops.h" #include "mongo/db/repl/repl_client_info.h" +#include "mongo/db/repl/replication_coordinator.h" #include "mongo/db/s/active_migrations_registry.h" #include "mongo/db/s/collection_sharding_runtime.h" #include "mongo/db/s/migration_coordinator.h" @@ -64,6 +65,8 @@ namespace migrationutil { namespace { MONGO_FAIL_POINT_DEFINE(hangBeforeFilteringMetadataRefresh); +MONGO_FAIL_POINT_DEFINE(hangInEnsureChunkVersionIsGreaterThanThenThrow); +MONGO_FAIL_POINT_DEFINE(hangInRefreshFilteringMetadataUntilSuccessThenThrow); const char kSourceShard[] = "source"; const char kDestinationShard[] = "destination"; @@ -445,43 +448,107 @@ void ensureChunkVersionIsGreaterThan(OperationContext* opCtx, const auto ensureChunkVersionIsGreaterThanRequestBSON = ensureChunkVersionIsGreaterThanRequest.toBSON({}); + const auto term = repl::ReplicationCoordinator::get(opCtx)->getTerm(); + for (int attempts = 1;; attempts++) { - const auto ensureChunkVersionIsGreaterThanResponse = - Grid::get(opCtx)->shardRegistry()->getConfigShard()->runCommandWithFixedRetryAttempts( - opCtx, - ReadPreferenceSetting{ReadPreference::PrimaryOnly}, - "admin", - ensureChunkVersionIsGreaterThanRequestBSON, - Shard::RetryPolicy::kIdempotent); - const auto ensureChunkVersionIsGreaterThanStatus = - Shard::CommandResponse::getEffectiveStatus(ensureChunkVersionIsGreaterThanResponse); - if (ensureChunkVersionIsGreaterThanStatus.isOK()) { + try { + auto newClient = + opCtx->getServiceContext()->makeClient("ensureChunkVersionIsGreaterThan"); + { + stdx::lock_guard<Client> lk(*newClient.get()); + newClient->setSystemOperationKillable(lk); + } + AlternativeClientRegion acr(newClient); + auto newOpCtxPtr = cc().makeOperationContext(); + auto newOpCtx = newOpCtxPtr.get(); + + const auto ensureChunkVersionIsGreaterThanResponse = + Grid::get(newOpCtx) + ->shardRegistry() + ->getConfigShard() + ->runCommandWithFixedRetryAttempts( + newOpCtx, + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + "admin", + ensureChunkVersionIsGreaterThanRequestBSON, + Shard::RetryPolicy::kIdempotent); + const auto ensureChunkVersionIsGreaterThanStatus = + Shard::CommandResponse::getEffectiveStatus(ensureChunkVersionIsGreaterThanResponse); + + uassertStatusOK(ensureChunkVersionIsGreaterThanStatus); + + // 'newOpCtx' won't get interrupted if a stepdown occurs while the thread is hanging in + // the failpoint, because 'newOpCtx' hasn't been used to take a MODE_S, MODE_IX, or + // MODE_X lock. To ensure the catch block is entered if the failpoint was set, throw an + // arbitrary error. + if (hangInEnsureChunkVersionIsGreaterThanThenThrow.shouldFail()) { + hangInEnsureChunkVersionIsGreaterThanThenThrow.pauseWhileSet(newOpCtx); + uasserted( + ErrorCodes::InternalError, + "simulate an error response for _configsvrEnsureChunkVersionIsGreaterThan"); + } break; - } + } catch (const DBException& ex) { + // If the server is already doing a clean shutdown, join the shutdown. + if (globalInShutdownDeprecated()) { + shutdown(waitForShutdown()); + } - // If the server is already doing a clean shutdown, join the shutdown. - if (globalInShutdownDeprecated()) { - shutdown(waitForShutdown()); + // If this node has stepped down, stop retrying. + uassert( + ErrorCodes::InterruptedDueToReplStateChange, + "Stepped down while trying to send ensureChunkVersionIsGreaterThan to recover a " + "migration commit decision", + repl::ReplicationCoordinator::get(opCtx)->getMemberState() == + repl::MemberState::RS_PRIMARY && + term == repl::ReplicationCoordinator::get(opCtx)->getTerm()); + + LOG(0) << "_configsvrEnsureChunkVersionIsGreaterThan failed after " << attempts + << " attempts " << causedBy(redact(ex.toStatus())) << " . Will try again."; } - opCtx->checkForInterrupt(); - - LOG(0) << "_configsvrEnsureChunkVersionIsGreaterThan failed after " << attempts - << " attempts " << causedBy(ensureChunkVersionIsGreaterThanStatus) - << " . Will try again."; } } void refreshFilteringMetadataUntilSuccess(OperationContext* opCtx, const NamespaceString& nss) { + const auto term = repl::ReplicationCoordinator::get(opCtx)->getTerm(); + for (int attempts = 1;; attempts++) { try { - forceShardFilteringMetadataRefresh(opCtx, nss, true); + auto newClient = + opCtx->getServiceContext()->makeClient("refreshFilteringMetadataUntilSuccess"); + { + stdx::lock_guard<Client> lk(*newClient.get()); + newClient->setSystemOperationKillable(lk); + } + AlternativeClientRegion acr(newClient); + auto newOpCtxPtr = cc().makeOperationContext(); + auto newOpCtx = newOpCtxPtr.get(); + + forceShardFilteringMetadataRefresh(newOpCtx, nss, true); + + // 'newOpCtx' won't get interrupted if a stepdown occurs while the thread is hanging in + // the failpoint, because 'newOpCtx' hasn't been used to take a MODE_S, MODE_IX, or + // MODE_X lock. To ensure the catch block is entered if the failpoint was set, throw an + // arbitrary error. + if (hangInRefreshFilteringMetadataUntilSuccessThenThrow.shouldFail()) { + hangInRefreshFilteringMetadataUntilSuccessThenThrow.pauseWhileSet(newOpCtx); + uasserted(ErrorCodes::InternalError, + "simulate an error response for forceShardFilteringMetadataRefresh"); + } break; } catch (const DBException& ex) { // If the server is already doing a clean shutdown, join the shutdown. if (globalInShutdownDeprecated()) { shutdown(waitForShutdown()); } - opCtx->checkForInterrupt(); + + // If this node has stepped down, stop retrying. + uassert(ErrorCodes::InterruptedDueToReplStateChange, + "Stepped down while trying to force a filtering metadata refresh to recover a " + "migration commit decision", + repl::ReplicationCoordinator::get(opCtx)->getMemberState() == + repl::MemberState::RS_PRIMARY && + term == repl::ReplicationCoordinator::get(opCtx)->getTerm()); LOG(0) << "Failed to refresh metadata for " << nss.ns() << " after " << attempts << " attempts " << causedBy(redact(ex.toStatus())) |