summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/repl/replication_coordinator_external_state_impl.cpp')
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.cpp257
1 files changed, 129 insertions, 128 deletions
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
index 70c74cc8942..36d8a84fd06 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
@@ -195,46 +195,46 @@ ReplicationCoordinatorExternalStateImpl::ReplicationCoordinatorExternalStateImpl
}
ReplicationCoordinatorExternalStateImpl::~ReplicationCoordinatorExternalStateImpl() {}
-bool ReplicationCoordinatorExternalStateImpl::isInitialSyncFlagSet(OperationContext* txn) {
- return _storageInterface->getInitialSyncFlag(txn);
+bool ReplicationCoordinatorExternalStateImpl::isInitialSyncFlagSet(OperationContext* opCtx) {
+ return _storageInterface->getInitialSyncFlag(opCtx);
}
void ReplicationCoordinatorExternalStateImpl::startInitialSync(OnInitialSyncFinishedFn finished) {
- _initialSyncRunner.schedule([finished, this](OperationContext* txn, const Status& status) {
+ _initialSyncRunner.schedule([finished, this](OperationContext* opCtx, const Status& status) {
if (status == ErrorCodes::CallbackCanceled) {
return TaskRunner::NextAction::kDisposeOperationContext;
}
// Do initial sync.
- syncDoInitialSync(txn, this);
- finished(txn);
+ syncDoInitialSync(opCtx, this);
+ finished(opCtx);
return TaskRunner::NextAction::kDisposeOperationContext;
});
}
void ReplicationCoordinatorExternalStateImpl::runOnInitialSyncThread(
- stdx::function<void(OperationContext* txn)> run) {
+ stdx::function<void(OperationContext* opCtx)> run) {
_initialSyncRunner.cancel();
_initialSyncRunner.join();
- _initialSyncRunner.schedule([run, this](OperationContext* txn, const Status& status) {
+ _initialSyncRunner.schedule([run, this](OperationContext* opCtx, const Status& status) {
if (status == ErrorCodes::CallbackCanceled) {
return TaskRunner::NextAction::kDisposeOperationContext;
}
- invariant(txn);
- invariant(txn->getClient());
- run(txn);
+ invariant(opCtx);
+ invariant(opCtx->getClient());
+ run(opCtx);
return TaskRunner::NextAction::kDisposeOperationContext;
});
}
void ReplicationCoordinatorExternalStateImpl::startSteadyStateReplication(
- OperationContext* txn, ReplicationCoordinator* replCoord) {
+ OperationContext* opCtx, ReplicationCoordinator* replCoord) {
LockGuard lk(_threadMutex);
invariant(replCoord);
invariant(!_bgSync);
log() << "Starting replication fetcher thread";
- _bgSync = stdx::make_unique<BackgroundSync>(this, makeSteadyStateOplogBuffer(txn));
- _bgSync->startup(txn);
+ _bgSync = stdx::make_unique<BackgroundSync>(this, makeSteadyStateOplogBuffer(opCtx));
+ _bgSync->startup(opCtx);
log() << "Starting replication applier thread";
invariant(!_applierThread);
@@ -246,12 +246,12 @@ void ReplicationCoordinatorExternalStateImpl::startSteadyStateReplication(
&SyncSourceFeedback::run, &_syncSourceFeedback, _taskExecutor.get(), _bgSync.get())));
}
-void ReplicationCoordinatorExternalStateImpl::stopDataReplication(OperationContext* txn) {
+void ReplicationCoordinatorExternalStateImpl::stopDataReplication(OperationContext* opCtx) {
UniqueLock lk(_threadMutex);
- _stopDataReplication_inlock(txn, &lk);
+ _stopDataReplication_inlock(opCtx, &lk);
}
-void ReplicationCoordinatorExternalStateImpl::_stopDataReplication_inlock(OperationContext* txn,
+void ReplicationCoordinatorExternalStateImpl::_stopDataReplication_inlock(OperationContext* opCtx,
UniqueLock* lock) {
// Make sue no other _stopDataReplication calls are in progress.
_dataReplicationStopped.wait(*lock, [this]() { return !_stoppingDataReplication; });
@@ -270,7 +270,7 @@ void ReplicationCoordinatorExternalStateImpl::_stopDataReplication_inlock(Operat
if (oldBgSync) {
log() << "Stopping replication fetcher thread";
- oldBgSync->shutdown(txn);
+ oldBgSync->shutdown(opCtx);
}
if (oldApplier) {
@@ -279,7 +279,7 @@ void ReplicationCoordinatorExternalStateImpl::_stopDataReplication_inlock(Operat
}
if (oldBgSync) {
- oldBgSync->join(txn);
+ oldBgSync->join(opCtx);
}
_initialSyncRunner.cancel();
@@ -320,25 +320,25 @@ void ReplicationCoordinatorExternalStateImpl::startThreads(const ReplSettings& s
_startedThreads = true;
}
-void ReplicationCoordinatorExternalStateImpl::startMasterSlave(OperationContext* txn) {
- repl::startMasterSlave(txn);
+void ReplicationCoordinatorExternalStateImpl::startMasterSlave(OperationContext* opCtx) {
+ repl::startMasterSlave(opCtx);
}
-void ReplicationCoordinatorExternalStateImpl::shutdown(OperationContext* txn) {
+void ReplicationCoordinatorExternalStateImpl::shutdown(OperationContext* opCtx) {
UniqueLock lk(_threadMutex);
if (_startedThreads) {
- _stopDataReplication_inlock(txn, &lk);
+ _stopDataReplication_inlock(opCtx, &lk);
if (_snapshotThread) {
log() << "Stopping replication snapshot thread";
_snapshotThread->shutdown();
}
- if (_storageInterface->getOplogDeleteFromPoint(txn).isNull() &&
- loadLastOpTime(txn) == _storageInterface->getAppliedThrough(txn)) {
+ if (_storageInterface->getOplogDeleteFromPoint(opCtx).isNull() &&
+ loadLastOpTime(opCtx) == _storageInterface->getAppliedThrough(opCtx)) {
// Clear the appliedThrough marker to indicate we are consistent with the top of the
// oplog.
- _storageInterface->setAppliedThrough(txn, {});
+ _storageInterface->setAppliedThrough(opCtx, {});
}
if (_noopWriter) {
@@ -361,95 +361,95 @@ OldThreadPool* ReplicationCoordinatorExternalStateImpl::getDbWorkThreadPool() co
return _writerPool.get();
}
-Status ReplicationCoordinatorExternalStateImpl::runRepairOnLocalDB(OperationContext* txn) {
+Status ReplicationCoordinatorExternalStateImpl::runRepairOnLocalDB(OperationContext* opCtx) {
try {
- ScopedTransaction scopedXact(txn, MODE_X);
- Lock::GlobalWrite globalWrite(txn->lockState());
+ ScopedTransaction scopedXact(opCtx, MODE_X);
+ Lock::GlobalWrite globalWrite(opCtx->lockState());
StorageEngine* engine = getGlobalServiceContext()->getGlobalStorageEngine();
if (!engine->isMmapV1()) {
return Status::OK();
}
- txn->setReplicatedWrites(false);
- Status status = repairDatabase(txn, engine, localDbName, false, false);
+ opCtx->setReplicatedWrites(false);
+ Status status = repairDatabase(opCtx, engine, localDbName, false, false);
// Open database before returning
- dbHolder().openDb(txn, localDbName);
+ dbHolder().openDb(opCtx, localDbName);
} catch (const DBException& ex) {
return ex.toStatus();
}
return Status::OK();
}
-Status ReplicationCoordinatorExternalStateImpl::initializeReplSetStorage(OperationContext* txn,
+Status ReplicationCoordinatorExternalStateImpl::initializeReplSetStorage(OperationContext* opCtx,
const BSONObj& config) {
try {
- createOplog(txn);
+ createOplog(opCtx);
MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
- ScopedTransaction scopedXact(txn, MODE_X);
- Lock::GlobalWrite globalWrite(txn->lockState());
+ ScopedTransaction scopedXact(opCtx, MODE_X);
+ Lock::GlobalWrite globalWrite(opCtx->lockState());
- WriteUnitOfWork wuow(txn);
- Helpers::putSingleton(txn, configCollectionName, config);
+ WriteUnitOfWork wuow(opCtx);
+ Helpers::putSingleton(opCtx, configCollectionName, config);
const auto msgObj = BSON("msg"
<< "initiating set");
- getGlobalServiceContext()->getOpObserver()->onOpMessage(txn, msgObj);
+ getGlobalServiceContext()->getOpObserver()->onOpMessage(opCtx, msgObj);
wuow.commit();
}
- MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "initiate oplog entry", "local.oplog.rs");
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_END(opCtx, "initiate oplog entry", "local.oplog.rs");
// This initializes the minvalid document with a null "ts" because older versions (<=3.2)
// get angry if the minValid document is present but doesn't have a "ts" field.
// Consider removing this once we no longer need to support downgrading to 3.2.
- _storageInterface->setMinValidToAtLeast(txn, {});
+ _storageInterface->setMinValidToAtLeast(opCtx, {});
- FeatureCompatibilityVersion::setIfCleanStartup(txn, _storageInterface);
+ FeatureCompatibilityVersion::setIfCleanStartup(opCtx, _storageInterface);
} catch (const DBException& ex) {
return ex.toStatus();
}
return Status::OK();
}
-void ReplicationCoordinatorExternalStateImpl::onDrainComplete(OperationContext* txn) {
- invariant(!txn->lockState()->isLocked());
+void ReplicationCoordinatorExternalStateImpl::onDrainComplete(OperationContext* opCtx) {
+ invariant(!opCtx->lockState()->isLocked());
// If this is a config server node becoming a primary, ensure the balancer is ready to start.
if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) {
// We must ensure the balancer has stopped because it may still be in the process of
// stopping if this node was previously primary.
- Balancer::get(txn)->waitForBalancerToStop();
+ Balancer::get(opCtx)->waitForBalancerToStop();
}
}
-OpTime ReplicationCoordinatorExternalStateImpl::onTransitionToPrimary(OperationContext* txn,
+OpTime ReplicationCoordinatorExternalStateImpl::onTransitionToPrimary(OperationContext* opCtx,
bool isV1ElectionProtocol) {
- invariant(txn->lockState()->isW());
+ invariant(opCtx->lockState()->isW());
// Clear the appliedThrough marker so on startup we'll use the top of the oplog. This must be
// done before we add anything to our oplog.
- invariant(_storageInterface->getOplogDeleteFromPoint(txn).isNull());
- _storageInterface->setAppliedThrough(txn, {});
+ invariant(_storageInterface->getOplogDeleteFromPoint(opCtx).isNull());
+ _storageInterface->setAppliedThrough(opCtx, {});
if (isV1ElectionProtocol) {
MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
- ScopedTransaction scopedXact(txn, MODE_X);
+ ScopedTransaction scopedXact(opCtx, MODE_X);
- WriteUnitOfWork wuow(txn);
- txn->getClient()->getServiceContext()->getOpObserver()->onOpMessage(
- txn,
+ WriteUnitOfWork wuow(opCtx);
+ opCtx->getClient()->getServiceContext()->getOpObserver()->onOpMessage(
+ opCtx,
BSON("msg"
<< "new primary"));
wuow.commit();
}
MONGO_WRITE_CONFLICT_RETRY_LOOP_END(
- txn, "logging transition to primary to oplog", "local.oplog.rs");
+ opCtx, "logging transition to primary to oplog", "local.oplog.rs");
}
- const auto opTimeToReturn = fassertStatusOK(28665, loadLastOpTime(txn));
+ const auto opTimeToReturn = fassertStatusOK(28665, loadLastOpTime(opCtx));
- _shardingOnTransitionToPrimaryHook(txn);
- _dropAllTempCollections(txn);
+ _shardingOnTransitionToPrimaryHook(opCtx);
+ _dropAllTempCollections(opCtx);
serverGlobalParams.featureCompatibility.validateFeaturesAsMaster.store(true);
@@ -460,28 +460,28 @@ void ReplicationCoordinatorExternalStateImpl::forwardSlaveProgress() {
_syncSourceFeedback.forwardSlaveProgress();
}
-OID ReplicationCoordinatorExternalStateImpl::ensureMe(OperationContext* txn) {
+OID ReplicationCoordinatorExternalStateImpl::ensureMe(OperationContext* opCtx) {
std::string myname = getHostName();
OID myRID;
{
- ScopedTransaction transaction(txn, MODE_IX);
- Lock::DBLock lock(txn->lockState(), meDatabaseName, MODE_X);
+ ScopedTransaction transaction(opCtx, MODE_IX);
+ Lock::DBLock lock(opCtx->lockState(), meDatabaseName, MODE_X);
BSONObj me;
// local.me is an identifier for a server for getLastError w:2+
// TODO: handle WriteConflictExceptions below
- if (!Helpers::getSingleton(txn, meCollectionName, me) || !me.hasField("host") ||
+ if (!Helpers::getSingleton(opCtx, meCollectionName, me) || !me.hasField("host") ||
me["host"].String() != myname) {
myRID = OID::gen();
// clean out local.me
- Helpers::emptyCollection(txn, meCollectionName);
+ Helpers::emptyCollection(opCtx, meCollectionName);
// repopulate
BSONObjBuilder b;
b.append("_id", myRID);
b.append("host", myname);
- Helpers::putSingleton(txn, meCollectionName, b.done());
+ Helpers::putSingleton(opCtx, meCollectionName, b.done());
} else {
myRID = me["_id"].OID();
}
@@ -490,11 +490,11 @@ OID ReplicationCoordinatorExternalStateImpl::ensureMe(OperationContext* txn) {
}
StatusWith<BSONObj> ReplicationCoordinatorExternalStateImpl::loadLocalConfigDocument(
- OperationContext* txn) {
+ OperationContext* opCtx) {
try {
MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
BSONObj config;
- if (!Helpers::getSingleton(txn, configCollectionName, config)) {
+ if (!Helpers::getSingleton(opCtx, configCollectionName, config)) {
return StatusWith<BSONObj>(
ErrorCodes::NoMatchingDocument,
str::stream() << "Did not find replica set configuration document in "
@@ -502,33 +502,33 @@ StatusWith<BSONObj> ReplicationCoordinatorExternalStateImpl::loadLocalConfigDocu
}
return StatusWith<BSONObj>(config);
}
- MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "load replica set config", configCollectionName);
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_END(opCtx, "load replica set config", configCollectionName);
} catch (const DBException& ex) {
return StatusWith<BSONObj>(ex.toStatus());
}
}
-Status ReplicationCoordinatorExternalStateImpl::storeLocalConfigDocument(OperationContext* txn,
+Status ReplicationCoordinatorExternalStateImpl::storeLocalConfigDocument(OperationContext* opCtx,
const BSONObj& config) {
try {
MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
- ScopedTransaction transaction(txn, MODE_IX);
- Lock::DBLock dbWriteLock(txn->lockState(), configDatabaseName, MODE_X);
- Helpers::putSingleton(txn, configCollectionName, config);
+ ScopedTransaction transaction(opCtx, MODE_IX);
+ Lock::DBLock dbWriteLock(opCtx->lockState(), configDatabaseName, MODE_X);
+ Helpers::putSingleton(opCtx, configCollectionName, config);
return Status::OK();
}
- MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "save replica set config", configCollectionName);
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_END(opCtx, "save replica set config", configCollectionName);
} catch (const DBException& ex) {
return ex.toStatus();
}
}
StatusWith<LastVote> ReplicationCoordinatorExternalStateImpl::loadLocalLastVoteDocument(
- OperationContext* txn) {
+ OperationContext* opCtx) {
try {
MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
BSONObj lastVoteObj;
- if (!Helpers::getSingleton(txn, lastVoteCollectionName, lastVoteObj)) {
+ if (!Helpers::getSingleton(opCtx, lastVoteCollectionName, lastVoteObj)) {
return StatusWith<LastVote>(ErrorCodes::NoMatchingDocument,
str::stream()
<< "Did not find replica set lastVote document in "
@@ -537,41 +537,41 @@ StatusWith<LastVote> ReplicationCoordinatorExternalStateImpl::loadLocalLastVoteD
return LastVote::readFromLastVote(lastVoteObj);
}
MONGO_WRITE_CONFLICT_RETRY_LOOP_END(
- txn, "load replica set lastVote", lastVoteCollectionName);
+ opCtx, "load replica set lastVote", lastVoteCollectionName);
} catch (const DBException& ex) {
return StatusWith<LastVote>(ex.toStatus());
}
}
Status ReplicationCoordinatorExternalStateImpl::storeLocalLastVoteDocument(
- OperationContext* txn, const LastVote& lastVote) {
+ OperationContext* opCtx, const LastVote& lastVote) {
BSONObj lastVoteObj = lastVote.toBSON();
try {
MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
- ScopedTransaction transaction(txn, MODE_IX);
- Lock::DBLock dbWriteLock(txn->lockState(), lastVoteDatabaseName, MODE_X);
+ ScopedTransaction transaction(opCtx, MODE_IX);
+ Lock::DBLock dbWriteLock(opCtx->lockState(), lastVoteDatabaseName, MODE_X);
// If there is no last vote document, we want to store one. Otherwise, we only want to
// replace it if the new last vote document would have a higher term. We both check
// the term of the current last vote document and insert the new document under the
// DBLock to synchronize the two operations.
BSONObj result;
- bool exists = Helpers::getSingleton(txn, lastVoteCollectionName, result);
+ bool exists = Helpers::getSingleton(opCtx, lastVoteCollectionName, result);
if (!exists) {
- Helpers::putSingleton(txn, lastVoteCollectionName, lastVoteObj);
+ Helpers::putSingleton(opCtx, lastVoteCollectionName, lastVoteObj);
} else {
StatusWith<LastVote> oldLastVoteDoc = LastVote::readFromLastVote(result);
if (!oldLastVoteDoc.isOK()) {
return oldLastVoteDoc.getStatus();
}
if (lastVote.getTerm() > oldLastVoteDoc.getValue().getTerm()) {
- Helpers::putSingleton(txn, lastVoteCollectionName, lastVoteObj);
+ Helpers::putSingleton(opCtx, lastVoteCollectionName, lastVoteObj);
}
}
}
MONGO_WRITE_CONFLICT_RETRY_LOOP_END(
- txn, "save replica set lastVote", lastVoteCollectionName);
- txn->recoveryUnit()->waitUntilDurable();
+ opCtx, "save replica set lastVote", lastVoteCollectionName);
+ opCtx->recoveryUnit()->waitUntilDurable();
return Status::OK();
} catch (const DBException& ex) {
return ex.toStatus();
@@ -583,18 +583,18 @@ void ReplicationCoordinatorExternalStateImpl::setGlobalTimestamp(ServiceContext*
setNewTimestamp(ctx, newTime);
}
-void ReplicationCoordinatorExternalStateImpl::cleanUpLastApplyBatch(OperationContext* txn) {
- if (_storageInterface->getInitialSyncFlag(txn)) {
+void ReplicationCoordinatorExternalStateImpl::cleanUpLastApplyBatch(OperationContext* opCtx) {
+ if (_storageInterface->getInitialSyncFlag(opCtx)) {
return; // Initial Sync will take over so no cleanup is needed.
}
// This initializes the minvalid document with a null "ts" because older versions (<=3.2)
// get angry if the minValid document is present but doesn't have a "ts" field.
// Consider removing this once we no longer need to support downgrading to 3.2.
- _storageInterface->setMinValidToAtLeast(txn, {});
+ _storageInterface->setMinValidToAtLeast(opCtx, {});
- const auto deleteFromPoint = _storageInterface->getOplogDeleteFromPoint(txn);
- const auto appliedThrough = _storageInterface->getAppliedThrough(txn);
+ const auto deleteFromPoint = _storageInterface->getOplogDeleteFromPoint(opCtx);
+ const auto appliedThrough = _storageInterface->getAppliedThrough(opCtx);
const bool needToDeleteEndOfOplog = !deleteFromPoint.isNull() &&
// This version should never have a non-null deleteFromPoint with a null appliedThrough.
@@ -609,9 +609,9 @@ void ReplicationCoordinatorExternalStateImpl::cleanUpLastApplyBatch(OperationCon
!(appliedThrough.getTimestamp() >= deleteFromPoint);
if (needToDeleteEndOfOplog) {
log() << "Removing unapplied entries starting at: " << deleteFromPoint;
- truncateOplogTo(txn, deleteFromPoint);
+ truncateOplogTo(opCtx, deleteFromPoint);
}
- _storageInterface->setOplogDeleteFromPoint(txn, {}); // clear the deleteFromPoint
+ _storageInterface->setOplogDeleteFromPoint(opCtx, {}); // clear the deleteFromPoint
if (appliedThrough.isNull()) {
// No follow-up work to do.
@@ -620,7 +620,7 @@ void ReplicationCoordinatorExternalStateImpl::cleanUpLastApplyBatch(OperationCon
// Check if we have any unapplied ops in our oplog. It is important that this is done after
// deleting the ragged end of the oplog.
- const auto topOfOplog = fassertStatusOK(40290, loadLastOpTime(txn));
+ const auto topOfOplog = fassertStatusOK(40290, loadLastOpTime(opCtx));
if (appliedThrough == topOfOplog) {
return; // We've applied all the valid oplog we have.
} else if (appliedThrough > topOfOplog) {
@@ -632,7 +632,7 @@ void ReplicationCoordinatorExternalStateImpl::cleanUpLastApplyBatch(OperationCon
log() << "Replaying stored operations from " << appliedThrough << " (exclusive) to "
<< topOfOplog << " (inclusive).";
- DBDirectClient db(txn);
+ DBDirectClient db(opCtx);
auto cursor = db.query(rsOplogName,
QUERY("ts" << BSON("$gte" << appliedThrough.getTimestamp())),
/*batchSize*/ 0,
@@ -658,28 +658,29 @@ void ReplicationCoordinatorExternalStateImpl::cleanUpLastApplyBatch(OperationCon
}
// Apply remaining ops one at at time, but don't log them because they are already logged.
- const bool wereWritesReplicated = txn->writesAreReplicated();
- ON_BLOCK_EXIT([&] { txn->setReplicatedWrites(wereWritesReplicated); });
- txn->setReplicatedWrites(false);
+ const bool wereWritesReplicated = opCtx->writesAreReplicated();
+ ON_BLOCK_EXIT([&] { opCtx->setReplicatedWrites(wereWritesReplicated); });
+ opCtx->setReplicatedWrites(false);
while (cursor->more()) {
auto entry = cursor->nextSafe();
- fassertStatusOK(40294, SyncTail::syncApply(txn, entry, true));
+ fassertStatusOK(40294, SyncTail::syncApply(opCtx, entry, true));
_storageInterface->setAppliedThrough(
- txn, fassertStatusOK(40295, OpTime::parseFromOplogEntry(entry)));
+ opCtx, fassertStatusOK(40295, OpTime::parseFromOplogEntry(entry)));
}
}
-StatusWith<OpTime> ReplicationCoordinatorExternalStateImpl::loadLastOpTime(OperationContext* txn) {
+StatusWith<OpTime> ReplicationCoordinatorExternalStateImpl::loadLastOpTime(
+ OperationContext* opCtx) {
// TODO: handle WriteConflictExceptions below
try {
// If we are doing an initial sync do not read from the oplog.
- if (_storageInterface->getInitialSyncFlag(txn)) {
+ if (_storageInterface->getInitialSyncFlag(opCtx)) {
return {ErrorCodes::InitialSyncFailure, "In the middle of an initial sync."};
}
BSONObj oplogEntry;
- if (!Helpers::getLast(txn, rsOplogName.c_str(), oplogEntry)) {
+ if (!Helpers::getLast(opCtx, rsOplogName.c_str(), oplogEntry)) {
return StatusWith<OpTime>(ErrorCodes::NoMatchingDocument,
str::stream() << "Did not find any entries in "
<< rsOplogName);
@@ -711,17 +712,17 @@ bool ReplicationCoordinatorExternalStateImpl::isSelf(const HostAndPort& host, Se
}
HostAndPort ReplicationCoordinatorExternalStateImpl::getClientHostAndPort(
- const OperationContext* txn) {
- return HostAndPort(txn->getClient()->clientAddress(true));
+ const OperationContext* opCtx) {
+ return HostAndPort(opCtx->getClient()->clientAddress(true));
}
void ReplicationCoordinatorExternalStateImpl::closeConnections() {
getGlobalServiceContext()->getTransportLayer()->endAllSessions(transport::Session::kKeepOpen);
}
-void ReplicationCoordinatorExternalStateImpl::killAllUserOperations(OperationContext* txn) {
- ServiceContext* environment = txn->getServiceContext();
- environment->killAllUserOperations(txn, ErrorCodes::InterruptedDueToReplStateChange);
+void ReplicationCoordinatorExternalStateImpl::killAllUserOperations(OperationContext* opCtx) {
+ ServiceContext* environment = opCtx->getServiceContext();
+ environment->killAllUserOperations(opCtx, ErrorCodes::InterruptedDueToReplStateChange);
}
void ReplicationCoordinatorExternalStateImpl::shardingOnStepDownHook() {
@@ -733,8 +734,8 @@ void ReplicationCoordinatorExternalStateImpl::shardingOnStepDownHook() {
}
void ReplicationCoordinatorExternalStateImpl::_shardingOnTransitionToPrimaryHook(
- OperationContext* txn) {
- auto status = ShardingStateRecovery::recover(txn);
+ OperationContext* opCtx) {
+ auto status = ShardingStateRecovery::recover(opCtx);
if (ErrorCodes::isShutdownError(status.code())) {
// Note: callers of this method don't expect exceptions, so throw only unexpected fatal
@@ -745,7 +746,7 @@ void ReplicationCoordinatorExternalStateImpl::_shardingOnTransitionToPrimaryHook
fassertStatusOK(40107, status);
if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) {
- status = Grid::get(txn)->catalogManager()->initializeConfigDatabaseIfNeeded(txn);
+ status = Grid::get(opCtx)->catalogManager()->initializeConfigDatabaseIfNeeded(opCtx);
if (!status.isOK() && status != ErrorCodes::AlreadyInitialized) {
if (ErrorCodes::isShutdownError(status.code())) {
// Don't fassert if we're mid-shutdown, let the shutdown happen gracefully.
@@ -768,8 +769,8 @@ void ReplicationCoordinatorExternalStateImpl::_shardingOnTransitionToPrimaryHook
// Since we *just* wrote the cluster ID to the config.version document (via
// ShardingCatalogManager::initializeConfigDatabaseIfNeeded), this should always
// succeed.
- status = ClusterIdentityLoader::get(txn)->loadClusterId(
- txn, repl::ReadConcernLevel::kLocalReadConcern);
+ status = ClusterIdentityLoader::get(opCtx)->loadClusterId(
+ opCtx, repl::ReadConcernLevel::kLocalReadConcern);
if (ErrorCodes::isShutdownError(status.code())) {
// Don't fassert if we're mid-shutdown, let the shutdown happen gracefully.
@@ -780,20 +781,20 @@ void ReplicationCoordinatorExternalStateImpl::_shardingOnTransitionToPrimaryHook
}
// Free any leftover locks from previous instantiations.
- auto distLockManager = Grid::get(txn)->catalogClient(txn)->getDistLockManager();
- distLockManager->unlockAll(txn, distLockManager->getProcessID());
+ auto distLockManager = Grid::get(opCtx)->catalogClient(opCtx)->getDistLockManager();
+ distLockManager->unlockAll(opCtx, distLockManager->getProcessID());
// If this is a config server node becoming a primary, start the balancer
- Balancer::get(txn)->initiateBalancer(txn);
+ Balancer::get(opCtx)->initiateBalancer(opCtx);
// Generate and upsert random 20 byte key for the LogicalClock's TimeProofService.
// TODO: SERVER-27768
- } else if (ShardingState::get(txn)->enabled()) {
+ } else if (ShardingState::get(opCtx)->enabled()) {
const auto configsvrConnStr =
- Grid::get(txn)->shardRegistry()->getConfigShard()->getConnString();
- auto status = ShardingState::get(txn)->updateShardIdentityConfigString(
- txn, configsvrConnStr.toString());
+ Grid::get(opCtx)->shardRegistry()->getConfigShard()->getConnString();
+ auto status = ShardingState::get(opCtx)->updateShardIdentityConfigString(
+ opCtx, configsvrConnStr.toString());
if (!status.isOK()) {
warning() << "error encountered while trying to update config connection string to "
<< configsvrConnStr << causedBy(status);
@@ -802,7 +803,7 @@ void ReplicationCoordinatorExternalStateImpl::_shardingOnTransitionToPrimaryHook
// There is a slight chance that some stale metadata might have been loaded before the latest
// optime has been recovered, so throw out everything that we have up to now
- ShardingState::get(txn)->markCollectionsNotShardedAtStepdown();
+ ShardingState::get(opCtx)->markCollectionsNotShardedAtStepdown();
}
void ReplicationCoordinatorExternalStateImpl::signalApplierToChooseNewSyncSource() {
@@ -826,7 +827,7 @@ void ReplicationCoordinatorExternalStateImpl::startProducerIfStopped() {
}
}
-void ReplicationCoordinatorExternalStateImpl::_dropAllTempCollections(OperationContext* txn) {
+void ReplicationCoordinatorExternalStateImpl::_dropAllTempCollections(OperationContext* opCtx) {
std::vector<std::string> dbNames;
StorageEngine* storageEngine = getGlobalServiceContext()->getGlobalStorageEngine();
storageEngine->listDatabases(&dbNames);
@@ -837,12 +838,12 @@ void ReplicationCoordinatorExternalStateImpl::_dropAllTempCollections(OperationC
if (*it == "local")
continue;
LOG(2) << "Removing temporary collections from " << *it;
- Database* db = dbHolder().get(txn, *it);
+ Database* db = dbHolder().get(opCtx, *it);
// Since we must be holding the global lock during this function, if listDatabases
// returned this dbname, we should be able to get a reference to it - it can't have
// been dropped.
invariant(db);
- db->clearTmpCollections(txn);
+ db->clearTmpCollections(opCtx);
}
}
@@ -857,11 +858,11 @@ void ReplicationCoordinatorExternalStateImpl::updateCommittedSnapshot(SnapshotNa
manager->setCommittedSnapshot(newCommitPoint);
}
-void ReplicationCoordinatorExternalStateImpl::createSnapshot(OperationContext* txn,
+void ReplicationCoordinatorExternalStateImpl::createSnapshot(OperationContext* opCtx,
SnapshotName name) {
auto manager = getGlobalServiceContext()->getGlobalStorageEngine()->getSnapshotManager();
invariant(manager); // This should never be called if there is no SnapshotManager.
- manager->createSnapshot(txn, name);
+ manager->createSnapshot(opCtx, name);
}
void ReplicationCoordinatorExternalStateImpl::forceSnapshotCreation() {
@@ -882,18 +883,18 @@ double ReplicationCoordinatorExternalStateImpl::getElectionTimeoutOffsetLimitFra
}
bool ReplicationCoordinatorExternalStateImpl::isReadCommittedSupportedByStorageEngine(
- OperationContext* txn) const {
- auto storageEngine = txn->getServiceContext()->getGlobalStorageEngine();
+ OperationContext* opCtx) const {
+ auto storageEngine = opCtx->getServiceContext()->getGlobalStorageEngine();
// This should never be called if the storage engine has not been initialized.
invariant(storageEngine);
return storageEngine->getSnapshotManager();
}
StatusWith<OpTime> ReplicationCoordinatorExternalStateImpl::multiApply(
- OperationContext* txn,
+ OperationContext* opCtx,
MultiApplier::Operations ops,
MultiApplier::ApplyOperationFn applyOperation) {
- return repl::multiApply(txn, _writerPool.get(), std::move(ops), applyOperation);
+ return repl::multiApply(opCtx, _writerPool.get(), std::move(ops), applyOperation);
}
Status ReplicationCoordinatorExternalStateImpl::multiSyncApply(MultiApplier::OperationPtrs* ops) {
@@ -915,20 +916,20 @@ Status ReplicationCoordinatorExternalStateImpl::multiInitialSyncApply(
}
std::unique_ptr<OplogBuffer> ReplicationCoordinatorExternalStateImpl::makeInitialSyncOplogBuffer(
- OperationContext* txn) const {
+ OperationContext* opCtx) const {
if (initialSyncOplogBuffer == kCollectionOplogBufferName) {
invariant(initialSyncOplogBufferPeekCacheSize >= 0);
OplogBufferCollection::Options options;
options.peekCacheSize = std::size_t(initialSyncOplogBufferPeekCacheSize);
return stdx::make_unique<OplogBufferProxy>(
- stdx::make_unique<OplogBufferCollection>(StorageInterface::get(txn), options));
+ stdx::make_unique<OplogBufferCollection>(StorageInterface::get(opCtx), options));
} else {
return stdx::make_unique<OplogBufferBlockingQueue>();
}
}
std::unique_ptr<OplogBuffer> ReplicationCoordinatorExternalStateImpl::makeSteadyStateOplogBuffer(
- OperationContext* txn) const {
+ OperationContext* opCtx) const {
return stdx::make_unique<OplogBufferBlockingQueue>();
}