diff options
Diffstat (limited to 'src/mongo/db/s/migration_destination_manager.cpp')
-rw-r--r-- | src/mongo/db/s/migration_destination_manager.cpp | 216 |
1 files changed, 83 insertions, 133 deletions
diff --git a/src/mongo/db/s/migration_destination_manager.cpp b/src/mongo/db/s/migration_destination_manager.cpp index dcb35a57af5..f1e0ffd925b 100644 --- a/src/mongo/db/s/migration_destination_manager.cpp +++ b/src/mongo/db/s/migration_destination_manager.cpp @@ -51,6 +51,7 @@ #include "mongo/db/repl/repl_client_info.h" #include "mongo/db/repl/replication_coordinator_global.h" #include "mongo/db/s/collection_metadata.h" +#include "mongo/db/s/collection_range_deleter.h" #include "mongo/db/s/collection_sharding_state.h" #include "mongo/db/s/migration_util.h" #include "mongo/db/s/move_timing_helper.h" @@ -59,6 +60,7 @@ #include "mongo/s/catalog/type_chunk.h" #include "mongo/s/shard_key_pattern.h" #include "mongo/stdx/chrono.h" +#include "mongo/util/concurrency/notification.h" #include "mongo/util/fail_point_service.h" #include "mongo/util/log.h" #include "mongo/util/mongoutils/str.h" @@ -216,6 +218,24 @@ void MigrationDestinationManager::setState(State newState) { _state = newState; } +void MigrationDestinationManager::setStateFail(std::string msg) { + log() << msg; + { + stdx::lock_guard<stdx::mutex> sl(_mutex); + _errmsg = std::move(msg); + _state = FAIL; + } +} + +void MigrationDestinationManager::setStateFailWarn(std::string msg) { + warning() << msg; + { + stdx::lock_guard<stdx::mutex> sl(_mutex); + _errmsg = std::move(msg); + _state = FAIL; + } +} + bool MigrationDestinationManager::isActive() const { stdx::lock_guard<stdx::mutex> lk(_mutex); return _isActive_inlock(); @@ -301,7 +321,6 @@ Status MigrationDestinationManager::start(const NamespaceString& nss, _sessionId = sessionId; _scopedRegisterReceiveChunk = std::move(scopedRegisterReceiveChunk); - // TODO: If we are here, the migrate thread must have completed, otherwise _active above // would be false, so this would never block. There is no better place with the current // implementation where to join the thread. @@ -378,8 +397,9 @@ Status MigrationDestinationManager::startCommit(const MigrationSessionId& sessio while (_sessionId) { if (stdx::cv_status::timeout == _isActiveCV.wait_until(lock, deadline.toSystemTimePoint())) { + _errmsg = str::stream() << "startCommit timed out waiting, " << _sessionId->toString(); _state = FAIL; - return {ErrorCodes::CommandFailed, "startCommit timed out waiting "}; + return {ErrorCodes::CommandFailed, _errmsg}; } } if (_state != DONE) { @@ -405,29 +425,13 @@ void MigrationDestinationManager::_migrateThread(BSONObj min, _migrateDriver( opCtx.get(), min, max, shardKeyPattern, fromShardConnString, epoch, writeConcern); } catch (std::exception& e) { - { - stdx::lock_guard<stdx::mutex> sl(_mutex); - _state = FAIL; - _errmsg = e.what(); - } - - log() << "migrate failed: " << redact(e.what()); + setStateFail(str::stream() << "migrate failed: " << redact(e.what())); } catch (...) { - { - stdx::lock_guard<stdx::mutex> sl(_mutex); - _state = FAIL; - _errmsg = "UNKNOWN ERROR"; - } - - log() << "migrate failed with unknown exception"; + setStateFail("migrate failed with unknown exception: UNKNOWN ERROR"); } if (getState() != DONE) { - // Unprotect the range if needed/possible on unsuccessful TO migration - Status status = _forgetPending(opCtx.get(), _nss, min, max, epoch); - if (!status.isOK()) { - warning() << "Failed to remove pending range" << redact(causedBy(status)); - } + _forgetPending(opCtx.get(), _nss, epoch, ChunkRange(min, max)); } stdx::lock_guard<stdx::mutex> lk(_mutex); @@ -492,10 +496,8 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx, OldClientWriteContext ctx(opCtx, _nss.ns()); if (!repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(opCtx, _nss)) { - _errmsg = str::stream() << "Not primary during migration: " << _nss.ns() - << ": checking if collection exists"; - warning() << _errmsg; - setState(FAIL); + setStateFailWarn(str::stream() << "Not primary during migration: " << _nss.ns() + << ": checking if collection exists"); return; } @@ -539,18 +541,14 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx, OldClientContext ctx(opCtx, _nss.ns()); if (!repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(opCtx, _nss)) { - _errmsg = str::stream() << "Not primary during migration: " << _nss.ns(); - warning() << _errmsg; - setState(FAIL); + setStateFailWarn(str::stream() << "Not primary during migration: " << _nss.ns()); return; } Database* db = ctx.db(); Collection* collection = db->getCollection(opCtx, _nss); if (!collection) { - _errmsg = str::stream() << "collection dropped during migration: " << _nss.ns(); - warning() << _errmsg; - setState(FAIL); + setStateFailWarn(str::stream() << "collection dropped during migration: " << _nss.ns()); return; } @@ -560,30 +558,27 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx, if (!indexSpecs.empty()) { // Only copy indexes if the collection does not have any documents. if (collection->numRecords(opCtx) > 0) { - _errmsg = str::stream() << "aborting migration, shard is missing " - << indexSpecs.size() << " indexes and " - << "collection is not empty. Non-trivial " - << "index creation should be scheduled manually"; - warning() << _errmsg; - setState(FAIL); + setStateFailWarn(str::stream() << "aborting migration, shard is missing " + << indexSpecs.size() + << " indexes and " + << "collection is not empty. Non-trivial " + << "index creation should be scheduled manually"); return; } auto indexInfoObjs = indexer.init(indexSpecs); if (!indexInfoObjs.isOK()) { - _errmsg = str::stream() << "failed to create index before migrating data. " - << " error: " << redact(indexInfoObjs.getStatus()); - warning() << _errmsg; - setState(FAIL); + setStateFailWarn(str::stream() << "failed to create index before migrating data. " + << " error: " + << redact(indexInfoObjs.getStatus())); return; } auto status = indexer.insertAllDocumentsInCollection(); if (!status.isOK()) { - _errmsg = str::stream() << "failed to create index before migrating data. " - << " error: " << redact(status); - warning() << _errmsg; - setState(FAIL); + setStateFailWarn(str::stream() << "failed to create index before migrating data. " + << " error: " + << redact(status)); return; } @@ -604,30 +599,21 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx, } { - // 2. Synchronously delete any data which might have been left orphaned in range - // being moved - - RangeDeleterOptions deleterOptions( - KeyRange(_nss.ns(), min.getOwned(), max.getOwned(), shardKeyPattern)); - deleterOptions.writeConcern = writeConcern; - - // No need to wait since all existing cursors will filter out this range when returning - // the results - deleterOptions.waitForOpenCursors = false; - deleterOptions.fromMigrate = true; - deleterOptions.onlyRemoveOrphanedDocs = true; - deleterOptions.removeSaverReason = "preCleanup"; - - if (!getDeleter()->deleteNow(opCtx, deleterOptions, &_errmsg)) { - warning() << "Failed to queue delete for migrate abort: " << redact(_errmsg); - setState(FAIL); + // 2. Synchronously delete any data which might have been left orphaned in the range + // being moved, and wait for completion + + auto footprint = ChunkRange(min, max); + Status status = _notePending(opCtx, _nss, epoch, footprint); + if (!status.isOK()) { + setStateFail(status.reason()); return; } - Status status = _notePending(opCtx, _nss, min, max, epoch); + _chunkMarkedPending = true; // no lock needed, only the migrate thread looks. + + status = CollectionShardingState::waitForClean(opCtx, _nss, epoch, footprint); if (!status.isOK()) { - _errmsg = status.reason(); - setState(FAIL); + setStateFail(status.reason()); return; } @@ -646,10 +632,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx, if (!conn->runCommand("admin", migrateCloneRequest, res)) { // gets array of objects to copy, in disk order - setState(FAIL); - _errmsg = "_migrateClone failed: "; - _errmsg += redact(res.toString()); - log() << _errmsg; + setStateFail(str::stream() << "_migrateClone failed: " << redact(res.toString())); conn.done(); return; } @@ -736,10 +719,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx, while (true) { BSONObj res; if (!conn->runCommand("admin", xferModsRequest, res)) { - setState(FAIL); - _errmsg = "_transferMods failed: "; - _errmsg += redact(res); - log() << "_transferMods failed: " << redact(res); + setStateFail(str::stream() << "_transferMods failed: " << redact(res)); conn.done(); return; } @@ -772,10 +752,8 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx, } if (i == maxIterations) { - _errmsg = "secondary can't keep up with migrate"; - log() << _errmsg; + setStateFail("secondary can't keep up with migrate"); conn.done(); - setState(FAIL); return; } } @@ -806,9 +784,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx, } if (t.minutes() >= 600) { - setState(FAIL); - _errmsg = "Cannot go to critical section because secondaries cannot keep up"; - log() << _errmsg; + setStateFail("Cannot go to critical section because secondaries cannot keep up"); return; } } @@ -831,9 +807,8 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx, BSONObj res; if (!conn->runCommand("admin", xferModsRequest, res)) { - log() << "_transferMods failed in STEADY state: " << redact(res); - _errmsg = res.toString(); - setState(FAIL); + setStateFail(str::stream() << "_transferMods failed in STEADY state: " + << redact(res)); conn.done(); return; } @@ -863,8 +838,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx, } if (getState() == FAIL) { - _errmsg = "timed out waiting for commit"; - log() << _errmsg; + setStateFail("timed out waiting for commit"); return; } @@ -992,12 +966,10 @@ bool MigrationDestinationManager::_flushPendingWrites(OperationContext* opCtx, } Status MigrationDestinationManager::_notePending(OperationContext* opCtx, - const NamespaceString& nss, - const BSONObj& min, - const BSONObj& max, - const OID& epoch) { + NamespaceString const& nss, + OID const& epoch, + ChunkRange const& range) { AutoGetCollection autoColl(opCtx, nss, MODE_IX, MODE_X); - auto css = CollectionShardingState::get(opCtx, nss); auto metadata = css->getMetadata(); @@ -1005,66 +977,44 @@ Status MigrationDestinationManager::_notePending(OperationContext* opCtx, // for checking this here is that in the future we shouldn't have this problem. if (!metadata || metadata->getCollVersion().epoch() != epoch) { return {ErrorCodes::StaleShardVersion, - str::stream() << "could not note chunk [" << min << "," << max << ")" - << " as pending because the epoch for " + str::stream() << "not noting chunk " << redact(range.toString()) + << " as pending because the epoch of " << nss.ns() - << " has changed from " - << epoch - << " to " - << (metadata ? metadata->getCollVersion().epoch() - : ChunkVersion::UNSHARDED().epoch())}; + << " changed"}; } - css->beginReceive(ChunkRange(min, max)); - - stdx::lock_guard<stdx::mutex> sl(_mutex); - invariant(!_chunkMarkedPending); - _chunkMarkedPending = true; - + // start clearing any leftovers that would be in the new chunk + if (!css->beginReceive(range)) { + return {ErrorCodes::RangeOverlapConflict, + str::stream() << "Collection " << nss.ns() << " range " << redact(range.toString()) + << " migration aborted; documents in range may still be in use on the" + " destination shard."}; + } return Status::OK(); } -Status MigrationDestinationManager::_forgetPending(OperationContext* opCtx, - const NamespaceString& nss, - const BSONObj& min, - const BSONObj& max, - const OID& epoch) { - { - stdx::lock_guard<stdx::mutex> sl(_mutex); - if (!_chunkMarkedPending) { - return Status::OK(); - } +void MigrationDestinationManager::_forgetPending(OperationContext* opCtx, + const NamespaceString& nss, + OID const& epoch, + ChunkRange const& range) { - _chunkMarkedPending = false; + if (!_chunkMarkedPending) { // (no lock needed, only the migrate thread looks at this.) + return; // no documents can have been moved in, so there is nothing to clean up. } AutoGetCollection autoColl(opCtx, nss, MODE_IX, MODE_X); - auto css = CollectionShardingState::get(opCtx, nss); auto metadata = css->getMetadata(); - // This can currently happen because drops aren't synchronized with in-migrations. The idea - // for checking this here is that in the future we shouldn't have this problem. + // This can currently happen because drops aren't synchronized with in-migrations. The idea for + // checking this here is that in the future we shouldn't have this problem. if (!metadata || metadata->getCollVersion().epoch() != epoch) { - return {ErrorCodes::StaleShardVersion, - str::stream() << "no need to forget pending chunk " - << "[" - << min - << "," - << max - << ")" - << " because the epoch for " - << nss.ns() - << " has changed from " - << epoch - << " to " - << (metadata ? metadata->getCollVersion().epoch() - : ChunkVersion::UNSHARDED().epoch())}; + log() << "no need to forget pending chunk " << redact(range.toString()) + << " because the epoch for " << nss.ns() << " changed"; + return; } - css->forgetReceive(ChunkRange(min, max)); - - return Status::OK(); + css->forgetReceive(range); } } // namespace mongo |