diff options
author | Dianna Hohensee <dianna.hohensee@10gen.com> | 2016-08-26 10:05:33 -0400 |
---|---|---|
committer | Dianna Hohensee <dianna.hohensee@10gen.com> | 2016-08-29 14:45:28 -0400 |
commit | 2374ef1a3ac05e3fca39485ef4824f559b1c082c (patch) | |
tree | 8e77f558622ace83f281463c335ea46f4aa23b11 | |
parent | 518b6e97a55e8cd1b133d9d2e1b759d9dddf5430 (diff) | |
download | mongo-2374ef1a3ac05e3fca39485ef4824f559b1c082c.tar.gz |
SERVER-24470 CommitChunkMigration command checks that the balancer still has the distributed lock
9 files changed, 138 insertions, 37 deletions
diff --git a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml index ed10d68506c..edfab8060ea 100644 --- a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml +++ b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml @@ -14,6 +14,7 @@ selector: - jstests/sharding/add_shard_to_zone.js - jstests/sharding/remove_shard_from_zone.js - jstests/sharding/update_zone_key_range.js + - jstests/sharding/movechunk_interrupt_at_primary_stepdown.js # Doesn't use ShardingTest so won't actually be run in a mixed version configuration - jstests/sharding/config_version_rollback.js # TODO Assumes mongod and mongos handle read on view. Enable when 3.4 becomes 'last-stable'. diff --git a/jstests/sharding/migration_failure.js b/jstests/sharding/migration_failure.js index 6e4b855194e..f49a48a0169 100644 --- a/jstests/sharding/migration_failure.js +++ b/jstests/sharding/migration_failure.js @@ -1,6 +1,8 @@ // // Tests that migration failures before and after commit correctly recover when possible. // +// Also checks that the collection version on a source shard updates correctly after a migration. +// (function() { 'use strict'; @@ -9,11 +11,10 @@ var mongos = st.s0; var admin = mongos.getDB("admin"); - var shards = mongos.getCollection("config.shards").find().toArray(); var coll = mongos.getCollection("foo.bar"); assert(admin.runCommand({enableSharding: coll.getDB() + ""}).ok); - printjson(admin.runCommand({movePrimary: coll.getDB() + "", to: shards[0]._id})); + printjson(admin.runCommand({movePrimary: coll.getDB() + "", to: st.shard0.shardName})); assert(admin.runCommand({shardCollection: coll + "", key: {_id: 1}}).ok); assert(admin.runCommand({split: coll + "", middle: {_id: 0}}).ok); @@ -32,7 +33,7 @@ oldVersion = st.shard0.getDB("admin").runCommand({getShardVersion: coll.toString()}).global; assert.commandFailed( - admin.runCommand({moveChunk: coll + "", find: {_id: 0}, to: shards[1]._id})); + admin.runCommand({moveChunk: coll + "", find: {_id: 0}, to: st.shard1.shardName})); newVersion = st.shard0.getDB("admin").runCommand({getShardVersion: coll.toString()}).global; @@ -46,20 +47,17 @@ assert.commandWorked(st.shard0.getDB("admin").runCommand( {configureFailPoint: 'failMigrationCommit', mode: 'off'})); - // failApplyChunkOps and failCommitMigrationCommand -- these mimic migration commit commands - // returning errors (e.g. network), whereupon the config server is queried to determine whether - // the commit was successful. - assert.commandWorked(st.shard0.getDB("admin").runCommand( - {configureFailPoint: 'failApplyChunkOps', mode: 'alwaysOn'})); + // migrationCommitNetworkError -- mimic migration commit command returning a network error, + // whereupon the config server is queried to determine that this commit was successful. assert.commandWorked(st.shard0.getDB("admin").runCommand( - {configureFailPoint: 'failCommitMigrationCommand', mode: 'alwaysOn'})); + {configureFailPoint: 'migrationCommitNetworkError', mode: 'alwaysOn'})); // Run a migration where there will still be chunks in the collection remaining on the shard // afterwards. This will cause the collection's shardVersion to be bumped higher. oldVersion = st.shard0.getDB("admin").runCommand({getShardVersion: coll.toString()}).global; assert.commandWorked( - admin.runCommand({moveChunk: coll + "", find: {_id: 1}, to: shards[1]._id})); + admin.runCommand({moveChunk: coll + "", find: {_id: 1}, to: st.shard1.shardName})); newVersion = st.shard0.getDB("admin").runCommand({getShardVersion: coll.toString()}).global; @@ -72,7 +70,7 @@ oldVersion = st.shard0.getDB("admin").runCommand({getShardVersion: coll.toString()}).global; assert.commandWorked( - admin.runCommand({moveChunk: coll + "", find: {_id: -1}, to: shards[1]._id})); + admin.runCommand({moveChunk: coll + "", find: {_id: -1}, to: st.shard1.shardName})); newVersion = st.shard0.getDB("admin").runCommand({getShardVersion: coll.toString()}).global; @@ -86,9 +84,7 @@ 0, newVersion.i, "The shard version should have reset, but the minor value is not zero"); assert.commandWorked(st.shard0.getDB("admin").runCommand( - {configureFailPoint: 'failApplyChunkOps', mode: 'off'})); - assert.commandWorked(st.shard0.getDB("admin").runCommand( - {configureFailPoint: 'failCommitMigrationCommand', mode: 'off'})); + {configureFailPoint: 'migrationCommitNetworkError', mode: 'off'})); st.stop(); diff --git a/jstests/sharding/movechunk_interrupt_at_primary_stepdown.js b/jstests/sharding/movechunk_interrupt_at_primary_stepdown.js index 123c052eab2..f890479126b 100644 --- a/jstests/sharding/movechunk_interrupt_at_primary_stepdown.js +++ b/jstests/sharding/movechunk_interrupt_at_primary_stepdown.js @@ -1,5 +1,10 @@ // Ensures that all pending move chunk operations get interrupted when the primary of the config -// server steps down and then becomes primary again +// server steps down and then becomes primary again. Then the migration can be rejoined, and a +// success/failure response still returned to the caller. +// +// Also tests the failure of a migration commit command on the source shard of a migration, due to +// the balancer being interrupted, failing to recover the active migrations, and releasing the +// distributed lock. load('./jstests/libs/chunk_manipulation_util.js'); @@ -9,12 +14,13 @@ load('./jstests/libs/chunk_manipulation_util.js'); // Intentionally use a config server with 1 node so that the step down and promotion to primary // are guaranteed to happen on the same host var st = new ShardingTest({config: 1, shards: 2}); + var mongos = st.s0; - assert.commandWorked(st.s0.adminCommand({enableSharding: 'TestDB'})); + assert.commandWorked(mongos.adminCommand({enableSharding: 'TestDB'})); st.ensurePrimaryShard('TestDB', st.shard0.shardName); - assert.commandWorked(st.s0.adminCommand({shardCollection: 'TestDB.TestColl', key: {Key: 1}})); + assert.commandWorked(mongos.adminCommand({shardCollection: 'TestDB.TestColl', key: {Key: 1}})); - var coll = st.s0.getDB('TestDB').TestColl; + var coll = mongos.getDB('TestDB').TestColl; // We have one chunk initially assert.writeOK(coll.insert({Key: 0, Value: 'Test value'})); @@ -25,7 +31,7 @@ load('./jstests/libs/chunk_manipulation_util.js'); var staticMongod = MongoRunner.runMongod({}); var joinMoveChunk = moveChunkParallel( - staticMongod, st.s0.host, {Key: 0}, null, 'TestDB.TestColl', st.shard1.shardName); + staticMongod, mongos.host, {Key: 0}, null, 'TestDB.TestColl', st.shard1.shardName); waitForMigrateStep(st.shard1, migrateStepNames.deletedPriorDataInRange); // Stepdown the primary in order to force the balancer to stop @@ -37,16 +43,30 @@ load('./jstests/libs/chunk_manipulation_util.js'); // Ensure a new primary is found promptly st.configRS.getPrimary(30000); - assert.eq(1, st.s0.getDB('config').chunks.find({shard: st.shard0.shardName}).itcount()); - assert.eq(0, st.s0.getDB('config').chunks.find({shard: st.shard1.shardName}).itcount()); + assert.eq(1, mongos.getDB('config').chunks.find({shard: st.shard0.shardName}).itcount()); + assert.eq(0, mongos.getDB('config').chunks.find({shard: st.shard1.shardName}).itcount()); unpauseMigrateAtStep(st.shard1, migrateStepNames.deletedPriorDataInRange); // Ensure that migration succeeded joinMoveChunk(); - assert.eq(0, st.s0.getDB('config').chunks.find({shard: st.shard0.shardName}).itcount()); - assert.eq(1, st.s0.getDB('config').chunks.find({shard: st.shard1.shardName}).itcount()); + assert.eq(0, mongos.getDB('config').chunks.find({shard: st.shard0.shardName}).itcount()); + assert.eq(1, mongos.getDB('config').chunks.find({shard: st.shard1.shardName}).itcount()); + + // migrationCommitError -- tell the shard that the migration cannot be committed because the + // collection distlock was lost during the migration because the balancer was interrupted and + // the collection could be incompatible now with this migration. + assert.commandWorked(st.configRS.getPrimary().getDB("admin").runCommand( + {configureFailPoint: 'migrationCommitError', mode: 'alwaysOn'})); + + assert.commandFailedWithCode( + mongos.getDB("admin").runCommand( + {moveChunk: coll + "", find: {Key: 0}, to: st.shard0.shardName}), + ErrorCodes.BalancerLostDistributedLock); + + assert.commandWorked(st.configRS.getPrimary().getDB("admin").runCommand( + {configureFailPoint: 'migrationCommitError', mode: 'off'})); st.stop(); })(); diff --git a/src/mongo/base/error_codes.err b/src/mongo/base/error_codes.err index fc894155e5d..50a38f41816 100644 --- a/src/mongo/base/error_codes.err +++ b/src/mongo/base/error_codes.err @@ -189,6 +189,7 @@ error_code("LinearizableReadConcernError", 187) error_code("IncompatibleServerVersion", 188) error_code("PrimarySteppedDown", 189) error_code("MasterSlaveConnectionFailure", 190) +error_code("BalancerLostDistributedLock", 191) # Non-sequential error codes (for compatibility only) error_code("SocketException", 9001) diff --git a/src/mongo/db/s/config/configsvr_commit_chunk_migration_command.cpp b/src/mongo/db/s/config/configsvr_commit_chunk_migration_command.cpp index 59b234f5eae..117c45615e5 100644 --- a/src/mongo/db/s/config/configsvr_commit_chunk_migration_command.cpp +++ b/src/mongo/db/s/config/configsvr_commit_chunk_migration_command.cpp @@ -37,15 +37,19 @@ #include "mongo/db/s/sharding_state.h" #include "mongo/rpc/get_status_from_command_result.h" #include "mongo/s/catalog/type_chunk.h" +#include "mongo/s/catalog/type_locks.h" #include "mongo/s/chunk_version.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/grid.h" #include "mongo/s/request_types/commit_chunk_migration_request_type.h" +#include "mongo/util/fail_point_service.h" namespace mongo { namespace { +MONGO_FP_DECLARE(migrationCommitError); + /** * This command takes the chunk being migrated ("migratedChunk") and generates a new version for it * that is written along with its new shard location ("toShard") to the chunks collection. It also @@ -68,6 +72,7 @@ namespace { * controlChunk: {min: <min_value>, max: <max_value>}, (optional) * fromShard: "<from_shard_name>", * toShard: "<to_shard_name>", + * shardHasDistributedLock: true/false, * } * * Returns: @@ -120,8 +125,7 @@ public: BSON(ChunkType::ns() << nss.ns() << ChunkType::min() << min << ChunkType::max() << max << ChunkType::shard() << shard); - // Must use kLocalReadConcern because using majority will set a flag on the recovery unit - // that conflicts with the subsequent writes in the CommitChunkMigration command. + // Must use local read concern because we're going to perform subsequent writes. auto findResponse = uassertStatusOK( Grid::get(txn)->shardRegistry()->getConfigShard()->exhaustiveFindOnConfig( txn, @@ -200,6 +204,43 @@ public: return BSON("applyOps" << updates.arr()); } + /** + * Assures that the balancer still holds the collection distributed lock for this collection. If + * it no longer does, uassert, because we don't know if the collection state has changed -- e.g. + * whether it was/is dropping, whether another imcompatible migration is running, etc.. + */ + void checkBalancerHasDistLock(OperationContext* txn, + const NamespaceString& nss, + const ChunkRange& chunkRange) { + auto balancerDistLockProcessID = + Grid::get(txn)->catalogClient(txn)->getDistLockManager()->getProcessID(); + + // Must use local read concern because we're going to perform subsequent writes. + auto lockQueryResponse = uassertStatusOK( + Grid::get(txn)->shardRegistry()->getConfigShard()->exhaustiveFindOnConfig( + txn, + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + repl::ReadConcernLevel::kLocalReadConcern, + NamespaceString(LocksType::ConfigNS), + BSON(LocksType::process(balancerDistLockProcessID) << LocksType::name(nss.ns())), + BSONObj(), + boost::none)); + + invariant(lockQueryResponse.docs.size() <= 1); + + if (MONGO_FAIL_POINT(migrationCommitError)) { + lockQueryResponse.docs.clear(); + } + + uassert(ErrorCodes::BalancerLostDistributedLock, + str::stream() << "The distributed lock for collection '" << nss.ns() + << "' was lost by the balancer since this migration began. Cannot " + << "proceed with the migration commit for chunk (" + << chunkRange.toString() + << ") because it could corrupt other operations.", + lockQueryResponse.docs.size() == 1); + } + bool run(OperationContext* txn, const std::string& dbName, BSONObj& cmdObj, @@ -221,6 +262,10 @@ public: // applyOps. Lock::GlobalWrite firstGlobalWriteLock(txn->lockState()); + if (!commitChunkMigrationRequest.shardHasDistributedLock()) { + checkBalancerHasDistLock(txn, nss, commitChunkMigrationRequest.getMigratedChunkRange()); + } + // Check that migratedChunk and controlChunk are where they should be, on fromShard. checkChunkIsOnShard(txn, nss, @@ -240,8 +285,7 @@ public: // incremented major version of the result returned. Migrating chunk's minor version will // be 0, control chunk's minor version will be 1 (if control chunk is present). - // Must use kLocalReadConcern because using majority will set a flag on the recovery unit - // that conflicts with the subsequent writes. + // Must use local read concern because we're going to perform subsequent writes. auto findResponse = uassertStatusOK( Grid::get(txn)->shardRegistry()->getConfigShard()->exhaustiveFindOnConfig( txn, diff --git a/src/mongo/db/s/migration_source_manager.cpp b/src/mongo/db/s/migration_source_manager.cpp index f8cf44048fd..52a39d2606e 100644 --- a/src/mongo/db/s/migration_source_manager.cpp +++ b/src/mongo/db/s/migration_source_manager.cpp @@ -71,7 +71,7 @@ const WriteConcernOptions kMajorityWriteConcern(WriteConcernOptions::kMajority, } // namespace -MONGO_FP_DECLARE(failCommitMigrationCommand); +MONGO_FP_DECLARE(migrationCommitNetworkError); MONGO_FP_DECLARE(failMigrationCommit); MONGO_FP_DECLARE(hangBeforeCommitMigration); MONGO_FP_DECLARE(hangBeforeLeavingCriticalSection); @@ -314,7 +314,8 @@ Status MigrationSourceManager::commitDonateChunk(OperationContext* txn) { _args.getFromShardId(), _args.getToShardId(), migratedChunkType, - controlChunkType); + controlChunkType, + _args.getTakeDistLock()); builder.append(kWriteConcernField, kMajorityWriteConcern.toBSON()); @@ -327,9 +328,9 @@ Status MigrationSourceManager::commitDonateChunk(OperationContext* txn) { builder.obj(), Shard::RetryPolicy::kIdempotent); - if (MONGO_FAIL_POINT(failCommitMigrationCommand)) { + if (MONGO_FAIL_POINT(migrationCommitNetworkError)) { commitChunkMigrationResponse = Status( - ErrorCodes::InternalError, "Failpoint 'failCommitMigrationCommand' generated error"); + ErrorCodes::InternalError, "Failpoint 'migrationCommitNetworkError' generated error"); } if (commitChunkMigrationResponse.isOK() && @@ -361,6 +362,12 @@ Status MigrationSourceManager::commitDonateChunk(OperationContext* txn) { css->refreshMetadata( txn, _committedMetadata->cloneMigrate(migratingChunkToForget, committedCollVersion)); _committedMetadata = css->getMetadata(); + } else if (commitChunkMigrationResponse.isOK() && + commitChunkMigrationResponse.getValue().commandStatus == + ErrorCodes::BalancerLostDistributedLock) { + // We were unable to commit because the Balancer was no longer holding the collection + // distributed lock. No attempt to commit was made. + return commitChunkMigrationResponse.getValue().commandStatus; } else { // This could be an unrelated error (e.g. network error). Check whether the metadata update // succeeded by refreshing the collection metadata from the config server and checking that diff --git a/src/mongo/s/request_types/commit_chunk_migration_request_test.cpp b/src/mongo/s/request_types/commit_chunk_migration_request_test.cpp index 87ee5d8245a..db770cdb55e 100644 --- a/src/mongo/s/request_types/commit_chunk_migration_request_test.cpp +++ b/src/mongo/s/request_types/commit_chunk_migration_request_test.cpp @@ -40,6 +40,7 @@ using unittest::assertGet; namespace { const auto kNamespaceString = NamespaceString("TestDB", "TestColl"); +const auto kShardHasDistributedLock = false; const auto kShardId0 = ShardId("shard0"); const auto kShardId1 = ShardId("shard1"); @@ -63,8 +64,13 @@ TEST(CommitChunkMigrationRequest, WithControlChunk) { controlChunkTypeTemp.setMax(kKey3); boost::optional<ChunkType> controlChunkType = std::move(controlChunkTypeTemp); - CommitChunkMigrationRequest::appendAsCommand( - &builder, kNamespaceString, kShardId0, kShardId1, migratedChunkType, controlChunkType); + CommitChunkMigrationRequest::appendAsCommand(&builder, + kNamespaceString, + kShardId0, + kShardId1, + migratedChunkType, + controlChunkType, + kShardHasDistributedLock); BSONObj cmdObj = builder.obj(); @@ -79,6 +85,7 @@ TEST(CommitChunkMigrationRequest, WithControlChunk) { ASSERT(request.hasControlChunkRange()); ASSERT_BSONOBJ_EQ(kKey2, request.getControlChunkRange().getMin()); ASSERT_BSONOBJ_EQ(kKey3, request.getControlChunkRange().getMax()); + ASSERT_EQ(kShardHasDistributedLock, request.shardHasDistributedLock()); } TEST(CommitChunkMigrationRequest, WithoutControlChunk) { @@ -88,8 +95,13 @@ TEST(CommitChunkMigrationRequest, WithoutControlChunk) { migratedChunkType.setMin(kKey0); migratedChunkType.setMax(kKey1); - CommitChunkMigrationRequest::appendAsCommand( - &builder, kNamespaceString, kShardId0, kShardId1, migratedChunkType, boost::none); + CommitChunkMigrationRequest::appendAsCommand(&builder, + kNamespaceString, + kShardId0, + kShardId1, + migratedChunkType, + boost::none, + kShardHasDistributedLock); BSONObj cmdObj = builder.obj(); @@ -102,6 +114,7 @@ TEST(CommitChunkMigrationRequest, WithoutControlChunk) { ASSERT_BSONOBJ_EQ(kKey0, request.getMigratedChunkRange().getMin()); ASSERT_BSONOBJ_EQ(kKey1, request.getMigratedChunkRange().getMax()); ASSERT(!request.hasControlChunkRange()); + ASSERT_EQ(kShardHasDistributedLock, request.shardHasDistributedLock()); } } // namespace diff --git a/src/mongo/s/request_types/commit_chunk_migration_request_type.cpp b/src/mongo/s/request_types/commit_chunk_migration_request_type.cpp index f03a105c5cf..052feb1fc44 100644 --- a/src/mongo/s/request_types/commit_chunk_migration_request_type.cpp +++ b/src/mongo/s/request_types/commit_chunk_migration_request_type.cpp @@ -40,6 +40,7 @@ const char kFromShard[] = "fromShard"; const char kToShard[] = "toShard"; const char kMigratedChunk[] = "migratedChunk"; const char kControlChunk[] = "controlChunk"; +const char kShardHasDistributedLock[] = "shardHasDistributedLock"; /** * Attempts to parse a ChunkRange from "field" in "source". @@ -114,6 +115,14 @@ StatusWith<CommitChunkMigrationRequest> CommitChunkMigrationRequest::createFromC } } + { + Status shardHasDistLockStatus = bsonExtractBooleanField( + obj, kShardHasDistributedLock, &request._shardHasDistributedLock); + if (!shardHasDistLockStatus.isOK()) { + return shardHasDistLockStatus; + } + } + return request; } @@ -123,7 +132,8 @@ void CommitChunkMigrationRequest::appendAsCommand( const ShardId& fromShard, const ShardId& toShard, const ChunkType& migratedChunkType, - const boost::optional<ChunkType>& controlChunkType) { + const boost::optional<ChunkType>& controlChunkType, + const bool& shardHasDistributedLock) { invariant(builder->asTempObj().isEmpty()); invariant(nss.isValid()); @@ -131,6 +141,7 @@ void CommitChunkMigrationRequest::appendAsCommand( builder->append(kFromShard, fromShard.toString()); builder->append(kToShard, toShard.toString()); builder->append(kMigratedChunk, migratedChunkType.toBSON()); + builder->append(kShardHasDistributedLock, shardHasDistributedLock); if (controlChunkType) { builder->append(kControlChunk, controlChunkType->toBSON()); diff --git a/src/mongo/s/request_types/commit_chunk_migration_request_type.h b/src/mongo/s/request_types/commit_chunk_migration_request_type.h index 431cd20b631..c0c818258a6 100644 --- a/src/mongo/s/request_types/commit_chunk_migration_request_type.h +++ b/src/mongo/s/request_types/commit_chunk_migration_request_type.h @@ -56,7 +56,8 @@ public: const ShardId& fromShard, const ShardId& toShard, const ChunkType& migratedChunkType, - const boost::optional<ChunkType>& controlChunkType); + const boost::optional<ChunkType>& controlChunkType, + const bool& shardHasDistributedLock); const NamespaceString& getNss() const { return _nss; @@ -80,6 +81,10 @@ public: return bool(_controlChunkRange); } + bool shardHasDistributedLock() { + return _shardHasDistributedLock; + } + private: CommitChunkMigrationRequest(const NamespaceString& nss, const ChunkRange& range); @@ -97,6 +102,9 @@ private: // Range of control chunk being moved, if it exists. boost::optional<ChunkRange> _controlChunkRange; + + // Flag to indicate whether the shard has the distlock. + bool _shardHasDistributedLock; }; } // namespace mongo |