summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/mongo/db/s/migration_destination_manager.cpp102
-rw-r--r--src/mongo/db/s/migration_destination_manager.h22
2 files changed, 62 insertions, 62 deletions
diff --git a/src/mongo/db/s/migration_destination_manager.cpp b/src/mongo/db/s/migration_destination_manager.cpp
index 45cec7065e3..f1cb2c28c9f 100644
--- a/src/mongo/db/s/migration_destination_manager.cpp
+++ b/src/mongo/db/s/migration_destination_manager.cpp
@@ -152,25 +152,25 @@ void runWithoutSession(OperationContext* opCtx, Callable&& callable) {
*/
std::string stateToString(MigrationDestinationManager::State state) {
switch (state) {
- case MigrationDestinationManager::READY:
+ case MigrationDestinationManager::kReady:
return "ready";
- case MigrationDestinationManager::CLONE:
+ case MigrationDestinationManager::kClone:
return "clone";
- case MigrationDestinationManager::CATCHUP:
+ case MigrationDestinationManager::kCatchup:
return "catchup";
- case MigrationDestinationManager::STEADY:
+ case MigrationDestinationManager::kSteady:
return "steady";
- case MigrationDestinationManager::COMMIT_START:
+ case MigrationDestinationManager::kCommitStart:
return "commitStart";
- case MigrationDestinationManager::ENTERED_CRIT_SEC:
+ case MigrationDestinationManager::kEnteredCritSec:
return "enteredCriticalSection";
- case MigrationDestinationManager::EXIT_CRIT_SEC:
+ case MigrationDestinationManager::kExitCritSec:
return "exitCriticalSection";
- case MigrationDestinationManager::DONE:
+ case MigrationDestinationManager::kDone:
return "done";
- case MigrationDestinationManager::FAIL:
+ case MigrationDestinationManager::kFail:
return "fail";
- case MigrationDestinationManager::ABORT:
+ case MigrationDestinationManager::kAbort:
return "abort";
default:
MONGO_UNREACHABLE;
@@ -328,7 +328,7 @@ void MigrationDestinationManager::_setStateFail(StringData msg) {
{
stdx::lock_guard<Latch> sl(_mutex);
_errmsg = msg.toString();
- _state = FAIL;
+ _state = kFail;
_stateChangedCV.notify_all();
}
@@ -345,7 +345,7 @@ void MigrationDestinationManager::_setStateFailWarn(StringData msg) {
{
stdx::lock_guard<Latch> sl(_mutex);
_errmsg = msg.toString();
- _state = FAIL;
+ _state = kFail;
_stateChangedCV.notify_all();
}
@@ -370,7 +370,7 @@ void MigrationDestinationManager::report(BSONObjBuilder& b,
stdx::unique_lock<Latch> lock(_mutex);
try {
opCtx->waitForConditionOrInterruptFor(_stateChangedCV, lock, Seconds(1), [&]() -> bool {
- return _state != READY && _state != CLONE && _state != CATCHUP;
+ return _state != kReady && _state != kClone && _state != kCatchup;
});
} catch (...) {
// Ignoring this error because this is an optional parameter and we catch timeout
@@ -396,7 +396,7 @@ void MigrationDestinationManager::report(BSONObjBuilder& b,
b.append("state", stateToString(_state));
- if (_state == FAIL) {
+ if (_state == kFail) {
invariant(!_errmsg.empty());
b.append("errmsg", _errmsg);
}
@@ -429,7 +429,7 @@ Status MigrationDestinationManager::start(OperationContext* opCtx,
invariant(!_sessionId);
invariant(!_scopedReceiveChunk);
- _state = READY;
+ _state = kReady;
_stateChangedCV.notify_all();
_errmsg = "";
@@ -491,7 +491,7 @@ Status MigrationDestinationManager::restoreRecoveredMigrationState(
_max = recoveryDoc.getRange().getMax();
_lsid = recoveryDoc.getLsid();
_txnNumber = recoveryDoc.getTxnNumber();
- _state = COMMIT_START;
+ _state = kCommitStart;
_acquireCSOnRecipient = true;
LOGV2(6064500, "Recovering migration recipient", "sessionId"_attr = *_sessionId);
@@ -504,7 +504,7 @@ Status MigrationDestinationManager::restoreRecoveredMigrationState(
"Reacquired migration recipient critical section",
"sessionId"_attr = *_sessionId);
- _state = ENTERED_CRIT_SEC;
+ _state = kEnteredCritSec;
if (_migrateThreadHandle.joinable()) {
_migrateThreadHandle.join();
@@ -605,7 +605,7 @@ Status MigrationDestinationManager::abort(const MigrationSessionId& sessionId) {
<< _sessionId->toString()};
}
- _state = ABORT;
+ _state = kAbort;
_stateChangedCV.notify_all();
_errmsg = "aborted";
@@ -614,7 +614,7 @@ Status MigrationDestinationManager::abort(const MigrationSessionId& sessionId) {
void MigrationDestinationManager::abortWithoutSessionIdCheck() {
stdx::lock_guard<Latch> sl(_mutex);
- _state = ABORT;
+ _state = kAbort;
_stateChangedCV.notify_all();
_errmsg = "aborted without session id check";
}
@@ -633,7 +633,7 @@ Status MigrationDestinationManager::startCommit(const MigrationSessionId& sessio
// the last batch of mods sent in the catch up phase. Allow some time for synching up.
auto deadline = Date_t::now() + convergenceTimeout;
- while (_state == CATCHUP) {
+ while (_state == kCatchup) {
if (stdx::cv_status::timeout ==
_stateChangedCV.wait_until(lock, deadline.toSystemTimePoint())) {
return {ErrorCodes::CommandFailed,
@@ -644,7 +644,7 @@ Status MigrationDestinationManager::startCommit(const MigrationSessionId& sessio
}
}
- if (_state != STEADY) {
+ if (_state != kSteady) {
return {ErrorCodes::CommandFailed,
str::stream() << "Migration startCommit attempted when not in STEADY state."
<< " Sender's session is " << sessionId.toString()
@@ -666,7 +666,7 @@ Status MigrationDestinationManager::startCommit(const MigrationSessionId& sessio
}
_sessionMigration->finish();
- _state = COMMIT_START;
+ _state = kCommitStart;
_stateChangedCV.notify_all();
// Assigning a timeout slightly higher than the one used for network requests to the config
@@ -678,26 +678,26 @@ Status MigrationDestinationManager::startCommit(const MigrationSessionId& sessio
_isActiveCV.wait_until(lock, deadline.toSystemTimePoint())) {
_errmsg = str::stream()
<< "startCommit timed out waiting, " << _sessionId->toString();
- _state = FAIL;
+ _state = kFail;
_stateChangedCV.notify_all();
return {ErrorCodes::CommandFailed, _errmsg};
}
}
- if (_state != DONE) {
+ if (_state != kDone) {
return {ErrorCodes::CommandFailed, "startCommit failed, final data failed to transfer"};
}
} else {
- while (_state == COMMIT_START) {
+ while (_state == kCommitStart) {
if (stdx::cv_status::timeout ==
_stateChangedCV.wait_until(lock, deadline.toSystemTimePoint())) {
_errmsg = str::stream()
<< "startCommit timed out waiting, " << _sessionId->toString();
- _state = FAIL;
+ _state = kFail;
_stateChangedCV.notify_all();
return {ErrorCodes::CommandFailed, _errmsg};
}
}
- if (_state != ENTERED_CRIT_SEC) {
+ if (_state != kEnteredCritSec) {
return {ErrorCodes::CommandFailed,
"startCommit failed, final data failed to transfer or failed to enter critical "
"section"};
@@ -731,21 +731,21 @@ Status MigrationDestinationManager::exitCriticalSection(OperationContext* opCtx,
return Status::OK();
}
- if (_state < ENTERED_CRIT_SEC) {
+ if (_state < kEnteredCritSec) {
return {ErrorCodes::CommandFailed, "recipient critical section has not yet been entered"};
}
// If the thread is waiting to be signaled to release the CS, signal it by transitioning
// _state to EXIT_CRIT_SEC
- if (_state == ENTERED_CRIT_SEC) {
- _state = EXIT_CRIT_SEC;
+ if (_state == kEnteredCritSec) {
+ _state = kExitCritSec;
_stateChangedCV.notify_all();
}
// Wait for the thread to finish
opCtx->waitForConditionOrInterrupt(_isActiveCV, lock, [&]() { return !_sessionId; });
- if (_state != DONE) {
+ if (_state != kDone) {
return {ErrorCodes::CommandFailed, "exitCriticalSection failed"};
}
@@ -1113,14 +1113,14 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* outerOpCtx,
const auto initialState = getState();
- if (initialState == ABORT) {
+ if (initialState == kAbort) {
LOGV2_ERROR(22013,
"Migration abort requested before the migration started",
"migrationId"_attr = _migrationId->toBSON());
return;
}
- invariant(initialState == READY);
+ invariant(initialState == kReady);
auto donorCollectionOptionsAndIndexes = [&]() -> CollectionOptionsAndIndexes {
auto [collOptions, uuid] =
@@ -1263,7 +1263,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* outerOpCtx,
repl::OpTime lastOpApplied;
{
// 4. Initial bulk clone
- _setState(CLONE);
+ _setState(kClone);
_sessionMigration->start(opCtx->getServiceContext());
@@ -1274,7 +1274,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* outerOpCtx,
auto assertNotAborted = [&](OperationContext* opCtx) {
opCtx->checkForInterrupt();
outerOpCtx->checkForInterrupt();
- uassert(50748, "Migration aborted while copying documents", getState() != ABORT);
+ uassert(50748, "Migration aborted while copying documents", getState() != kAbort);
};
auto insertBatchFn = [&](OperationContext* opCtx, BSONObj nextBatch) {
@@ -1384,7 +1384,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* outerOpCtx,
{
// 5. Do bulk of mods
- _setState(CATCHUP);
+ _setState(kCatchup);
auto fetchBatchFn = [&](OperationContext* opCtx, BSONObj* nextBatch) {
auto commandResponse = uassertStatusOKWithContext(
@@ -1425,7 +1425,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* outerOpCtx,
str::stream()
<< "Migration aborted while waiting for replication at catch up stage, "
<< _migrationId->toBSON(),
- getState() != ABORT);
+ getState() != kAbort);
if (runWithoutSession(outerOpCtx, [&] {
return opReplicatedEnough(opCtx, lastOpApplied, _writeConcern);
@@ -1489,10 +1489,10 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* outerOpCtx,
{
// 6. Wait for commit
- _setState(STEADY);
+ _setState(kSteady);
bool transferAfterCommit = false;
- while (getState() == STEADY || getState() == COMMIT_START) {
+ while (getState() == kSteady || getState() == kCommitStart) {
opCtx->checkForInterrupt();
outerOpCtx->checkForInterrupt();
@@ -1500,7 +1500,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* outerOpCtx,
// aren't sure that at least one transfer happens *after* our state changes to
// COMMIT_START, there could be mods still on the FROM shard that got logged
// *after* our _transferMods but *before* the critical section.
- if (getState() == COMMIT_START) {
+ if (getState() == kCommitStart) {
transferAfterCommit = true;
}
@@ -1522,7 +1522,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* outerOpCtx,
continue;
}
- if (getState() == ABORT) {
+ if (getState() == kAbort) {
LOGV2(22006,
"Migration aborted while transferring mods",
"migrationId"_attr = _migrationId->toBSON());
@@ -1532,7 +1532,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* outerOpCtx,
// We know we're finished when:
// 1) The from side has told us that it has locked writes (COMMIT_START)
// 2) We've checked at least one more time for un-transmitted mods
- if (getState() == COMMIT_START && transferAfterCommit == true) {
+ if (getState() == kCommitStart && transferAfterCommit == true) {
if (runWithoutSession(outerOpCtx, [&] {
return _flushPendingWrites(opCtx, lastOpApplied);
})) {
@@ -1541,11 +1541,11 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* outerOpCtx,
}
// Only sleep if we aren't committing
- if (getState() == STEADY)
+ if (getState() == kSteady)
sleepmillis(10);
}
- if (getState() == FAIL || getState() == ABORT) {
+ if (getState() == kFail || getState() == kAbort) {
_setStateFail("timed out waiting for commit");
return;
}
@@ -1589,15 +1589,15 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* outerOpCtx,
timeInCriticalSection.emplace();
});
- if (getState() == FAIL || getState() == ABORT) {
+ if (getState() == kFail || getState() == kAbort) {
_setStateFail("timed out waiting for critical section acquisition");
}
{
// Make sure we don't overwrite a FAIL or ABORT state.
stdx::lock_guard<Latch> sl(_mutex);
- if (_state != FAIL && _state != ABORT) {
- _state = ENTERED_CRIT_SEC;
+ if (_state != kFail && _state != kAbort) {
+ _state = kEnteredCritSec;
_stateChangedCV.notify_all();
}
}
@@ -1629,7 +1629,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* outerOpCtx,
});
}
- _setState(DONE);
+ _setState(kDone);
if (timing) {
timing->done(8);
@@ -1786,13 +1786,13 @@ void MigrationDestinationManager::awaitCriticalSectionReleaseSignalAndCompleteMi
{
stdx::unique_lock<Latch> lock(_mutex);
opCtx->waitForConditionOrInterrupt(
- _stateChangedCV, lock, [&]() { return _state != ENTERED_CRIT_SEC; });
+ _stateChangedCV, lock, [&]() { return _state != kEnteredCritSec; });
}
- invariant(_state == EXIT_CRIT_SEC || _state == FAIL || _state == ABORT);
+ invariant(_state == kExitCritSec || _state == kFail || _state == kAbort);
// Refresh the filtering metadata
- if (_state == EXIT_CRIT_SEC) {
+ if (_state == kExitCritSec) {
LOGV2_DEBUG(5899112, 3, "Refreshing filtering metadata before exiting critical section");
try {
diff --git a/src/mongo/db/s/migration_destination_manager.h b/src/mongo/db/s/migration_destination_manager.h
index c9584279429..cf5bf351f83 100644
--- a/src/mongo/db/s/migration_destination_manager.h
+++ b/src/mongo/db/s/migration_destination_manager.h
@@ -76,16 +76,16 @@ class MigrationDestinationManager {
public:
enum State {
- READY,
- CLONE,
- CATCHUP,
- STEADY,
- COMMIT_START,
- ENTERED_CRIT_SEC,
- EXIT_CRIT_SEC,
- DONE,
- FAIL,
- ABORT
+ kReady,
+ kClone,
+ kCatchup,
+ kSteady,
+ kCommitStart,
+ kEnteredCritSec,
+ kExitCritSec,
+ kDone,
+ kFail,
+ kAbort
};
MigrationDestinationManager();
@@ -293,7 +293,7 @@ private:
long long _numCatchup{0};
long long _numSteady{0};
- State _state{READY};
+ State _state{kReady};
std::string _errmsg;
std::unique_ptr<SessionCatalogMigrationDestination> _sessionMigration;