summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/replication_coordinator_impl.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/repl/replication_coordinator_impl.cpp')
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp205
1 files changed, 103 insertions, 102 deletions
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index 8ad99d7e24a..a238cee74de 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -411,8 +411,8 @@ void ReplicationCoordinatorImpl::appendConnectionStats(executor::ConnectionPoolS
_replExecutor.appendConnectionStats(stats);
}
-bool ReplicationCoordinatorImpl::_startLoadLocalConfig(OperationContext* txn) {
- StatusWith<LastVote> lastVote = _externalState->loadLocalLastVoteDocument(txn);
+bool ReplicationCoordinatorImpl::_startLoadLocalConfig(OperationContext* opCtx) {
+ StatusWith<LastVote> lastVote = _externalState->loadLocalLastVoteDocument(opCtx);
if (!lastVote.isOK()) {
if (lastVote.getStatus() == ErrorCodes::NoMatchingDocument) {
log() << "Did not find local voted for document at startup.";
@@ -426,7 +426,7 @@ bool ReplicationCoordinatorImpl::_startLoadLocalConfig(OperationContext* txn) {
_topCoord->loadLastVote(lastVote.getValue());
}
- StatusWith<BSONObj> cfg = _externalState->loadLocalConfigDocument(txn);
+ StatusWith<BSONObj> cfg = _externalState->loadLocalConfigDocument(opCtx);
if (!cfg.isOK()) {
log() << "Did not find local replica set configuration document at startup; "
<< cfg.getStatus();
@@ -443,8 +443,8 @@ bool ReplicationCoordinatorImpl::_startLoadLocalConfig(OperationContext* txn) {
}
// Read the last op from the oplog after cleaning up any partially applied batches.
- _externalState->cleanUpLastApplyBatch(txn);
- auto lastOpTimeStatus = _externalState->loadLastOpTime(txn);
+ _externalState->cleanUpLastApplyBatch(opCtx);
+ auto lastOpTimeStatus = _externalState->loadLastOpTime(opCtx);
// Use a callback here, because _finishLoadLocalConfig calls isself() which requires
// that the server's networking layer be up and running and accepting connections, which
@@ -546,12 +546,12 @@ void ReplicationCoordinatorImpl::_finishLoadLocalConfig(
_performPostMemberStateUpdateAction(action);
if (!isArbiter) {
_externalState->startThreads(_settings);
- invariant(cbData.txn);
- _startDataReplication(cbData.txn);
+ invariant(cbData.opCtx);
+ _startDataReplication(cbData.opCtx);
}
}
-void ReplicationCoordinatorImpl::_stopDataReplication(OperationContext* txn) {
+void ReplicationCoordinatorImpl::_stopDataReplication(OperationContext* opCtx) {
std::shared_ptr<DataReplicator> drCopy;
{
LockGuard lk(_mutex);
@@ -569,19 +569,20 @@ void ReplicationCoordinatorImpl::_stopDataReplication(OperationContext* txn) {
}
LOG(1) << "ReplicationCoordinatorImpl::_stopDataReplication calling "
"ReplCoordExtState::stopDataReplication.";
- _externalState->stopDataReplication(txn);
+ _externalState->stopDataReplication(opCtx);
}
-void ReplicationCoordinatorImpl::_startDataReplication(OperationContext* txn,
+void ReplicationCoordinatorImpl::_startDataReplication(OperationContext* opCtx,
stdx::function<void()> startCompleted) {
// Check to see if we need to do an initial sync.
const auto lastOpTime = getMyLastAppliedOpTime();
- const auto needsInitialSync = lastOpTime.isNull() || _externalState->isInitialSyncFlagSet(txn);
+ const auto needsInitialSync =
+ lastOpTime.isNull() || _externalState->isInitialSyncFlagSet(opCtx);
if (!needsInitialSync) {
stdx::lock_guard<stdx::mutex> lk(_mutex);
if (!_inShutdown) {
// Start steady replication, since we already have data.
- _externalState->startSteadyStateReplication(txn, this);
+ _externalState->startSteadyStateReplication(opCtx, this);
}
return;
}
@@ -624,9 +625,9 @@ void ReplicationCoordinatorImpl::_startDataReplication(OperationContext* txn,
startCompleted();
}
// Repair local db (to compact it).
- auto txn = cc().makeOperationContext();
- uassertStatusOK(_externalState->runRepairOnLocalDB(txn.get()));
- _externalState->startSteadyStateReplication(txn.get(), this);
+ auto opCtx = cc().makeOperationContext();
+ uassertStatusOK(_externalState->runRepairOnLocalDB(opCtx.get()));
+ _externalState->startSteadyStateReplication(opCtx.get(), this);
};
std::shared_ptr<DataReplicator> drCopy;
@@ -644,7 +645,7 @@ void ReplicationCoordinatorImpl::_startDataReplication(OperationContext* txn,
}
// DataReplicator::startup() must be called outside lock because it uses features (eg.
// setting the initial sync flag) which depend on the ReplicationCoordinatorImpl.
- uassertStatusOK(drCopy->startup(txn, numInitialSyncAttempts.load()));
+ uassertStatusOK(drCopy->startup(opCtx, numInitialSyncAttempts.load()));
} catch (...) {
auto status = exceptionToStatus();
log() << "Initial Sync failed to start: " << status;
@@ -655,19 +656,19 @@ void ReplicationCoordinatorImpl::_startDataReplication(OperationContext* txn,
fassertFailedWithStatusNoTrace(40354, status);
}
} else {
- _externalState->startInitialSync([this, startCompleted](OperationContext* txn) {
+ _externalState->startInitialSync([this, startCompleted](OperationContext* opCtx) {
stdx::lock_guard<stdx::mutex> lk(_mutex);
if (!_inShutdown) {
if (startCompleted) {
startCompleted();
}
- _externalState->startSteadyStateReplication(txn, this);
+ _externalState->startSteadyStateReplication(opCtx, this);
}
});
}
}
-void ReplicationCoordinatorImpl::startup(OperationContext* txn) {
+void ReplicationCoordinatorImpl::startup(OperationContext* opCtx) {
if (!isReplEnabled()) {
stdx::lock_guard<stdx::mutex> lk(_mutex);
_setConfigState_inlock(kConfigReplicationDisabled);
@@ -675,7 +676,7 @@ void ReplicationCoordinatorImpl::startup(OperationContext* txn) {
}
{
- OID rid = _externalState->ensureMe(txn);
+ OID rid = _externalState->ensureMe(opCtx);
stdx::lock_guard<stdx::mutex> lk(_mutex);
fassert(18822, !_inShutdown);
@@ -687,16 +688,16 @@ void ReplicationCoordinatorImpl::startup(OperationContext* txn) {
if (!_settings.usingReplSets()) {
// Must be Master/Slave
invariant(_settings.isMaster() || _settings.isSlave());
- _externalState->startMasterSlave(txn);
+ _externalState->startMasterSlave(opCtx);
return;
}
_replExecutor.startup();
_topCoord->setStorageEngineSupportsReadCommitted(
- _externalState->isReadCommittedSupportedByStorageEngine(txn));
+ _externalState->isReadCommittedSupportedByStorageEngine(opCtx));
- bool doneLoadingConfig = _startLoadLocalConfig(txn);
+ bool doneLoadingConfig = _startLoadLocalConfig(opCtx);
if (doneLoadingConfig) {
// If we're not done loading the config, then the config state will be set by
// _finishLoadLocalConfig.
@@ -706,7 +707,7 @@ void ReplicationCoordinatorImpl::startup(OperationContext* txn) {
}
}
-void ReplicationCoordinatorImpl::shutdown(OperationContext* txn) {
+void ReplicationCoordinatorImpl::shutdown(OperationContext* opCtx) {
// Shutdown must:
// * prevent new threads from blocking in awaitReplication
// * wake up all existing threads blocking in awaitReplication
@@ -759,7 +760,7 @@ void ReplicationCoordinatorImpl::shutdown(OperationContext* txn) {
drCopy->join();
drCopy.reset();
}
- _externalState->shutdown(txn);
+ _externalState->shutdown(opCtx);
_replExecutor.shutdown();
_replExecutor.join();
}
@@ -892,7 +893,7 @@ ReplicationCoordinator::ApplierState ReplicationCoordinatorImpl::getApplierState
return _applierState;
}
-void ReplicationCoordinatorImpl::signalDrainComplete(OperationContext* txn,
+void ReplicationCoordinatorImpl::signalDrainComplete(OperationContext* opCtx,
long long termWhenBufferIsEmpty) {
// This logic is a little complicated in order to avoid acquiring the global exclusive lock
// unnecessarily. This is important because the applier may call signalDrainComplete()
@@ -918,7 +919,7 @@ void ReplicationCoordinatorImpl::signalDrainComplete(OperationContext* txn,
// temp collection isn't introduced on the new primary before we drop all the temp collections.
// When we go to drop all temp collections, we must replicate the drops.
- invariant(txn->writesAreReplicated());
+ invariant(opCtx->writesAreReplicated());
stdx::unique_lock<stdx::mutex> lk(_mutex);
if (_applierState != ApplierState::Draining) {
@@ -926,7 +927,7 @@ void ReplicationCoordinatorImpl::signalDrainComplete(OperationContext* txn,
}
lk.unlock();
- _externalState->onDrainComplete(txn);
+ _externalState->onDrainComplete(opCtx);
if (MONGO_FAIL_POINT(transitionToPrimaryHangBeforeTakingGlobalExclusiveLock)) {
log() << "transition to primary - "
@@ -943,8 +944,8 @@ void ReplicationCoordinatorImpl::signalDrainComplete(OperationContext* txn,
}
}
- ScopedTransaction transaction(txn, MODE_X);
- Lock::GlobalWrite globalWriteLock(txn->lockState());
+ ScopedTransaction transaction(opCtx, MODE_X);
+ Lock::GlobalWrite globalWriteLock(opCtx->lockState());
lk.lock();
// Exit drain mode when the buffer is empty in the current term and we're in Draining mode.
@@ -959,7 +960,7 @@ void ReplicationCoordinatorImpl::signalDrainComplete(OperationContext* txn,
_canAcceptNonLocalWrites = true;
lk.unlock();
- _setFirstOpTimeOfMyTerm(_externalState->onTransitionToPrimary(txn, isV1ElectionProtocol()));
+ _setFirstOpTimeOfMyTerm(_externalState->onTransitionToPrimary(opCtx, isV1ElectionProtocol()));
lk.lock();
// Must calculate the commit level again because firstOpTimeOfMyTerm wasn't set when we logged
@@ -1232,11 +1233,11 @@ OpTime ReplicationCoordinatorImpl::getMyLastDurableOpTime() const {
return _getMyLastDurableOpTime_inlock();
}
-Status ReplicationCoordinatorImpl::waitUntilOpTimeForRead(OperationContext* txn,
+Status ReplicationCoordinatorImpl::waitUntilOpTimeForRead(OperationContext* opCtx,
const ReadConcernArgs& settings) {
// We should never wait for replication if we are holding any locks, because this can
// potentially block for long time while doing network activity.
- if (txn->lockState()->isLocked()) {
+ if (opCtx->lockState()->isLocked()) {
return {ErrorCodes::IllegalOperation,
"Waiting for replication not allowed while holding a lock"};
}
@@ -1291,10 +1292,10 @@ Status ReplicationCoordinatorImpl::waitUntilOpTimeForRead(OperationContext* txn,
// If we are doing a majority read concern we only need to wait for a new snapshot.
if (isMajorityReadConcern) {
// Wait for a snapshot that meets our needs (< targetOpTime).
- LOG(3) << "waitUntilOpTime: waiting for a new snapshot until " << txn->getDeadline();
+ LOG(3) << "waitUntilOpTime: waiting for a new snapshot until " << opCtx->getDeadline();
auto waitStatus =
- txn->waitForConditionOrInterruptNoAssert(_currentCommittedSnapshotCond, lock);
+ opCtx->waitForConditionOrInterruptNoAssert(_currentCommittedSnapshotCond, lock);
if (!waitStatus.isOK()) {
return waitStatus;
}
@@ -1305,12 +1306,12 @@ Status ReplicationCoordinatorImpl::waitUntilOpTimeForRead(OperationContext* txn,
// We just need to wait for the opTime to catch up to what we need (not majority RC).
stdx::condition_variable condVar;
WaiterInfoGuard waitInfo(
- &_opTimeWaiterList, txn->getOpID(), targetOpTime, nullptr, &condVar);
+ &_opTimeWaiterList, opCtx->getOpID(), targetOpTime, nullptr, &condVar);
LOG(3) << "waituntilOpTime: waiting for OpTime " << waitInfo.waiter << " until "
- << txn->getDeadline();
+ << opCtx->getDeadline();
- auto waitStatus = txn->waitForConditionOrInterruptNoAssert(condVar, lock);
+ auto waitStatus = opCtx->waitForConditionOrInterruptNoAssert(condVar, lock);
if (!waitStatus.isOK()) {
return waitStatus;
}
@@ -1591,37 +1592,37 @@ bool ReplicationCoordinatorImpl::_haveTaggedNodesReachedOpTime_inlock(
}
ReplicationCoordinator::StatusAndDuration ReplicationCoordinatorImpl::awaitReplication(
- OperationContext* txn, const OpTime& opTime, const WriteConcernOptions& writeConcern) {
+ OperationContext* opCtx, const OpTime& opTime, const WriteConcernOptions& writeConcern) {
Timer timer;
WriteConcernOptions fixedWriteConcern = populateUnsetWriteConcernOptionsSyncMode(writeConcern);
stdx::unique_lock<stdx::mutex> lock(_mutex);
auto status =
- _awaitReplication_inlock(&lock, txn, opTime, SnapshotName::min(), fixedWriteConcern);
+ _awaitReplication_inlock(&lock, opCtx, opTime, SnapshotName::min(), fixedWriteConcern);
return {std::move(status), duration_cast<Milliseconds>(timer.elapsed())};
}
ReplicationCoordinator::StatusAndDuration
ReplicationCoordinatorImpl::awaitReplicationOfLastOpForClient(
- OperationContext* txn, const WriteConcernOptions& writeConcern) {
+ OperationContext* opCtx, const WriteConcernOptions& writeConcern) {
Timer timer;
WriteConcernOptions fixedWriteConcern = populateUnsetWriteConcernOptionsSyncMode(writeConcern);
stdx::unique_lock<stdx::mutex> lock(_mutex);
- const auto& clientInfo = ReplClientInfo::forClient(txn->getClient());
+ const auto& clientInfo = ReplClientInfo::forClient(opCtx->getClient());
auto status = _awaitReplication_inlock(
- &lock, txn, clientInfo.getLastOp(), clientInfo.getLastSnapshot(), fixedWriteConcern);
+ &lock, opCtx, clientInfo.getLastOp(), clientInfo.getLastSnapshot(), fixedWriteConcern);
return {std::move(status), duration_cast<Milliseconds>(timer.elapsed())};
}
Status ReplicationCoordinatorImpl::_awaitReplication_inlock(
stdx::unique_lock<stdx::mutex>* lock,
- OperationContext* txn,
+ OperationContext* opCtx,
const OpTime& opTime,
SnapshotName minSnapshot,
const WriteConcernOptions& writeConcern) {
// We should never wait for replication if we are holding any locks, because this can
// potentially block for long time while doing network activity.
- if (txn->lockState()->isLocked()) {
+ if (opCtx->lockState()->isLocked()) {
return {ErrorCodes::IllegalOperation,
"Waiting for replication not allowed while holding a lock"};
}
@@ -1668,7 +1669,7 @@ Status ReplicationCoordinatorImpl::_awaitReplication_inlock(
return stepdownStatus;
}
- auto interruptStatus = txn->checkForInterruptNoAssert();
+ auto interruptStatus = opCtx->checkForInterruptNoAssert();
if (!interruptStatus.isOK()) {
return interruptStatus;
}
@@ -1681,7 +1682,7 @@ Status ReplicationCoordinatorImpl::_awaitReplication_inlock(
}
}
- auto clockSource = txn->getServiceContext()->getFastClockSource();
+ auto clockSource = opCtx->getServiceContext()->getFastClockSource();
const auto wTimeoutDate = [&]() -> const Date_t {
if (writeConcern.wDeadline != Date_t::max()) {
return writeConcern.wDeadline;
@@ -1696,14 +1697,14 @@ Status ReplicationCoordinatorImpl::_awaitReplication_inlock(
// Must hold _mutex before constructing waitInfo as it will modify _replicationWaiterList
stdx::condition_variable condVar;
WaiterInfoGuard waitInfo(
- &_replicationWaiterList, txn->getOpID(), opTime, &writeConcern, &condVar);
+ &_replicationWaiterList, opCtx->getOpID(), opTime, &writeConcern, &condVar);
while (!_doneWaitingForReplication_inlock(opTime, minSnapshot, writeConcern)) {
if (_inShutdown) {
return {ErrorCodes::ShutdownInProgress, "Replication is being shut down"};
}
- auto status = txn->waitForConditionOrInterruptNoAssertUntil(condVar, *lock, wTimeoutDate);
+ auto status = opCtx->waitForConditionOrInterruptNoAssertUntil(condVar, *lock, wTimeoutDate);
if (!status.isOK()) {
return status.getStatus();
}
@@ -1729,7 +1730,7 @@ Status ReplicationCoordinatorImpl::_awaitReplication_inlock(
return _checkIfWriteConcernCanBeSatisfied_inlock(writeConcern);
}
-Status ReplicationCoordinatorImpl::stepDown(OperationContext* txn,
+Status ReplicationCoordinatorImpl::stepDown(OperationContext* opCtx,
const bool force,
const Milliseconds& waitTime,
const Milliseconds& stepdownTime) {
@@ -1745,12 +1746,12 @@ Status ReplicationCoordinatorImpl::stepDown(OperationContext* txn,
return {ErrorCodes::NotMaster, "not primary so can't step down"};
}
- Lock::GlobalLock globalReadLock(txn->lockState(), MODE_S, Lock::GlobalLock::EnqueueOnly());
+ Lock::GlobalLock globalReadLock(opCtx->lockState(), MODE_S, Lock::GlobalLock::EnqueueOnly());
// We've requested the global shared lock which will stop new writes from coming in,
// but existing writes could take a long time to finish, so kill all user operations
// to help us get the global lock faster.
- _externalState->killAllUserOperations(txn);
+ _externalState->killAllUserOperations(opCtx);
globalReadLock.waitForLock(durationCount<Milliseconds>(stepdownTime));
@@ -1763,7 +1764,7 @@ Status ReplicationCoordinatorImpl::stepDown(OperationContext* txn,
try {
stdx::unique_lock<stdx::mutex> topoLock(_topoMutex);
bool restartHeartbeats = true;
- txn->checkForInterrupt();
+ opCtx->checkForInterrupt();
while (!_tryToStepDown(waitUntil, stepDownUntil, force)) {
if (restartHeartbeats) {
// We send out a fresh round of heartbeats because stepping down successfully
@@ -1773,7 +1774,7 @@ Status ReplicationCoordinatorImpl::stepDown(OperationContext* txn,
_restartHeartbeats_inlock();
restartHeartbeats = false;
}
- txn->waitForConditionOrInterruptUntil(
+ opCtx->waitForConditionOrInterruptUntil(
_stepDownWaiters, topoLock, std::min(stepDownUntil, waitUntil));
}
} catch (const DBException& ex) {
@@ -1864,14 +1865,14 @@ bool ReplicationCoordinatorImpl::isMasterForReportingPurposes() {
return false;
}
-bool ReplicationCoordinatorImpl::canAcceptWritesForDatabase(OperationContext* txn,
+bool ReplicationCoordinatorImpl::canAcceptWritesForDatabase(OperationContext* opCtx,
StringData dbName) {
// The answer isn't meaningful unless we hold the global lock.
- invariant(txn->lockState()->isLocked());
- return canAcceptWritesForDatabase_UNSAFE(txn, dbName);
+ invariant(opCtx->lockState()->isLocked());
+ return canAcceptWritesForDatabase_UNSAFE(opCtx, dbName);
}
-bool ReplicationCoordinatorImpl::canAcceptWritesForDatabase_UNSAFE(OperationContext* txn,
+bool ReplicationCoordinatorImpl::canAcceptWritesForDatabase_UNSAFE(OperationContext* opCtx,
StringData dbName) {
// _canAcceptNonLocalWrites is always true for standalone nodes, always false for nodes
// started with --slave, and adjusted based on primary+drain state in replica sets.
@@ -1889,32 +1890,32 @@ bool ReplicationCoordinatorImpl::canAcceptWritesForDatabase_UNSAFE(OperationCont
return !replAllDead && _settings.isMaster();
}
-bool ReplicationCoordinatorImpl::canAcceptWritesFor(OperationContext* txn,
+bool ReplicationCoordinatorImpl::canAcceptWritesFor(OperationContext* opCtx,
const NamespaceString& ns) {
- invariant(txn->lockState()->isLocked());
- return canAcceptWritesFor_UNSAFE(txn, ns);
+ invariant(opCtx->lockState()->isLocked());
+ return canAcceptWritesFor_UNSAFE(opCtx, ns);
}
-bool ReplicationCoordinatorImpl::canAcceptWritesFor_UNSAFE(OperationContext* txn,
+bool ReplicationCoordinatorImpl::canAcceptWritesFor_UNSAFE(OperationContext* opCtx,
const NamespaceString& ns) {
if (_memberState.rollback() && ns.isOplog()) {
return false;
}
StringData dbName = ns.db();
- return canAcceptWritesForDatabase_UNSAFE(txn, dbName);
+ return canAcceptWritesForDatabase_UNSAFE(opCtx, dbName);
}
-Status ReplicationCoordinatorImpl::checkCanServeReadsFor(OperationContext* txn,
+Status ReplicationCoordinatorImpl::checkCanServeReadsFor(OperationContext* opCtx,
const NamespaceString& ns,
bool slaveOk) {
- invariant(txn->lockState()->isLocked());
- return checkCanServeReadsFor_UNSAFE(txn, ns, slaveOk);
+ invariant(opCtx->lockState()->isLocked());
+ return checkCanServeReadsFor_UNSAFE(opCtx, ns, slaveOk);
}
-Status ReplicationCoordinatorImpl::checkCanServeReadsFor_UNSAFE(OperationContext* txn,
+Status ReplicationCoordinatorImpl::checkCanServeReadsFor_UNSAFE(OperationContext* opCtx,
const NamespaceString& ns,
bool slaveOk) {
- auto client = txn->getClient();
+ auto client = opCtx->getClient();
// Oplog reads are not allowed during STARTUP state, but we make an exception for internal
// reads and master-slave replication. Internel reads are required for cleaning up unfinished
// apply batches. Master-slave never sets the state so we make an exception for it as well.
@@ -1928,7 +1929,7 @@ Status ReplicationCoordinatorImpl::checkCanServeReadsFor_UNSAFE(OperationContext
if (client->isInDirectClient()) {
return Status::OK();
}
- if (canAcceptWritesFor_UNSAFE(txn, ns)) {
+ if (canAcceptWritesFor_UNSAFE(opCtx, ns)) {
return Status::OK();
}
if (_settings.isSlave() || _settings.isMaster()) {
@@ -1948,9 +1949,9 @@ bool ReplicationCoordinatorImpl::isInPrimaryOrSecondaryState() const {
return _canServeNonLocalReads.loadRelaxed();
}
-bool ReplicationCoordinatorImpl::shouldRelaxIndexConstraints(OperationContext* txn,
+bool ReplicationCoordinatorImpl::shouldRelaxIndexConstraints(OperationContext* opCtx,
const NamespaceString& ns) {
- return !canAcceptWritesFor(txn, ns);
+ return !canAcceptWritesFor(opCtx, ns);
}
OID ReplicationCoordinatorImpl::getElectionId() {
@@ -1977,8 +1978,8 @@ int ReplicationCoordinatorImpl::_getMyId_inlock() const {
return self.getId();
}
-Status ReplicationCoordinatorImpl::resyncData(OperationContext* txn, bool waitUntilCompleted) {
- _stopDataReplication(txn);
+Status ReplicationCoordinatorImpl::resyncData(OperationContext* opCtx, bool waitUntilCompleted) {
+ _stopDataReplication(opCtx);
auto finishedEvent = uassertStatusOK(_replExecutor.makeEvent());
stdx::function<void()> f;
if (waitUntilCompleted)
@@ -1987,7 +1988,7 @@ Status ReplicationCoordinatorImpl::resyncData(OperationContext* txn, bool waitUn
stdx::unique_lock<stdx::mutex> lk(_mutex);
_resetMyLastOpTimes_inlock();
lk.unlock(); // unlock before calling into replCoordExtState.
- _startDataReplication(txn, f);
+ _startDataReplication(opCtx, f);
if (waitUntilCompleted) {
_replExecutor.waitForEvent(finishedEvent);
}
@@ -2212,7 +2213,7 @@ Status ReplicationCoordinatorImpl::setMaintenanceMode(bool activate) {
return Status::OK();
}
-Status ReplicationCoordinatorImpl::processReplSetSyncFrom(OperationContext* txn,
+Status ReplicationCoordinatorImpl::processReplSetSyncFrom(OperationContext* opCtx,
const HostAndPort& target,
BSONObjBuilder* resultObj) {
Status result(ErrorCodes::InternalError, "didn't set status in prepareSyncFromResponse");
@@ -2227,7 +2228,7 @@ Status ReplicationCoordinatorImpl::processReplSetSyncFrom(OperationContext* txn,
}
if (doResync) {
- return resyncData(txn, false);
+ return resyncData(opCtx, false);
}
return result;
@@ -2292,7 +2293,7 @@ Status ReplicationCoordinatorImpl::processHeartbeat(const ReplSetHeartbeatArgs&
return result;
}
-Status ReplicationCoordinatorImpl::processReplSetReconfig(OperationContext* txn,
+Status ReplicationCoordinatorImpl::processReplSetReconfig(OperationContext* opCtx,
const ReplSetReconfigArgs& args,
BSONObjBuilder* resultObj) {
log() << "replSetReconfig admin command received from client";
@@ -2363,7 +2364,7 @@ Status ReplicationCoordinatorImpl::processReplSetReconfig(OperationContext* txn,
}
StatusWith<int> myIndex = validateConfigForReconfig(
- _externalState.get(), oldConfig, newConfig, txn->getServiceContext(), args.force);
+ _externalState.get(), oldConfig, newConfig, opCtx->getServiceContext(), args.force);
if (!myIndex.isOK()) {
error() << "replSetReconfig got " << myIndex.getStatus() << " while validating "
<< newConfigObj;
@@ -2382,7 +2383,7 @@ Status ReplicationCoordinatorImpl::processReplSetReconfig(OperationContext* txn,
}
}
- status = _externalState->storeLocalConfigDocument(txn, newConfig.toBSON());
+ status = _externalState->storeLocalConfigDocument(opCtx, newConfig.toBSON());
if (!status.isOK()) {
error() << "replSetReconfig failed to store config document; " << status;
return status;
@@ -2465,7 +2466,7 @@ void ReplicationCoordinatorImpl::_finishReplSetReconfig(
}
}
-Status ReplicationCoordinatorImpl::processReplSetInitiate(OperationContext* txn,
+Status ReplicationCoordinatorImpl::processReplSetInitiate(OperationContext* opCtx,
const BSONObj& configObj,
BSONObjBuilder* resultObj) {
log() << "replSetInitiate admin command received from client";
@@ -2508,7 +2509,7 @@ Status ReplicationCoordinatorImpl::processReplSetInitiate(OperationContext* txn,
}
StatusWith<int> myIndex =
- validateConfigForInitiate(_externalState.get(), newConfig, txn->getServiceContext());
+ validateConfigForInitiate(_externalState.get(), newConfig, opCtx->getServiceContext());
if (!myIndex.isOK()) {
error() << "replSet initiate got " << myIndex.getStatus() << " while validating "
<< configObj;
@@ -2525,7 +2526,7 @@ Status ReplicationCoordinatorImpl::processReplSetInitiate(OperationContext* txn,
return status;
}
- status = _externalState->initializeReplSetStorage(txn, newConfig.toBSON());
+ status = _externalState->initializeReplSetStorage(opCtx, newConfig.toBSON());
if (!status.isOK()) {
error() << "replSetInitiate failed to store config document or create the oplog; "
<< status;
@@ -2545,7 +2546,7 @@ Status ReplicationCoordinatorImpl::processReplSetInitiate(OperationContext* txn,
// will fail validation with a "replSet initiate got ... while validating" reason.
invariant(!newConfig.getMemberAt(myIndex.getValue()).isArbiter());
_externalState->startThreads(_settings);
- _startDataReplication(txn);
+ _startDataReplication(opCtx);
configStateGuard.Dismiss();
return Status::OK();
@@ -2949,7 +2950,7 @@ Status ReplicationCoordinatorImpl::processReplSetUpdatePosition(const UpdatePosi
return status;
}
-Status ReplicationCoordinatorImpl::processHandshake(OperationContext* txn,
+Status ReplicationCoordinatorImpl::processHandshake(OperationContext* opCtx,
const HandshakeArgs& handshake) {
LOG(2) << "Received handshake " << handshake.toBSON();
@@ -2968,7 +2969,7 @@ Status ReplicationCoordinatorImpl::processHandshake(OperationContext* txn,
SlaveInfo newSlaveInfo;
newSlaveInfo.rid = handshake.getRid();
newSlaveInfo.memberId = -1;
- newSlaveInfo.hostAndPort = _externalState->getClientHostAndPort(txn);
+ newSlaveInfo.hostAndPort = _externalState->getClientHostAndPort(opCtx);
// Don't call _addSlaveInfo_inlock as that would wake sleepers unnecessarily.
_slaveInfo.push_back(newSlaveInfo);
@@ -3121,8 +3122,8 @@ void ReplicationCoordinatorImpl::blacklistSyncSource(const HostAndPort& host, Da
host));
}
-void ReplicationCoordinatorImpl::resetLastOpTimesFromOplog(OperationContext* txn) {
- StatusWith<OpTime> lastOpTimeStatus = _externalState->loadLastOpTime(txn);
+void ReplicationCoordinatorImpl::resetLastOpTimesFromOplog(OperationContext* opCtx) {
+ StatusWith<OpTime> lastOpTimeStatus = _externalState->loadLastOpTime(opCtx);
OpTime lastOpTime;
if (!lastOpTimeStatus.isOK()) {
warning() << "Failed to load timestamp of most recently applied operation; "
@@ -3137,7 +3138,7 @@ void ReplicationCoordinatorImpl::resetLastOpTimesFromOplog(OperationContext* txn
_reportUpstream_inlock(std::move(lock));
// Unlocked below.
- _externalState->setGlobalTimestamp(txn->getServiceContext(), lastOpTime.getTimestamp());
+ _externalState->setGlobalTimestamp(opCtx->getServiceContext(), lastOpTime.getTimestamp());
}
bool ReplicationCoordinatorImpl::shouldChangeSyncSource(
@@ -3244,14 +3245,14 @@ OpTime ReplicationCoordinatorImpl::getLastCommittedOpTime() const {
}
Status ReplicationCoordinatorImpl::processReplSetRequestVotes(
- OperationContext* txn,
+ OperationContext* opCtx,
const ReplSetRequestVotesArgs& args,
ReplSetRequestVotesResponse* response) {
if (!isV1ElectionProtocol()) {
return {ErrorCodes::BadValue, "not using election protocol v1"};
}
- auto termStatus = updateTerm(txn, args.getTerm());
+ auto termStatus = updateTerm(opCtx, args.getTerm());
if (!termStatus.isOK() && termStatus.code() != ErrorCodes::StaleTerm)
return termStatus;
@@ -3264,7 +3265,7 @@ Status ReplicationCoordinatorImpl::processReplSetRequestVotes(
if (!args.isADryRun() && response->getVoteGranted()) {
LastVote lastVote{args.getTerm(), args.getCandidateIndex()};
- Status status = _externalState->storeLocalLastVoteDocument(txn, lastVote);
+ Status status = _externalState->storeLocalLastVoteDocument(opCtx, lastVote);
if (!status.isOK()) {
error() << "replSetRequestVotes failed to store LastVote document; " << status;
return status;
@@ -3405,7 +3406,7 @@ EventHandle ReplicationCoordinatorImpl::updateTerm_forTest(
return finishEvh;
}
-Status ReplicationCoordinatorImpl::updateTerm(OperationContext* txn, long long term) {
+Status ReplicationCoordinatorImpl::updateTerm(OperationContext* opCtx, long long term) {
// Term is only valid if we are replicating.
if (getReplicationMode() != modeReplSet) {
return {ErrorCodes::BadValue, "cannot supply 'term' without active replication"};
@@ -3417,7 +3418,7 @@ Status ReplicationCoordinatorImpl::updateTerm(OperationContext* txn, long long t
}
// Check we haven't acquired any lock, because potential stepdown needs global lock.
- dassert(!txn->lockState()->isLocked());
+ dassert(!opCtx->lockState()->isLocked());
TopologyCoordinator::UpdateTermResult updateTermResult;
EventHandle finishEvh;
@@ -3469,12 +3470,12 @@ EventHandle ReplicationCoordinatorImpl::_updateTerm_incallback(
return EventHandle();
}
-SnapshotName ReplicationCoordinatorImpl::reserveSnapshotName(OperationContext* txn) {
+SnapshotName ReplicationCoordinatorImpl::reserveSnapshotName(OperationContext* opCtx) {
auto reservedName = SnapshotName(_snapshotNameGenerator.addAndFetch(1));
dassert(reservedName > SnapshotName::min());
dassert(reservedName < SnapshotName::max());
- if (txn) {
- ReplClientInfo::forClient(txn->getClient()).setLastSnapshot(reservedName);
+ if (opCtx) {
+ ReplClientInfo::forClient(opCtx->getClient()).setLastSnapshot(reservedName);
}
return reservedName;
}
@@ -3483,12 +3484,12 @@ void ReplicationCoordinatorImpl::forceSnapshotCreation() {
_externalState->forceSnapshotCreation();
}
-void ReplicationCoordinatorImpl::waitUntilSnapshotCommitted(OperationContext* txn,
+void ReplicationCoordinatorImpl::waitUntilSnapshotCommitted(OperationContext* opCtx,
const SnapshotName& untilSnapshot) {
stdx::unique_lock<stdx::mutex> lock(_mutex);
while (!_currentCommittedSnapshot || _currentCommittedSnapshot->name < untilSnapshot) {
- txn->waitForConditionOrInterrupt(_currentCommittedSnapshotCond, lock);
+ opCtx->waitForConditionOrInterrupt(_currentCommittedSnapshotCond, lock);
}
}
@@ -3496,11 +3497,11 @@ size_t ReplicationCoordinatorImpl::getNumUncommittedSnapshots() {
return _uncommittedSnapshotsSize.load();
}
-void ReplicationCoordinatorImpl::createSnapshot(OperationContext* txn,
+void ReplicationCoordinatorImpl::createSnapshot(OperationContext* opCtx,
OpTime timeOfSnapshot,
SnapshotName name) {
stdx::lock_guard<stdx::mutex> lock(_mutex);
- _externalState->createSnapshot(txn, name);
+ _externalState->createSnapshot(opCtx, name);
auto snapshotInfo = SnapshotInfo{timeOfSnapshot, name};
if (timeOfSnapshot <= _lastCommittedOpTime) {
@@ -3588,10 +3589,10 @@ EventHandle ReplicationCoordinatorImpl::_resetElectionInfoOnProtocolVersionUpgra
if (cbData.status == ErrorCodes::CallbackCanceled) {
return;
}
- invariant(cbData.txn);
+ invariant(cbData.opCtx);
LastVote lastVote{OpTime::kInitialTerm, -1};
- auto status = _externalState->storeLocalLastVoteDocument(cbData.txn, lastVote);
+ auto status = _externalState->storeLocalLastVoteDocument(cbData.opCtx, lastVote);
invariant(status.isOK());
_replExecutor.signalEvent(evh);
});