summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/sync_tail.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/repl/sync_tail.cpp')
-rw-r--r--src/mongo/db/repl/sync_tail.cpp211
1 files changed, 106 insertions, 105 deletions
diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp
index 277df2f9a9d..8738b47d027 100644
--- a/src/mongo/db/repl/sync_tail.cpp
+++ b/src/mongo/db/repl/sync_tail.cpp
@@ -255,8 +255,8 @@ void ApplyBatchFinalizerForJournal::_run() {
_latestOpTime = OpTime();
}
- auto txn = cc().makeOperationContext();
- txn->recoveryUnit()->waitUntilDurable();
+ auto opCtx = cc().makeOperationContext();
+ opCtx->recoveryUnit()->waitUntilDurable();
_recordDurable(latestOpTime);
}
}
@@ -276,19 +276,19 @@ std::unique_ptr<OldThreadPool> SyncTail::makeWriterPool() {
return stdx::make_unique<OldThreadPool>(replWriterThreadCount, "repl writer worker ");
}
-bool SyncTail::peek(OperationContext* txn, BSONObj* op) {
- return _networkQueue->peek(txn, op);
+bool SyncTail::peek(OperationContext* opCtx, BSONObj* op) {
+ return _networkQueue->peek(opCtx, op);
}
// static
-Status SyncTail::syncApply(OperationContext* txn,
+Status SyncTail::syncApply(OperationContext* opCtx,
const BSONObj& op,
bool inSteadyStateReplication,
ApplyOperationInLockFn applyOperationInLock,
ApplyCommandInLockFn applyCommandInLock,
IncrementOpsAppliedStatsFn incrementOpsAppliedStats) {
// Count each log op application as a separate operation, for reporting purposes
- CurOp individualOp(txn);
+ CurOp individualOp(opCtx);
const char* ns = op.getStringField("ns");
verify(ns);
@@ -312,24 +312,24 @@ Status SyncTail::syncApply(OperationContext* txn,
MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
// a command may need a global write lock. so we will conservatively go
// ahead and grab one here. suboptimal. :-(
- Lock::GlobalWrite globalWriteLock(txn->lockState());
+ Lock::GlobalWrite globalWriteLock(opCtx->lockState());
// special case apply for commands to avoid implicit database creation
- Status status = applyCommandInLock(txn, op, inSteadyStateReplication);
+ Status status = applyCommandInLock(opCtx, op, inSteadyStateReplication);
incrementOpsAppliedStats();
return status;
}
- MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "syncApply_command", ns);
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_END(opCtx, "syncApply_command", ns);
}
auto applyOp = [&](Database* db) {
// For non-initial-sync, we convert updates to upserts
// to suppress errors when replaying oplog entries.
- txn->setReplicatedWrites(false);
- DisableDocumentValidation validationDisabler(txn);
+ opCtx->setReplicatedWrites(false);
+ DisableDocumentValidation validationDisabler(opCtx);
Status status =
- applyOperationInLock(txn, db, op, inSteadyStateReplication, incrementOpsAppliedStats);
+ applyOperationInLock(opCtx, db, op, inSteadyStateReplication, incrementOpsAppliedStats);
if (!status.isOK() && status.code() == ErrorCodes::WriteConflict) {
throw WriteConflictException();
}
@@ -339,11 +339,11 @@ Status SyncTail::syncApply(OperationContext* txn,
if (isNoOp || (opType[0] == 'i' && nsToCollectionSubstring(ns) == "system.indexes")) {
auto opStr = isNoOp ? "syncApply_noop" : "syncApply_indexBuild";
MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
- Lock::DBLock dbLock(txn->lockState(), nsToDatabaseSubstring(ns), MODE_X);
- OldClientContext ctx(txn, ns);
+ Lock::DBLock dbLock(opCtx->lockState(), nsToDatabaseSubstring(ns), MODE_X);
+ OldClientContext ctx(opCtx, ns);
return applyOp(ctx.db());
}
- MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, opStr, ns);
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_END(opCtx, opStr, ns);
}
if (isCrudOpType(opType)) {
@@ -361,29 +361,29 @@ Status SyncTail::syncApply(OperationContext* txn,
// drop the DB lock before acquiring
// the upgraded one.
dbLock.reset();
- dbLock.reset(new Lock::DBLock(txn->lockState(), dbName, mode));
- collectionLock.reset(new Lock::CollectionLock(txn->lockState(), ns, mode));
+ dbLock.reset(new Lock::DBLock(opCtx->lockState(), dbName, mode));
+ collectionLock.reset(new Lock::CollectionLock(opCtx->lockState(), ns, mode));
};
resetLocks(MODE_IX);
- if (!dbHolder().get(txn, dbName)) {
+ if (!dbHolder().get(opCtx, dbName)) {
// Need to create database, so reset lock to stronger mode.
resetLocks(MODE_X);
- ctx.reset(new OldClientContext(txn, ns));
+ ctx.reset(new OldClientContext(opCtx, ns));
} else {
- ctx.reset(new OldClientContext(txn, ns));
+ ctx.reset(new OldClientContext(opCtx, ns));
if (!ctx->db()->getCollection(ns)) {
// Need to implicitly create collection. This occurs for 'u' opTypes,
// but not for 'i' nor 'd'.
ctx.reset();
resetLocks(MODE_X);
- ctx.reset(new OldClientContext(txn, ns));
+ ctx.reset(new OldClientContext(opCtx, ns));
}
}
return applyOp(ctx->db());
}
- MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "syncApply_CRUD", ns);
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_END(opCtx, "syncApply_CRUD", ns);
}
// unknown opType
@@ -393,10 +393,10 @@ Status SyncTail::syncApply(OperationContext* txn,
return Status(ErrorCodes::BadValue, ss);
}
-Status SyncTail::syncApply(OperationContext* txn,
+Status SyncTail::syncApply(OperationContext* opCtx,
const BSONObj& op,
bool inSteadyStateReplication) {
- return SyncTail::syncApply(txn,
+ return SyncTail::syncApply(opCtx,
op,
inSteadyStateReplication,
applyOperation_inlock,
@@ -416,12 +416,12 @@ void prefetchOp(const BSONObj& op) {
try {
// one possible tweak here would be to stay in the read lock for this database
// for multiple prefetches if they are for the same database.
- const ServiceContext::UniqueOperationContext txnPtr = cc().makeOperationContext();
- OperationContext& txn = *txnPtr;
- AutoGetCollectionForRead ctx(&txn, NamespaceString(ns));
+ const ServiceContext::UniqueOperationContext opCtxPtr = cc().makeOperationContext();
+ OperationContext& opCtx = *opCtxPtr;
+ AutoGetCollectionForRead ctx(&opCtx, NamespaceString(ns));
Database* db = ctx.getDb();
if (db) {
- prefetchPagesForReplicatedOp(&txn, db, op);
+ prefetchPagesForReplicatedOp(&opCtx, db, op);
}
} catch (const DBException& e) {
LOG(2) << "ignoring exception in prefetchOp(): " << redact(e) << endl;
@@ -468,7 +468,7 @@ void initializeWriterThread() {
// Schedules the writes to the oplog for 'ops' into threadPool. The caller must guarantee that 'ops'
// stays valid until all scheduled work in the thread pool completes.
-void scheduleWritesToOplog(OperationContext* txn,
+void scheduleWritesToOplog(OperationContext* opCtx,
OldThreadPool* threadPool,
const MultiApplier::Operations& ops) {
@@ -479,9 +479,9 @@ void scheduleWritesToOplog(OperationContext* txn,
return [&ops, begin, end] {
initializeWriterThread();
const auto txnHolder = cc().makeOperationContext();
- const auto txn = txnHolder.get();
- txn->lockState()->setShouldConflictWithSecondaryBatchApplication(false);
- txn->setReplicatedWrites(false);
+ const auto opCtx = txnHolder.get();
+ opCtx->lockState()->setShouldConflictWithSecondaryBatchApplication(false);
+ opCtx->setReplicatedWrites(false);
std::vector<BSONObj> docs;
docs.reserve(end - begin);
@@ -492,8 +492,8 @@ void scheduleWritesToOplog(OperationContext* txn,
}
fassertStatusOK(40141,
- StorageInterface::get(txn)->insertDocuments(
- txn, NamespaceString(rsOplogName), docs));
+ StorageInterface::get(opCtx)->insertDocuments(
+ opCtx, NamespaceString(rsOplogName), docs));
};
};
@@ -509,7 +509,7 @@ void scheduleWritesToOplog(OperationContext* txn,
// there would be no way to take advantage of multiple threads if a storage engine doesn't
// support document locking.
if (!enoughToMultiThread ||
- !txn->getServiceContext()->getGlobalStorageEngine()->supportsDocLocking()) {
+ !opCtx->getServiceContext()->getGlobalStorageEngine()->supportsDocLocking()) {
threadPool->schedule(makeOplogWriterForRange(0, ops.size()));
return;
@@ -536,24 +536,24 @@ public:
const CollatorInterface* collator = nullptr;
};
- CollectionProperties getCollectionProperties(OperationContext* txn,
+ CollectionProperties getCollectionProperties(OperationContext* opCtx,
const StringMapTraits::HashedKey& ns) {
auto it = _cache.find(ns);
if (it != _cache.end()) {
return it->second;
}
- auto collProperties = getCollectionPropertiesImpl(txn, ns.key());
+ auto collProperties = getCollectionPropertiesImpl(opCtx, ns.key());
_cache[ns] = collProperties;
return collProperties;
}
private:
- CollectionProperties getCollectionPropertiesImpl(OperationContext* txn, StringData ns) {
+ CollectionProperties getCollectionPropertiesImpl(OperationContext* opCtx, StringData ns) {
CollectionProperties collProperties;
- Lock::DBLock dbLock(txn->lockState(), nsToDatabaseSubstring(ns), MODE_IS);
- auto db = dbHolder().get(txn, ns);
+ Lock::DBLock dbLock(opCtx->lockState(), nsToDatabaseSubstring(ns), MODE_IS);
+ auto db = dbHolder().get(opCtx, ns);
if (!db) {
return collProperties;
}
@@ -573,7 +573,7 @@ private:
// This only modifies the isForCappedCollection field on each op. It does not alter the ops vector
// in any other way.
-void fillWriterVectors(OperationContext* txn,
+void fillWriterVectors(OperationContext* opCtx,
MultiApplier::Operations* ops,
std::vector<MultiApplier::OperationPtrs>* writerVectors) {
const bool supportsDocLocking =
@@ -587,7 +587,7 @@ void fillWriterVectors(OperationContext* txn,
uint32_t hash = hashedNs.hash();
if (op.isCrudOpType()) {
- auto collProperties = collPropertiesCache.getCollectionProperties(txn, hashedNs);
+ auto collProperties = collPropertiesCache.getCollectionProperties(opCtx, hashedNs);
// For doc locking engines, include the _id of the document in the hash so we get
// parallelism even if all writes are to a single collection.
@@ -620,7 +620,7 @@ void fillWriterVectors(OperationContext* txn,
// Applies a batch of oplog entries, by using a set of threads to apply the operations and then
// writes the oplog entries to the local oplog.
-OpTime SyncTail::multiApply(OperationContext* txn, MultiApplier::Operations ops) {
+OpTime SyncTail::multiApply(OperationContext* opCtx, MultiApplier::Operations ops) {
auto applyOperation = [this](MultiApplier::OperationPtrs* ops) -> Status {
_applyFunc(ops, this);
// This function is used by 3.2 initial sync and steady state data replication.
@@ -628,11 +628,11 @@ OpTime SyncTail::multiApply(OperationContext* txn, MultiApplier::Operations ops)
return Status::OK();
};
return fassertStatusOK(
- 34437, repl::multiApply(txn, _writerPool.get(), std::move(ops), applyOperation));
+ 34437, repl::multiApply(opCtx, _writerPool.get(), std::move(ops), applyOperation));
}
namespace {
-void tryToGoLiveAsASecondary(OperationContext* txn, ReplicationCoordinator* replCoord) {
+void tryToGoLiveAsASecondary(OperationContext* opCtx, ReplicationCoordinator* replCoord) {
if (replCoord->isInPrimaryOrSecondaryState()) {
return;
}
@@ -640,8 +640,8 @@ void tryToGoLiveAsASecondary(OperationContext* txn, ReplicationCoordinator* repl
// This needs to happen after the attempt so readers can be sure we've already tried.
ON_BLOCK_EXIT([] { attemptsToBecomeSecondary.increment(); });
- ScopedTransaction transaction(txn, MODE_S);
- Lock::GlobalRead readLock(txn->lockState());
+ ScopedTransaction transaction(opCtx, MODE_S);
+ Lock::GlobalRead readLock(opCtx->lockState());
if (replCoord->getMaintenanceMode()) {
LOG(1) << "Can't go live (tryToGoLiveAsASecondary) as maintenance mode is active.";
@@ -657,7 +657,7 @@ void tryToGoLiveAsASecondary(OperationContext* txn, ReplicationCoordinator* repl
}
// We can't go to SECONDARY until we reach minvalid.
- if (replCoord->getMyLastAppliedOpTime() < StorageInterface::get(txn)->getMinValid(txn)) {
+ if (replCoord->getMyLastAppliedOpTime() < StorageInterface::get(opCtx)->getMinValid(opCtx)) {
return;
}
@@ -697,13 +697,13 @@ public:
private:
void run() {
Client::initThread("ReplBatcher");
- const ServiceContext::UniqueOperationContext txnPtr = cc().makeOperationContext();
- OperationContext& txn = *txnPtr;
- const auto replCoord = ReplicationCoordinator::get(&txn);
- const auto fastClockSource = txn.getServiceContext()->getFastClockSource();
+ const ServiceContext::UniqueOperationContext opCtxPtr = cc().makeOperationContext();
+ OperationContext& opCtx = *opCtxPtr;
+ const auto replCoord = ReplicationCoordinator::get(&opCtx);
+ const auto fastClockSource = opCtx.getServiceContext()->getFastClockSource();
const auto oplogMaxSize = fassertStatusOK(
40301,
- StorageInterface::get(&txn)->getOplogMaxSize(&txn, NamespaceString(rsOplogName)));
+ StorageInterface::get(&opCtx)->getOplogMaxSize(&opCtx, NamespaceString(rsOplogName)));
// Batches are limited to 10% of the oplog.
BatchLimits batchLimits;
@@ -720,7 +720,7 @@ private:
OpQueue ops;
// tryPopAndWaitForMore adds to ops and returns true when we need to end a batch early.
- while (!_syncTail->tryPopAndWaitForMore(&txn, &ops, batchLimits)) {
+ while (!_syncTail->tryPopAndWaitForMore(&opCtx, &ops, batchLimits)) {
}
if (ops.empty() && !ops.mustShutdown()) {
@@ -755,8 +755,8 @@ private:
void SyncTail::oplogApplication(ReplicationCoordinator* replCoord) {
OpQueueBatcher batcher(this);
- const ServiceContext::UniqueOperationContext txnPtr = cc().makeOperationContext();
- OperationContext& txn = *txnPtr;
+ const ServiceContext::UniqueOperationContext opCtxPtr = cc().makeOperationContext();
+ OperationContext& opCtx = *opCtxPtr;
std::unique_ptr<ApplyBatchFinalizer> finalizer{
getGlobalServiceContext()->getGlobalStorageEngine()->isDurable()
? new ApplyBatchFinalizerForJournal(replCoord)
@@ -774,7 +774,7 @@ void SyncTail::oplogApplication(ReplicationCoordinator* replCoord) {
sleepmillis(10);
}
- tryToGoLiveAsASecondary(&txn, replCoord);
+ tryToGoLiveAsASecondary(&opCtx, replCoord);
long long termWhenBufferIsEmpty = replCoord->getTerm();
// Blocks up to a second waiting for a batch to be ready to apply. If one doesn't become
@@ -788,7 +788,7 @@ void SyncTail::oplogApplication(ReplicationCoordinator* replCoord) {
continue;
}
// Signal drain complete if we're in Draining state and the buffer is empty.
- replCoord->signalDrainComplete(&txn, termWhenBufferIsEmpty);
+ replCoord->signalDrainComplete(&opCtx, termWhenBufferIsEmpty);
continue; // Try again.
}
@@ -813,13 +813,13 @@ void SyncTail::oplogApplication(ReplicationCoordinator* replCoord) {
stdx::lock_guard<SimpleMutex> fsynclk(filesLockedFsync);
// Do the work.
- multiApply(&txn, ops.releaseBatch());
+ multiApply(&opCtx, ops.releaseBatch());
// Update various things that care about our last applied optime. Tests rely on 2 happening
// before 3 even though it isn't strictly necessary. The order of 1 doesn't matter.
- setNewTimestamp(txn.getServiceContext(), lastOpTimeInBatch.getTimestamp()); // 1
- StorageInterface::get(&txn)->setAppliedThrough(&txn, lastOpTimeInBatch); // 2
- finalizer->record(lastOpTimeInBatch); // 3
+ setNewTimestamp(opCtx.getServiceContext(), lastOpTimeInBatch.getTimestamp()); // 1
+ StorageInterface::get(&opCtx)->setAppliedThrough(&opCtx, lastOpTimeInBatch); // 2
+ finalizer->record(lastOpTimeInBatch); // 3
}
}
@@ -830,13 +830,13 @@ void SyncTail::oplogApplication(ReplicationCoordinator* replCoord) {
// This function also blocks 1 second waiting for new ops to appear in the bgsync
// queue. We don't block forever so that we can periodically check for things like shutdown or
// reconfigs.
-bool SyncTail::tryPopAndWaitForMore(OperationContext* txn,
+bool SyncTail::tryPopAndWaitForMore(OperationContext* opCtx,
SyncTail::OpQueue* ops,
const BatchLimits& limits) {
{
BSONObj op;
// Check to see if there are ops waiting in the bgsync queue
- bool peek_success = peek(txn, &op);
+ bool peek_success = peek(opCtx, &op);
if (!peek_success) {
// If we don't have anything in the queue, wait a bit for something to appear.
if (ops->empty()) {
@@ -908,7 +908,7 @@ bool SyncTail::tryPopAndWaitForMore(OperationContext* txn,
(!entry.ns.empty() && nsToCollectionSubstring(entry.ns) == "system.indexes")) {
if (ops->getCount() == 1) {
// apply commands one-at-a-time
- _networkQueue->consume(txn);
+ _networkQueue->consume(opCtx);
} else {
// This op must be processed alone, but we already had ops in the queue so we can't
// include it in this batch. Since we didn't call consume(), we'll see this again next
@@ -921,7 +921,7 @@ bool SyncTail::tryPopAndWaitForMore(OperationContext* txn,
}
// We are going to apply this Op.
- _networkQueue->consume(txn);
+ _networkQueue->consume(opCtx);
// Go back for more ops, unless we've hit the limit.
return ops->getCount() >= limits.ops;
@@ -935,7 +935,7 @@ OldThreadPool* SyncTail::getWriterPool() {
return _writerPool.get();
}
-BSONObj SyncTail::getMissingDoc(OperationContext* txn, Database* db, const BSONObj& o) {
+BSONObj SyncTail::getMissingDoc(OperationContext* opCtx, Database* db, const BSONObj& o) {
OplogReader missingObjReader; // why are we using OplogReader to run a non-oplog query?
const char* ns = o.getStringField("ns");
@@ -1004,18 +1004,18 @@ BSONObj SyncTail::getMissingDoc(OperationContext* txn, Database* db, const BSONO
str::stream() << "Can no longer connect to initial sync source: " << _hostname);
}
-bool SyncTail::shouldRetry(OperationContext* txn, const BSONObj& o) {
+bool SyncTail::shouldRetry(OperationContext* opCtx, const BSONObj& o) {
const NamespaceString nss(o.getStringField("ns"));
MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
// Take an X lock on the database in order to preclude other modifications.
// Also, the database might not exist yet, so create it.
- AutoGetOrCreateDb autoDb(txn, nss.db(), MODE_X);
+ AutoGetOrCreateDb autoDb(opCtx, nss.db(), MODE_X);
Database* const db = autoDb.getDb();
// we don't have the object yet, which is possible on initial sync. get it.
log() << "adding missing object" << endl; // rare enough we can log
- BSONObj missingObj = getMissingDoc(txn, db, o);
+ BSONObj missingObj = getMissingDoc(opCtx, db, o);
if (missingObj.isEmpty()) {
log() << "missing object not found on source."
@@ -1025,13 +1025,13 @@ bool SyncTail::shouldRetry(OperationContext* txn, const BSONObj& o) {
return false;
} else {
- WriteUnitOfWork wunit(txn);
+ WriteUnitOfWork wunit(opCtx);
- Collection* const coll = db->getOrCreateCollection(txn, nss.toString());
+ Collection* const coll = db->getOrCreateCollection(opCtx, nss.toString());
invariant(coll);
OpDebug* const nullOpDebug = nullptr;
- Status status = coll->insertDocument(txn, missingObj, nullOpDebug, true);
+ Status status = coll->insertDocument(opCtx, missingObj, nullOpDebug, true);
uassert(15917,
str::stream() << "failed to insert missing doc: " << status.toString(),
status.isOK());
@@ -1042,7 +1042,7 @@ bool SyncTail::shouldRetry(OperationContext* txn, const BSONObj& o) {
return true;
}
}
- MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "InsertRetry", nss.ns());
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_END(opCtx, "InsertRetry", nss.ns());
// fixes compile errors on GCC - see SERVER-18219 for details
MONGO_UNREACHABLE;
@@ -1051,22 +1051,22 @@ bool SyncTail::shouldRetry(OperationContext* txn, const BSONObj& o) {
// This free function is used by the writer threads to apply each op
void multiSyncApply(MultiApplier::OperationPtrs* ops, SyncTail*) {
initializeWriterThread();
- auto txn = cc().makeOperationContext();
- auto syncApply = [](OperationContext* txn, const BSONObj& op, bool inSteadyStateReplication) {
- return SyncTail::syncApply(txn, op, inSteadyStateReplication);
+ auto opCtx = cc().makeOperationContext();
+ auto syncApply = [](OperationContext* opCtx, const BSONObj& op, bool inSteadyStateReplication) {
+ return SyncTail::syncApply(opCtx, op, inSteadyStateReplication);
};
- fassertNoTrace(16359, multiSyncApply_noAbort(txn.get(), ops, syncApply));
+ fassertNoTrace(16359, multiSyncApply_noAbort(opCtx.get(), ops, syncApply));
}
-Status multiSyncApply_noAbort(OperationContext* txn,
+Status multiSyncApply_noAbort(OperationContext* opCtx,
MultiApplier::OperationPtrs* oplogEntryPointers,
SyncApplyFn syncApply) {
- txn->setReplicatedWrites(false);
- DisableDocumentValidation validationDisabler(txn);
+ opCtx->setReplicatedWrites(false);
+ DisableDocumentValidation validationDisabler(opCtx);
// allow us to get through the magic barrier
- txn->lockState()->setShouldConflictWithSecondaryBatchApplication(false);
+ opCtx->lockState()->setShouldConflictWithSecondaryBatchApplication(false);
if (oplogEntryPointers->size() > 1) {
std::stable_sort(oplogEntryPointers->begin(),
@@ -1125,7 +1125,7 @@ Status multiSyncApply_noAbort(OperationContext* txn,
try {
// Apply the group of inserts.
uassertStatusOK(
- syncApply(txn, groupedInsertBuilder.done(), inSteadyStateReplication));
+ syncApply(opCtx, groupedInsertBuilder.done(), inSteadyStateReplication));
// It succeeded, advance the oplogEntriesIterator to the end of the
// group of inserts.
oplogEntriesIterator = endOfGroupableOpsIterator - 1;
@@ -1145,7 +1145,7 @@ Status multiSyncApply_noAbort(OperationContext* txn,
try {
// Apply an individual (non-grouped) op.
- const Status status = syncApply(txn, entry->raw, inSteadyStateReplication);
+ const Status status = syncApply(opCtx, entry->raw, inSteadyStateReplication);
if (!status.isOK()) {
severe() << "Error applying operation (" << redact(entry->raw)
@@ -1165,28 +1165,28 @@ Status multiSyncApply_noAbort(OperationContext* txn,
// This free function is used by the initial sync writer threads to apply each op
void multiInitialSyncApply_abortOnFailure(MultiApplier::OperationPtrs* ops, SyncTail* st) {
initializeWriterThread();
- auto txn = cc().makeOperationContext();
+ auto opCtx = cc().makeOperationContext();
AtomicUInt32 fetchCount(0);
- fassertNoTrace(15915, multiInitialSyncApply_noAbort(txn.get(), ops, st, &fetchCount));
+ fassertNoTrace(15915, multiInitialSyncApply_noAbort(opCtx.get(), ops, st, &fetchCount));
}
Status multiInitialSyncApply(MultiApplier::OperationPtrs* ops,
SyncTail* st,
AtomicUInt32* fetchCount) {
initializeWriterThread();
- auto txn = cc().makeOperationContext();
- return multiInitialSyncApply_noAbort(txn.get(), ops, st, fetchCount);
+ auto opCtx = cc().makeOperationContext();
+ return multiInitialSyncApply_noAbort(opCtx.get(), ops, st, fetchCount);
}
-Status multiInitialSyncApply_noAbort(OperationContext* txn,
+Status multiInitialSyncApply_noAbort(OperationContext* opCtx,
MultiApplier::OperationPtrs* ops,
SyncTail* st,
AtomicUInt32* fetchCount) {
- txn->setReplicatedWrites(false);
- DisableDocumentValidation validationDisabler(txn);
+ opCtx->setReplicatedWrites(false);
+ DisableDocumentValidation validationDisabler(opCtx);
// allow us to get through the magic barrier
- txn->lockState()->setShouldConflictWithSecondaryBatchApplication(false);
+ opCtx->lockState()->setShouldConflictWithSecondaryBatchApplication(false);
// This function is only called in initial sync, as its name suggests.
const bool inSteadyStateReplication = false;
@@ -1194,7 +1194,7 @@ Status multiInitialSyncApply_noAbort(OperationContext* txn,
for (auto it = ops->begin(); it != ops->end(); ++it) {
auto& entry = **it;
try {
- const Status s = SyncTail::syncApply(txn, entry.raw, inSteadyStateReplication);
+ const Status s = SyncTail::syncApply(opCtx, entry.raw, inSteadyStateReplication);
if (!s.isOK()) {
// Don't retry on commands.
if (entry.isCommand()) {
@@ -1205,8 +1205,9 @@ Status multiInitialSyncApply_noAbort(OperationContext* txn,
// We might need to fetch the missing docs from the sync source.
fetchCount->fetchAndAdd(1);
- if (st->shouldRetry(txn, entry.raw)) {
- const Status s2 = SyncTail::syncApply(txn, entry.raw, inSteadyStateReplication);
+ if (st->shouldRetry(opCtx, entry.raw)) {
+ const Status s2 =
+ SyncTail::syncApply(opCtx, entry.raw, inSteadyStateReplication);
if (!s2.isOK()) {
severe() << "Error applying operation (" << redact(entry.raw)
<< "): " << redact(s2);
@@ -1234,11 +1235,11 @@ Status multiInitialSyncApply_noAbort(OperationContext* txn,
return Status::OK();
}
-StatusWith<OpTime> multiApply(OperationContext* txn,
+StatusWith<OpTime> multiApply(OperationContext* opCtx,
OldThreadPool* workerPool,
MultiApplier::Operations ops,
MultiApplier::ApplyOperationFn applyOperation) {
- if (!txn) {
+ if (!opCtx) {
return {ErrorCodes::BadValue, "invalid operation context"};
}
@@ -1259,14 +1260,14 @@ StatusWith<OpTime> multiApply(OperationContext* txn,
prefetchOps(ops, workerPool);
}
- auto storage = StorageInterface::get(txn);
+ auto storage = StorageInterface::get(opCtx);
LOG(2) << "replication batch size is " << ops.size();
// Stop all readers until we're done. This also prevents doc-locking engines from deleting old
// entries from the oplog until we finish writing.
- Lock::ParallelBatchWriterMode pbwm(txn->lockState());
+ Lock::ParallelBatchWriterMode pbwm(opCtx->lockState());
- auto replCoord = ReplicationCoordinator::get(txn);
+ auto replCoord = ReplicationCoordinator::get(opCtx);
if (replCoord->getApplierState() == ReplicationCoordinator::ApplierState::Stopped) {
severe() << "attempting to replicate ops while primary";
return {ErrorCodes::CannotApplyOplogWhilePrimary,
@@ -1280,14 +1281,14 @@ StatusWith<OpTime> multiApply(OperationContext* txn,
std::vector<MultiApplier::OperationPtrs> writerVectors(workerPool->getNumThreads());
ON_BLOCK_EXIT([&] { workerPool->join(); });
- storage->setOplogDeleteFromPoint(txn, ops.front().ts.timestamp());
- scheduleWritesToOplog(txn, workerPool, ops);
- fillWriterVectors(txn, &ops, &writerVectors);
+ storage->setOplogDeleteFromPoint(opCtx, ops.front().ts.timestamp());
+ scheduleWritesToOplog(opCtx, workerPool, ops);
+ fillWriterVectors(opCtx, &ops, &writerVectors);
workerPool->join();
- storage->setOplogDeleteFromPoint(txn, Timestamp());
- storage->setMinValidToAtLeast(txn, ops.back().getOpTime());
+ storage->setOplogDeleteFromPoint(opCtx, Timestamp());
+ storage->setMinValidToAtLeast(opCtx, ops.back().getOpTime());
applyOps(writerVectors, workerPool, applyOperation, &statusVector);
}