summaryrefslogtreecommitdiff
path: root/src/mongo/db/s/balancer/migration_manager_test.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/s/balancer/migration_manager_test.cpp')
-rw-r--r--src/mongo/db/s/balancer/migration_manager_test.cpp980
1 files changed, 980 insertions, 0 deletions
diff --git a/src/mongo/db/s/balancer/migration_manager_test.cpp b/src/mongo/db/s/balancer/migration_manager_test.cpp
new file mode 100644
index 00000000000..87353c5dc4b
--- /dev/null
+++ b/src/mongo/db/s/balancer/migration_manager_test.cpp
@@ -0,0 +1,980 @@
+/**
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/client/remote_command_targeter_mock.h"
+#include "mongo/db/client.h"
+#include "mongo/db/commands.h"
+#include "mongo/db/s/balancer/migration_manager.h"
+#include "mongo/db/s/balancer/type_migration.h"
+#include "mongo/db/write_concern_options.h"
+#include "mongo/s/catalog/dist_lock_manager_mock.h"
+#include "mongo/s/catalog/sharding_catalog_client_impl.h"
+#include "mongo/s/catalog/type_collection.h"
+#include "mongo/s/catalog/type_database.h"
+#include "mongo/s/catalog/type_locks.h"
+#include "mongo/s/catalog/type_shard.h"
+#include "mongo/s/config_server_test_fixture.h"
+#include "mongo/s/move_chunk_request.h"
+#include "mongo/stdx/memory.h"
+
+namespace mongo {
+namespace {
+
+using executor::RemoteCommandRequest;
+using executor::RemoteCommandResponse;
+using std::vector;
+using unittest::assertGet;
+
+const auto kShardId0 = ShardId("shard0");
+const auto kShardId1 = ShardId("shard1");
+const auto kShardId2 = ShardId("shard2");
+const auto kShardId3 = ShardId("shard3");
+
+const HostAndPort kShardHost0 = HostAndPort("TestHost0", 12345);
+const HostAndPort kShardHost1 = HostAndPort("TestHost1", 12346);
+const HostAndPort kShardHost2 = HostAndPort("TestHost2", 12347);
+const HostAndPort kShardHost3 = HostAndPort("TestHost3", 12348);
+
+const MigrationSecondaryThrottleOptions kDefaultSecondaryThrottle =
+ MigrationSecondaryThrottleOptions::create(MigrationSecondaryThrottleOptions::kDefault);
+
+const long long kMaxSizeMB = 100;
+const std::string kPattern = "_id";
+
+const WriteConcernOptions kMajorityWriteConcern(WriteConcernOptions::kMajority,
+ WriteConcernOptions::SyncMode::UNSET,
+ Seconds(15));
+
+class MigrationManagerTest : public ConfigServerTestFixture {
+protected:
+ /**
+ * Returns the mock targeter for the specified shard. Useful to use like so
+ *
+ * shardTargeterMock(txn, shardId)->setFindHostReturnValue(shardHost);
+ *
+ * Then calls to RemoteCommandTargeterMock::findHost will return HostAndPort "shardHost" for
+ * Shard "shardId".
+ *
+ * Scheduling a command requires a shard host target. The command will be caught by the mock
+ * network, but sending the command requires finding the shard's host.
+ */
+ std::shared_ptr<RemoteCommandTargeterMock> shardTargeterMock(OperationContext* txn,
+ ShardId shardId);
+
+ /**
+ * Inserts a document into the config.databases collection to indicate that "dbName" is sharded
+ * with primary "primaryShard".
+ */
+ void setUpDatabase(const std::string& dbName, const ShardId primaryShard);
+
+ /**
+ * Inserts a document into the config.collections collection to indicate that "collName" is
+ * sharded with version "version". The shard key pattern defaults to "_id".
+ */
+ void setUpCollection(const std::string collName, ChunkVersion version);
+
+ /**
+ * Inserts a document into the config.chunks collection so that the chunk defined by the
+ * parameters exists. Returns a ChunkType defined by the parameters.
+ */
+ ChunkType setUpChunk(const std::string& collName,
+ const BSONObj& chunkMin,
+ const BSONObj& chunkMax,
+ const ShardId& shardId,
+ const ChunkVersion& version);
+
+ /**
+ * Inserts a document into the config.migrations collection as an active migration.
+ */
+ void setUpMigration(const std::string& collName,
+ const BSONObj& minKey,
+ const BSONObj& maxKey,
+ const ShardId& toShard,
+ const ShardId& fromShard);
+
+ /**
+ * Asserts that config.migrations is empty and config.locks contains no locked documents, both
+ * of which should be true if the MigrationManager is inactive and behaving properly.
+ */
+ void checkMigrationsCollectionIsEmptyAndLocksAreUnlocked();
+
+ /**
+ * Sets up mock network to expect a moveChunk command and return a fixed BSON response or a
+ * "returnStatus".
+ */
+ void expectMoveChunkCommand(const ChunkType& chunk,
+ const ShardId& toShardId,
+ const bool& takeDistLock,
+ const BSONObj& response);
+ void expectMoveChunkCommand(const ChunkType& chunk,
+ const ShardId& toShardId,
+ const bool& takeDistLock,
+ const Status& returnStatus);
+
+ // Random static initialization order can result in X constructor running before Y constructor
+ // if X and Y are defined in different source files. Defining variables here to enforce order.
+ const BSONObj kShard0 =
+ BSON(ShardType::name(kShardId0.toString()) << ShardType::host(kShardHost0.toString())
+ << ShardType::maxSizeMB(kMaxSizeMB));
+ const BSONObj kShard1 =
+ BSON(ShardType::name(kShardId1.toString()) << ShardType::host(kShardHost1.toString())
+ << ShardType::maxSizeMB(kMaxSizeMB));
+ const BSONObj kShard2 =
+ BSON(ShardType::name(kShardId2.toString()) << ShardType::host(kShardHost2.toString())
+ << ShardType::maxSizeMB(kMaxSizeMB));
+ const BSONObj kShard3 =
+ BSON(ShardType::name(kShardId3.toString()) << ShardType::host(kShardHost3.toString())
+ << ShardType::maxSizeMB(kMaxSizeMB));
+
+ const KeyPattern kKeyPattern = KeyPattern(BSON(kPattern << 1));
+
+ std::unique_ptr<MigrationManager> _migrationManager;
+
+private:
+ void setUp() override;
+ void tearDown() override;
+};
+
+void MigrationManagerTest::setUp() {
+ ConfigServerTestFixture::setUp();
+ _migrationManager = stdx::make_unique<MigrationManager>(getServiceContext());
+ _migrationManager->startRecoveryAndAcquireDistLocks(operationContext());
+ _migrationManager->finishRecovery(operationContext(), 0, kDefaultSecondaryThrottle, false);
+}
+
+void MigrationManagerTest::tearDown() {
+ checkMigrationsCollectionIsEmptyAndLocksAreUnlocked();
+ _migrationManager->interruptAndDisableMigrations();
+ _migrationManager->drainActiveMigrations();
+ _migrationManager.reset();
+ ConfigServerTestFixture::tearDown();
+}
+
+std::shared_ptr<RemoteCommandTargeterMock> MigrationManagerTest::shardTargeterMock(
+ OperationContext* txn, ShardId shardId) {
+ return RemoteCommandTargeterMock::get(
+ uassertStatusOK(shardRegistry()->getShard(txn, shardId))->getTargeter());
+}
+
+void MigrationManagerTest::setUpDatabase(const std::string& dbName, const ShardId primaryShard) {
+ DatabaseType db;
+ db.setName(dbName);
+ db.setPrimary(primaryShard);
+ db.setSharded(true);
+ ASSERT_OK(catalogClient()->insertConfigDocument(
+ operationContext(), DatabaseType::ConfigNS, db.toBSON(), kMajorityWriteConcern));
+}
+
+void MigrationManagerTest::setUpCollection(const std::string collName, ChunkVersion version) {
+ CollectionType coll;
+ coll.setNs(NamespaceString(collName));
+ coll.setEpoch(version.epoch());
+ coll.setUpdatedAt(Date_t::fromMillisSinceEpoch(version.toLong()));
+ coll.setKeyPattern(kKeyPattern);
+ coll.setUnique(false);
+ ASSERT_OK(catalogClient()->insertConfigDocument(
+ operationContext(), CollectionType::ConfigNS, coll.toBSON(), kMajorityWriteConcern));
+}
+
+ChunkType MigrationManagerTest::setUpChunk(const std::string& collName,
+ const BSONObj& chunkMin,
+ const BSONObj& chunkMax,
+ const ShardId& shardId,
+ const ChunkVersion& version) {
+ ChunkType chunk;
+ chunk.setNS(collName);
+ chunk.setMin(chunkMin);
+ chunk.setMax(chunkMax);
+ chunk.setShard(shardId);
+ chunk.setVersion(version);
+ ASSERT_OK(catalogClient()->insertConfigDocument(
+ operationContext(), ChunkType::ConfigNS, chunk.toBSON(), kMajorityWriteConcern));
+ return chunk;
+}
+
+void MigrationManagerTest::setUpMigration(const std::string& collName,
+ const BSONObj& minKey,
+ const BSONObj& maxKey,
+ const ShardId& toShard,
+ const ShardId& fromShard) {
+ BSONObjBuilder builder;
+ builder.append(MigrationType::ns(), collName);
+ builder.append(MigrationType::min(), minKey);
+ builder.append(MigrationType::max(), maxKey);
+ builder.append(MigrationType::toShard(), toShard.toString());
+ builder.append(MigrationType::fromShard(), fromShard.toString());
+ MigrationType migrationType = assertGet(MigrationType::fromBSON(builder.obj()));
+ ASSERT_OK(catalogClient()->insertConfigDocument(operationContext(),
+ MigrationType::ConfigNS,
+ migrationType.toBSON(),
+ kMajorityWriteConcern));
+}
+
+void MigrationManagerTest::checkMigrationsCollectionIsEmptyAndLocksAreUnlocked() {
+ auto statusWithMigrationsQueryResponse =
+ shardRegistry()->getConfigShard()->exhaustiveFindOnConfig(
+ operationContext(),
+ ReadPreferenceSetting{ReadPreference::PrimaryOnly},
+ repl::ReadConcernLevel::kMajorityReadConcern,
+ NamespaceString(MigrationType::ConfigNS),
+ BSONObj(),
+ BSONObj(),
+ boost::none);
+ Shard::QueryResponse migrationsQueryResponse =
+ uassertStatusOK(statusWithMigrationsQueryResponse);
+ ASSERT_EQUALS(0U, migrationsQueryResponse.docs.size());
+
+ auto statusWithLocksQueryResponse = shardRegistry()->getConfigShard()->exhaustiveFindOnConfig(
+ operationContext(),
+ ReadPreferenceSetting{ReadPreference::PrimaryOnly},
+ repl::ReadConcernLevel::kMajorityReadConcern,
+ NamespaceString(LocksType::ConfigNS),
+ BSON(LocksType::state(LocksType::LOCKED)),
+ BSONObj(),
+ boost::none);
+ Shard::QueryResponse locksQueryResponse = uassertStatusOK(statusWithLocksQueryResponse);
+ ASSERT_EQUALS(0U, locksQueryResponse.docs.size());
+}
+
+void MigrationManagerTest::expectMoveChunkCommand(const ChunkType& chunk,
+ const ShardId& toShardId,
+ const bool& takeDistLock,
+ const BSONObj& response) {
+ onCommand([&chunk, &toShardId, &takeDistLock, &response](const RemoteCommandRequest& request) {
+ NamespaceString nss(request.cmdObj.firstElement().valueStringData());
+ ASSERT_EQ(chunk.getNS(), nss.ns());
+
+ const StatusWith<MoveChunkRequest> moveChunkRequestWithStatus =
+ MoveChunkRequest::createFromCommand(nss, request.cmdObj);
+ ASSERT_OK(moveChunkRequestWithStatus.getStatus());
+
+ ASSERT_EQ(chunk.getNS(), moveChunkRequestWithStatus.getValue().getNss().ns());
+ ASSERT_BSONOBJ_EQ(chunk.getMin(), moveChunkRequestWithStatus.getValue().getMinKey());
+ ASSERT_BSONOBJ_EQ(chunk.getMax(), moveChunkRequestWithStatus.getValue().getMaxKey());
+ ASSERT_EQ(chunk.getShard(), moveChunkRequestWithStatus.getValue().getFromShardId());
+
+ ASSERT_EQ(toShardId, moveChunkRequestWithStatus.getValue().getToShardId());
+ ASSERT_EQ(takeDistLock, moveChunkRequestWithStatus.getValue().getTakeDistLock());
+
+ return response;
+ });
+}
+
+void MigrationManagerTest::expectMoveChunkCommand(const ChunkType& chunk,
+ const ShardId& toShardId,
+ const bool& takeDistLock,
+ const Status& returnStatus) {
+ BSONObjBuilder resultBuilder;
+ Command::appendCommandStatus(resultBuilder, returnStatus);
+ expectMoveChunkCommand(chunk, toShardId, takeDistLock, resultBuilder.obj());
+}
+
+TEST_F(MigrationManagerTest, OneCollectionTwoMigrations) {
+ // Set up two shards in the metadata.
+ ASSERT_OK(catalogClient()->insertConfigDocument(
+ operationContext(), ShardType::ConfigNS, kShard0, kMajorityWriteConcern));
+ ASSERT_OK(catalogClient()->insertConfigDocument(
+ operationContext(), ShardType::ConfigNS, kShard2, kMajorityWriteConcern));
+
+ // Set up the database and collection as sharded in the metadata.
+ std::string dbName = "foo";
+ std::string collName = "foo.bar";
+ ChunkVersion version(2, 0, OID::gen());
+
+ setUpDatabase(dbName, kShardId0);
+ setUpCollection(collName, version);
+
+ // Set up two chunks in the metadata.
+ ChunkType chunk1 =
+ setUpChunk(collName, kKeyPattern.globalMin(), BSON(kPattern << 49), kShardId0, version);
+ version.incMinor();
+ ChunkType chunk2 =
+ setUpChunk(collName, BSON(kPattern << 49), kKeyPattern.globalMax(), kShardId2, version);
+
+ // Going to request that these two chunks get migrated.
+ const std::vector<MigrateInfo> migrationRequests{{chunk1.getNS(), kShardId1, chunk1},
+ {chunk2.getNS(), kShardId3, chunk2}};
+
+ auto future = launchAsync([this, migrationRequests] {
+ Client::initThreadIfNotAlready("Test");
+ auto txn = cc().makeOperationContext();
+
+ // Scheduling the moveChunk commands requires finding a host to which to send the command.
+ // Set up dummy hosts for the source shards.
+ shardTargeterMock(txn.get(), kShardId0)->setFindHostReturnValue(kShardHost0);
+ shardTargeterMock(txn.get(), kShardId2)->setFindHostReturnValue(kShardHost2);
+
+ MigrationStatuses migrationStatuses = _migrationManager->executeMigrationsForAutoBalance(
+ txn.get(), migrationRequests, 0, kDefaultSecondaryThrottle, false);
+
+ for (const auto& migrateInfo : migrationRequests) {
+ ASSERT_OK(migrationStatuses.at(migrateInfo.getName()));
+ }
+ });
+
+ // Expect two moveChunk commands.
+ expectMoveChunkCommand(chunk1, kShardId1, false, Status::OK());
+ expectMoveChunkCommand(chunk2, kShardId3, false, Status::OK());
+
+ // Run the MigrationManager code.
+ future.timed_get(kFutureTimeout);
+}
+
+TEST_F(MigrationManagerTest, TwoCollectionsTwoMigrationsEach) {
+ // Set up two shards in the metadata.
+ ASSERT_OK(catalogClient()->insertConfigDocument(
+ operationContext(), ShardType::ConfigNS, kShard0, kMajorityWriteConcern));
+ ASSERT_OK(catalogClient()->insertConfigDocument(
+ operationContext(), ShardType::ConfigNS, kShard2, kMajorityWriteConcern));
+
+ // Set up a database and two collections as sharded in the metadata.
+ std::string dbName = "foo";
+ std::string collName1 = "foo.bar";
+ std::string collName2 = "foo.baz";
+ ChunkVersion version1(2, 0, OID::gen());
+ ChunkVersion version2(2, 0, OID::gen());
+
+ setUpDatabase(dbName, kShardId0);
+ setUpCollection(collName1, version1);
+ setUpCollection(collName2, version2);
+
+ // Set up two chunks in the metadata for each collection.
+ ChunkType chunk1coll1 =
+ setUpChunk(collName1, kKeyPattern.globalMin(), BSON(kPattern << 49), kShardId0, version1);
+ version1.incMinor();
+ ChunkType chunk2coll1 =
+ setUpChunk(collName1, BSON(kPattern << 49), kKeyPattern.globalMax(), kShardId2, version1);
+
+ ChunkType chunk1coll2 =
+ setUpChunk(collName2, kKeyPattern.globalMin(), BSON(kPattern << 49), kShardId0, version2);
+ version2.incMinor();
+ ChunkType chunk2coll2 =
+ setUpChunk(collName2, BSON(kPattern << 49), kKeyPattern.globalMax(), kShardId2, version2);
+
+ // Going to request that these four chunks get migrated.
+ const std::vector<MigrateInfo> migrationRequests{{chunk1coll1.getNS(), kShardId1, chunk1coll1},
+ {chunk2coll1.getNS(), kShardId3, chunk2coll1},
+ {chunk1coll2.getNS(), kShardId1, chunk1coll2},
+ {chunk2coll2.getNS(), kShardId3, chunk2coll2}};
+
+ auto future = launchAsync([this, migrationRequests] {
+ Client::initThreadIfNotAlready("Test");
+ auto txn = cc().makeOperationContext();
+
+ // Scheduling the moveChunk commands requires finding a host to which to send the command.
+ // Set up dummy hosts for the source shards.
+ shardTargeterMock(txn.get(), kShardId0)->setFindHostReturnValue(kShardHost0);
+ shardTargeterMock(txn.get(), kShardId2)->setFindHostReturnValue(kShardHost2);
+
+ MigrationStatuses migrationStatuses = _migrationManager->executeMigrationsForAutoBalance(
+ txn.get(), migrationRequests, 0, kDefaultSecondaryThrottle, false);
+
+ for (const auto& migrateInfo : migrationRequests) {
+ ASSERT_OK(migrationStatuses.at(migrateInfo.getName()));
+ }
+ });
+
+ // Expect four moveChunk commands.
+ expectMoveChunkCommand(chunk1coll1, kShardId1, false, Status::OK());
+ expectMoveChunkCommand(chunk2coll1, kShardId3, false, Status::OK());
+ expectMoveChunkCommand(chunk1coll2, kShardId1, false, Status::OK());
+ expectMoveChunkCommand(chunk2coll2, kShardId3, false, Status::OK());
+
+ // Run the MigrationManager code.
+ future.timed_get(kFutureTimeout);
+}
+
+// Old v3.2 shards expect to take the distributed lock before executing a moveChunk command. The
+// MigrationManager should take the distlock and fail the first moveChunk command with an old shard,
+// and then release the lock and retry the command successfully.
+TEST_F(MigrationManagerTest, SameCollectionOldShardMigration) {
+ // Set up two shards in the metadata.
+ ASSERT_OK(catalogClient()->insertConfigDocument(
+ operationContext(), ShardType::ConfigNS, kShard0, kMajorityWriteConcern));
+ ASSERT_OK(catalogClient()->insertConfigDocument(
+ operationContext(), ShardType::ConfigNS, kShard2, kMajorityWriteConcern));
+
+ // Set up the database and collection as sharded in the metadata.
+ std::string dbName = "foo";
+ std::string collName = "foo.bar";
+ ChunkVersion version(2, 0, OID::gen());
+
+ setUpDatabase(dbName, kShardId0);
+ setUpCollection(collName, version);
+
+ // Set up two chunks in the metadata.
+ ChunkType chunk1 =
+ setUpChunk(collName, kKeyPattern.globalMin(), BSON(kPattern << 49), kShardId0, version);
+ version.incMinor();
+ ChunkType chunk2 =
+ setUpChunk(collName, BSON(kPattern << 49), kKeyPattern.globalMax(), kShardId2, version);
+
+ // Going to request that these two chunks get migrated.
+ const std::vector<MigrateInfo> migrationRequests{{chunk1.getNS(), kShardId1, chunk1},
+ {chunk2.getNS(), kShardId3, chunk2}};
+
+ auto future = launchAsync([this, migrationRequests] {
+ Client::initThreadIfNotAlready("Test");
+ auto txn = cc().makeOperationContext();
+
+ // Scheduling the moveChunk commands requires finding a host to which to send the command.
+ // Set up dummy hosts for the source shards.
+ shardTargeterMock(txn.get(), kShardId0)->setFindHostReturnValue(kShardHost0);
+ shardTargeterMock(txn.get(), kShardId2)->setFindHostReturnValue(kShardHost2);
+
+ MigrationStatuses migrationStatuses = _migrationManager->executeMigrationsForAutoBalance(
+ txn.get(), migrationRequests, 0, kDefaultSecondaryThrottle, false);
+
+ for (const auto& migrateInfo : migrationRequests) {
+ ASSERT_OK(migrationStatuses.at(migrateInfo.getName()));
+ }
+ });
+
+ // Expect two moveChunk commands.
+ expectMoveChunkCommand(
+ chunk1,
+ kShardId1,
+ false,
+ Status(ErrorCodes::LockBusy, "SameCollectionOldShardMigration generated error."));
+ expectMoveChunkCommand(chunk2, kShardId3, false, Status::OK());
+ expectMoveChunkCommand(chunk1, kShardId1, true, Status::OK());
+
+ // Run the MigrationManager code.
+ future.timed_get(kFutureTimeout);
+}
+
+// Fail a migration if an old v3.2 shard fails to acquire the distributed lock more than once. The
+// first LockBusy error identifies the shard as an old shard to the MigrationManager, the second
+// indicates the lock is held elsewhere and unavailable.
+TEST_F(MigrationManagerTest, SameOldShardFailsToAcquireDistributedLockTwice) {
+ // Set up a shard in the metadata.
+ ASSERT_OK(catalogClient()->insertConfigDocument(
+ operationContext(), ShardType::ConfigNS, kShard0, kMajorityWriteConcern));
+
+ // Set up the database and collection as sharded in the metadata.
+ std::string dbName = "foo";
+ std::string collName = "foo.bar";
+ ChunkVersion version(2, 0, OID::gen());
+
+ setUpDatabase(dbName, kShardId0);
+ setUpCollection(collName, version);
+
+ // Set up a chunk in the metadata.
+ ChunkType chunk1 =
+ setUpChunk(collName, kKeyPattern.globalMin(), kKeyPattern.globalMax(), kShardId0, version);
+
+ // Going to request that this chunk get migrated.
+ const std::vector<MigrateInfo> migrationRequests{{chunk1.getNS(), kShardId1, chunk1}};
+
+ auto future = launchAsync([this, migrationRequests] {
+ Client::initThreadIfNotAlready("Test");
+ auto txn = cc().makeOperationContext();
+
+ // Scheduling the moveChunk commands requires finding a host to which to send the command.
+ // Set up a dummy host for the source shard.
+ shardTargeterMock(txn.get(), kShardId0)->setFindHostReturnValue(kShardHost0);
+
+ MigrationStatuses migrationStatuses = _migrationManager->executeMigrationsForAutoBalance(
+ txn.get(), migrationRequests, 0, kDefaultSecondaryThrottle, false);
+
+ for (const auto& migrateInfo : migrationRequests) {
+ ASSERT_EQ(ErrorCodes::LockBusy, migrationStatuses.at(migrateInfo.getName()));
+ }
+ });
+
+ // Expect two sequential moveChunk commands to the same shard, both of which fail with LockBusy.
+ expectMoveChunkCommand(
+ chunk1,
+ kShardId1,
+ false,
+ Status(ErrorCodes::LockBusy, "SameCollectionOldShardMigrations generated error."));
+ expectMoveChunkCommand(
+ chunk1,
+ kShardId1,
+ true,
+ Status(ErrorCodes::LockBusy, "SameCollectionOldShardMigrations generated error."));
+
+ // Run the MigrationManager code.
+ future.timed_get(kFutureTimeout);
+}
+
+// If in the same collection a migration is scheduled with an old v3.2 shard, a second migration in
+// the collection with a different old v3.2 shard should get rescheduled.
+TEST_F(MigrationManagerTest, SameCollectionTwoOldShardMigrations) {
+ // Set up two shards in the metadata.
+ ASSERT_OK(catalogClient()->insertConfigDocument(
+ operationContext(), ShardType::ConfigNS, kShard0, kMajorityWriteConcern));
+ ASSERT_OK(catalogClient()->insertConfigDocument(
+ operationContext(), ShardType::ConfigNS, kShard2, kMajorityWriteConcern));
+
+ // Set up the database and collection as sharded in the metadata.
+ std::string dbName = "foo";
+ std::string collName = "foo.bar";
+ ChunkVersion version(2, 0, OID::gen());
+
+ setUpDatabase(dbName, kShardId0);
+ setUpCollection(collName, version);
+
+ // Set up two chunks in the metadata.
+ ChunkType chunk1 =
+ setUpChunk(collName, kKeyPattern.globalMin(), BSON(kPattern << 49), kShardId0, version);
+ version.incMinor();
+ ChunkType chunk2 =
+ setUpChunk(collName, BSON(kPattern << 49), kKeyPattern.globalMax(), kShardId2, version);
+
+ // Going to request that these two chunks get migrated.
+ const std::vector<MigrateInfo> migrationRequests{{chunk1.getNS(), kShardId1, chunk1},
+ {chunk2.getNS(), kShardId3, chunk2}};
+
+ auto future = launchAsync([this, migrationRequests] {
+ Client::initThreadIfNotAlready("Test");
+ auto txn = cc().makeOperationContext();
+
+ // Scheduling the moveChunk commands requires finding a host to which to send the command.
+ // Set up dummy hosts for the source shards.
+ shardTargeterMock(txn.get(), kShardId0)->setFindHostReturnValue(kShardHost0);
+ shardTargeterMock(txn.get(), kShardId2)->setFindHostReturnValue(kShardHost2);
+
+ MigrationStatuses migrationStatuses = _migrationManager->executeMigrationsForAutoBalance(
+ txn.get(), migrationRequests, 0, kDefaultSecondaryThrottle, false);
+
+ for (const auto& migrateInfo : migrationRequests) {
+ ASSERT_OK(migrationStatuses.at(migrateInfo.getName()));
+ }
+ });
+
+ // Expect two failed moveChunk commands, then two successful moveChunk commands after the
+ // balancer releases the distributed lock.
+ expectMoveChunkCommand(
+ chunk1,
+ kShardId1,
+ false,
+ Status(ErrorCodes::LockBusy, "SameCollectionOldShardMigration generated error."));
+ expectMoveChunkCommand(
+ chunk2,
+ kShardId3,
+ false,
+ Status(ErrorCodes::LockBusy, "SameCollectionOldShardMigration generated error."));
+ expectMoveChunkCommand(chunk1, kShardId1, true, Status::OK());
+ expectMoveChunkCommand(chunk2, kShardId3, true, Status::OK());
+
+ // Run the MigrationManager code.
+ future.timed_get(kFutureTimeout);
+}
+
+// Takes the distributed lock for a collection so that that the MigrationManager is unable to
+// schedule migrations for that collection.
+TEST_F(MigrationManagerTest, FailToAcquireDistributedLock) {
+ // Set up two shards in the metadata.
+ ASSERT_OK(catalogClient()->insertConfigDocument(
+ operationContext(), ShardType::ConfigNS, kShard0, kMajorityWriteConcern));
+ ASSERT_OK(catalogClient()->insertConfigDocument(
+ operationContext(), ShardType::ConfigNS, kShard2, kMajorityWriteConcern));
+
+ // Set up the database and collection as sharded in the metadata.
+ std::string dbName = "foo";
+ std::string collName = "foo.bar";
+ ChunkVersion version(2, 0, OID::gen());
+
+ setUpDatabase(dbName, kShardId0);
+ setUpCollection(collName, version);
+
+ // Set up two chunks in the metadata.
+ ChunkType chunk1 =
+ setUpChunk(collName, kKeyPattern.globalMin(), BSON(kPattern << 49), kShardId0, version);
+ version.incMinor();
+ ChunkType chunk2 =
+ setUpChunk(collName, BSON(kPattern << 49), kKeyPattern.globalMax(), kShardId2, version);
+
+ // Going to request that these two chunks get migrated.
+ const std::vector<MigrateInfo> migrationRequests{{chunk1.getNS(), kShardId1, chunk1},
+ {chunk2.getNS(), kShardId3, chunk2}};
+
+ shardTargeterMock(operationContext(), kShardId0)->setFindHostReturnValue(kShardHost0);
+ shardTargeterMock(operationContext(), kShardId2)->setFindHostReturnValue(kShardHost2);
+
+ // Take the distributed lock for the collection before scheduling via the MigrationManager.
+ const std::string whyMessage("FailToAcquireDistributedLock unit-test taking distributed lock");
+ DistLockManager::ScopedDistLock distLockStatus = assertGet(
+ catalogClient()->getDistLockManager()->lock(operationContext(),
+ chunk1.getNS(),
+ whyMessage,
+ DistLockManager::kSingleLockAttemptTimeout));
+
+ MigrationStatuses migrationStatuses = _migrationManager->executeMigrationsForAutoBalance(
+ operationContext(), migrationRequests, 0, kDefaultSecondaryThrottle, false);
+
+ for (const auto& migrateInfo : migrationRequests) {
+ ASSERT_EQ(ErrorCodes::ConflictingOperationInProgress,
+ migrationStatuses.at(migrateInfo.getName()));
+ }
+}
+
+// The MigrationManager should fail the migration if a host is not found for the source shard.
+// Scheduling a moveChunk command requires finding a host to which to send the command.
+TEST_F(MigrationManagerTest, SourceShardNotFound) {
+ // Set up two shards in the metadata.
+ ASSERT_OK(catalogClient()->insertConfigDocument(
+ operationContext(), ShardType::ConfigNS, kShard0, kMajorityWriteConcern));
+ ASSERT_OK(catalogClient()->insertConfigDocument(
+ operationContext(), ShardType::ConfigNS, kShard2, kMajorityWriteConcern));
+
+ // Set up the database and collection as sharded in the metadata.
+ std::string dbName = "foo";
+ std::string collName = "foo.bar";
+ ChunkVersion version(2, 0, OID::gen());
+
+ setUpDatabase(dbName, kShardId0);
+ setUpCollection(collName, version);
+
+ // Set up two chunks in the metadata.
+ ChunkType chunk1 =
+ setUpChunk(collName, kKeyPattern.globalMin(), BSON(kPattern << 49), kShardId0, version);
+ version.incMinor();
+ ChunkType chunk2 =
+ setUpChunk(collName, BSON(kPattern << 49), kKeyPattern.globalMax(), kShardId2, version);
+
+ // Going to request that these two chunks get migrated.
+ const std::vector<MigrateInfo> migrationRequests{{chunk1.getNS(), kShardId1, chunk1},
+ {chunk2.getNS(), kShardId3, chunk2}};
+
+ auto future = launchAsync([this, chunk1, chunk2, migrationRequests] {
+ Client::initThreadIfNotAlready("Test");
+ auto txn = cc().makeOperationContext();
+
+ // Scheduling a moveChunk command requires finding a host to which to send the command. Set
+ // up a dummy host for kShardHost0, and return an error for kShardHost3.
+ shardTargeterMock(txn.get(), kShardId0)->setFindHostReturnValue(kShardHost0);
+ shardTargeterMock(txn.get(), kShardId2)
+ ->setFindHostReturnValue(
+ Status(ErrorCodes::ReplicaSetNotFound, "SourceShardNotFound generated error."));
+
+ MigrationStatuses migrationStatuses = _migrationManager->executeMigrationsForAutoBalance(
+ txn.get(), migrationRequests, 0, kDefaultSecondaryThrottle, false);
+
+ ASSERT_OK(migrationStatuses.at(chunk1.getName()));
+ ASSERT_EQ(ErrorCodes::ReplicaSetNotFound, migrationStatuses.at(chunk2.getName()));
+ });
+
+ // Expect only one moveChunk command to be called.
+ expectMoveChunkCommand(chunk1, kShardId1, false, Status::OK());
+
+ // Run the MigrationManager code.
+ future.timed_get(kFutureTimeout);
+}
+
+TEST_F(MigrationManagerTest, JumboChunkResponseBackwardsCompatibility) {
+ // Set up one shard in the metadata.
+ ASSERT_OK(catalogClient()->insertConfigDocument(
+ operationContext(), ShardType::ConfigNS, kShard0, kMajorityWriteConcern));
+
+ // Set up the database and collection as sharded in the metadata.
+ std::string dbName = "foo";
+ std::string collName = "foo.bar";
+ ChunkVersion version(2, 0, OID::gen());
+
+ setUpDatabase(dbName, kShardId0);
+ setUpCollection(collName, version);
+
+ // Set up a single chunk in the metadata.
+ ChunkType chunk1 =
+ setUpChunk(collName, kKeyPattern.globalMin(), kKeyPattern.globalMax(), kShardId0, version);
+
+ // Going to request that this chunk gets migrated.
+ const std::vector<MigrateInfo> migrationRequests{{chunk1.getNS(), kShardId1, chunk1}};
+
+ auto future = launchAsync([this, chunk1, migrationRequests] {
+ Client::initThreadIfNotAlready("Test");
+ auto txn = cc().makeOperationContext();
+
+ // Scheduling a moveChunk command requires finding a host to which to send the command. Set
+ // up a dummy host for kShardHost0.
+ shardTargeterMock(txn.get(), kShardId0)->setFindHostReturnValue(kShardHost0);
+
+ MigrationStatuses migrationStatuses = _migrationManager->executeMigrationsForAutoBalance(
+ txn.get(), migrationRequests, 0, kDefaultSecondaryThrottle, false);
+
+ ASSERT_EQ(ErrorCodes::ChunkTooBig, migrationStatuses.at(chunk1.getName()));
+ });
+
+ // Expect only one moveChunk command to be called.
+ expectMoveChunkCommand(chunk1, kShardId1, false, BSON("ok" << 0 << "chunkTooBig" << true));
+
+ // Run the MigrationManager code.
+ future.timed_get(kFutureTimeout);
+}
+
+TEST_F(MigrationManagerTest, InterruptMigration) {
+ // Set up one shard in the metadata.
+ ASSERT_OK(catalogClient()->insertConfigDocument(
+ operationContext(), ShardType::ConfigNS, kShard0, kMajorityWriteConcern));
+
+ // Set up the database and collection as sharded in the metadata.
+ std::string dbName = "foo";
+ std::string collName = "foo.bar";
+ ChunkVersion version(2, 0, OID::gen());
+
+ setUpDatabase(dbName, kShardId0);
+ setUpCollection(collName, version);
+
+ // Set up a single chunk in the metadata.
+ ChunkType chunk =
+ setUpChunk(collName, kKeyPattern.globalMin(), kKeyPattern.globalMax(), kShardId0, version);
+
+ auto future = launchAsync([&] {
+ Client::initThreadIfNotAlready("Test");
+ auto txn = cc().makeOperationContext();
+
+ // Scheduling a moveChunk command requires finding a host to which to send the command. Set
+ // up a dummy host for kShardHost0.
+ shardTargeterMock(txn.get(), kShardId0)->setFindHostReturnValue(kShardHost0);
+
+ ASSERT_NOT_OK(_migrationManager->executeManualMigration(
+ txn.get(), {chunk.getNS(), kShardId1, chunk}, 0, kDefaultSecondaryThrottle, false));
+ });
+
+ // Wait till the move chunk request gets sent and pretend that it is stuck by never responding
+ // to the request
+ network()->enterNetwork();
+ network()->blackHole(network()->getNextReadyRequest());
+ network()->exitNetwork();
+
+ // Now that the migration request is 'pending', try to cancel the migration manager. This should
+ // succeed.
+ _migrationManager->interruptAndDisableMigrations();
+
+ // Ensure that cancellations get processed
+ network()->enterNetwork();
+ network()->runReadyNetworkOperations();
+ network()->exitNetwork();
+
+ // Ensure that the previously scheduled migration is cancelled
+ future.timed_get(kFutureTimeout);
+
+ // Ensure that no new migrations can be scheduled
+ ASSERT_NOT_OK(_migrationManager->executeManualMigration(operationContext(),
+ {chunk.getNS(), kShardId1, chunk},
+ 0,
+ kDefaultSecondaryThrottle,
+ false));
+
+ // Ensure that the migration manager is no longer handling any migrations.
+ _migrationManager->drainActiveMigrations();
+
+ // Check that the migration that was active when the migration manager was interrupted can be
+ // found in config.migrations (and thus would be recovered if a migration manager were to start
+ // up again).
+ auto statusWithMigrationsQueryResponse =
+ shardRegistry()->getConfigShard()->exhaustiveFindOnConfig(
+ operationContext(),
+ ReadPreferenceSetting{ReadPreference::PrimaryOnly},
+ repl::ReadConcernLevel::kMajorityReadConcern,
+ NamespaceString(MigrationType::ConfigNS),
+ BSON(MigrationType::name(chunk.getName())),
+ BSONObj(),
+ boost::none);
+ Shard::QueryResponse migrationsQueryResponse =
+ uassertStatusOK(statusWithMigrationsQueryResponse);
+ ASSERT_EQUALS(1U, migrationsQueryResponse.docs.size());
+
+ ASSERT_OK(catalogClient()->removeConfigDocuments(operationContext(),
+ MigrationType::ConfigNS,
+ BSON(MigrationType::name(chunk.getName())),
+ kMajorityWriteConcern));
+
+ // Restore the migration manager back to the started state, which is expected by tearDown
+ _migrationManager->startRecoveryAndAcquireDistLocks(operationContext());
+ _migrationManager->finishRecovery(operationContext(), 0, kDefaultSecondaryThrottle, false);
+}
+
+TEST_F(MigrationManagerTest, RestartMigrationManager) {
+ // Set up one shard in the metadata.
+ ASSERT_OK(catalogClient()->insertConfigDocument(
+ operationContext(), ShardType::ConfigNS, kShard0, kMajorityWriteConcern));
+
+ // Set up the database and collection as sharded in the metadata.
+ std::string dbName = "foo";
+ std::string collName = "foo.bar";
+ ChunkVersion version(2, 0, OID::gen());
+
+ setUpDatabase(dbName, kShardId0);
+ setUpCollection(collName, version);
+
+ // Set up a single chunk in the metadata.
+ ChunkType chunk1 =
+ setUpChunk(collName, kKeyPattern.globalMin(), kKeyPattern.globalMax(), kShardId0, version);
+
+ // Go through the lifecycle of the migration manager
+ _migrationManager->interruptAndDisableMigrations();
+ _migrationManager->drainActiveMigrations();
+ _migrationManager->startRecoveryAndAcquireDistLocks(operationContext());
+ _migrationManager->finishRecovery(operationContext(), 0, kDefaultSecondaryThrottle, false);
+
+ auto future = launchAsync([&] {
+ Client::initThreadIfNotAlready("Test");
+ auto txn = cc().makeOperationContext();
+
+ // Scheduling a moveChunk command requires finding a host to which to send the command. Set
+ // up a dummy host for kShardHost0.
+ shardTargeterMock(txn.get(), kShardId0)->setFindHostReturnValue(kShardHost0);
+
+ ASSERT_OK(_migrationManager->executeManualMigration(
+ txn.get(), {chunk1.getNS(), kShardId1, chunk1}, 0, kDefaultSecondaryThrottle, false));
+ });
+
+ // Expect only one moveChunk command to be called.
+ expectMoveChunkCommand(chunk1, kShardId1, false, Status::OK());
+
+ // Run the MigrationManager code.
+ future.timed_get(kFutureTimeout);
+}
+
+TEST_F(MigrationManagerTest, MigrationRecovery) {
+ // Set up two shards in the metadata.
+ ASSERT_OK(catalogClient()->insertConfigDocument(
+ operationContext(), ShardType::ConfigNS, kShard0, kMajorityWriteConcern));
+ ASSERT_OK(catalogClient()->insertConfigDocument(
+ operationContext(), ShardType::ConfigNS, kShard2, kMajorityWriteConcern));
+
+ // Set up the database and collection as sharded in the metadata.
+ std::string dbName = "foo";
+ std::string collName = "foo.bar";
+ ChunkVersion version(2, 0, OID::gen());
+
+ setUpDatabase(dbName, kShardId0);
+ setUpCollection(collName, version);
+
+ // Set up two chunks in the metadata.
+ ChunkType chunk1 =
+ setUpChunk(collName, kKeyPattern.globalMin(), BSON(kPattern << 49), kShardId0, version);
+ version.incMinor();
+ ChunkType chunk2 =
+ setUpChunk(collName, BSON(kPattern << 49), kKeyPattern.globalMax(), kShardId2, version);
+
+ _migrationManager->interruptAndDisableMigrations();
+ _migrationManager->drainActiveMigrations();
+
+ // Set up two fake active migrations by writing documents to the config.migrations collection.
+ setUpMigration(collName,
+ chunk1.getMin(),
+ chunk1.getMax(),
+ kShardId1.toString(),
+ chunk1.getShard().toString());
+ setUpMigration(collName,
+ chunk2.getMin(),
+ chunk2.getMax(),
+ kShardId3.toString(),
+ chunk2.getShard().toString());
+
+ _migrationManager->startRecoveryAndAcquireDistLocks(operationContext());
+
+ auto future = launchAsync([this] {
+ Client::initThreadIfNotAlready("Test");
+ auto txn = cc().makeOperationContext();
+
+ // Scheduling the moveChunk commands requires finding hosts to which to send the commands.
+ // Set up dummy hosts for the source shards.
+ shardTargeterMock(txn.get(), kShardId0)->setFindHostReturnValue(kShardHost0);
+ shardTargeterMock(txn.get(), kShardId2)->setFindHostReturnValue(kShardHost2);
+
+ _migrationManager->finishRecovery(txn.get(), 0, kDefaultSecondaryThrottle, false);
+ });
+
+ // Expect two moveChunk commands.
+ expectMoveChunkCommand(chunk1, kShardId1, false, Status::OK());
+ expectMoveChunkCommand(chunk2, kShardId3, false, Status::OK());
+
+ // Run the MigrationManager code.
+ future.timed_get(kFutureTimeout);
+}
+
+TEST_F(MigrationManagerTest, FailMigrationRecovery) {
+ // Set up two shards in the metadata.
+ ASSERT_OK(catalogClient()->insertConfigDocument(
+ operationContext(), ShardType::ConfigNS, kShard0, kMajorityWriteConcern));
+ ASSERT_OK(catalogClient()->insertConfigDocument(
+ operationContext(), ShardType::ConfigNS, kShard2, kMajorityWriteConcern));
+
+ // Set up the database and collection as sharded in the metadata.
+ std::string dbName = "foo";
+ std::string collName = "foo.bar";
+ ChunkVersion version(2, 0, OID::gen());
+
+ setUpDatabase(dbName, kShardId0);
+ setUpCollection(collName, version);
+
+ // Set up two chunks in the metadata.
+ ChunkType chunk1 =
+ setUpChunk(collName, kKeyPattern.globalMin(), BSON(kPattern << 49), kShardId0, version);
+ version.incMinor();
+ ChunkType chunk2 =
+ setUpChunk(collName, BSON(kPattern << 49), kKeyPattern.globalMax(), kShardId2, version);
+
+ _migrationManager->interruptAndDisableMigrations();
+ _migrationManager->drainActiveMigrations();
+
+ // Set up a parsable fake active migration document in the config.migrations collection.
+ setUpMigration(collName,
+ chunk1.getMin(),
+ chunk1.getMax(),
+ kShardId1.toString(),
+ chunk1.getShard().toString());
+
+ // Set up a fake active migration document that will fail MigrationType parsing -- missing
+ // field.
+ BSONObjBuilder builder;
+ builder.append("_id", "testing");
+ // No MigrationType::ns() field!
+ builder.append(MigrationType::min(), chunk2.getMin());
+ builder.append(MigrationType::max(), chunk2.getMax());
+ builder.append(MigrationType::toShard(), kShardId3.toString());
+ builder.append(MigrationType::fromShard(), chunk2.getShard().toString());
+ ASSERT_OK(catalogClient()->insertConfigDocument(
+ operationContext(), MigrationType::ConfigNS, builder.obj(), kMajorityWriteConcern));
+
+ // Take the distributed lock for the collection, which should be released during recovery when
+ // it fails. Any dist lock held by the config server will be released via proccessId, so the
+ // session ID used here doesn't matter.
+ ASSERT_OK(catalogClient()->getDistLockManager()->lockWithSessionID(
+ operationContext(),
+ collName,
+ "MigrationManagerTest",
+ OID::gen(),
+ DistLockManager::kSingleLockAttemptTimeout));
+
+ _migrationManager->startRecoveryAndAcquireDistLocks(operationContext());
+ _migrationManager->finishRecovery(operationContext(), 0, kDefaultSecondaryThrottle, false);
+
+ // MigrationManagerTest::tearDown checks that the config.migrations collection is empty and all
+ // distributed locks are unlocked.
+}
+
+} // namespace
+} // namespace mongo