diff options
-rw-r--r-- | src/mongo/db/s/migration_destination_manager.cpp | 102 | ||||
-rw-r--r-- | src/mongo/db/s/migration_destination_manager.h | 22 |
2 files changed, 62 insertions, 62 deletions
diff --git a/src/mongo/db/s/migration_destination_manager.cpp b/src/mongo/db/s/migration_destination_manager.cpp index 45cec7065e3..f1cb2c28c9f 100644 --- a/src/mongo/db/s/migration_destination_manager.cpp +++ b/src/mongo/db/s/migration_destination_manager.cpp @@ -152,25 +152,25 @@ void runWithoutSession(OperationContext* opCtx, Callable&& callable) { */ std::string stateToString(MigrationDestinationManager::State state) { switch (state) { - case MigrationDestinationManager::READY: + case MigrationDestinationManager::kReady: return "ready"; - case MigrationDestinationManager::CLONE: + case MigrationDestinationManager::kClone: return "clone"; - case MigrationDestinationManager::CATCHUP: + case MigrationDestinationManager::kCatchup: return "catchup"; - case MigrationDestinationManager::STEADY: + case MigrationDestinationManager::kSteady: return "steady"; - case MigrationDestinationManager::COMMIT_START: + case MigrationDestinationManager::kCommitStart: return "commitStart"; - case MigrationDestinationManager::ENTERED_CRIT_SEC: + case MigrationDestinationManager::kEnteredCritSec: return "enteredCriticalSection"; - case MigrationDestinationManager::EXIT_CRIT_SEC: + case MigrationDestinationManager::kExitCritSec: return "exitCriticalSection"; - case MigrationDestinationManager::DONE: + case MigrationDestinationManager::kDone: return "done"; - case MigrationDestinationManager::FAIL: + case MigrationDestinationManager::kFail: return "fail"; - case MigrationDestinationManager::ABORT: + case MigrationDestinationManager::kAbort: return "abort"; default: MONGO_UNREACHABLE; @@ -328,7 +328,7 @@ void MigrationDestinationManager::_setStateFail(StringData msg) { { stdx::lock_guard<Latch> sl(_mutex); _errmsg = msg.toString(); - _state = FAIL; + _state = kFail; _stateChangedCV.notify_all(); } @@ -345,7 +345,7 @@ void MigrationDestinationManager::_setStateFailWarn(StringData msg) { { stdx::lock_guard<Latch> sl(_mutex); _errmsg = msg.toString(); - _state = FAIL; + _state = kFail; _stateChangedCV.notify_all(); } @@ -370,7 +370,7 @@ void MigrationDestinationManager::report(BSONObjBuilder& b, stdx::unique_lock<Latch> lock(_mutex); try { opCtx->waitForConditionOrInterruptFor(_stateChangedCV, lock, Seconds(1), [&]() -> bool { - return _state != READY && _state != CLONE && _state != CATCHUP; + return _state != kReady && _state != kClone && _state != kCatchup; }); } catch (...) { // Ignoring this error because this is an optional parameter and we catch timeout @@ -396,7 +396,7 @@ void MigrationDestinationManager::report(BSONObjBuilder& b, b.append("state", stateToString(_state)); - if (_state == FAIL) { + if (_state == kFail) { invariant(!_errmsg.empty()); b.append("errmsg", _errmsg); } @@ -429,7 +429,7 @@ Status MigrationDestinationManager::start(OperationContext* opCtx, invariant(!_sessionId); invariant(!_scopedReceiveChunk); - _state = READY; + _state = kReady; _stateChangedCV.notify_all(); _errmsg = ""; @@ -491,7 +491,7 @@ Status MigrationDestinationManager::restoreRecoveredMigrationState( _max = recoveryDoc.getRange().getMax(); _lsid = recoveryDoc.getLsid(); _txnNumber = recoveryDoc.getTxnNumber(); - _state = COMMIT_START; + _state = kCommitStart; _acquireCSOnRecipient = true; LOGV2(6064500, "Recovering migration recipient", "sessionId"_attr = *_sessionId); @@ -504,7 +504,7 @@ Status MigrationDestinationManager::restoreRecoveredMigrationState( "Reacquired migration recipient critical section", "sessionId"_attr = *_sessionId); - _state = ENTERED_CRIT_SEC; + _state = kEnteredCritSec; if (_migrateThreadHandle.joinable()) { _migrateThreadHandle.join(); @@ -605,7 +605,7 @@ Status MigrationDestinationManager::abort(const MigrationSessionId& sessionId) { << _sessionId->toString()}; } - _state = ABORT; + _state = kAbort; _stateChangedCV.notify_all(); _errmsg = "aborted"; @@ -614,7 +614,7 @@ Status MigrationDestinationManager::abort(const MigrationSessionId& sessionId) { void MigrationDestinationManager::abortWithoutSessionIdCheck() { stdx::lock_guard<Latch> sl(_mutex); - _state = ABORT; + _state = kAbort; _stateChangedCV.notify_all(); _errmsg = "aborted without session id check"; } @@ -633,7 +633,7 @@ Status MigrationDestinationManager::startCommit(const MigrationSessionId& sessio // the last batch of mods sent in the catch up phase. Allow some time for synching up. auto deadline = Date_t::now() + convergenceTimeout; - while (_state == CATCHUP) { + while (_state == kCatchup) { if (stdx::cv_status::timeout == _stateChangedCV.wait_until(lock, deadline.toSystemTimePoint())) { return {ErrorCodes::CommandFailed, @@ -644,7 +644,7 @@ Status MigrationDestinationManager::startCommit(const MigrationSessionId& sessio } } - if (_state != STEADY) { + if (_state != kSteady) { return {ErrorCodes::CommandFailed, str::stream() << "Migration startCommit attempted when not in STEADY state." << " Sender's session is " << sessionId.toString() @@ -666,7 +666,7 @@ Status MigrationDestinationManager::startCommit(const MigrationSessionId& sessio } _sessionMigration->finish(); - _state = COMMIT_START; + _state = kCommitStart; _stateChangedCV.notify_all(); // Assigning a timeout slightly higher than the one used for network requests to the config @@ -678,26 +678,26 @@ Status MigrationDestinationManager::startCommit(const MigrationSessionId& sessio _isActiveCV.wait_until(lock, deadline.toSystemTimePoint())) { _errmsg = str::stream() << "startCommit timed out waiting, " << _sessionId->toString(); - _state = FAIL; + _state = kFail; _stateChangedCV.notify_all(); return {ErrorCodes::CommandFailed, _errmsg}; } } - if (_state != DONE) { + if (_state != kDone) { return {ErrorCodes::CommandFailed, "startCommit failed, final data failed to transfer"}; } } else { - while (_state == COMMIT_START) { + while (_state == kCommitStart) { if (stdx::cv_status::timeout == _stateChangedCV.wait_until(lock, deadline.toSystemTimePoint())) { _errmsg = str::stream() << "startCommit timed out waiting, " << _sessionId->toString(); - _state = FAIL; + _state = kFail; _stateChangedCV.notify_all(); return {ErrorCodes::CommandFailed, _errmsg}; } } - if (_state != ENTERED_CRIT_SEC) { + if (_state != kEnteredCritSec) { return {ErrorCodes::CommandFailed, "startCommit failed, final data failed to transfer or failed to enter critical " "section"}; @@ -731,21 +731,21 @@ Status MigrationDestinationManager::exitCriticalSection(OperationContext* opCtx, return Status::OK(); } - if (_state < ENTERED_CRIT_SEC) { + if (_state < kEnteredCritSec) { return {ErrorCodes::CommandFailed, "recipient critical section has not yet been entered"}; } // If the thread is waiting to be signaled to release the CS, signal it by transitioning // _state to EXIT_CRIT_SEC - if (_state == ENTERED_CRIT_SEC) { - _state = EXIT_CRIT_SEC; + if (_state == kEnteredCritSec) { + _state = kExitCritSec; _stateChangedCV.notify_all(); } // Wait for the thread to finish opCtx->waitForConditionOrInterrupt(_isActiveCV, lock, [&]() { return !_sessionId; }); - if (_state != DONE) { + if (_state != kDone) { return {ErrorCodes::CommandFailed, "exitCriticalSection failed"}; } @@ -1113,14 +1113,14 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* outerOpCtx, const auto initialState = getState(); - if (initialState == ABORT) { + if (initialState == kAbort) { LOGV2_ERROR(22013, "Migration abort requested before the migration started", "migrationId"_attr = _migrationId->toBSON()); return; } - invariant(initialState == READY); + invariant(initialState == kReady); auto donorCollectionOptionsAndIndexes = [&]() -> CollectionOptionsAndIndexes { auto [collOptions, uuid] = @@ -1263,7 +1263,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* outerOpCtx, repl::OpTime lastOpApplied; { // 4. Initial bulk clone - _setState(CLONE); + _setState(kClone); _sessionMigration->start(opCtx->getServiceContext()); @@ -1274,7 +1274,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* outerOpCtx, auto assertNotAborted = [&](OperationContext* opCtx) { opCtx->checkForInterrupt(); outerOpCtx->checkForInterrupt(); - uassert(50748, "Migration aborted while copying documents", getState() != ABORT); + uassert(50748, "Migration aborted while copying documents", getState() != kAbort); }; auto insertBatchFn = [&](OperationContext* opCtx, BSONObj nextBatch) { @@ -1384,7 +1384,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* outerOpCtx, { // 5. Do bulk of mods - _setState(CATCHUP); + _setState(kCatchup); auto fetchBatchFn = [&](OperationContext* opCtx, BSONObj* nextBatch) { auto commandResponse = uassertStatusOKWithContext( @@ -1425,7 +1425,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* outerOpCtx, str::stream() << "Migration aborted while waiting for replication at catch up stage, " << _migrationId->toBSON(), - getState() != ABORT); + getState() != kAbort); if (runWithoutSession(outerOpCtx, [&] { return opReplicatedEnough(opCtx, lastOpApplied, _writeConcern); @@ -1489,10 +1489,10 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* outerOpCtx, { // 6. Wait for commit - _setState(STEADY); + _setState(kSteady); bool transferAfterCommit = false; - while (getState() == STEADY || getState() == COMMIT_START) { + while (getState() == kSteady || getState() == kCommitStart) { opCtx->checkForInterrupt(); outerOpCtx->checkForInterrupt(); @@ -1500,7 +1500,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* outerOpCtx, // aren't sure that at least one transfer happens *after* our state changes to // COMMIT_START, there could be mods still on the FROM shard that got logged // *after* our _transferMods but *before* the critical section. - if (getState() == COMMIT_START) { + if (getState() == kCommitStart) { transferAfterCommit = true; } @@ -1522,7 +1522,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* outerOpCtx, continue; } - if (getState() == ABORT) { + if (getState() == kAbort) { LOGV2(22006, "Migration aborted while transferring mods", "migrationId"_attr = _migrationId->toBSON()); @@ -1532,7 +1532,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* outerOpCtx, // We know we're finished when: // 1) The from side has told us that it has locked writes (COMMIT_START) // 2) We've checked at least one more time for un-transmitted mods - if (getState() == COMMIT_START && transferAfterCommit == true) { + if (getState() == kCommitStart && transferAfterCommit == true) { if (runWithoutSession(outerOpCtx, [&] { return _flushPendingWrites(opCtx, lastOpApplied); })) { @@ -1541,11 +1541,11 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* outerOpCtx, } // Only sleep if we aren't committing - if (getState() == STEADY) + if (getState() == kSteady) sleepmillis(10); } - if (getState() == FAIL || getState() == ABORT) { + if (getState() == kFail || getState() == kAbort) { _setStateFail("timed out waiting for commit"); return; } @@ -1589,15 +1589,15 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* outerOpCtx, timeInCriticalSection.emplace(); }); - if (getState() == FAIL || getState() == ABORT) { + if (getState() == kFail || getState() == kAbort) { _setStateFail("timed out waiting for critical section acquisition"); } { // Make sure we don't overwrite a FAIL or ABORT state. stdx::lock_guard<Latch> sl(_mutex); - if (_state != FAIL && _state != ABORT) { - _state = ENTERED_CRIT_SEC; + if (_state != kFail && _state != kAbort) { + _state = kEnteredCritSec; _stateChangedCV.notify_all(); } } @@ -1629,7 +1629,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* outerOpCtx, }); } - _setState(DONE); + _setState(kDone); if (timing) { timing->done(8); @@ -1786,13 +1786,13 @@ void MigrationDestinationManager::awaitCriticalSectionReleaseSignalAndCompleteMi { stdx::unique_lock<Latch> lock(_mutex); opCtx->waitForConditionOrInterrupt( - _stateChangedCV, lock, [&]() { return _state != ENTERED_CRIT_SEC; }); + _stateChangedCV, lock, [&]() { return _state != kEnteredCritSec; }); } - invariant(_state == EXIT_CRIT_SEC || _state == FAIL || _state == ABORT); + invariant(_state == kExitCritSec || _state == kFail || _state == kAbort); // Refresh the filtering metadata - if (_state == EXIT_CRIT_SEC) { + if (_state == kExitCritSec) { LOGV2_DEBUG(5899112, 3, "Refreshing filtering metadata before exiting critical section"); try { diff --git a/src/mongo/db/s/migration_destination_manager.h b/src/mongo/db/s/migration_destination_manager.h index c9584279429..cf5bf351f83 100644 --- a/src/mongo/db/s/migration_destination_manager.h +++ b/src/mongo/db/s/migration_destination_manager.h @@ -76,16 +76,16 @@ class MigrationDestinationManager { public: enum State { - READY, - CLONE, - CATCHUP, - STEADY, - COMMIT_START, - ENTERED_CRIT_SEC, - EXIT_CRIT_SEC, - DONE, - FAIL, - ABORT + kReady, + kClone, + kCatchup, + kSteady, + kCommitStart, + kEnteredCritSec, + kExitCritSec, + kDone, + kFail, + kAbort }; MigrationDestinationManager(); @@ -293,7 +293,7 @@ private: long long _numCatchup{0}; long long _numSteady{0}; - State _state{READY}; + State _state{kReady}; std::string _errmsg; std::unique_ptr<SessionCatalogMigrationDestination> _sessionMigration; |