diff options
Diffstat (limited to 'src/mongo/db/s/migration_destination_manager.cpp')
-rw-r--r-- | src/mongo/db/s/migration_destination_manager.cpp | 92 |
1 files changed, 69 insertions, 23 deletions
diff --git a/src/mongo/db/s/migration_destination_manager.cpp b/src/mongo/db/s/migration_destination_manager.cpp index 27e767cd6a9..38d563023d5 100644 --- a/src/mongo/db/s/migration_destination_manager.cpp +++ b/src/mongo/db/s/migration_destination_manager.cpp @@ -161,6 +161,32 @@ bool opReplicatedEnough(OperationContext* txn, return majorityStatus.isOK() && userStatus.isOK(); } +/** + * Create the migration clone request BSON object to send to the source + * shard. + * + * 'sessionId' unique identifier for this migration. + */ +BSONObj createMigrateCloneRequest(const MigrationSessionId& sessionId) { + BSONObjBuilder builder; + builder.append("_migrateClone", 1); + sessionId.append(&builder); + return builder.obj(); +} + +/** + * Create the migration transfer mods request BSON object to send to the source + * shard. + * + * 'sessionId' unique identifier for this migration. + */ +BSONObj createTransferModsRequest(const MigrationSessionId& sessionId) { + BSONObjBuilder builder; + builder.append("_transferMods", 1); + sessionId.append(&builder); + return builder.obj(); +} + MONGO_FP_DECLARE(failMigrationReceivedOutOfRangeDelete); } // namespace @@ -174,13 +200,7 @@ MONGO_FP_DECLARE(migrateThreadHangAtStep4); MONGO_FP_DECLARE(migrateThreadHangAtStep5); -MigrationDestinationManager::MigrationDestinationManager() - : _active(false), - _numCloned(0), - _clonedBytes(0), - _numCatchup(0), - _numSteady(0), - _state(READY) {} +MigrationDestinationManager::MigrationDestinationManager() = default; MigrationDestinationManager::~MigrationDestinationManager() = default; @@ -194,15 +214,19 @@ void MigrationDestinationManager::setState(State newState) { _state = newState; } -bool MigrationDestinationManager::getActive() const { +bool MigrationDestinationManager::isActive() const { stdx::lock_guard<stdx::mutex> lk(_mutex); - return _active; + return _sessionId.is_initialized(); } void MigrationDestinationManager::report(BSONObjBuilder& b) { stdx::lock_guard<stdx::mutex> sl(_mutex); - b.appendBool("active", _active); + b.appendBool("active", _sessionId.is_initialized()); + + if (_sessionId) { + b.append("sessionId", _sessionId->toString()); + } b.append("ns", _ns); b.append("from", _from); @@ -225,6 +249,7 @@ void MigrationDestinationManager::report(BSONObjBuilder& b) { } Status MigrationDestinationManager::start(const string& ns, + const MigrationSessionId& sessionId, const string& fromShard, const BSONObj& min, const BSONObj& max, @@ -233,7 +258,7 @@ Status MigrationDestinationManager::start(const string& ns, const WriteConcernOptions& writeConcern) { stdx::lock_guard<stdx::mutex> lk(_mutex); - if (_active) { + if (_sessionId) { return Status(ErrorCodes::ConflictingOperationInProgress, str::stream() << "Active migration already in progress " << "ns: " << _ns << ", from: " << _from << ", min: " << _min @@ -254,7 +279,7 @@ Status MigrationDestinationManager::start(const string& ns, _numCatchup = 0; _numSteady = 0; - _active = true; + _sessionId = sessionId; // TODO: If we are here, the migrate thread must have completed, otherwise _active above would // be false, so this would never block. There is no better place with the current implementation @@ -263,9 +288,10 @@ Status MigrationDestinationManager::start(const string& ns, _migrateThreadHandle.join(); } - _migrateThreadHandle = - stdx::thread([this, ns, min, max, shardKeyPattern, fromShard, epoch, writeConcern]() { - _migrateThread(ns, min, max, shardKeyPattern, fromShard, epoch, writeConcern); + _migrateThreadHandle = stdx::thread( + [this, ns, sessionId, min, max, shardKeyPattern, fromShard, epoch, writeConcern]() { + _migrateThread( + ns, sessionId, min, max, shardKeyPattern, fromShard, epoch, writeConcern); }); return Status::OK(); @@ -277,18 +303,31 @@ void MigrationDestinationManager::abort() { _errmsg = "aborted"; } -bool MigrationDestinationManager::startCommit() { +bool MigrationDestinationManager::startCommit(const MigrationSessionId& sessionId) { stdx::unique_lock<stdx::mutex> lock(_mutex); if (_state != STEADY) { return false; } + // In STEADY state we must have active migration + invariant(_sessionId); + + // This check guards against the (very unlikely) situation where the current donor shard has + // been stalled for some time, during which the recipient shard crashed or timed out and started + // serving as a recipient of chunks for another collection (note that it cannot be the same + // collection, because the old donor still holds the collection lock). + if (!_sessionId->matches(sessionId)) { + warning() << "startCommit received commit request from a stale session " << sessionId.toString() + << ". Current session is " << _sessionId->toString(); + return false; + } + _state = COMMIT_START; const auto deadline = stdx::chrono::system_clock::now() + Seconds(30); - while (_active) { + while (_sessionId) { if (stdx::cv_status::timeout == _isActiveCV.wait_until(lock, deadline)) { _state = FAIL; log() << "startCommit never finished!" << migrateLog; @@ -305,6 +344,7 @@ bool MigrationDestinationManager::startCommit() { } void MigrationDestinationManager::_migrateThread(std::string ns, + MigrationSessionId sessionId, BSONObj min, BSONObj max, BSONObj shardKeyPattern, @@ -321,7 +361,8 @@ void MigrationDestinationManager::_migrateThread(std::string ns, } try { - _migrateDriver(&txn, ns, min, max, shardKeyPattern, fromShard, epoch, writeConcern); + _migrateDriver( + &txn, ns, sessionId, min, max, shardKeyPattern, fromShard, epoch, writeConcern); } catch (std::exception& e) { { stdx::lock_guard<stdx::mutex> sl(_mutex); @@ -353,19 +394,20 @@ void MigrationDestinationManager::_migrateThread(std::string ns, } stdx::lock_guard<stdx::mutex> lk(_mutex); - _active = false; + _sessionId = boost::none; _isActiveCV.notify_all(); } void MigrationDestinationManager::_migrateDriver(OperationContext* txn, const string& ns, + const MigrationSessionId& sessionId, const BSONObj& min, const BSONObj& max, const BSONObj& shardKeyPattern, const std::string& fromShard, const OID& epoch, const WriteConcernOptions& writeConcern) { - invariant(getActive()); + invariant(isActive()); invariant(getState() == READY); invariant(!min.isEmpty()); invariant(!max.isEmpty()); @@ -563,10 +605,12 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* txn, // 3. Initial bulk clone setState(CLONE); + const BSONObj migrateCloneRequest = createMigrateCloneRequest(sessionId); + while (true) { BSONObj res; if (!conn->runCommand("admin", - BSON("_migrateClone" << 1), + migrateCloneRequest, res)) { // gets array of objects to copy, in disk order setState(FAIL); errmsg = "_migrateClone failed: "; @@ -645,13 +689,15 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* txn, // secondaries repl::OpTime lastOpApplied = repl::ReplClientInfo::forClient(txn->getClient()).getLastOp(); + const BSONObj xferModsRequest = createTransferModsRequest(sessionId); + { // 4. Do bulk of mods setState(CATCHUP); while (true) { BSONObj res; - if (!conn->runCommand("admin", BSON("_transferMods" << 1), res)) { + if (!conn->runCommand("admin", xferModsRequest, res)) { setState(FAIL); errmsg = "_transferMods failed: "; errmsg += res.toString(); @@ -752,7 +798,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* txn, } BSONObj res; - if (!conn->runCommand("admin", BSON("_transferMods" << 1), res)) { + if (!conn->runCommand("admin", xferModsRequest, res)) { log() << "_transferMods failed in STEADY state: " << res << migrateLog; errmsg = res.toString(); setState(FAIL); |