summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDianna Hohensee <dianna.hohensee@10gen.com>2016-06-24 11:12:39 -0400
committerDianna Hohensee <dianna.hohensee@10gen.com>2016-07-13 14:57:19 -0400
commit08f3a57fbb54e72a03e47047a1e8aea0f708f1c7 (patch)
tree0aa8510937037a085d4d322894b352b14aa05357
parentf18bb78b469cc980c45ce0e5a3eb66d4c090292e (diff)
downloadmongo-08f3a57fbb54e72a03e47047a1e8aea0f708f1c7.tar.gz
SERVER-24532 adding MigrationManager to manage parallel migrations
-rw-r--r--src/mongo/db/repl/replication_coordinator_mock.cpp7
-rw-r--r--src/mongo/db/repl/replication_coordinator_mock.h7
-rw-r--r--src/mongo/s/SConscript13
-rw-r--r--src/mongo/s/balancer/balancer_policy.cpp4
-rw-r--r--src/mongo/s/balancer/balancer_policy.h1
-rw-r--r--src/mongo/s/balancer/migration_manager.cpp329
-rw-r--r--src/mongo/s/balancer/migration_manager.h192
-rw-r--r--src/mongo/s/balancer/migration_manager_test.cpp282
-rw-r--r--src/mongo/s/client/shard_local.cpp11
-rw-r--r--src/mongo/s/client/shard_local.h2
-rw-r--r--src/mongo/s/client/shard_local_test.cpp9
-rw-r--r--src/mongo/s/config_server_test_fixture.cpp56
-rw-r--r--src/mongo/s/config_server_test_fixture.h6
13 files changed, 888 insertions, 31 deletions
diff --git a/src/mongo/db/repl/replication_coordinator_mock.cpp b/src/mongo/db/repl/replication_coordinator_mock.cpp
index 0d95c9f3f0e..4a0bb7c8ea8 100644
--- a/src/mongo/db/repl/replication_coordinator_mock.cpp
+++ b/src/mongo/db/repl/replication_coordinator_mock.cpp
@@ -33,7 +33,6 @@
#include "mongo/base/status.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/repl/read_concern_args.h"
-#include "mongo/db/repl/replica_set_config.h"
#include "mongo/db/repl/sync_source_resolver.h"
#include "mongo/db/storage/snapshot_name.h"
#include "mongo/db/write_concern_options.h"
@@ -226,7 +225,11 @@ StatusWith<BSONObj> ReplicationCoordinatorMock::prepareReplSetUpdatePositionComm
}
ReplicaSetConfig ReplicationCoordinatorMock::getConfig() const {
- return ReplicaSetConfig();
+ return _getConfigReturnValue;
+}
+
+void ReplicationCoordinatorMock::setGetConfigReturnValue(ReplicaSetConfig returnValue) {
+ _getConfigReturnValue = std::move(returnValue);
}
void ReplicationCoordinatorMock::processReplSetGetConfig(BSONObjBuilder* result) {
diff --git a/src/mongo/db/repl/replication_coordinator_mock.h b/src/mongo/db/repl/replication_coordinator_mock.h
index 58e213ed6ee..4dd04f8fb3f 100644
--- a/src/mongo/db/repl/replication_coordinator_mock.h
+++ b/src/mongo/db/repl/replication_coordinator_mock.h
@@ -30,6 +30,7 @@
#include "mongo/base/status.h"
#include "mongo/db/repl/optime.h"
+#include "mongo/db/repl/replica_set_config.h"
#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/platform/atomic_word.h"
@@ -258,12 +259,18 @@ public:
virtual Status stepUpIfEligible() override;
+ /**
+ * Sets the return value for calls to getConfig.
+ */
+ void setGetConfigReturnValue(ReplicaSetConfig returnValue);
+
private:
AtomicUInt64 _snapshotNameGenerator;
const ReplSettings _settings;
MemberState _memberState;
OpTime _myLastDurableOpTime;
OpTime _myLastAppliedOpTime;
+ ReplicaSetConfig _getConfigReturnValue;
};
} // namespace repl
diff --git a/src/mongo/s/SConscript b/src/mongo/s/SConscript
index 8b0b8321aaa..4cb691f5153 100644
--- a/src/mongo/s/SConscript
+++ b/src/mongo/s/SConscript
@@ -249,6 +249,7 @@ env.Library(
'balancer/balancer_policy.cpp',
'balancer/cluster_statistics.cpp',
'balancer/cluster_statistics_impl.cpp',
+ 'balancer/migration_manager.cpp',
'catalog/catalog_cache.cpp',
'chunk.cpp',
'chunk_manager.cpp',
@@ -337,6 +338,18 @@ env.CppUnitTest(
]
)
+env.CppUnitTest(
+ target='migration_manager_test',
+ source=[
+ 'balancer/migration_manager_test.cpp',
+ ],
+ LIBDEPS=[
+ '$BUILD_DIR/mongo/db/repl/replmocks',
+ 'config_server_test_fixture',
+ 'coreshard',
+ ]
+)
+
env.Library(
target='local_sharding_info',
source=[
diff --git a/src/mongo/s/balancer/balancer_policy.cpp b/src/mongo/s/balancer/balancer_policy.cpp
index 205102fe909..12670903785 100644
--- a/src/mongo/s/balancer/balancer_policy.cpp
+++ b/src/mongo/s/balancer/balancer_policy.cpp
@@ -380,6 +380,10 @@ string TagRange::toString() const {
return str::stream() << min << " -->> " << max << " on " << tag;
}
+std::string MigrateInfo::getName() const {
+ return ChunkType::genID(ns, minKey);
+}
+
string MigrateInfo::toString() const {
return str::stream() << ns << ": [" << minKey << ", " << maxKey << "), from " << from << ", to "
<< to;
diff --git a/src/mongo/s/balancer/balancer_policy.h b/src/mongo/s/balancer/balancer_policy.h
index 01fb4cd64af..97aeeb05c64 100644
--- a/src/mongo/s/balancer/balancer_policy.h
+++ b/src/mongo/s/balancer/balancer_policy.h
@@ -60,6 +60,7 @@ struct MigrateInfo {
minKey(a_chunk.getMin()),
maxKey(a_chunk.getMax()) {}
+ std::string getName() const;
std::string toString() const;
std::string ns;
diff --git a/src/mongo/s/balancer/migration_manager.cpp b/src/mongo/s/balancer/migration_manager.cpp
new file mode 100644
index 00000000000..42183f00b03
--- /dev/null
+++ b/src/mongo/s/balancer/migration_manager.cpp
@@ -0,0 +1,329 @@
+/**
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding
+
+#include <memory>
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/s/balancer/migration_manager.h"
+
+#include "mongo/client/remote_command_targeter.h"
+#include "mongo/db/operation_context.h"
+#include "mongo/executor/task_executor.h"
+#include "mongo/executor/task_executor_pool.h"
+#include "mongo/rpc/get_status_from_command_result.h"
+#include "mongo/s/balancer/balancer_configuration.h"
+#include "mongo/s/catalog/sharding_catalog_client.h"
+#include "mongo/s/client/shard_registry.h"
+#include "mongo/s/grid.h"
+#include "mongo/s/move_chunk_request.h"
+#include "mongo/s/sharding_raii.h"
+#include "mongo/util/assert_util.h"
+#include "mongo/util/log.h"
+#include "mongo/util/represent_as.h"
+
+namespace mongo {
+
+namespace {
+
+using executor::RemoteCommandRequest;
+using executor::RemoteCommandResponse;
+
+} // namespace
+
+MigrationManager::Migration::Migration(
+ const MigrateInfo& migrateInfo,
+ boost::optional<executor::TaskExecutor::CallbackHandle> callbackHandle)
+ : chunkInfo(&migrateInfo),
+ moveChunkCallbackHandle(std::move(callbackHandle)),
+ oldShard(false) {}
+
+void MigrationManager::Migration::setCallbackHandle(
+ executor::TaskExecutor::CallbackHandle callbackHandle) {
+ invariant(!moveChunkCallbackHandle);
+ moveChunkCallbackHandle = std::move(callbackHandle);
+}
+
+void MigrationManager::Migration::clearCallbackHandle() {
+ moveChunkCallbackHandle = boost::none;
+}
+
+MigrationManager::DistLockTracker::DistLockTracker(
+ boost::optional<DistLockManager::ScopedDistLock> distlock)
+ : distributedLock(std::move(distlock)) {
+ if (distlock) {
+ migrationCounter = 1;
+ } else {
+ migrationCounter = 0;
+ }
+}
+
+MigrationStatuses MigrationManager::scheduleMigrations(
+ OperationContext* txn,
+ const BalancerChunkSelectionPolicy::MigrateInfoVector& candidateMigrations) {
+ invariant(_activeMigrations.empty());
+
+ MigrationStatuses migrationStatuses;
+
+ for (auto& migrateInfo : candidateMigrations) {
+ _activeMigrations.push_back(Migration(migrateInfo, boost::none));
+ }
+
+ _executeMigrations(txn, &migrationStatuses);
+
+ return migrationStatuses;
+}
+
+void MigrationManager::_executeMigrations(OperationContext* txn,
+ MigrationStatuses* migrationStatuses) {
+ auto balancerConfig = Grid::get(txn)->getBalancerConfiguration();
+ for (auto& migration : _activeMigrations) {
+ const NamespaceString nss(migration.chunkInfo->ns);
+
+ auto scopedCMStatus = ScopedChunkManager::getExisting(txn, nss);
+ if (!scopedCMStatus.isOK()) {
+ // Unable to find the ChunkManager for "nss" for whatever reason; abandon this
+ // migration and proceed to the next.
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ migrationStatuses->insert(MigrationStatuses::value_type(
+ migration.chunkInfo->getName(), std::move(scopedCMStatus.getStatus())));
+ continue;
+ }
+
+ ChunkManager* const chunkManager = scopedCMStatus.getValue().cm();
+ auto chunk = chunkManager->findIntersectingChunk(txn, migration.chunkInfo->minKey);
+
+ {
+ // No need to lock the mutex. Only this function and _takeDistLockForAMigration
+ // manipulate "_distributedLocks". No need to protect serial actions.
+ if (!_takeDistLockForAMigration(txn, migration, migrationStatuses)) {
+ // If there is a lock conflict between the balancer and the shard, or a shard and a
+ // shard, the migration has been rescheduled. Otherwise an attempt to take the lock
+ // failed for whatever reason and this migration is being abandoned.
+ continue;
+ }
+ }
+
+ BSONObjBuilder builder;
+ MoveChunkRequest::appendAsCommand(
+ &builder,
+ nss,
+ chunkManager->getVersion(),
+ Grid::get(txn)->shardRegistry()->getConfigServerConnectionString(),
+ migration.chunkInfo->from,
+ migration.chunkInfo->to,
+ ChunkRange(chunk->getMin(), chunk->getMax()),
+ balancerConfig->getMaxChunkSizeBytes(),
+ balancerConfig->getSecondaryThrottle(),
+ balancerConfig->waitForDelete(),
+ migration.oldShard ? true : false); // takeDistLock flag.
+
+ BSONObj moveChunkRequestObj = builder.obj();
+
+ const auto recipientShard = grid.shardRegistry()->getShard(txn, migration.chunkInfo->from);
+ const auto host = recipientShard->getTargeter()->findHost(
+ ReadPreferenceSetting{ReadPreference::PrimaryOnly},
+ RemoteCommandTargeter::selectFindHostMaxWaitTime(txn));
+ if (!host.isOK()) {
+ // Unable to find a target shard for whatever reason; abandon this migration and proceed
+ // to the next.
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ migrationStatuses->insert(MigrationStatuses::value_type(migration.chunkInfo->getName(),
+ std::move(host.getStatus())));
+ continue;
+ }
+
+ RemoteCommandRequest remoteRequest(host.getValue(), "admin", moveChunkRequestObj);
+
+ StatusWith<RemoteCommandResponse> remoteCommandResponse(
+ Status{ErrorCodes::InternalError, "Uninitialized value"});
+
+ executor::TaskExecutor* executor = Grid::get(txn)->getExecutorPool()->getFixedExecutor();
+
+ StatusWith<executor::TaskExecutor::CallbackHandle> callbackHandleWithStatus =
+ executor->scheduleRemoteCommand(remoteRequest,
+ stdx::bind(&MigrationManager::_checkMigrationCallback,
+ this,
+ stdx::placeholders::_1,
+ txn,
+ &migration,
+ migrationStatuses));
+
+ if (!callbackHandleWithStatus.isOK()) {
+ // Scheduling the migration moveChunk failed.
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ migrationStatuses->insert(MigrationStatuses::value_type(
+ migration.chunkInfo->getName(), std::move(callbackHandleWithStatus.getStatus())));
+ continue;
+ }
+
+ // The moveChunk command was successfully scheduled. Store the callback handle so that the
+ // command's return can be waited for later.
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ migration.setCallbackHandle(std::move(callbackHandleWithStatus.getValue()));
+ }
+
+ _waitForMigrations(txn);
+ // At this point, there are no parallel running threads so it is safe not to lock the mutex.
+
+ // All the migrations have returned, release all of the distributed locks that are no longer
+ // being used.
+ _distributedLocks.clear();
+
+ // If there are rescheduled migrations, move them to active and run the function again.
+ if (!_rescheduledMigrations.empty()) {
+ // Clear all the callback handles of the rescheduled migrations.
+ for (auto migration : _rescheduledMigrations) {
+ migration.clearCallbackHandle();
+ }
+ _activeMigrations = std::move(_rescheduledMigrations);
+ _rescheduledMigrations.clear();
+ _executeMigrations(txn, migrationStatuses);
+ } else {
+ _activeMigrations.clear();
+ }
+}
+
+void MigrationManager::_checkMigrationCallback(
+ const executor::TaskExecutor::RemoteCommandCallbackArgs& callbackArgs,
+ OperationContext* txn,
+ Migration* migration,
+ MigrationStatuses* migrationStatuses) {
+ const auto& remoteCommandResponseWithStatus = callbackArgs.response;
+
+ if (!remoteCommandResponseWithStatus.isOK()) {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ migrationStatuses->insert(
+ MigrationStatuses::value_type(migration->chunkInfo->getName(),
+ std::move(remoteCommandResponseWithStatus.getStatus())));
+ return;
+ }
+
+ Status commandStatus =
+ getStatusFromCommandResult(remoteCommandResponseWithStatus.getValue().data);
+ if (commandStatus == ErrorCodes::LockBusy && !migration->oldShard) {
+ migration->oldShard = true;
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ _rescheduleMigration(*migration);
+ return;
+ }
+
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ migrationStatuses->insert(
+ MigrationStatuses::value_type(migration->chunkInfo->getName(), std::move(commandStatus)));
+}
+
+void MigrationManager::_waitForMigrations(OperationContext* txn) const {
+ executor::TaskExecutor* executor = Grid::get(txn)->getExecutorPool()->getFixedExecutor();
+ for (const auto& migration : _activeMigrations) {
+ // Block until the command is carried out.
+ if (migration.moveChunkCallbackHandle) {
+ executor->wait(migration.moveChunkCallbackHandle.get());
+ }
+ }
+}
+
+void MigrationManager::_rescheduleMigration(const Migration& migration) {
+ _rescheduledMigrations.push_back(migration);
+}
+
+bool MigrationManager::_takeDistLockForAMigration(OperationContext* txn,
+ const Migration& migration,
+ MigrationStatuses* migrationStatuses) {
+ auto it = _distributedLocks.find(migration.chunkInfo->ns);
+
+ if (it == _distributedLocks.end()) {
+ // Neither the balancer nor the shard has the distributed collection lock for "ns".
+ if (migration.oldShard) {
+ DistLockTracker distLockTracker(boost::none);
+ _distributedLocks.insert(std::map<std::string, DistLockTracker>::value_type(
+ migration.chunkInfo->ns, std::move(distLockTracker)));
+ } else {
+ auto distlock = _getDistLock(txn, migration);
+ if (!distlock.isOK()) {
+ // Abandon the migration so the balancer doesn't reschedule endlessly if whatever is
+ // preventing the distlock from being acquired doesn't go away.
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ migrationStatuses->insert(MigrationStatuses::value_type(
+ migration.chunkInfo->getName(), std::move(distlock.getStatus())));
+ return false;
+ }
+ DistLockTracker distLockTracker(std::move(distlock.getValue()));
+ _distributedLocks.insert(std::map<std::string, DistLockTracker>::value_type(
+ migration.chunkInfo->ns, std::move(distLockTracker)));
+ }
+ } else {
+ DistLockTracker* distLockTracker = &(it->second);
+ if (!distLockTracker->distributedLock) {
+ // Lock conflict. A shard holds the lock for a different migration.
+ invariant(distLockTracker->migrationCounter == 0 && !distLockTracker->distributedLock);
+ _rescheduleMigration(migration);
+ return false;
+ } else {
+ invariant(distLockTracker->distributedLock && distLockTracker->migrationCounter > 0);
+ if (migration.oldShard) {
+ // Lock conflict. The balancer holds the lock, so the shard cannot take it yet.
+ _rescheduleMigration(migration);
+ return false;
+ } else {
+ ++(distLockTracker->migrationCounter);
+ }
+ }
+ }
+
+ return true;
+}
+
+StatusWith<DistLockManager::ScopedDistLock> MigrationManager::_getDistLock(
+ OperationContext* txn, const Migration& migration) {
+ const std::string whyMessage(
+ str::stream()
+ << "migrating chunk "
+ << ChunkRange(migration.chunkInfo->minKey, migration.chunkInfo->maxKey).toString()
+ << " in "
+ << migration.chunkInfo->ns);
+
+ StatusWith<DistLockManager::ScopedDistLock> distLockStatus =
+ Grid::get(txn)->catalogClient(txn)->distLock(txn, migration.chunkInfo->ns, whyMessage);
+
+ if (!distLockStatus.isOK()) {
+ const std::string msg = str::stream()
+ << "Could not acquire collection lock for " << migration.chunkInfo->ns
+ << " to migrate chunk "
+ << ChunkRange(migration.chunkInfo->minKey, migration.chunkInfo->maxKey).toString()
+ << " due to " << distLockStatus.getStatus().toString();
+ warning() << msg;
+ return {distLockStatus.getStatus().code(), msg};
+ }
+
+ return std::move(distLockStatus.getValue());
+}
+
+} // namespace mongo
diff --git a/src/mongo/s/balancer/migration_manager.h b/src/mongo/s/balancer/migration_manager.h
new file mode 100644
index 00000000000..8089197de9c
--- /dev/null
+++ b/src/mongo/s/balancer/migration_manager.h
@@ -0,0 +1,192 @@
+/**
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/executor/task_executor.h"
+#include "mongo/s/balancer/balancer_chunk_selection_policy.h"
+#include "mongo/s/catalog/dist_lock_manager.h"
+#include "mongo/stdx/mutex.h"
+
+namespace mongo {
+
+class OperationContext;
+class Status;
+class MigrationSecondaryThrottleOptions;
+
+// Uniquely identifies a migration, regardless of shard and version.
+typedef std::string MigrationIdentifier;
+typedef std::map<MigrationIdentifier, Status> MigrationStatuses;
+
+/**
+ * Manages and executes parallel migrations for the balancer.
+ *
+ * TODO: for v3.6, remove code making compatible with v3.2 shards that take distlock.
+ */
+class MigrationManager {
+ MONGO_DISALLOW_COPYING(MigrationManager);
+
+public:
+ MigrationManager() = default;
+ ~MigrationManager() = default;
+
+ /**
+ * A blocking method that attempts to schedule all the migrations specified in
+ * "candidateMigrations". Takes the distributed lock for each collection with a chunk being
+ * migrated.
+ *
+ * Returns a map of migration Status objects to indicate the success/failure of each migration.
+ */
+ MigrationStatuses scheduleMigrations(
+ OperationContext* txn,
+ const BalancerChunkSelectionPolicy::MigrateInfoVector& candidateMigrations);
+
+private:
+ /**
+ * Holds the data associated with an ongoing migration. Stores a callback handle for the
+ * moveChunk command when one is scheduled. Also holds a flag that indicates the source shard is
+ * v3.2 and must take the distributed lock itself.
+ */
+ struct Migration {
+ Migration(const MigrateInfo& migrateInfo,
+ boost::optional<executor::TaskExecutor::CallbackHandle> callbackHandle);
+
+ void setCallbackHandle(executor::TaskExecutor::CallbackHandle callbackHandle);
+ void clearCallbackHandle();
+
+ // Pointer to the chunk details.
+ const MigrateInfo* chunkInfo;
+
+ // Callback handle for the active moveChunk request. If no migration is active for the chunk
+ // specified in "chunkInfo", this won't be set.
+ boost::optional<executor::TaskExecutor::CallbackHandle> moveChunkCallbackHandle;
+
+ // Indicates that the first moveChunk request failed with LockBusy. The second attempt must
+ // be made without the balancer holding the collection distlock. This is necessary for
+ // compatibility with a v3.2 shard, which expects to take the distlock itself.
+ bool oldShard;
+ };
+
+ /**
+ * Manages and maintains a collection distlock, which should normally be held by the balancer,
+ * but in the case of a migration with a v3.2 source shard the balancer must release it in order
+ * to allow the shard to acquire it.
+ */
+ struct DistLockTracker {
+ DistLockTracker(boost::optional<DistLockManager::ScopedDistLock> distlock);
+
+ // Holds the distributed lock, if the balancer should hold it for the migration. If this is
+ // empty, then a shard has the distlock.
+ boost::optional<DistLockManager::ScopedDistLock> distributedLock;
+
+ // The number of migrations that are currently using the balancer held distributed lock.
+ int migrationCounter;
+ };
+
+ /**
+ * Blocking function that schedules all the migrations prepared in "_activeMigrations" and then
+ * waits for them all to complete. This is also where the distributed locks are taken. Some
+ * migrations may be rescheduled for a recursive call of this function if there are distributed
+ * lock conflicts. A lock conflict can occur when:
+ * 1) The source shard of a migration is v3.2 and expects to take the lock itself and the
+ * balancer already holds it for a different migration.
+ * 2) A v3.2 shard already has the distlock, so it isn't free for either the balancer to
+ * take or another v3.2 shard.
+ * All lock conflicts are resolved by waiting for all of the scheduled migrations to complete,
+ * at which point all the locks are safely released.
+ *
+ * All the moveChunk command Status results are placed in "migrationStatuses".
+ */
+ void _executeMigrations(OperationContext* txn,
+ std::map<MigrationIdentifier, Status>* migrationStatuses);
+
+ /**
+ * Callback function that checks a remote command response for errors. If there is a LockBusy
+ * error, the first time this happens the shard starting the migration is assumed to be v3.2 and
+ * is marked such and rescheduled. On other errors, the migration is abandoned. Places all of
+ * the Status results from the moveChunk commands in "migrationStatuses".
+ */
+ void _checkMigrationCallback(
+ const executor::TaskExecutor::RemoteCommandCallbackArgs& callbackArgs,
+ OperationContext* txn,
+ Migration* migration,
+ std::map<MigrationIdentifier, Status>* migrationStatuses);
+
+ /**
+ * Goes through the callback handles in "_activeMigrations" and waits for the moveChunk commands
+ * to return.
+ */
+ void _waitForMigrations(OperationContext* txn) const;
+
+ /**
+ * Adds "migration" to "_rescheduledMigrations" vector.
+ */
+ void _rescheduleMigration(const Migration& migration);
+
+ /**
+ * Attempts to take a distlock for collection "ns", if appropriate. It may
+ * 1) Take the distlock for the balancer and initialize the counter to 1.
+ * 2) Increment the counter on the distlock that the balancer already holds.
+ * 3) Initialize the counter to 0 to indicate a migration with a v3.2 shard, where the shard
+ * will take the distlock.
+ *
+ * If none of these actions are possible because of a lock conflict (shard can't take the lock
+ * if the balancer already holds it, or vice versa) or if the lock is unavailable, returns
+ * false to indicate that the migration cannot proceed right now. If the lock could not be
+ * taken because of a lock conflict as described, then the migration is rescheduled; otherwise
+ * it is abandoned.
+ *
+ * If an attempt to acquire the distributed lock fails and the migration is abandoned, the error
+ * Status is placed in "migrationStatuses".
+ */
+ bool _takeDistLockForAMigration(OperationContext* txn,
+ const Migration& migration,
+ MigrationStatuses* migrationStatuses);
+
+ /**
+ * Attempts to acquire the distributed collection lock necessary required for "migration".
+ */
+ StatusWith<DistLockManager::ScopedDistLock> _getDistLock(OperationContext* txn,
+ const Migration& migration);
+
+ // Protects class variables when migrations run in parallel.
+ stdx::mutex _mutex;
+
+ // Holds information about each ongoing migration.
+ std::vector<Migration> _activeMigrations;
+
+ // Temporary container for migrations that must be rescheduled. After all of the
+ // _activeMigrations are finished, this variable is used to reset _activeMigrations before
+ // executing migrations again.
+ std::vector<Migration> _rescheduledMigrations;
+
+ // Manages the distributed locks and whether the balancer or shard holds them.
+ std::map<std::string, DistLockTracker> _distributedLocks;
+};
+
+} // namespace mongo
diff --git a/src/mongo/s/balancer/migration_manager_test.cpp b/src/mongo/s/balancer/migration_manager_test.cpp
new file mode 100644
index 00000000000..ed419ba5e77
--- /dev/null
+++ b/src/mongo/s/balancer/migration_manager_test.cpp
@@ -0,0 +1,282 @@
+/**
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/client/remote_command_targeter_mock.h"
+#include "mongo/db/client.h"
+#include "mongo/db/write_concern_options.h"
+#include "mongo/s/balancer/migration_manager.h"
+#include "mongo/s/catalog/dist_lock_manager_mock.h"
+#include "mongo/s/catalog/replset/sharding_catalog_client_impl.h"
+#include "mongo/s/catalog/type_collection.h"
+#include "mongo/s/catalog/type_database.h"
+#include "mongo/s/catalog/type_shard.h"
+#include "mongo/s/config_server_test_fixture.h"
+#include "mongo/s/move_chunk_request.h"
+#include "mongo/stdx/memory.h"
+
+namespace mongo {
+namespace {
+
+using executor::RemoteCommandRequest;
+using executor::RemoteCommandResponse;
+
+const auto kShardId0 = ShardId("shard0");
+const auto kShardId1 = ShardId("shard1");
+const auto kShardId2 = ShardId("shard2");
+const auto kShardId3 = ShardId("shard3");
+
+const HostAndPort kShardHost0 = HostAndPort("TestHost0", 12345);
+const HostAndPort kShardHost1 = HostAndPort("TestHost1", 12346);
+const HostAndPort kShardHost2 = HostAndPort("TestHost2", 12347);
+const HostAndPort kShardHost3 = HostAndPort("TestHost3", 12348);
+
+const long long kMaxSizeMB = 100;
+const std::string kPattern = "_id";
+
+const WriteConcernOptions kMajorityWriteConcern(WriteConcernOptions::kMajority,
+ WriteConcernOptions::SyncMode::UNSET,
+ Seconds(15));
+
+class MigrationManagerTest : public ConfigServerTestFixture {
+protected:
+ std::unique_ptr<MigrationManager> _migrationManager;
+
+ /**
+ * Returns the mock targeter for the specified shard. Useful to use like so
+ *
+ * shardTargeterMock(txn, shardId)->setFindHostReturnValue(shardHost);
+ *
+ * Then calls to RemoteCommandTargeterMock::findHost will return HostAndPort "shardHost" for
+ * Shard "shardId".
+ *
+ * Scheduling a command requires a shard host target. The command will be caught by the mock
+ * network, but sending the command requires finding the shard's host.
+ */
+ std::shared_ptr<RemoteCommandTargeterMock> shardTargeterMock(OperationContext* txn,
+ ShardId shardId);
+
+ /**
+ * Inserts a document into the config.databases collection to indicate that "dbName" is sharded
+ * with primary "primaryShard".
+ */
+ void setUpDatabase(const std::string& dbName, const ShardId primaryShard);
+
+ /**
+ * Inserts a document into the config.collections collection to indicate that "collName" is
+ * sharded with version "version". The shard key pattern defaults to "_id".
+ */
+ void setUpCollection(const std::string collName, ChunkVersion version);
+
+ /**
+ * Inserts a document into the config.chunks collection so that the chunk defined by the
+ * parameters exists. Returns a ChunkType defined by the parameters.
+ */
+ ChunkType setUpChunk(const std::string& collName,
+ const BSONObj& chunkMin,
+ const BSONObj& chunkMax,
+ const ShardId& fromShardId,
+ const ShardId& toShardId,
+ const ChunkVersion& version);
+
+ /**
+ * Sets up mock network to expect a moveChunk command and return "returnStatus".
+ */
+ void expectMoveChunkCommand(const ChunkType& chunk,
+ const ShardId& toShardId,
+ const bool& takeDistLock,
+ const Status& returnStatus);
+
+private:
+ void setUp() override;
+ void tearDown() override;
+
+public:
+ // Random static initialization order can result in X constructor running before Y constructor
+ // if X and Y are defined in different source files. Defining variables here to enforce order.
+ const BSONObj shardType0 =
+ BSON(ShardType::name(kShardId0.toString()) << ShardType::host(kShardHost0.toString())
+ << ShardType::maxSizeMB(kMaxSizeMB));
+ const BSONObj shardType1 =
+ BSON(ShardType::name(kShardId1.toString()) << ShardType::host(kShardHost1.toString())
+ << ShardType::maxSizeMB(kMaxSizeMB));
+ const BSONObj shardType2 =
+ BSON(ShardType::name(kShardId2.toString()) << ShardType::host(kShardHost2.toString())
+ << ShardType::maxSizeMB(kMaxSizeMB));
+ const BSONObj shardType3 =
+ BSON(ShardType::name(kShardId3.toString()) << ShardType::host(kShardHost3.toString())
+ << ShardType::maxSizeMB(kMaxSizeMB));
+ const KeyPattern kKeyPattern = KeyPattern(BSON(kPattern << 1));
+};
+
+void MigrationManagerTest::setUp() {
+ _migrationManager = stdx::make_unique<MigrationManager>();
+ ConfigServerTestFixture::setUp();
+}
+
+void MigrationManagerTest::tearDown() {
+ _migrationManager.reset();
+ ConfigServerTestFixture::tearDown();
+}
+
+std::shared_ptr<RemoteCommandTargeterMock> MigrationManagerTest::shardTargeterMock(
+ OperationContext* txn, ShardId shardId) {
+ return RemoteCommandTargeterMock::get(shardRegistry()->getShard(txn, shardId)->getTargeter());
+}
+
+void MigrationManagerTest::setUpDatabase(const std::string& dbName, const ShardId primaryShard) {
+ DatabaseType db;
+ db.setName(dbName);
+ db.setPrimary(primaryShard);
+ db.setSharded(true);
+ ASSERT(catalogClient()
+ ->insertConfigDocument(
+ operationContext(), DatabaseType::ConfigNS, db.toBSON(), kMajorityWriteConcern)
+ .isOK());
+}
+
+void MigrationManagerTest::setUpCollection(const std::string collName, ChunkVersion version) {
+ CollectionType coll;
+ coll.setNs(NamespaceString(collName));
+ coll.setEpoch(version.epoch());
+ coll.setUpdatedAt(Date_t::fromMillisSinceEpoch(version.toLong()));
+ coll.setKeyPattern(kKeyPattern);
+ coll.setUnique(false);
+ ASSERT(
+ catalogClient()
+ ->insertConfigDocument(
+ operationContext(), CollectionType::ConfigNS, coll.toBSON(), kMajorityWriteConcern)
+ .isOK());
+}
+
+ChunkType MigrationManagerTest::setUpChunk(const std::string& collName,
+ const BSONObj& chunkMin,
+ const BSONObj& chunkMax,
+ const ShardId& fromShardId,
+ const ShardId& toShardId,
+ const ChunkVersion& version) {
+ ChunkType chunk;
+ chunk.setNS(collName);
+ chunk.setMin(chunkMin);
+ chunk.setMax(chunkMax);
+ chunk.setShard(fromShardId);
+ chunk.setVersion(version);
+ ASSERT(catalogClient()
+ ->insertConfigDocument(
+ operationContext(), ChunkType::ConfigNS, chunk.toBSON(), kMajorityWriteConcern)
+ .isOK());
+ return chunk;
+}
+
+void MigrationManagerTest::expectMoveChunkCommand(const ChunkType& chunk,
+ const ShardId& toShardId,
+ const bool& takeDistLock,
+ const Status& returnStatus) {
+ onCommand(
+ [&chunk, &toShardId, &takeDistLock, &returnStatus](const RemoteCommandRequest& request) {
+ NamespaceString nss(request.cmdObj.firstElement().valueStringData());
+ ASSERT_EQ(chunk.getNS(), nss.ns());
+
+ const StatusWith<MoveChunkRequest> moveChunkRequestWithStatus =
+ MoveChunkRequest::createFromCommand(nss, request.cmdObj);
+ ASSERT(moveChunkRequestWithStatus.isOK());
+
+ ASSERT_EQ(chunk.getNS(), moveChunkRequestWithStatus.getValue().getNss().ns());
+ ASSERT_EQ(chunk.getMin(), moveChunkRequestWithStatus.getValue().getMinKey());
+ ASSERT_EQ(chunk.getMax(), moveChunkRequestWithStatus.getValue().getMaxKey());
+ ASSERT_EQ(chunk.getShard(), moveChunkRequestWithStatus.getValue().getFromShardId());
+
+ ASSERT_EQ(toShardId, moveChunkRequestWithStatus.getValue().getToShardId());
+ ASSERT_EQ(takeDistLock, moveChunkRequestWithStatus.getValue().getTakeDistLock());
+
+ if (returnStatus.isOK()) {
+ return BSON("ok" << 1);
+ } else {
+ BSONObjBuilder builder;
+ builder.append("ok", 0);
+ builder.append("code", returnStatus.code());
+ builder.append("errmsg", returnStatus.reason());
+ return builder.obj();
+ }
+ });
+}
+
+TEST_F(MigrationManagerTest, TwoSameCollectionMigrations) {
+ // Set up two shards in the metadata.
+ ASSERT(catalogClient()
+ ->insertConfigDocument(
+ operationContext(), ShardType::ConfigNS, shardType0, kMajorityWriteConcern)
+ .isOK());
+ ASSERT(catalogClient()
+ ->insertConfigDocument(
+ operationContext(), ShardType::ConfigNS, shardType2, kMajorityWriteConcern)
+ .isOK());
+
+ // Set up the database and collection as sharded in the metadata.
+ std::string dbName = "foo";
+ std::string collName = "foo.bar";
+ ChunkVersion version(2, 0, OID::gen());
+
+ setUpDatabase(dbName, kShardId0);
+ setUpCollection(collName, version);
+
+ // Set up two chunks in the metadata.
+ BalancerChunkSelectionPolicy::MigrateInfoVector candidateChunks;
+ ChunkType chunk1 = setUpChunk(
+ collName, kKeyPattern.globalMin(), BSON(kPattern << 49), kShardId0, kShardId1, version);
+ version.incMinor();
+ ChunkType chunk2 = setUpChunk(
+ collName, BSON(kPattern << 49), kKeyPattern.globalMax(), kShardId2, kShardId3, version);
+
+ // Going to request that these two chunks get migrated.
+ candidateChunks.push_back(MigrateInfo(chunk1.getNS(), kShardId1, chunk1));
+ candidateChunks.push_back(MigrateInfo(chunk2.getNS(), kShardId3, chunk2));
+
+ auto future = launchAsync([this, candidateChunks] {
+ Client::initThreadIfNotAlready("Test");
+ auto txn = cc().makeOperationContext();
+
+ // Scheduling the moveChunk commands requires finding a host to which to send the command.
+ // Set up some dummy hosts -- moveChunk commands are going to hit the mock network anyway.
+ shardTargeterMock(txn.get(), kShardId0)->setFindHostReturnValue(kShardHost0);
+ shardTargeterMock(txn.get(), kShardId2)->setFindHostReturnValue(kShardHost3);
+
+ _migrationManager->scheduleMigrations(txn.get(), candidateChunks);
+ });
+
+ // Expect two moveChunk commands.
+ expectMoveChunkCommand(chunk1, kShardId1, false, Status::OK());
+ expectMoveChunkCommand(chunk2, kShardId3, false, Status::OK());
+
+ // Run the MigrationManager code.
+ future.timed_get(kFutureTimeout);
+}
+
+} // namespace
+} // namespace mongo
diff --git a/src/mongo/s/client/shard_local.cpp b/src/mongo/s/client/shard_local.cpp
index 3eca849b2b5..085dbce0e40 100644
--- a/src/mongo/s/client/shard_local.cpp
+++ b/src/mongo/s/client/shard_local.cpp
@@ -54,13 +54,14 @@ const Status kInternalErrorStatus{ErrorCodes::InternalError,
"Invalid to check for write concern error if command failed"};
} // namespace
+ShardLocal::ShardLocal(const ShardId& id) : Shard(id) {
+ // Currently ShardLocal only works for config servers. If we ever start using ShardLocal on
+ // shards we'll need to consider how to handle shards.
+ invariant(serverGlobalParams.clusterRole == ClusterRole::ConfigServer);
+}
+
const ConnectionString ShardLocal::getConnString() const {
auto replCoord = repl::getGlobalReplicationCoordinator();
-
- // Currently ShardLocal only works for config servers, which must be replica sets. If we
- // ever start using ShardLocal on shards we'll need to consider how to handle shards that are
- // not replica sets.
- invariant(replCoord->getReplicationMode() == repl::ReplicationCoordinator::modeReplSet);
return replCoord->getConfig().getConnectionString();
}
diff --git a/src/mongo/s/client/shard_local.h b/src/mongo/s/client/shard_local.h
index 27e2edbea40..56358b449dc 100644
--- a/src/mongo/s/client/shard_local.h
+++ b/src/mongo/s/client/shard_local.h
@@ -40,7 +40,7 @@ class ShardLocal : public Shard {
MONGO_DISALLOW_COPYING(ShardLocal);
public:
- explicit ShardLocal(const ShardId& id) : Shard(id) {}
+ explicit ShardLocal(const ShardId& id);
~ShardLocal() = default;
diff --git a/src/mongo/s/client/shard_local_test.cpp b/src/mongo/s/client/shard_local_test.cpp
index b79fef28600..35a9359b372 100644
--- a/src/mongo/s/client/shard_local_test.cpp
+++ b/src/mongo/s/client/shard_local_test.cpp
@@ -82,6 +82,7 @@ void ShardLocalTest::setUp() {
ServiceContextMongoDTest::setUp();
Client::initThreadIfNotAlready();
_txn = getGlobalServiceContext()->makeOperationContext(&cc());
+ serverGlobalParams.clusterRole = ClusterRole::ConfigServer;
_shardLocal = stdx::make_unique<ShardLocal>(ShardId("shardOrConfig"));
const repl::ReplSettings replSettings = {};
repl::setGlobalReplicationCoordinator(new repl::ReplicationCoordinatorMock(replSettings));
@@ -153,7 +154,7 @@ StatusWith<Shard::QueryResponse> ShardLocalTest::runFindQuery(NamespaceString ns
}
TEST_F(ShardLocalTest, RunCommand) {
- NamespaceString nss("foo.bar");
+ NamespaceString nss("admin.bar");
StatusWith<Shard::CommandResponse> findAndModifyResponse = runFindAndModifyRunCommand(
nss, BSON("fooItem" << 1), BSON("$set" << BSON("fooRandom" << 254)));
@@ -165,7 +166,7 @@ TEST_F(ShardLocalTest, RunCommand) {
}
TEST_F(ShardLocalTest, FindOneWithoutLimit) {
- NamespaceString nss("foo.bar");
+ NamespaceString nss("admin.bar");
// Set up documents to be queried.
StatusWith<Shard::CommandResponse> findAndModifyResponse = runFindAndModifyRunCommand(
@@ -189,7 +190,7 @@ TEST_F(ShardLocalTest, FindOneWithoutLimit) {
}
TEST_F(ShardLocalTest, FindManyWithLimit) {
- NamespaceString nss("foo.bar");
+ NamespaceString nss("admin.bar");
// Set up documents to be queried.
StatusWith<Shard::CommandResponse> findAndModifyResponse = runFindAndModifyRunCommand(
@@ -219,7 +220,7 @@ TEST_F(ShardLocalTest, FindManyWithLimit) {
}
TEST_F(ShardLocalTest, FindNoMatchingDocumentsEmpty) {
- NamespaceString nss("foo.bar");
+ NamespaceString nss("admin.bar");
// Set up a document.
StatusWith<Shard::CommandResponse> findAndModifyResponse = runFindAndModifyRunCommand(
diff --git a/src/mongo/s/config_server_test_fixture.cpp b/src/mongo/s/config_server_test_fixture.cpp
index c04d0df2a06..aed55611dc1 100644
--- a/src/mongo/s/config_server_test_fixture.cpp
+++ b/src/mongo/s/config_server_test_fixture.cpp
@@ -52,7 +52,8 @@
#include "mongo/rpc/metadata/server_selection_metadata.h"
#include "mongo/s/balancer/balancer_configuration.h"
#include "mongo/s/catalog/catalog_cache.h"
-#include "mongo/s/catalog/dist_lock_manager_mock.h"
+#include "mongo/s/catalog/replset/dist_lock_catalog_impl.h"
+#include "mongo/s/catalog/replset/replset_dist_lock_manager.h"
#include "mongo/s/catalog/replset/sharding_catalog_client_impl.h"
#include "mongo/s/catalog/replset/sharding_catalog_manager_impl.h"
#include "mongo/s/catalog/type_changelog.h"
@@ -104,8 +105,23 @@ void ConfigServerTestFixture::setUp() {
_opCtx = cc().makeOperationContext();
repl::ReplSettings replSettings;
- repl::ReplicationCoordinator::set(
- serviceContext, stdx::make_unique<repl::ReplicationCoordinatorMock>(replSettings));
+ auto replCoord = stdx::make_unique<repl::ReplicationCoordinatorMock>(replSettings);
+ repl::ReplicaSetConfig config;
+ config.initialize(BSON("_id"
+ << "mySet"
+ << "protocolVersion"
+ << 1
+ << "version"
+ << 3
+ << "members"
+ << BSON_ARRAY(BSON("host"
+ << "node2:12345"
+ << "_id"
+ << 1))));
+ replCoord->setGetConfigReturnValue(config);
+ repl::ReplicationCoordinator::set(serviceContext, std::move(replCoord));
+
+ serverGlobalParams.clusterRole = ClusterRole::ConfigServer;
// Set up executor pool used for most operations.
auto fixedNet = stdx::make_unique<executor::NetworkInterfaceMock>();
@@ -131,18 +147,6 @@ void ConfigServerTestFixture::setUp() {
_addShardNetworkTestEnv = stdx::make_unique<NetworkTestEnv>(specialExec.get(), specialMockNet);
_executorForAddShard = specialExec.get();
- auto uniqueDistLockManager = stdx::make_unique<DistLockManagerMock>();
- _distLockManager = uniqueDistLockManager.get();
- std::unique_ptr<ShardingCatalogClientImpl> catalogClient(
- stdx::make_unique<ShardingCatalogClientImpl>(std::move(uniqueDistLockManager)));
- _catalogClient = catalogClient.get();
- catalogClient->startup();
-
- std::unique_ptr<ShardingCatalogManagerImpl> catalogManager(
- stdx::make_unique<ShardingCatalogManagerImpl>(_catalogClient, std::move(specialExec)));
- _catalogManager = catalogManager.get();
- catalogManager->startup();
-
auto targeterFactory(stdx::make_unique<RemoteCommandTargeterFactoryMock>());
auto targeterFactoryPtr = targeterFactory.get();
_targeterFactory = targeterFactoryPtr;
@@ -177,6 +181,23 @@ void ConfigServerTestFixture::setUp() {
stdx::make_unique<ShardRegistry>(std::move(shardFactory), ConnectionString::forLocal()));
executorPool->startup();
+ auto distLockCatalog = stdx::make_unique<DistLockCatalogImpl>(shardRegistry.get());
+
+ auto uniqueDistLockManager =
+ stdx::make_unique<ReplSetDistLockManager>(serviceContext,
+ "distLockProcessId",
+ std::move(distLockCatalog),
+ ReplSetDistLockManager::kDistLockPingInterval,
+ ReplSetDistLockManager::kDistLockExpirationTime);
+ _distLockManager = uniqueDistLockManager.get();
+ std::unique_ptr<ShardingCatalogClientImpl> catalogClient(
+ stdx::make_unique<ShardingCatalogClientImpl>(std::move(uniqueDistLockManager)));
+ _catalogClient = catalogClient.get();
+
+ std::unique_ptr<ShardingCatalogManagerImpl> catalogManager(
+ stdx::make_unique<ShardingCatalogManagerImpl>(_catalogClient, std::move(specialExec)));
+ _catalogManager = catalogManager.get();
+
// For now initialize the global grid object. All sharding objects will be accessible from there
// until we get rid of it.
grid.init(std::move(catalogClient),
@@ -187,6 +208,9 @@ void ConfigServerTestFixture::setUp() {
stdx::make_unique<BalancerConfiguration>(),
std::move(executorPool),
_mockNetwork);
+
+ _catalogClient->startup();
+ _catalogManager->startup();
}
void ConfigServerTestFixture::tearDown() {
@@ -255,7 +279,7 @@ MessagingPortMock* ConfigServerTestFixture::getMessagingPort() const {
return _messagePort.get();
}
-DistLockManagerMock* ConfigServerTestFixture::distLock() const {
+ReplSetDistLockManager* ConfigServerTestFixture::distLock() const {
invariant(_distLockManager);
return _distLockManager;
}
diff --git a/src/mongo/s/config_server_test_fixture.h b/src/mongo/s/config_server_test_fixture.h
index 7deabf1b323..38d348a78f0 100644
--- a/src/mongo/s/config_server_test_fixture.h
+++ b/src/mongo/s/config_server_test_fixture.h
@@ -42,7 +42,7 @@ class BSONObj;
class CatalogCache;
struct ChunkVersion;
class CollectionType;
-class DistLockManagerMock;
+class ReplSetDistLockManager;
class NamespaceString;
class Shard;
class ShardFactoryMock;
@@ -100,7 +100,7 @@ public:
MessagingPortMock* getMessagingPort() const;
- DistLockManagerMock* distLock() const;
+ ReplSetDistLockManager* distLock() const;
OperationContext* operationContext() const;
@@ -172,7 +172,7 @@ private:
executor::TaskExecutor* _executorForAddShard;
std::unique_ptr<executor::NetworkTestEnv> _networkTestEnv;
std::unique_ptr<executor::NetworkTestEnv> _addShardNetworkTestEnv;
- DistLockManagerMock* _distLockManager = nullptr;
+ ReplSetDistLockManager* _distLockManager = nullptr;
ShardingCatalogClientImpl* _catalogClient = nullptr;
ShardingCatalogManagerImpl* _catalogManager = nullptr;
};