diff options
author | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2016-09-28 15:33:05 -0400 |
---|---|---|
committer | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2016-09-29 13:28:48 -0400 |
commit | b12af9e243b98b50d5918d9afa9354b9935cebf0 (patch) | |
tree | 0c866ad5bc76d5901799a9adac9fc14bd71c9955 | |
parent | b4d55086ed9a5d49709409c9c66652f12fa3d932 (diff) | |
download | mongo-b12af9e243b98b50d5918d9afa9354b9935cebf0.tar.gz |
SERVER-26370 Cleanup MigrationDestinationManager
-rw-r--r-- | src/mongo/db/s/active_migrations_registry.cpp | 26 | ||||
-rw-r--r-- | src/mongo/db/s/active_migrations_registry.h | 38 | ||||
-rw-r--r-- | src/mongo/db/s/active_migrations_registry_test.cpp | 56 | ||||
-rw-r--r-- | src/mongo/db/s/cleanup_orphaned_cmd.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/s/merge_chunks_command.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/s/migration_chunk_cloner_source_legacy_commands.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/s/migration_destination_manager.cpp | 82 | ||||
-rw-r--r-- | src/mongo/db/s/migration_destination_manager.h | 9 | ||||
-rw-r--r-- | src/mongo/db/s/migration_destination_manager_legacy_commands.cpp | 13 | ||||
-rw-r--r-- | src/mongo/db/s/migration_source_manager.cpp | 7 | ||||
-rw-r--r-- | src/mongo/db/s/move_chunk_command.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/s/sharding_state.cpp | 13 | ||||
-rw-r--r-- | src/mongo/db/s/sharding_state.h | 14 | ||||
-rw-r--r-- | src/mongo/db/s/split_chunk_command.cpp | 5 |
14 files changed, 135 insertions, 138 deletions
diff --git a/src/mongo/db/s/active_migrations_registry.cpp b/src/mongo/db/s/active_migrations_registry.cpp index ca38749c373..35a765a89c2 100644 --- a/src/mongo/db/s/active_migrations_registry.cpp +++ b/src/mongo/db/s/active_migrations_registry.cpp @@ -44,16 +44,16 @@ ActiveMigrationsRegistry::~ActiveMigrationsRegistry() { invariant(!_activeMoveChunkState); } -StatusWith<ScopedRegisterMigration> ActiveMigrationsRegistry::registerMigration( +StatusWith<ScopedRegisterDonateChunk> ActiveMigrationsRegistry::registerDonateChunk( const MoveChunkRequest& args) { stdx::lock_guard<stdx::mutex> lk(_mutex); if (!_activeMoveChunkState) { _activeMoveChunkState.emplace(args); - return {ScopedRegisterMigration(this, true, _activeMoveChunkState->notification)}; + return {ScopedRegisterDonateChunk(this, true, _activeMoveChunkState->notification)}; } if (_activeMoveChunkState->args == args) { - return {ScopedRegisterMigration(nullptr, false, _activeMoveChunkState->notification)}; + return {ScopedRegisterDonateChunk(nullptr, false, _activeMoveChunkState->notification)}; } return { @@ -63,7 +63,7 @@ StatusWith<ScopedRegisterMigration> ActiveMigrationsRegistry::registerMigration( << _activeMoveChunkState->args.getNss().ns()}; } -boost::optional<NamespaceString> ActiveMigrationsRegistry::getActiveMigrationNss() { +boost::optional<NamespaceString> ActiveMigrationsRegistry::getActiveDonateChunkNss() { stdx::lock_guard<stdx::mutex> lk(_mutex); if (_activeMoveChunkState) { return _activeMoveChunkState->args.getNss(); @@ -99,13 +99,13 @@ BSONObj ActiveMigrationsRegistry::getActiveMigrationStatusReport(OperationContex return BSONObj(); } -void ActiveMigrationsRegistry::_clearMigration() { +void ActiveMigrationsRegistry::_clearDonateChunk() { stdx::lock_guard<stdx::mutex> lk(_mutex); invariant(_activeMoveChunkState); _activeMoveChunkState.reset(); } -ScopedRegisterMigration::ScopedRegisterMigration( +ScopedRegisterDonateChunk::ScopedRegisterDonateChunk( ActiveMigrationsRegistry* registry, bool forUnregister, std::shared_ptr<Notification<Status>> completionNotification) @@ -113,35 +113,35 @@ ScopedRegisterMigration::ScopedRegisterMigration( _forUnregister(forUnregister), _completionNotification(std::move(completionNotification)) {} -ScopedRegisterMigration::~ScopedRegisterMigration() { +ScopedRegisterDonateChunk::~ScopedRegisterDonateChunk() { if (_registry && _forUnregister) { // If this is a newly started migration the caller must always signal on completion invariant(*_completionNotification); - _registry->_clearMigration(); + _registry->_clearDonateChunk(); } } -ScopedRegisterMigration::ScopedRegisterMigration(ScopedRegisterMigration&& other) { +ScopedRegisterDonateChunk::ScopedRegisterDonateChunk(ScopedRegisterDonateChunk&& other) { *this = std::move(other); } -ScopedRegisterMigration& ScopedRegisterMigration::operator=(ScopedRegisterMigration&& other) { +ScopedRegisterDonateChunk& ScopedRegisterDonateChunk::operator=(ScopedRegisterDonateChunk&& other) { if (&other != this) { _registry = other._registry; + other._registry = nullptr; _forUnregister = other._forUnregister; _completionNotification = std::move(other._completionNotification); - other._registry = nullptr; } return *this; } -void ScopedRegisterMigration::complete(Status status) { +void ScopedRegisterDonateChunk::complete(Status status) { invariant(_forUnregister); _completionNotification->set(status); } -Status ScopedRegisterMigration::waitForCompletion(OperationContext* txn) { +Status ScopedRegisterDonateChunk::waitForCompletion(OperationContext* txn) { invariant(!_forUnregister); return _completionNotification->get(txn); } diff --git a/src/mongo/db/s/active_migrations_registry.h b/src/mongo/db/s/active_migrations_registry.h index 1415a4c469f..06ffeaece07 100644 --- a/src/mongo/db/s/active_migrations_registry.h +++ b/src/mongo/db/s/active_migrations_registry.h @@ -39,7 +39,7 @@ namespace mongo { class OperationContext; -class ScopedRegisterMigration; +class ScopedRegisterDonateChunk; template <typename T> class StatusWith; @@ -56,22 +56,22 @@ public: /** * If there are no migrations running on this shard, registers an active migration with the - * specified arguments and returns a ScopedRegisterMigration, which must be signaled by the + * specified arguments and returns a ScopedRegisterDonateChunk, which must be signaled by the * caller before it goes out of scope. * * If there is an active migration already running on this shard and it has the exact same - * arguments, returns a ScopedRegisterMigration, which can be used to join the already running + * arguments, returns a ScopedRegisterDonateChunk, which can be used to join the already running * migration. * * Otherwise returns a ConflictingOperationInProgress error. */ - StatusWith<ScopedRegisterMigration> registerMigration(const MoveChunkRequest& args); + StatusWith<ScopedRegisterDonateChunk> registerDonateChunk(const MoveChunkRequest& args); /** - * If a migration has been previously registered through a call to registerMigration returns + * If a migration has been previously registered through a call to registerDonateChunk returns * that namespace. Otherwise returns boost::none. */ - boost::optional<NamespaceString> getActiveMigrationNss(); + boost::optional<NamespaceString> getActiveDonateChunkNss(); /** * Returns a report on the active migration if there currently is one. Otherwise, returns an @@ -82,7 +82,7 @@ public: BSONObj getActiveMigrationStatusReport(OperationContext* txn); private: - friend class ScopedRegisterMigration; + friend class ScopedRegisterDonateChunk; // Describes the state of a currently active moveChunk operation struct ActiveMoveChunkState { @@ -98,9 +98,9 @@ private: /** * Unregisters a previously registered namespace with ongoing migration. Must only be called if - * a previous call to registerMigration has succeeded. + * a previous call to registerDonateChunk has succeeded. */ - void _clearMigration(); + void _clearDonateChunk(); // Protects the state below stdx::mutex _mutex; @@ -111,21 +111,21 @@ private: }; /** - * Object of this class is returned from the registerMigration call of the active migrations + * Object of this class is returned from the registerDonateChunk call of the active migrations * registry. It can exist in two modes - 'unregister' and 'join'. See the comments for - * registerMigration method for more details. + * registerDonateChunk method for more details. */ -class ScopedRegisterMigration { - MONGO_DISALLOW_COPYING(ScopedRegisterMigration); +class ScopedRegisterDonateChunk { + MONGO_DISALLOW_COPYING(ScopedRegisterDonateChunk); public: - ScopedRegisterMigration(ActiveMigrationsRegistry* registry, - bool forUnregister, - std::shared_ptr<Notification<Status>> completionNotification); - ~ScopedRegisterMigration(); + ScopedRegisterDonateChunk(ActiveMigrationsRegistry* registry, + bool forUnregister, + std::shared_ptr<Notification<Status>> completionNotification); + ~ScopedRegisterDonateChunk(); - ScopedRegisterMigration(ScopedRegisterMigration&&); - ScopedRegisterMigration& operator=(ScopedRegisterMigration&&); + ScopedRegisterDonateChunk(ScopedRegisterDonateChunk&&); + ScopedRegisterDonateChunk& operator=(ScopedRegisterDonateChunk&&); /** * Returns true if the migration object is in the 'unregister' mode, which means that the holder diff --git a/src/mongo/db/s/active_migrations_registry_test.cpp b/src/mongo/db/s/active_migrations_registry_test.cpp index 9faabae23c0..74ec900fc49 100644 --- a/src/mongo/db/s/active_migrations_registry_test.cpp +++ b/src/mongo/db/s/active_migrations_registry_test.cpp @@ -85,60 +85,60 @@ MoveChunkRequest createMoveChunkRequest(const NamespaceString& nss) { return assertGet(MoveChunkRequest::createFromCommand(nss, builder.obj())); } -TEST_F(MoveChunkRegistration, ScopedRegisterMigrationMoveConstructorAndAssignment) { - auto originalScopedRegisterMigration = assertGet( - _registry.registerMigration(createMoveChunkRequest(NamespaceString("TestDB", "TestColl")))); - ASSERT(originalScopedRegisterMigration.mustExecute()); +TEST_F(MoveChunkRegistration, ScopedRegisterDonateChunkMoveConstructorAndAssignment) { + auto originalScopedRegisterDonateChunk = assertGet(_registry.registerDonateChunk( + createMoveChunkRequest(NamespaceString("TestDB", "TestColl")))); + ASSERT(originalScopedRegisterDonateChunk.mustExecute()); - ScopedRegisterMigration movedScopedRegisterMigration( - std::move(originalScopedRegisterMigration)); - ASSERT(movedScopedRegisterMigration.mustExecute()); + ScopedRegisterDonateChunk movedScopedRegisterDonateChunk( + std::move(originalScopedRegisterDonateChunk)); + ASSERT(movedScopedRegisterDonateChunk.mustExecute()); - originalScopedRegisterMigration = std::move(movedScopedRegisterMigration); - ASSERT(originalScopedRegisterMigration.mustExecute()); + originalScopedRegisterDonateChunk = std::move(movedScopedRegisterDonateChunk); + ASSERT(originalScopedRegisterDonateChunk.mustExecute()); // Need to signal the registered migration so the destructor doesn't invariant - originalScopedRegisterMigration.complete(Status::OK()); + originalScopedRegisterDonateChunk.complete(Status::OK()); } TEST_F(MoveChunkRegistration, GetActiveMigrationNamespace) { - ASSERT(!_registry.getActiveMigrationNss()); + ASSERT(!_registry.getActiveDonateChunkNss()); const NamespaceString nss("TestDB", "TestColl"); - auto originalScopedRegisterMigration = - assertGet(_registry.registerMigration(createMoveChunkRequest(nss))); + auto originalScopedRegisterDonateChunk = + assertGet(_registry.registerDonateChunk(createMoveChunkRequest(nss))); - ASSERT_EQ(nss.ns(), _registry.getActiveMigrationNss()->ns()); + ASSERT_EQ(nss.ns(), _registry.getActiveDonateChunkNss()->ns()); // Need to signal the registered migration so the destructor doesn't invariant - originalScopedRegisterMigration.complete(Status::OK()); + originalScopedRegisterDonateChunk.complete(Status::OK()); } TEST_F(MoveChunkRegistration, SecondMigrationReturnsConflictingOperationInProgress) { - auto originalScopedRegisterMigration = assertGet(_registry.registerMigration( + auto originalScopedRegisterDonateChunk = assertGet(_registry.registerDonateChunk( createMoveChunkRequest(NamespaceString("TestDB", "TestColl1")))); - auto secondScopedRegisterMigrationStatus = - _registry.registerMigration(createMoveChunkRequest(NamespaceString("TestDB", "TestColl2"))); + auto secondScopedRegisterDonateChunkStatus = _registry.registerDonateChunk( + createMoveChunkRequest(NamespaceString("TestDB", "TestColl2"))); ASSERT_EQ(ErrorCodes::ConflictingOperationInProgress, - secondScopedRegisterMigrationStatus.getStatus()); + secondScopedRegisterDonateChunkStatus.getStatus()); - originalScopedRegisterMigration.complete(Status::OK()); + originalScopedRegisterDonateChunk.complete(Status::OK()); } TEST_F(MoveChunkRegistration, SecondMigrationWithSameArgumentsJoinsFirst) { - auto originalScopedRegisterMigration = assertGet( - _registry.registerMigration(createMoveChunkRequest(NamespaceString("TestDB", "TestColl")))); - ASSERT(originalScopedRegisterMigration.mustExecute()); + auto originalScopedRegisterDonateChunk = assertGet(_registry.registerDonateChunk( + createMoveChunkRequest(NamespaceString("TestDB", "TestColl")))); + ASSERT(originalScopedRegisterDonateChunk.mustExecute()); - auto secondScopedRegisterMigration = assertGet( - _registry.registerMigration(createMoveChunkRequest(NamespaceString("TestDB", "TestColl")))); - ASSERT(!secondScopedRegisterMigration.mustExecute()); + auto secondScopedRegisterDonateChunk = assertGet(_registry.registerDonateChunk( + createMoveChunkRequest(NamespaceString("TestDB", "TestColl")))); + ASSERT(!secondScopedRegisterDonateChunk.mustExecute()); - originalScopedRegisterMigration.complete({ErrorCodes::InternalError, "Test error"}); + originalScopedRegisterDonateChunk.complete({ErrorCodes::InternalError, "Test error"}); ASSERT_EQ(Status(ErrorCodes::InternalError, "Test error"), - secondScopedRegisterMigration.waitForCompletion(getTxn())); + secondScopedRegisterDonateChunk.waitForCompletion(getTxn())); } } // namespace diff --git a/src/mongo/db/s/cleanup_orphaned_cmd.cpp b/src/mongo/db/s/cleanup_orphaned_cmd.cpp index 281d1cdcc39..a9b361027bb 100644 --- a/src/mongo/db/s/cleanup_orphaned_cmd.cpp +++ b/src/mongo/db/s/cleanup_orphaned_cmd.cpp @@ -243,7 +243,7 @@ public: } ChunkVersion shardVersion; - Status status = shardingState->refreshMetadataNow(txn, ns, &shardVersion); + Status status = shardingState->refreshMetadataNow(txn, NamespaceString(ns), &shardVersion); if (!status.isOK()) { if (status.code() == ErrorCodes::RemoteChangeDetected) { warning() << "Shard version in transition detected while refreshing " diff --git a/src/mongo/db/s/merge_chunks_command.cpp b/src/mongo/db/s/merge_chunks_command.cpp index db3044db66f..33f31f3315b 100644 --- a/src/mongo/db/s/merge_chunks_command.cpp +++ b/src/mongo/db/s/merge_chunks_command.cpp @@ -106,7 +106,7 @@ Status mergeChunks(OperationContext* txn, // ChunkVersion shardVersion; - Status refreshStatus = shardingState->refreshMetadataNow(txn, nss.ns(), &shardVersion); + Status refreshStatus = shardingState->refreshMetadataNow(txn, nss, &shardVersion); if (!refreshStatus.isOK()) { std::string errmsg = str::stream() @@ -283,7 +283,7 @@ Status mergeChunks(OperationContext* txn, // { ChunkVersion shardVersionAfterMerge; - refreshStatus = shardingState->refreshMetadataNow(txn, nss.ns(), &shardVersionAfterMerge); + refreshStatus = shardingState->refreshMetadataNow(txn, nss, &shardVersionAfterMerge); if (!refreshStatus.isOK()) { std::string errmsg = str::stream() << "failed to refresh metadata for merge chunk [" diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy_commands.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy_commands.cpp index 7f7850cb22c..3931ce43893 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source_legacy_commands.cpp +++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy_commands.cpp @@ -60,7 +60,7 @@ public: AutoGetActiveCloner(OperationContext* txn, const MigrationSessionId& migrationSessionId) { ShardingState* const gss = ShardingState::get(txn); - const auto nss = gss->getActiveMigrationNss(); + const auto nss = gss->getActiveDonateChunkNss(); uassert(ErrorCodes::NotYetInitialized, "No active migrations were found", nss); // Once the collection is locked, the migration status cannot change diff --git a/src/mongo/db/s/migration_destination_manager.cpp b/src/mongo/db/s/migration_destination_manager.cpp index 2368ade2b07..8110f684766 100644 --- a/src/mongo/db/s/migration_destination_manager.cpp +++ b/src/mongo/db/s/migration_destination_manager.cpp @@ -262,7 +262,7 @@ BSONObj MigrationDestinationManager::getMigrationStatusReport() { } } -Status MigrationDestinationManager::start(const string& ns, +Status MigrationDestinationManager::start(const NamespaceString& nss, const MigrationSessionId& sessionId, const ConnectionString& fromShardConnString, const ShardId& fromShard, @@ -290,7 +290,7 @@ Status MigrationDestinationManager::start(const string& ns, _state = READY; _errmsg = ""; - _nss = NamespaceString(ns); + _nss = nss; _fromShardConnString = fromShardConnString; _fromShard = fromShard; _toShard = toShard; @@ -314,9 +314,9 @@ Status MigrationDestinationManager::start(const string& ns, _migrateThreadHandle.join(); } - _migrateThreadHandle = stdx::thread( - [this, ns, min, max, shardKeyPattern, fromShardConnString, epoch, writeConcern]() { - _migrateThread(ns, min, max, shardKeyPattern, fromShardConnString, epoch, writeConcern); + _migrateThreadHandle = + stdx::thread([this, min, max, shardKeyPattern, fromShardConnString, epoch, writeConcern]() { + _migrateThread(min, max, shardKeyPattern, fromShardConnString, epoch, writeConcern); }); return Status::OK(); @@ -388,8 +388,7 @@ bool MigrationDestinationManager::startCommit(const MigrationSessionId& sessionI return false; } -void MigrationDestinationManager::_migrateThread(std::string ns, - BSONObj min, +void MigrationDestinationManager::_migrateThread(BSONObj min, BSONObj max, BSONObj shardKeyPattern, ConnectionString fromShardConnString, @@ -405,7 +404,7 @@ void MigrationDestinationManager::_migrateThread(std::string ns, try { _migrateDriver( - opCtx.get(), ns, min, max, shardKeyPattern, fromShardConnString, epoch, writeConcern); + opCtx.get(), min, max, shardKeyPattern, fromShardConnString, epoch, writeConcern); } catch (std::exception& e) { { stdx::lock_guard<stdx::mutex> sl(_mutex); @@ -426,7 +425,7 @@ void MigrationDestinationManager::_migrateThread(std::string ns, if (getState() != DONE) { // Unprotect the range if needed/possible on unsuccessful TO migration - Status status = _forgetPending(opCtx.get(), NamespaceString(ns), min, max, epoch); + Status status = _forgetPending(opCtx.get(), _nss, min, max, epoch); if (!status.isOK()) { warning() << "Failed to remove pending range" << redact(causedBy(status)); } @@ -438,7 +437,6 @@ void MigrationDestinationManager::_migrateThread(std::string ns, } void MigrationDestinationManager::_migrateDriver(OperationContext* txn, - const string& ns, const BSONObj& min, const BSONObj& max, const BSONObj& shardKeyPattern, @@ -451,11 +449,12 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* txn, invariant(!max.isEmpty()); log() << "Starting receiving end of migration of chunk " << redact(min) << " -> " << redact(max) - << " for collection " << ns << " from " << fromShardConnString << " at epoch " + << " for collection " << _nss.ns() << " from " << fromShardConnString << " at epoch " << epoch.toString() << " with session id " << *_sessionId; string errmsg; - MoveTimingHelper timing(txn, "to", ns, min, max, 6 /* steps */, &errmsg, ShardId(), ShardId()); + MoveTimingHelper timing( + txn, "to", _nss.ns(), min, max, 6 /* steps */, &errmsg, ShardId(), ShardId()); const auto initialState = getState(); @@ -472,16 +471,14 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* txn, // Just tests the connection conn->getLastError(); - const NamespaceString nss(ns); - DisableDocumentValidation validationDisabler(txn); { // 0. copy system.namespaces entry if collection doesn't already exist - OldClientWriteContext ctx(txn, ns); - if (!repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(nss)) { - errmsg = str::stream() << "Not primary during migration: " << ns + OldClientWriteContext ctx(txn, _nss.ns()); + if (!repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(_nss)) { + errmsg = str::stream() << "Not primary during migration: " << _nss.ns() << ": checking if collection exists"; warning() << errmsg; setState(FAIL); @@ -491,10 +488,10 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* txn, // Only copy if ns doesn't already exist Database* const db = ctx.db(); - Collection* const collection = db->getCollection(ns); + Collection* const collection = db->getCollection(_nss); if (!collection) { - std::list<BSONObj> infos = conn->getCollectionInfos( - nsToDatabase(ns), BSON("name" << nsToCollectionSubstring(ns))); + std::list<BSONObj> infos = + conn->getCollectionInfos(_nss.db().toString(), BSON("name" << _nss.coll())); BSONObj options; if (infos.size() > 0) { @@ -505,9 +502,9 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* txn, } WriteUnitOfWork wuow(txn); - Status status = userCreateNS(txn, db, ns, options, false); + Status status = userCreateNS(txn, db, _nss.ns(), options, false); if (!status.isOK()) { - warning() << "failed to create collection [" << ns << "] " + warning() << "failed to create collection [" << _nss << "] " << " with options " << options << ": " << redact(status); } wuow.commit(); @@ -520,25 +517,25 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* txn, std::vector<BSONObj> indexSpecs; { - const std::list<BSONObj> indexes = conn->getIndexSpecs(ns); + const std::list<BSONObj> indexes = conn->getIndexSpecs(_nss.ns()); indexSpecs.insert(indexSpecs.begin(), indexes.begin(), indexes.end()); } ScopedTransaction scopedXact(txn, MODE_IX); - Lock::DBLock lk(txn->lockState(), nsToDatabaseSubstring(ns), MODE_X); - OldClientContext ctx(txn, ns); + Lock::DBLock lk(txn->lockState(), _nss.db(), MODE_X); + OldClientContext ctx(txn, _nss.ns()); - if (!repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(nss)) { - errmsg = str::stream() << "Not primary during migration: " << ns; + if (!repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(_nss)) { + errmsg = str::stream() << "Not primary during migration: " << _nss.ns(); warning() << errmsg; setState(FAIL); return; } Database* db = ctx.db(); - Collection* collection = db->getCollection(ns); + Collection* collection = db->getCollection(_nss); if (!collection) { - errmsg = str::stream() << "collection dropped during migration: " << ns; + errmsg = str::stream() << "collection dropped during migration: " << _nss.ns(); warning() << errmsg; setState(FAIL); return; @@ -597,7 +594,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* txn, // 2. Synchronously delete any data which might have been left orphaned in range being moved RangeDeleterOptions deleterOptions( - KeyRange(ns, min.getOwned(), max.getOwned(), shardKeyPattern)); + KeyRange(_nss.ns(), min.getOwned(), max.getOwned(), shardKeyPattern)); deleterOptions.writeConcern = writeConcern; // No need to wait since all existing cursors will filter out this range when returning the @@ -613,7 +610,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* txn, return; } - Status status = _notePending(txn, NamespaceString(ns), min, max, epoch); + Status status = _notePending(txn, _nss, min, max, epoch); if (!status.isOK()) { setState(FAIL); return; @@ -657,11 +654,17 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* txn, BSONObj docToClone = i.next().Obj(); { - OldClientWriteContext cx(txn, ns); + OldClientWriteContext cx(txn, _nss.ns()); BSONObj localDoc; - if (willOverrideLocalId( - txn, ns, min, max, shardKeyPattern, cx.db(), docToClone, &localDoc)) { + if (willOverrideLocalId(txn, + _nss.ns(), + min, + max, + shardKeyPattern, + cx.db(), + docToClone, + &localDoc)) { string errMsg = str::stream() << "cannot migrate chunk, local document " << redact(localDoc) << " has same _id as cloned " @@ -673,7 +676,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* txn, uasserted(16976, errMsg); } - Helpers::upsert(txn, ns, docToClone, true); + Helpers::upsert(txn, _nss.ns(), docToClone, true); } thisTime++; @@ -731,7 +734,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* txn, break; } - _applyMigrateOp(txn, ns, min, max, shardKeyPattern, res, &lastOpApplied); + _applyMigrateOp(txn, _nss.ns(), min, max, shardKeyPattern, res, &lastOpApplied); const int maxIterations = 3600 * 50; @@ -784,7 +787,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* txn, log() << "Waiting for replication to catch up before entering critical section"; - if (_flushPendingWrites(txn, ns, min, max, lastOpApplied, writeConcern)) { + if (_flushPendingWrites(txn, _nss.ns(), min, max, lastOpApplied, writeConcern)) { break; } @@ -825,7 +828,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* txn, } if (res["size"].number() > 0 && - _applyMigrateOp(txn, ns, min, max, shardKeyPattern, res, &lastOpApplied)) { + _applyMigrateOp(txn, _nss.ns(), min, max, shardKeyPattern, res, &lastOpApplied)) { continue; } @@ -839,7 +842,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* txn, // 1) The from side has told us that it has locked writes (COMMIT_START) // 2) We've checked at least one more time for un-transmitted mods if (getState() == COMMIT_START && transferAfterCommit == true) { - if (_flushPendingWrites(txn, ns, min, max, lastOpApplied, writeConcern)) { + if (_flushPendingWrites(txn, _nss.ns(), min, max, lastOpApplied, writeConcern)) { break; } } @@ -859,6 +862,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* txn, } setState(DONE); + timing.done(6); MONGO_FAIL_POINT_PAUSE_WHILE_SET(migrateThreadHangAtStep6); diff --git a/src/mongo/db/s/migration_destination_manager.h b/src/mongo/db/s/migration_destination_manager.h index 9de1c5027df..804ec2bf35a 100644 --- a/src/mongo/db/s/migration_destination_manager.h +++ b/src/mongo/db/s/migration_destination_manager.h @@ -87,7 +87,7 @@ public: /** * Returns OK if migration started successfully. */ - Status start(const std::string& ns, + Status start(const NamespaceString& nss, const MigrationSessionId& sessionId, const ConnectionString& fromShardConnString, const ShardId& fromShard, @@ -117,8 +117,7 @@ private: /** * Thread which drives the migration apply process on the recipient side. */ - void _migrateThread(std::string ns, - BSONObj min, + void _migrateThread(BSONObj min, BSONObj max, BSONObj shardKeyPattern, ConnectionString fromShardConnString, @@ -126,7 +125,6 @@ private: WriteConcernOptions writeConcern); void _migrateDriver(OperationContext* txn, - const std::string& ns, const BSONObj& min, const BSONObj& max, const BSONObj& shardKeyPattern, @@ -194,7 +192,8 @@ private: // Migration session ID uniquely identifies the migration and indicates whether the prepare // method has been called. - boost::optional<MigrationSessionId> _sessionId{boost::none}; + boost::optional<MigrationSessionId> _sessionId; + // A condition variable on which to wait for the prepare method to be called. stdx::condition_variable _isActiveCV; diff --git a/src/mongo/db/s/migration_destination_manager_legacy_commands.cpp b/src/mongo/db/s/migration_destination_manager_legacy_commands.cpp index 86df3fe2edb..4216b30ef5e 100644 --- a/src/mongo/db/s/migration_destination_manager_legacy_commands.cpp +++ b/src/mongo/db/s/migration_destination_manager_legacy_commands.cpp @@ -124,7 +124,7 @@ public: shardingState->setShardName(toShard.toString()); const ShardId fromShard(cmdObj["fromShardName"].String()); - const string ns = cmdObj.firstElement().String(); + const NamespaceString nss(cmdObj.firstElement().String()); BSONObj min = cmdObj["min"].Obj().getOwned(); BSONObj max = cmdObj["max"].Obj().getOwned(); @@ -134,7 +134,7 @@ public: // consistent and predictable, generally we'd refresh anyway, and to be paranoid. ChunkVersion currentVersion; - Status status = shardingState->refreshMetadataNow(txn, ns, ¤tVersion); + Status status = shardingState->refreshMetadataNow(txn, nss, ¤tVersion); if (!status.isOK()) { errmsg = str::stream() << "cannot start recv'ing chunk " << "[" << redact(min) << "," << redact(max) << ")" @@ -166,8 +166,8 @@ public: const MigrationSessionId migrationSessionId( uassertStatusOK(MigrationSessionId::extractFromBSON(cmdObj))); - Status startStatus = shardingState->migrationDestinationManager()->start( - ns, + uassertStatusOK(shardingState->migrationDestinationManager()->start( + nss, migrationSessionId, statusWithFromShardConnectionString.getValue(), fromShard, @@ -176,10 +176,7 @@ public: max, shardKeyPattern, currentVersion.epoch(), - writeConcern); - if (!startStatus.isOK()) { - return appendCommandStatus(result, startStatus); - } + writeConcern)); result.appendBool("started", true); return true; diff --git a/src/mongo/db/s/migration_source_manager.cpp b/src/mongo/db/s/migration_source_manager.cpp index fb36ae89364..1db915cfaa5 100644 --- a/src/mongo/db/s/migration_source_manager.cpp +++ b/src/mongo/db/s/migration_source_manager.cpp @@ -102,8 +102,7 @@ MigrationSourceManager::MigrationSourceManager(OperationContext* txn, MoveChunkR ChunkVersion shardVersion; - Status refreshStatus = - shardingState->refreshMetadataNow(txn, _args.getNss().ns(), &shardVersion); + Status refreshStatus = shardingState->refreshMetadataNow(txn, _args.getNss(), &shardVersion); if (!refreshStatus.isOK()) { uasserted(refreshStatus.code(), str::stream() << "cannot start migrate of chunk " @@ -412,8 +411,8 @@ Status MigrationSourceManager::commitDonateChunk(OperationContext* txn) { // case do a best effort attempt to incrementally refresh the metadata. If this fails, just // clear it up so that subsequent requests will try to do a full refresh. ChunkVersion unusedShardVersion; - Status refreshStatus = ShardingState::get(txn)->refreshMetadataNow( - txn, _args.getNss().ns(), &unusedShardVersion); + Status refreshStatus = + ShardingState::get(txn)->refreshMetadataNow(txn, _args.getNss(), &unusedShardVersion); if (refreshStatus.isOK()) { ScopedTransaction scopedXact(txn, MODE_IS); diff --git a/src/mongo/db/s/move_chunk_command.cpp b/src/mongo/db/s/move_chunk_command.cpp index c7dca21c1f7..0b930131d4e 100644 --- a/src/mongo/db/s/move_chunk_command.cpp +++ b/src/mongo/db/s/move_chunk_command.cpp @@ -156,7 +156,7 @@ public: grid.shardRegistry()->reload(txn); auto scopedRegisterMigration = - uassertStatusOK(shardingState->registerMigration(moveChunkRequest)); + uassertStatusOK(shardingState->registerDonateChunk(moveChunkRequest)); Status status = {ErrorCodes::InternalError, "Uninitialized value"}; diff --git a/src/mongo/db/s/sharding_state.cpp b/src/mongo/db/s/sharding_state.cpp index 7bc1e42aac4..8250ed4021f 100644 --- a/src/mongo/db/s/sharding_state.cpp +++ b/src/mongo/db/s/sharding_state.cpp @@ -328,9 +328,9 @@ Status ShardingState::onStaleShardVersion(OperationContext* txn, } Status ShardingState::refreshMetadataNow(OperationContext* txn, - const string& ns, + const NamespaceString& nss, ChunkVersion* latestShardVersion) { - auto refreshLatestShardVersionStatus = _refreshMetadata(txn, NamespaceString(ns), nullptr); + auto refreshLatestShardVersionStatus = _refreshMetadata(txn, nss, nullptr); if (!refreshLatestShardVersionStatus.isOK()) { return refreshLatestShardVersionStatus.getStatus(); } @@ -731,18 +731,19 @@ StatusWith<ChunkVersion> ShardingState::_refreshMetadata( return (metadata ? metadata->getShardVersion() : ChunkVersion::UNSHARDED()); } -StatusWith<ScopedRegisterMigration> ShardingState::registerMigration(const MoveChunkRequest& args) { +StatusWith<ScopedRegisterDonateChunk> ShardingState::registerDonateChunk( + const MoveChunkRequest& args) { if (_migrationDestManager.isActive()) { return {ErrorCodes::ConflictingOperationInProgress, str::stream() << "Unable to start new migration because this shard is currently " "receiving a chunk"}; } - return _activeMigrationsRegistry.registerMigration(args); + return _activeMigrationsRegistry.registerDonateChunk(args); } -boost::optional<NamespaceString> ShardingState::getActiveMigrationNss() { - return _activeMigrationsRegistry.getActiveMigrationNss(); +boost::optional<NamespaceString> ShardingState::getActiveDonateChunkNss() { + return _activeMigrationsRegistry.getActiveDonateChunkNss(); } BSONObj ShardingState::getActiveMigrationStatusReport(OperationContext* txn) { diff --git a/src/mongo/db/s/sharding_state.h b/src/mongo/db/s/sharding_state.h index bd06604b79f..8e8f0c20074 100644 --- a/src/mongo/db/s/sharding_state.h +++ b/src/mongo/db/s/sharding_state.h @@ -185,7 +185,7 @@ public: * @return latestShardVersion the version that is now stored for this collection */ Status refreshMetadataNow(OperationContext* txn, - const std::string& ns, + const NamespaceString& nss, ChunkVersion* latestShardVersion); void appendInfo(OperationContext* txn, BSONObjBuilder& b); @@ -203,24 +203,24 @@ public: /** * If there are no migrations running on this shard, registers an active migration with the - * specified arguments and returns a ScopedRegisterMigration, which must be signaled by the + * specified arguments and returns a ScopedRegisterDonateChunk, which must be signaled by the * caller before it goes out of scope. * * If there is an active migration already running on this shard and it has the exact same - * arguments, returns a ScopedRegisterMigration, which can be used to join the existing one. + * arguments, returns a ScopedRegisterDonateChunk, which can be used to join the existing one. * * Othwerwise returns a ConflictingOperationInProgress error. */ - StatusWith<ScopedRegisterMigration> registerMigration(const MoveChunkRequest& args); + StatusWith<ScopedRegisterDonateChunk> registerDonateChunk(const MoveChunkRequest& args); /** - * If a migration has been previously registered through a call to registerMigration returns + * If a migration has been previously registered through a call to registerDonateChunk returns * that namespace. Otherwise returns boost::none. * * This method can be called without any locks, but once the namespace is fetched it needs to be * re-checked after acquiring some intent lock on that namespace. */ - boost::optional<NamespaceString> getActiveMigrationNss(); + boost::optional<NamespaceString> getActiveDonateChunkNss(); /** * Get a migration status report from the migration registry. If no migration is active, this @@ -278,8 +278,6 @@ public: } private: - friend class ScopedRegisterMigration; - // Map from a namespace into the sharding state for each collection we have typedef stdx::unordered_map<std::string, std::unique_ptr<CollectionShardingState>> CollectionShardingStateMap; diff --git a/src/mongo/db/s/split_chunk_command.cpp b/src/mongo/db/s/split_chunk_command.cpp index c5719f9c7e1..d50a1680cd4 100644 --- a/src/mongo/db/s/split_chunk_command.cpp +++ b/src/mongo/db/s/split_chunk_command.cpp @@ -267,7 +267,7 @@ public: // Always check our version remotely ChunkVersion shardVersion; - Status refreshStatus = shardingState->refreshMetadataNow(txn, nss.ns(), &shardVersion); + Status refreshStatus = shardingState->refreshMetadataNow(txn, nss, &shardVersion); if (!refreshStatus.isOK()) { errmsg = str::stream() << "splitChunk cannot split chunk " @@ -364,8 +364,7 @@ public: // { ChunkVersion shardVersionAfterSplit; - refreshStatus = - shardingState->refreshMetadataNow(txn, nss.ns(), &shardVersionAfterSplit); + refreshStatus = shardingState->refreshMetadataNow(txn, nss, &shardVersionAfterSplit); if (!refreshStatus.isOK()) { errmsg = str::stream() << "failed to refresh metadata for split chunk [" |