summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKaloian Manassiev <kaloian.manassiev@mongodb.com>2016-09-28 15:33:05 -0400
committerKaloian Manassiev <kaloian.manassiev@mongodb.com>2016-09-29 13:28:48 -0400
commitb12af9e243b98b50d5918d9afa9354b9935cebf0 (patch)
tree0c866ad5bc76d5901799a9adac9fc14bd71c9955
parentb4d55086ed9a5d49709409c9c66652f12fa3d932 (diff)
downloadmongo-b12af9e243b98b50d5918d9afa9354b9935cebf0.tar.gz
SERVER-26370 Cleanup MigrationDestinationManager
-rw-r--r--src/mongo/db/s/active_migrations_registry.cpp26
-rw-r--r--src/mongo/db/s/active_migrations_registry.h38
-rw-r--r--src/mongo/db/s/active_migrations_registry_test.cpp56
-rw-r--r--src/mongo/db/s/cleanup_orphaned_cmd.cpp2
-rw-r--r--src/mongo/db/s/merge_chunks_command.cpp4
-rw-r--r--src/mongo/db/s/migration_chunk_cloner_source_legacy_commands.cpp2
-rw-r--r--src/mongo/db/s/migration_destination_manager.cpp82
-rw-r--r--src/mongo/db/s/migration_destination_manager.h9
-rw-r--r--src/mongo/db/s/migration_destination_manager_legacy_commands.cpp13
-rw-r--r--src/mongo/db/s/migration_source_manager.cpp7
-rw-r--r--src/mongo/db/s/move_chunk_command.cpp2
-rw-r--r--src/mongo/db/s/sharding_state.cpp13
-rw-r--r--src/mongo/db/s/sharding_state.h14
-rw-r--r--src/mongo/db/s/split_chunk_command.cpp5
14 files changed, 135 insertions, 138 deletions
diff --git a/src/mongo/db/s/active_migrations_registry.cpp b/src/mongo/db/s/active_migrations_registry.cpp
index ca38749c373..35a765a89c2 100644
--- a/src/mongo/db/s/active_migrations_registry.cpp
+++ b/src/mongo/db/s/active_migrations_registry.cpp
@@ -44,16 +44,16 @@ ActiveMigrationsRegistry::~ActiveMigrationsRegistry() {
invariant(!_activeMoveChunkState);
}
-StatusWith<ScopedRegisterMigration> ActiveMigrationsRegistry::registerMigration(
+StatusWith<ScopedRegisterDonateChunk> ActiveMigrationsRegistry::registerDonateChunk(
const MoveChunkRequest& args) {
stdx::lock_guard<stdx::mutex> lk(_mutex);
if (!_activeMoveChunkState) {
_activeMoveChunkState.emplace(args);
- return {ScopedRegisterMigration(this, true, _activeMoveChunkState->notification)};
+ return {ScopedRegisterDonateChunk(this, true, _activeMoveChunkState->notification)};
}
if (_activeMoveChunkState->args == args) {
- return {ScopedRegisterMigration(nullptr, false, _activeMoveChunkState->notification)};
+ return {ScopedRegisterDonateChunk(nullptr, false, _activeMoveChunkState->notification)};
}
return {
@@ -63,7 +63,7 @@ StatusWith<ScopedRegisterMigration> ActiveMigrationsRegistry::registerMigration(
<< _activeMoveChunkState->args.getNss().ns()};
}
-boost::optional<NamespaceString> ActiveMigrationsRegistry::getActiveMigrationNss() {
+boost::optional<NamespaceString> ActiveMigrationsRegistry::getActiveDonateChunkNss() {
stdx::lock_guard<stdx::mutex> lk(_mutex);
if (_activeMoveChunkState) {
return _activeMoveChunkState->args.getNss();
@@ -99,13 +99,13 @@ BSONObj ActiveMigrationsRegistry::getActiveMigrationStatusReport(OperationContex
return BSONObj();
}
-void ActiveMigrationsRegistry::_clearMigration() {
+void ActiveMigrationsRegistry::_clearDonateChunk() {
stdx::lock_guard<stdx::mutex> lk(_mutex);
invariant(_activeMoveChunkState);
_activeMoveChunkState.reset();
}
-ScopedRegisterMigration::ScopedRegisterMigration(
+ScopedRegisterDonateChunk::ScopedRegisterDonateChunk(
ActiveMigrationsRegistry* registry,
bool forUnregister,
std::shared_ptr<Notification<Status>> completionNotification)
@@ -113,35 +113,35 @@ ScopedRegisterMigration::ScopedRegisterMigration(
_forUnregister(forUnregister),
_completionNotification(std::move(completionNotification)) {}
-ScopedRegisterMigration::~ScopedRegisterMigration() {
+ScopedRegisterDonateChunk::~ScopedRegisterDonateChunk() {
if (_registry && _forUnregister) {
// If this is a newly started migration the caller must always signal on completion
invariant(*_completionNotification);
- _registry->_clearMigration();
+ _registry->_clearDonateChunk();
}
}
-ScopedRegisterMigration::ScopedRegisterMigration(ScopedRegisterMigration&& other) {
+ScopedRegisterDonateChunk::ScopedRegisterDonateChunk(ScopedRegisterDonateChunk&& other) {
*this = std::move(other);
}
-ScopedRegisterMigration& ScopedRegisterMigration::operator=(ScopedRegisterMigration&& other) {
+ScopedRegisterDonateChunk& ScopedRegisterDonateChunk::operator=(ScopedRegisterDonateChunk&& other) {
if (&other != this) {
_registry = other._registry;
+ other._registry = nullptr;
_forUnregister = other._forUnregister;
_completionNotification = std::move(other._completionNotification);
- other._registry = nullptr;
}
return *this;
}
-void ScopedRegisterMigration::complete(Status status) {
+void ScopedRegisterDonateChunk::complete(Status status) {
invariant(_forUnregister);
_completionNotification->set(status);
}
-Status ScopedRegisterMigration::waitForCompletion(OperationContext* txn) {
+Status ScopedRegisterDonateChunk::waitForCompletion(OperationContext* txn) {
invariant(!_forUnregister);
return _completionNotification->get(txn);
}
diff --git a/src/mongo/db/s/active_migrations_registry.h b/src/mongo/db/s/active_migrations_registry.h
index 1415a4c469f..06ffeaece07 100644
--- a/src/mongo/db/s/active_migrations_registry.h
+++ b/src/mongo/db/s/active_migrations_registry.h
@@ -39,7 +39,7 @@
namespace mongo {
class OperationContext;
-class ScopedRegisterMigration;
+class ScopedRegisterDonateChunk;
template <typename T>
class StatusWith;
@@ -56,22 +56,22 @@ public:
/**
* If there are no migrations running on this shard, registers an active migration with the
- * specified arguments and returns a ScopedRegisterMigration, which must be signaled by the
+ * specified arguments and returns a ScopedRegisterDonateChunk, which must be signaled by the
* caller before it goes out of scope.
*
* If there is an active migration already running on this shard and it has the exact same
- * arguments, returns a ScopedRegisterMigration, which can be used to join the already running
+ * arguments, returns a ScopedRegisterDonateChunk, which can be used to join the already running
* migration.
*
* Otherwise returns a ConflictingOperationInProgress error.
*/
- StatusWith<ScopedRegisterMigration> registerMigration(const MoveChunkRequest& args);
+ StatusWith<ScopedRegisterDonateChunk> registerDonateChunk(const MoveChunkRequest& args);
/**
- * If a migration has been previously registered through a call to registerMigration returns
+ * If a migration has been previously registered through a call to registerDonateChunk returns
* that namespace. Otherwise returns boost::none.
*/
- boost::optional<NamespaceString> getActiveMigrationNss();
+ boost::optional<NamespaceString> getActiveDonateChunkNss();
/**
* Returns a report on the active migration if there currently is one. Otherwise, returns an
@@ -82,7 +82,7 @@ public:
BSONObj getActiveMigrationStatusReport(OperationContext* txn);
private:
- friend class ScopedRegisterMigration;
+ friend class ScopedRegisterDonateChunk;
// Describes the state of a currently active moveChunk operation
struct ActiveMoveChunkState {
@@ -98,9 +98,9 @@ private:
/**
* Unregisters a previously registered namespace with ongoing migration. Must only be called if
- * a previous call to registerMigration has succeeded.
+ * a previous call to registerDonateChunk has succeeded.
*/
- void _clearMigration();
+ void _clearDonateChunk();
// Protects the state below
stdx::mutex _mutex;
@@ -111,21 +111,21 @@ private:
};
/**
- * Object of this class is returned from the registerMigration call of the active migrations
+ * Object of this class is returned from the registerDonateChunk call of the active migrations
* registry. It can exist in two modes - 'unregister' and 'join'. See the comments for
- * registerMigration method for more details.
+ * registerDonateChunk method for more details.
*/
-class ScopedRegisterMigration {
- MONGO_DISALLOW_COPYING(ScopedRegisterMigration);
+class ScopedRegisterDonateChunk {
+ MONGO_DISALLOW_COPYING(ScopedRegisterDonateChunk);
public:
- ScopedRegisterMigration(ActiveMigrationsRegistry* registry,
- bool forUnregister,
- std::shared_ptr<Notification<Status>> completionNotification);
- ~ScopedRegisterMigration();
+ ScopedRegisterDonateChunk(ActiveMigrationsRegistry* registry,
+ bool forUnregister,
+ std::shared_ptr<Notification<Status>> completionNotification);
+ ~ScopedRegisterDonateChunk();
- ScopedRegisterMigration(ScopedRegisterMigration&&);
- ScopedRegisterMigration& operator=(ScopedRegisterMigration&&);
+ ScopedRegisterDonateChunk(ScopedRegisterDonateChunk&&);
+ ScopedRegisterDonateChunk& operator=(ScopedRegisterDonateChunk&&);
/**
* Returns true if the migration object is in the 'unregister' mode, which means that the holder
diff --git a/src/mongo/db/s/active_migrations_registry_test.cpp b/src/mongo/db/s/active_migrations_registry_test.cpp
index 9faabae23c0..74ec900fc49 100644
--- a/src/mongo/db/s/active_migrations_registry_test.cpp
+++ b/src/mongo/db/s/active_migrations_registry_test.cpp
@@ -85,60 +85,60 @@ MoveChunkRequest createMoveChunkRequest(const NamespaceString& nss) {
return assertGet(MoveChunkRequest::createFromCommand(nss, builder.obj()));
}
-TEST_F(MoveChunkRegistration, ScopedRegisterMigrationMoveConstructorAndAssignment) {
- auto originalScopedRegisterMigration = assertGet(
- _registry.registerMigration(createMoveChunkRequest(NamespaceString("TestDB", "TestColl"))));
- ASSERT(originalScopedRegisterMigration.mustExecute());
+TEST_F(MoveChunkRegistration, ScopedRegisterDonateChunkMoveConstructorAndAssignment) {
+ auto originalScopedRegisterDonateChunk = assertGet(_registry.registerDonateChunk(
+ createMoveChunkRequest(NamespaceString("TestDB", "TestColl"))));
+ ASSERT(originalScopedRegisterDonateChunk.mustExecute());
- ScopedRegisterMigration movedScopedRegisterMigration(
- std::move(originalScopedRegisterMigration));
- ASSERT(movedScopedRegisterMigration.mustExecute());
+ ScopedRegisterDonateChunk movedScopedRegisterDonateChunk(
+ std::move(originalScopedRegisterDonateChunk));
+ ASSERT(movedScopedRegisterDonateChunk.mustExecute());
- originalScopedRegisterMigration = std::move(movedScopedRegisterMigration);
- ASSERT(originalScopedRegisterMigration.mustExecute());
+ originalScopedRegisterDonateChunk = std::move(movedScopedRegisterDonateChunk);
+ ASSERT(originalScopedRegisterDonateChunk.mustExecute());
// Need to signal the registered migration so the destructor doesn't invariant
- originalScopedRegisterMigration.complete(Status::OK());
+ originalScopedRegisterDonateChunk.complete(Status::OK());
}
TEST_F(MoveChunkRegistration, GetActiveMigrationNamespace) {
- ASSERT(!_registry.getActiveMigrationNss());
+ ASSERT(!_registry.getActiveDonateChunkNss());
const NamespaceString nss("TestDB", "TestColl");
- auto originalScopedRegisterMigration =
- assertGet(_registry.registerMigration(createMoveChunkRequest(nss)));
+ auto originalScopedRegisterDonateChunk =
+ assertGet(_registry.registerDonateChunk(createMoveChunkRequest(nss)));
- ASSERT_EQ(nss.ns(), _registry.getActiveMigrationNss()->ns());
+ ASSERT_EQ(nss.ns(), _registry.getActiveDonateChunkNss()->ns());
// Need to signal the registered migration so the destructor doesn't invariant
- originalScopedRegisterMigration.complete(Status::OK());
+ originalScopedRegisterDonateChunk.complete(Status::OK());
}
TEST_F(MoveChunkRegistration, SecondMigrationReturnsConflictingOperationInProgress) {
- auto originalScopedRegisterMigration = assertGet(_registry.registerMigration(
+ auto originalScopedRegisterDonateChunk = assertGet(_registry.registerDonateChunk(
createMoveChunkRequest(NamespaceString("TestDB", "TestColl1"))));
- auto secondScopedRegisterMigrationStatus =
- _registry.registerMigration(createMoveChunkRequest(NamespaceString("TestDB", "TestColl2")));
+ auto secondScopedRegisterDonateChunkStatus = _registry.registerDonateChunk(
+ createMoveChunkRequest(NamespaceString("TestDB", "TestColl2")));
ASSERT_EQ(ErrorCodes::ConflictingOperationInProgress,
- secondScopedRegisterMigrationStatus.getStatus());
+ secondScopedRegisterDonateChunkStatus.getStatus());
- originalScopedRegisterMigration.complete(Status::OK());
+ originalScopedRegisterDonateChunk.complete(Status::OK());
}
TEST_F(MoveChunkRegistration, SecondMigrationWithSameArgumentsJoinsFirst) {
- auto originalScopedRegisterMigration = assertGet(
- _registry.registerMigration(createMoveChunkRequest(NamespaceString("TestDB", "TestColl"))));
- ASSERT(originalScopedRegisterMigration.mustExecute());
+ auto originalScopedRegisterDonateChunk = assertGet(_registry.registerDonateChunk(
+ createMoveChunkRequest(NamespaceString("TestDB", "TestColl"))));
+ ASSERT(originalScopedRegisterDonateChunk.mustExecute());
- auto secondScopedRegisterMigration = assertGet(
- _registry.registerMigration(createMoveChunkRequest(NamespaceString("TestDB", "TestColl"))));
- ASSERT(!secondScopedRegisterMigration.mustExecute());
+ auto secondScopedRegisterDonateChunk = assertGet(_registry.registerDonateChunk(
+ createMoveChunkRequest(NamespaceString("TestDB", "TestColl"))));
+ ASSERT(!secondScopedRegisterDonateChunk.mustExecute());
- originalScopedRegisterMigration.complete({ErrorCodes::InternalError, "Test error"});
+ originalScopedRegisterDonateChunk.complete({ErrorCodes::InternalError, "Test error"});
ASSERT_EQ(Status(ErrorCodes::InternalError, "Test error"),
- secondScopedRegisterMigration.waitForCompletion(getTxn()));
+ secondScopedRegisterDonateChunk.waitForCompletion(getTxn()));
}
} // namespace
diff --git a/src/mongo/db/s/cleanup_orphaned_cmd.cpp b/src/mongo/db/s/cleanup_orphaned_cmd.cpp
index 281d1cdcc39..a9b361027bb 100644
--- a/src/mongo/db/s/cleanup_orphaned_cmd.cpp
+++ b/src/mongo/db/s/cleanup_orphaned_cmd.cpp
@@ -243,7 +243,7 @@ public:
}
ChunkVersion shardVersion;
- Status status = shardingState->refreshMetadataNow(txn, ns, &shardVersion);
+ Status status = shardingState->refreshMetadataNow(txn, NamespaceString(ns), &shardVersion);
if (!status.isOK()) {
if (status.code() == ErrorCodes::RemoteChangeDetected) {
warning() << "Shard version in transition detected while refreshing "
diff --git a/src/mongo/db/s/merge_chunks_command.cpp b/src/mongo/db/s/merge_chunks_command.cpp
index db3044db66f..33f31f3315b 100644
--- a/src/mongo/db/s/merge_chunks_command.cpp
+++ b/src/mongo/db/s/merge_chunks_command.cpp
@@ -106,7 +106,7 @@ Status mergeChunks(OperationContext* txn,
//
ChunkVersion shardVersion;
- Status refreshStatus = shardingState->refreshMetadataNow(txn, nss.ns(), &shardVersion);
+ Status refreshStatus = shardingState->refreshMetadataNow(txn, nss, &shardVersion);
if (!refreshStatus.isOK()) {
std::string errmsg = str::stream()
@@ -283,7 +283,7 @@ Status mergeChunks(OperationContext* txn,
//
{
ChunkVersion shardVersionAfterMerge;
- refreshStatus = shardingState->refreshMetadataNow(txn, nss.ns(), &shardVersionAfterMerge);
+ refreshStatus = shardingState->refreshMetadataNow(txn, nss, &shardVersionAfterMerge);
if (!refreshStatus.isOK()) {
std::string errmsg = str::stream() << "failed to refresh metadata for merge chunk ["
diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy_commands.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy_commands.cpp
index 7f7850cb22c..3931ce43893 100644
--- a/src/mongo/db/s/migration_chunk_cloner_source_legacy_commands.cpp
+++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy_commands.cpp
@@ -60,7 +60,7 @@ public:
AutoGetActiveCloner(OperationContext* txn, const MigrationSessionId& migrationSessionId) {
ShardingState* const gss = ShardingState::get(txn);
- const auto nss = gss->getActiveMigrationNss();
+ const auto nss = gss->getActiveDonateChunkNss();
uassert(ErrorCodes::NotYetInitialized, "No active migrations were found", nss);
// Once the collection is locked, the migration status cannot change
diff --git a/src/mongo/db/s/migration_destination_manager.cpp b/src/mongo/db/s/migration_destination_manager.cpp
index 2368ade2b07..8110f684766 100644
--- a/src/mongo/db/s/migration_destination_manager.cpp
+++ b/src/mongo/db/s/migration_destination_manager.cpp
@@ -262,7 +262,7 @@ BSONObj MigrationDestinationManager::getMigrationStatusReport() {
}
}
-Status MigrationDestinationManager::start(const string& ns,
+Status MigrationDestinationManager::start(const NamespaceString& nss,
const MigrationSessionId& sessionId,
const ConnectionString& fromShardConnString,
const ShardId& fromShard,
@@ -290,7 +290,7 @@ Status MigrationDestinationManager::start(const string& ns,
_state = READY;
_errmsg = "";
- _nss = NamespaceString(ns);
+ _nss = nss;
_fromShardConnString = fromShardConnString;
_fromShard = fromShard;
_toShard = toShard;
@@ -314,9 +314,9 @@ Status MigrationDestinationManager::start(const string& ns,
_migrateThreadHandle.join();
}
- _migrateThreadHandle = stdx::thread(
- [this, ns, min, max, shardKeyPattern, fromShardConnString, epoch, writeConcern]() {
- _migrateThread(ns, min, max, shardKeyPattern, fromShardConnString, epoch, writeConcern);
+ _migrateThreadHandle =
+ stdx::thread([this, min, max, shardKeyPattern, fromShardConnString, epoch, writeConcern]() {
+ _migrateThread(min, max, shardKeyPattern, fromShardConnString, epoch, writeConcern);
});
return Status::OK();
@@ -388,8 +388,7 @@ bool MigrationDestinationManager::startCommit(const MigrationSessionId& sessionI
return false;
}
-void MigrationDestinationManager::_migrateThread(std::string ns,
- BSONObj min,
+void MigrationDestinationManager::_migrateThread(BSONObj min,
BSONObj max,
BSONObj shardKeyPattern,
ConnectionString fromShardConnString,
@@ -405,7 +404,7 @@ void MigrationDestinationManager::_migrateThread(std::string ns,
try {
_migrateDriver(
- opCtx.get(), ns, min, max, shardKeyPattern, fromShardConnString, epoch, writeConcern);
+ opCtx.get(), min, max, shardKeyPattern, fromShardConnString, epoch, writeConcern);
} catch (std::exception& e) {
{
stdx::lock_guard<stdx::mutex> sl(_mutex);
@@ -426,7 +425,7 @@ void MigrationDestinationManager::_migrateThread(std::string ns,
if (getState() != DONE) {
// Unprotect the range if needed/possible on unsuccessful TO migration
- Status status = _forgetPending(opCtx.get(), NamespaceString(ns), min, max, epoch);
+ Status status = _forgetPending(opCtx.get(), _nss, min, max, epoch);
if (!status.isOK()) {
warning() << "Failed to remove pending range" << redact(causedBy(status));
}
@@ -438,7 +437,6 @@ void MigrationDestinationManager::_migrateThread(std::string ns,
}
void MigrationDestinationManager::_migrateDriver(OperationContext* txn,
- const string& ns,
const BSONObj& min,
const BSONObj& max,
const BSONObj& shardKeyPattern,
@@ -451,11 +449,12 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* txn,
invariant(!max.isEmpty());
log() << "Starting receiving end of migration of chunk " << redact(min) << " -> " << redact(max)
- << " for collection " << ns << " from " << fromShardConnString << " at epoch "
+ << " for collection " << _nss.ns() << " from " << fromShardConnString << " at epoch "
<< epoch.toString() << " with session id " << *_sessionId;
string errmsg;
- MoveTimingHelper timing(txn, "to", ns, min, max, 6 /* steps */, &errmsg, ShardId(), ShardId());
+ MoveTimingHelper timing(
+ txn, "to", _nss.ns(), min, max, 6 /* steps */, &errmsg, ShardId(), ShardId());
const auto initialState = getState();
@@ -472,16 +471,14 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* txn,
// Just tests the connection
conn->getLastError();
- const NamespaceString nss(ns);
-
DisableDocumentValidation validationDisabler(txn);
{
// 0. copy system.namespaces entry if collection doesn't already exist
- OldClientWriteContext ctx(txn, ns);
- if (!repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(nss)) {
- errmsg = str::stream() << "Not primary during migration: " << ns
+ OldClientWriteContext ctx(txn, _nss.ns());
+ if (!repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(_nss)) {
+ errmsg = str::stream() << "Not primary during migration: " << _nss.ns()
<< ": checking if collection exists";
warning() << errmsg;
setState(FAIL);
@@ -491,10 +488,10 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* txn,
// Only copy if ns doesn't already exist
Database* const db = ctx.db();
- Collection* const collection = db->getCollection(ns);
+ Collection* const collection = db->getCollection(_nss);
if (!collection) {
- std::list<BSONObj> infos = conn->getCollectionInfos(
- nsToDatabase(ns), BSON("name" << nsToCollectionSubstring(ns)));
+ std::list<BSONObj> infos =
+ conn->getCollectionInfos(_nss.db().toString(), BSON("name" << _nss.coll()));
BSONObj options;
if (infos.size() > 0) {
@@ -505,9 +502,9 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* txn,
}
WriteUnitOfWork wuow(txn);
- Status status = userCreateNS(txn, db, ns, options, false);
+ Status status = userCreateNS(txn, db, _nss.ns(), options, false);
if (!status.isOK()) {
- warning() << "failed to create collection [" << ns << "] "
+ warning() << "failed to create collection [" << _nss << "] "
<< " with options " << options << ": " << redact(status);
}
wuow.commit();
@@ -520,25 +517,25 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* txn,
std::vector<BSONObj> indexSpecs;
{
- const std::list<BSONObj> indexes = conn->getIndexSpecs(ns);
+ const std::list<BSONObj> indexes = conn->getIndexSpecs(_nss.ns());
indexSpecs.insert(indexSpecs.begin(), indexes.begin(), indexes.end());
}
ScopedTransaction scopedXact(txn, MODE_IX);
- Lock::DBLock lk(txn->lockState(), nsToDatabaseSubstring(ns), MODE_X);
- OldClientContext ctx(txn, ns);
+ Lock::DBLock lk(txn->lockState(), _nss.db(), MODE_X);
+ OldClientContext ctx(txn, _nss.ns());
- if (!repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(nss)) {
- errmsg = str::stream() << "Not primary during migration: " << ns;
+ if (!repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(_nss)) {
+ errmsg = str::stream() << "Not primary during migration: " << _nss.ns();
warning() << errmsg;
setState(FAIL);
return;
}
Database* db = ctx.db();
- Collection* collection = db->getCollection(ns);
+ Collection* collection = db->getCollection(_nss);
if (!collection) {
- errmsg = str::stream() << "collection dropped during migration: " << ns;
+ errmsg = str::stream() << "collection dropped during migration: " << _nss.ns();
warning() << errmsg;
setState(FAIL);
return;
@@ -597,7 +594,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* txn,
// 2. Synchronously delete any data which might have been left orphaned in range being moved
RangeDeleterOptions deleterOptions(
- KeyRange(ns, min.getOwned(), max.getOwned(), shardKeyPattern));
+ KeyRange(_nss.ns(), min.getOwned(), max.getOwned(), shardKeyPattern));
deleterOptions.writeConcern = writeConcern;
// No need to wait since all existing cursors will filter out this range when returning the
@@ -613,7 +610,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* txn,
return;
}
- Status status = _notePending(txn, NamespaceString(ns), min, max, epoch);
+ Status status = _notePending(txn, _nss, min, max, epoch);
if (!status.isOK()) {
setState(FAIL);
return;
@@ -657,11 +654,17 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* txn,
BSONObj docToClone = i.next().Obj();
{
- OldClientWriteContext cx(txn, ns);
+ OldClientWriteContext cx(txn, _nss.ns());
BSONObj localDoc;
- if (willOverrideLocalId(
- txn, ns, min, max, shardKeyPattern, cx.db(), docToClone, &localDoc)) {
+ if (willOverrideLocalId(txn,
+ _nss.ns(),
+ min,
+ max,
+ shardKeyPattern,
+ cx.db(),
+ docToClone,
+ &localDoc)) {
string errMsg = str::stream() << "cannot migrate chunk, local document "
<< redact(localDoc)
<< " has same _id as cloned "
@@ -673,7 +676,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* txn,
uasserted(16976, errMsg);
}
- Helpers::upsert(txn, ns, docToClone, true);
+ Helpers::upsert(txn, _nss.ns(), docToClone, true);
}
thisTime++;
@@ -731,7 +734,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* txn,
break;
}
- _applyMigrateOp(txn, ns, min, max, shardKeyPattern, res, &lastOpApplied);
+ _applyMigrateOp(txn, _nss.ns(), min, max, shardKeyPattern, res, &lastOpApplied);
const int maxIterations = 3600 * 50;
@@ -784,7 +787,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* txn,
log() << "Waiting for replication to catch up before entering critical section";
- if (_flushPendingWrites(txn, ns, min, max, lastOpApplied, writeConcern)) {
+ if (_flushPendingWrites(txn, _nss.ns(), min, max, lastOpApplied, writeConcern)) {
break;
}
@@ -825,7 +828,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* txn,
}
if (res["size"].number() > 0 &&
- _applyMigrateOp(txn, ns, min, max, shardKeyPattern, res, &lastOpApplied)) {
+ _applyMigrateOp(txn, _nss.ns(), min, max, shardKeyPattern, res, &lastOpApplied)) {
continue;
}
@@ -839,7 +842,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* txn,
// 1) The from side has told us that it has locked writes (COMMIT_START)
// 2) We've checked at least one more time for un-transmitted mods
if (getState() == COMMIT_START && transferAfterCommit == true) {
- if (_flushPendingWrites(txn, ns, min, max, lastOpApplied, writeConcern)) {
+ if (_flushPendingWrites(txn, _nss.ns(), min, max, lastOpApplied, writeConcern)) {
break;
}
}
@@ -859,6 +862,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* txn,
}
setState(DONE);
+
timing.done(6);
MONGO_FAIL_POINT_PAUSE_WHILE_SET(migrateThreadHangAtStep6);
diff --git a/src/mongo/db/s/migration_destination_manager.h b/src/mongo/db/s/migration_destination_manager.h
index 9de1c5027df..804ec2bf35a 100644
--- a/src/mongo/db/s/migration_destination_manager.h
+++ b/src/mongo/db/s/migration_destination_manager.h
@@ -87,7 +87,7 @@ public:
/**
* Returns OK if migration started successfully.
*/
- Status start(const std::string& ns,
+ Status start(const NamespaceString& nss,
const MigrationSessionId& sessionId,
const ConnectionString& fromShardConnString,
const ShardId& fromShard,
@@ -117,8 +117,7 @@ private:
/**
* Thread which drives the migration apply process on the recipient side.
*/
- void _migrateThread(std::string ns,
- BSONObj min,
+ void _migrateThread(BSONObj min,
BSONObj max,
BSONObj shardKeyPattern,
ConnectionString fromShardConnString,
@@ -126,7 +125,6 @@ private:
WriteConcernOptions writeConcern);
void _migrateDriver(OperationContext* txn,
- const std::string& ns,
const BSONObj& min,
const BSONObj& max,
const BSONObj& shardKeyPattern,
@@ -194,7 +192,8 @@ private:
// Migration session ID uniquely identifies the migration and indicates whether the prepare
// method has been called.
- boost::optional<MigrationSessionId> _sessionId{boost::none};
+ boost::optional<MigrationSessionId> _sessionId;
+
// A condition variable on which to wait for the prepare method to be called.
stdx::condition_variable _isActiveCV;
diff --git a/src/mongo/db/s/migration_destination_manager_legacy_commands.cpp b/src/mongo/db/s/migration_destination_manager_legacy_commands.cpp
index 86df3fe2edb..4216b30ef5e 100644
--- a/src/mongo/db/s/migration_destination_manager_legacy_commands.cpp
+++ b/src/mongo/db/s/migration_destination_manager_legacy_commands.cpp
@@ -124,7 +124,7 @@ public:
shardingState->setShardName(toShard.toString());
const ShardId fromShard(cmdObj["fromShardName"].String());
- const string ns = cmdObj.firstElement().String();
+ const NamespaceString nss(cmdObj.firstElement().String());
BSONObj min = cmdObj["min"].Obj().getOwned();
BSONObj max = cmdObj["max"].Obj().getOwned();
@@ -134,7 +134,7 @@ public:
// consistent and predictable, generally we'd refresh anyway, and to be paranoid.
ChunkVersion currentVersion;
- Status status = shardingState->refreshMetadataNow(txn, ns, &currentVersion);
+ Status status = shardingState->refreshMetadataNow(txn, nss, &currentVersion);
if (!status.isOK()) {
errmsg = str::stream() << "cannot start recv'ing chunk "
<< "[" << redact(min) << "," << redact(max) << ")"
@@ -166,8 +166,8 @@ public:
const MigrationSessionId migrationSessionId(
uassertStatusOK(MigrationSessionId::extractFromBSON(cmdObj)));
- Status startStatus = shardingState->migrationDestinationManager()->start(
- ns,
+ uassertStatusOK(shardingState->migrationDestinationManager()->start(
+ nss,
migrationSessionId,
statusWithFromShardConnectionString.getValue(),
fromShard,
@@ -176,10 +176,7 @@ public:
max,
shardKeyPattern,
currentVersion.epoch(),
- writeConcern);
- if (!startStatus.isOK()) {
- return appendCommandStatus(result, startStatus);
- }
+ writeConcern));
result.appendBool("started", true);
return true;
diff --git a/src/mongo/db/s/migration_source_manager.cpp b/src/mongo/db/s/migration_source_manager.cpp
index fb36ae89364..1db915cfaa5 100644
--- a/src/mongo/db/s/migration_source_manager.cpp
+++ b/src/mongo/db/s/migration_source_manager.cpp
@@ -102,8 +102,7 @@ MigrationSourceManager::MigrationSourceManager(OperationContext* txn, MoveChunkR
ChunkVersion shardVersion;
- Status refreshStatus =
- shardingState->refreshMetadataNow(txn, _args.getNss().ns(), &shardVersion);
+ Status refreshStatus = shardingState->refreshMetadataNow(txn, _args.getNss(), &shardVersion);
if (!refreshStatus.isOK()) {
uasserted(refreshStatus.code(),
str::stream() << "cannot start migrate of chunk "
@@ -412,8 +411,8 @@ Status MigrationSourceManager::commitDonateChunk(OperationContext* txn) {
// case do a best effort attempt to incrementally refresh the metadata. If this fails, just
// clear it up so that subsequent requests will try to do a full refresh.
ChunkVersion unusedShardVersion;
- Status refreshStatus = ShardingState::get(txn)->refreshMetadataNow(
- txn, _args.getNss().ns(), &unusedShardVersion);
+ Status refreshStatus =
+ ShardingState::get(txn)->refreshMetadataNow(txn, _args.getNss(), &unusedShardVersion);
if (refreshStatus.isOK()) {
ScopedTransaction scopedXact(txn, MODE_IS);
diff --git a/src/mongo/db/s/move_chunk_command.cpp b/src/mongo/db/s/move_chunk_command.cpp
index c7dca21c1f7..0b930131d4e 100644
--- a/src/mongo/db/s/move_chunk_command.cpp
+++ b/src/mongo/db/s/move_chunk_command.cpp
@@ -156,7 +156,7 @@ public:
grid.shardRegistry()->reload(txn);
auto scopedRegisterMigration =
- uassertStatusOK(shardingState->registerMigration(moveChunkRequest));
+ uassertStatusOK(shardingState->registerDonateChunk(moveChunkRequest));
Status status = {ErrorCodes::InternalError, "Uninitialized value"};
diff --git a/src/mongo/db/s/sharding_state.cpp b/src/mongo/db/s/sharding_state.cpp
index 7bc1e42aac4..8250ed4021f 100644
--- a/src/mongo/db/s/sharding_state.cpp
+++ b/src/mongo/db/s/sharding_state.cpp
@@ -328,9 +328,9 @@ Status ShardingState::onStaleShardVersion(OperationContext* txn,
}
Status ShardingState::refreshMetadataNow(OperationContext* txn,
- const string& ns,
+ const NamespaceString& nss,
ChunkVersion* latestShardVersion) {
- auto refreshLatestShardVersionStatus = _refreshMetadata(txn, NamespaceString(ns), nullptr);
+ auto refreshLatestShardVersionStatus = _refreshMetadata(txn, nss, nullptr);
if (!refreshLatestShardVersionStatus.isOK()) {
return refreshLatestShardVersionStatus.getStatus();
}
@@ -731,18 +731,19 @@ StatusWith<ChunkVersion> ShardingState::_refreshMetadata(
return (metadata ? metadata->getShardVersion() : ChunkVersion::UNSHARDED());
}
-StatusWith<ScopedRegisterMigration> ShardingState::registerMigration(const MoveChunkRequest& args) {
+StatusWith<ScopedRegisterDonateChunk> ShardingState::registerDonateChunk(
+ const MoveChunkRequest& args) {
if (_migrationDestManager.isActive()) {
return {ErrorCodes::ConflictingOperationInProgress,
str::stream() << "Unable to start new migration because this shard is currently "
"receiving a chunk"};
}
- return _activeMigrationsRegistry.registerMigration(args);
+ return _activeMigrationsRegistry.registerDonateChunk(args);
}
-boost::optional<NamespaceString> ShardingState::getActiveMigrationNss() {
- return _activeMigrationsRegistry.getActiveMigrationNss();
+boost::optional<NamespaceString> ShardingState::getActiveDonateChunkNss() {
+ return _activeMigrationsRegistry.getActiveDonateChunkNss();
}
BSONObj ShardingState::getActiveMigrationStatusReport(OperationContext* txn) {
diff --git a/src/mongo/db/s/sharding_state.h b/src/mongo/db/s/sharding_state.h
index bd06604b79f..8e8f0c20074 100644
--- a/src/mongo/db/s/sharding_state.h
+++ b/src/mongo/db/s/sharding_state.h
@@ -185,7 +185,7 @@ public:
* @return latestShardVersion the version that is now stored for this collection
*/
Status refreshMetadataNow(OperationContext* txn,
- const std::string& ns,
+ const NamespaceString& nss,
ChunkVersion* latestShardVersion);
void appendInfo(OperationContext* txn, BSONObjBuilder& b);
@@ -203,24 +203,24 @@ public:
/**
* If there are no migrations running on this shard, registers an active migration with the
- * specified arguments and returns a ScopedRegisterMigration, which must be signaled by the
+ * specified arguments and returns a ScopedRegisterDonateChunk, which must be signaled by the
* caller before it goes out of scope.
*
* If there is an active migration already running on this shard and it has the exact same
- * arguments, returns a ScopedRegisterMigration, which can be used to join the existing one.
+ * arguments, returns a ScopedRegisterDonateChunk, which can be used to join the existing one.
*
* Othwerwise returns a ConflictingOperationInProgress error.
*/
- StatusWith<ScopedRegisterMigration> registerMigration(const MoveChunkRequest& args);
+ StatusWith<ScopedRegisterDonateChunk> registerDonateChunk(const MoveChunkRequest& args);
/**
- * If a migration has been previously registered through a call to registerMigration returns
+ * If a migration has been previously registered through a call to registerDonateChunk returns
* that namespace. Otherwise returns boost::none.
*
* This method can be called without any locks, but once the namespace is fetched it needs to be
* re-checked after acquiring some intent lock on that namespace.
*/
- boost::optional<NamespaceString> getActiveMigrationNss();
+ boost::optional<NamespaceString> getActiveDonateChunkNss();
/**
* Get a migration status report from the migration registry. If no migration is active, this
@@ -278,8 +278,6 @@ public:
}
private:
- friend class ScopedRegisterMigration;
-
// Map from a namespace into the sharding state for each collection we have
typedef stdx::unordered_map<std::string, std::unique_ptr<CollectionShardingState>>
CollectionShardingStateMap;
diff --git a/src/mongo/db/s/split_chunk_command.cpp b/src/mongo/db/s/split_chunk_command.cpp
index c5719f9c7e1..d50a1680cd4 100644
--- a/src/mongo/db/s/split_chunk_command.cpp
+++ b/src/mongo/db/s/split_chunk_command.cpp
@@ -267,7 +267,7 @@ public:
// Always check our version remotely
ChunkVersion shardVersion;
- Status refreshStatus = shardingState->refreshMetadataNow(txn, nss.ns(), &shardVersion);
+ Status refreshStatus = shardingState->refreshMetadataNow(txn, nss, &shardVersion);
if (!refreshStatus.isOK()) {
errmsg = str::stream() << "splitChunk cannot split chunk "
@@ -364,8 +364,7 @@ public:
//
{
ChunkVersion shardVersionAfterSplit;
- refreshStatus =
- shardingState->refreshMetadataNow(txn, nss.ns(), &shardVersionAfterSplit);
+ refreshStatus = shardingState->refreshMetadataNow(txn, nss, &shardVersionAfterSplit);
if (!refreshStatus.isOK()) {
errmsg = str::stream() << "failed to refresh metadata for split chunk ["