diff options
Diffstat (limited to 'src/mongo/db/repl/sync_tail.cpp')
-rw-r--r-- | src/mongo/db/repl/sync_tail.cpp | 211 |
1 files changed, 106 insertions, 105 deletions
diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp index 277df2f9a9d..8738b47d027 100644 --- a/src/mongo/db/repl/sync_tail.cpp +++ b/src/mongo/db/repl/sync_tail.cpp @@ -255,8 +255,8 @@ void ApplyBatchFinalizerForJournal::_run() { _latestOpTime = OpTime(); } - auto txn = cc().makeOperationContext(); - txn->recoveryUnit()->waitUntilDurable(); + auto opCtx = cc().makeOperationContext(); + opCtx->recoveryUnit()->waitUntilDurable(); _recordDurable(latestOpTime); } } @@ -276,19 +276,19 @@ std::unique_ptr<OldThreadPool> SyncTail::makeWriterPool() { return stdx::make_unique<OldThreadPool>(replWriterThreadCount, "repl writer worker "); } -bool SyncTail::peek(OperationContext* txn, BSONObj* op) { - return _networkQueue->peek(txn, op); +bool SyncTail::peek(OperationContext* opCtx, BSONObj* op) { + return _networkQueue->peek(opCtx, op); } // static -Status SyncTail::syncApply(OperationContext* txn, +Status SyncTail::syncApply(OperationContext* opCtx, const BSONObj& op, bool inSteadyStateReplication, ApplyOperationInLockFn applyOperationInLock, ApplyCommandInLockFn applyCommandInLock, IncrementOpsAppliedStatsFn incrementOpsAppliedStats) { // Count each log op application as a separate operation, for reporting purposes - CurOp individualOp(txn); + CurOp individualOp(opCtx); const char* ns = op.getStringField("ns"); verify(ns); @@ -312,24 +312,24 @@ Status SyncTail::syncApply(OperationContext* txn, MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { // a command may need a global write lock. so we will conservatively go // ahead and grab one here. suboptimal. :-( - Lock::GlobalWrite globalWriteLock(txn->lockState()); + Lock::GlobalWrite globalWriteLock(opCtx->lockState()); // special case apply for commands to avoid implicit database creation - Status status = applyCommandInLock(txn, op, inSteadyStateReplication); + Status status = applyCommandInLock(opCtx, op, inSteadyStateReplication); incrementOpsAppliedStats(); return status; } - MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "syncApply_command", ns); + MONGO_WRITE_CONFLICT_RETRY_LOOP_END(opCtx, "syncApply_command", ns); } auto applyOp = [&](Database* db) { // For non-initial-sync, we convert updates to upserts // to suppress errors when replaying oplog entries. - txn->setReplicatedWrites(false); - DisableDocumentValidation validationDisabler(txn); + opCtx->setReplicatedWrites(false); + DisableDocumentValidation validationDisabler(opCtx); Status status = - applyOperationInLock(txn, db, op, inSteadyStateReplication, incrementOpsAppliedStats); + applyOperationInLock(opCtx, db, op, inSteadyStateReplication, incrementOpsAppliedStats); if (!status.isOK() && status.code() == ErrorCodes::WriteConflict) { throw WriteConflictException(); } @@ -339,11 +339,11 @@ Status SyncTail::syncApply(OperationContext* txn, if (isNoOp || (opType[0] == 'i' && nsToCollectionSubstring(ns) == "system.indexes")) { auto opStr = isNoOp ? "syncApply_noop" : "syncApply_indexBuild"; MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { - Lock::DBLock dbLock(txn->lockState(), nsToDatabaseSubstring(ns), MODE_X); - OldClientContext ctx(txn, ns); + Lock::DBLock dbLock(opCtx->lockState(), nsToDatabaseSubstring(ns), MODE_X); + OldClientContext ctx(opCtx, ns); return applyOp(ctx.db()); } - MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, opStr, ns); + MONGO_WRITE_CONFLICT_RETRY_LOOP_END(opCtx, opStr, ns); } if (isCrudOpType(opType)) { @@ -361,29 +361,29 @@ Status SyncTail::syncApply(OperationContext* txn, // drop the DB lock before acquiring // the upgraded one. dbLock.reset(); - dbLock.reset(new Lock::DBLock(txn->lockState(), dbName, mode)); - collectionLock.reset(new Lock::CollectionLock(txn->lockState(), ns, mode)); + dbLock.reset(new Lock::DBLock(opCtx->lockState(), dbName, mode)); + collectionLock.reset(new Lock::CollectionLock(opCtx->lockState(), ns, mode)); }; resetLocks(MODE_IX); - if (!dbHolder().get(txn, dbName)) { + if (!dbHolder().get(opCtx, dbName)) { // Need to create database, so reset lock to stronger mode. resetLocks(MODE_X); - ctx.reset(new OldClientContext(txn, ns)); + ctx.reset(new OldClientContext(opCtx, ns)); } else { - ctx.reset(new OldClientContext(txn, ns)); + ctx.reset(new OldClientContext(opCtx, ns)); if (!ctx->db()->getCollection(ns)) { // Need to implicitly create collection. This occurs for 'u' opTypes, // but not for 'i' nor 'd'. ctx.reset(); resetLocks(MODE_X); - ctx.reset(new OldClientContext(txn, ns)); + ctx.reset(new OldClientContext(opCtx, ns)); } } return applyOp(ctx->db()); } - MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "syncApply_CRUD", ns); + MONGO_WRITE_CONFLICT_RETRY_LOOP_END(opCtx, "syncApply_CRUD", ns); } // unknown opType @@ -393,10 +393,10 @@ Status SyncTail::syncApply(OperationContext* txn, return Status(ErrorCodes::BadValue, ss); } -Status SyncTail::syncApply(OperationContext* txn, +Status SyncTail::syncApply(OperationContext* opCtx, const BSONObj& op, bool inSteadyStateReplication) { - return SyncTail::syncApply(txn, + return SyncTail::syncApply(opCtx, op, inSteadyStateReplication, applyOperation_inlock, @@ -416,12 +416,12 @@ void prefetchOp(const BSONObj& op) { try { // one possible tweak here would be to stay in the read lock for this database // for multiple prefetches if they are for the same database. - const ServiceContext::UniqueOperationContext txnPtr = cc().makeOperationContext(); - OperationContext& txn = *txnPtr; - AutoGetCollectionForRead ctx(&txn, NamespaceString(ns)); + const ServiceContext::UniqueOperationContext opCtxPtr = cc().makeOperationContext(); + OperationContext& opCtx = *opCtxPtr; + AutoGetCollectionForRead ctx(&opCtx, NamespaceString(ns)); Database* db = ctx.getDb(); if (db) { - prefetchPagesForReplicatedOp(&txn, db, op); + prefetchPagesForReplicatedOp(&opCtx, db, op); } } catch (const DBException& e) { LOG(2) << "ignoring exception in prefetchOp(): " << redact(e) << endl; @@ -468,7 +468,7 @@ void initializeWriterThread() { // Schedules the writes to the oplog for 'ops' into threadPool. The caller must guarantee that 'ops' // stays valid until all scheduled work in the thread pool completes. -void scheduleWritesToOplog(OperationContext* txn, +void scheduleWritesToOplog(OperationContext* opCtx, OldThreadPool* threadPool, const MultiApplier::Operations& ops) { @@ -479,9 +479,9 @@ void scheduleWritesToOplog(OperationContext* txn, return [&ops, begin, end] { initializeWriterThread(); const auto txnHolder = cc().makeOperationContext(); - const auto txn = txnHolder.get(); - txn->lockState()->setShouldConflictWithSecondaryBatchApplication(false); - txn->setReplicatedWrites(false); + const auto opCtx = txnHolder.get(); + opCtx->lockState()->setShouldConflictWithSecondaryBatchApplication(false); + opCtx->setReplicatedWrites(false); std::vector<BSONObj> docs; docs.reserve(end - begin); @@ -492,8 +492,8 @@ void scheduleWritesToOplog(OperationContext* txn, } fassertStatusOK(40141, - StorageInterface::get(txn)->insertDocuments( - txn, NamespaceString(rsOplogName), docs)); + StorageInterface::get(opCtx)->insertDocuments( + opCtx, NamespaceString(rsOplogName), docs)); }; }; @@ -509,7 +509,7 @@ void scheduleWritesToOplog(OperationContext* txn, // there would be no way to take advantage of multiple threads if a storage engine doesn't // support document locking. if (!enoughToMultiThread || - !txn->getServiceContext()->getGlobalStorageEngine()->supportsDocLocking()) { + !opCtx->getServiceContext()->getGlobalStorageEngine()->supportsDocLocking()) { threadPool->schedule(makeOplogWriterForRange(0, ops.size())); return; @@ -536,24 +536,24 @@ public: const CollatorInterface* collator = nullptr; }; - CollectionProperties getCollectionProperties(OperationContext* txn, + CollectionProperties getCollectionProperties(OperationContext* opCtx, const StringMapTraits::HashedKey& ns) { auto it = _cache.find(ns); if (it != _cache.end()) { return it->second; } - auto collProperties = getCollectionPropertiesImpl(txn, ns.key()); + auto collProperties = getCollectionPropertiesImpl(opCtx, ns.key()); _cache[ns] = collProperties; return collProperties; } private: - CollectionProperties getCollectionPropertiesImpl(OperationContext* txn, StringData ns) { + CollectionProperties getCollectionPropertiesImpl(OperationContext* opCtx, StringData ns) { CollectionProperties collProperties; - Lock::DBLock dbLock(txn->lockState(), nsToDatabaseSubstring(ns), MODE_IS); - auto db = dbHolder().get(txn, ns); + Lock::DBLock dbLock(opCtx->lockState(), nsToDatabaseSubstring(ns), MODE_IS); + auto db = dbHolder().get(opCtx, ns); if (!db) { return collProperties; } @@ -573,7 +573,7 @@ private: // This only modifies the isForCappedCollection field on each op. It does not alter the ops vector // in any other way. -void fillWriterVectors(OperationContext* txn, +void fillWriterVectors(OperationContext* opCtx, MultiApplier::Operations* ops, std::vector<MultiApplier::OperationPtrs>* writerVectors) { const bool supportsDocLocking = @@ -587,7 +587,7 @@ void fillWriterVectors(OperationContext* txn, uint32_t hash = hashedNs.hash(); if (op.isCrudOpType()) { - auto collProperties = collPropertiesCache.getCollectionProperties(txn, hashedNs); + auto collProperties = collPropertiesCache.getCollectionProperties(opCtx, hashedNs); // For doc locking engines, include the _id of the document in the hash so we get // parallelism even if all writes are to a single collection. @@ -620,7 +620,7 @@ void fillWriterVectors(OperationContext* txn, // Applies a batch of oplog entries, by using a set of threads to apply the operations and then // writes the oplog entries to the local oplog. -OpTime SyncTail::multiApply(OperationContext* txn, MultiApplier::Operations ops) { +OpTime SyncTail::multiApply(OperationContext* opCtx, MultiApplier::Operations ops) { auto applyOperation = [this](MultiApplier::OperationPtrs* ops) -> Status { _applyFunc(ops, this); // This function is used by 3.2 initial sync and steady state data replication. @@ -628,11 +628,11 @@ OpTime SyncTail::multiApply(OperationContext* txn, MultiApplier::Operations ops) return Status::OK(); }; return fassertStatusOK( - 34437, repl::multiApply(txn, _writerPool.get(), std::move(ops), applyOperation)); + 34437, repl::multiApply(opCtx, _writerPool.get(), std::move(ops), applyOperation)); } namespace { -void tryToGoLiveAsASecondary(OperationContext* txn, ReplicationCoordinator* replCoord) { +void tryToGoLiveAsASecondary(OperationContext* opCtx, ReplicationCoordinator* replCoord) { if (replCoord->isInPrimaryOrSecondaryState()) { return; } @@ -640,8 +640,8 @@ void tryToGoLiveAsASecondary(OperationContext* txn, ReplicationCoordinator* repl // This needs to happen after the attempt so readers can be sure we've already tried. ON_BLOCK_EXIT([] { attemptsToBecomeSecondary.increment(); }); - ScopedTransaction transaction(txn, MODE_S); - Lock::GlobalRead readLock(txn->lockState()); + ScopedTransaction transaction(opCtx, MODE_S); + Lock::GlobalRead readLock(opCtx->lockState()); if (replCoord->getMaintenanceMode()) { LOG(1) << "Can't go live (tryToGoLiveAsASecondary) as maintenance mode is active."; @@ -657,7 +657,7 @@ void tryToGoLiveAsASecondary(OperationContext* txn, ReplicationCoordinator* repl } // We can't go to SECONDARY until we reach minvalid. - if (replCoord->getMyLastAppliedOpTime() < StorageInterface::get(txn)->getMinValid(txn)) { + if (replCoord->getMyLastAppliedOpTime() < StorageInterface::get(opCtx)->getMinValid(opCtx)) { return; } @@ -697,13 +697,13 @@ public: private: void run() { Client::initThread("ReplBatcher"); - const ServiceContext::UniqueOperationContext txnPtr = cc().makeOperationContext(); - OperationContext& txn = *txnPtr; - const auto replCoord = ReplicationCoordinator::get(&txn); - const auto fastClockSource = txn.getServiceContext()->getFastClockSource(); + const ServiceContext::UniqueOperationContext opCtxPtr = cc().makeOperationContext(); + OperationContext& opCtx = *opCtxPtr; + const auto replCoord = ReplicationCoordinator::get(&opCtx); + const auto fastClockSource = opCtx.getServiceContext()->getFastClockSource(); const auto oplogMaxSize = fassertStatusOK( 40301, - StorageInterface::get(&txn)->getOplogMaxSize(&txn, NamespaceString(rsOplogName))); + StorageInterface::get(&opCtx)->getOplogMaxSize(&opCtx, NamespaceString(rsOplogName))); // Batches are limited to 10% of the oplog. BatchLimits batchLimits; @@ -720,7 +720,7 @@ private: OpQueue ops; // tryPopAndWaitForMore adds to ops and returns true when we need to end a batch early. - while (!_syncTail->tryPopAndWaitForMore(&txn, &ops, batchLimits)) { + while (!_syncTail->tryPopAndWaitForMore(&opCtx, &ops, batchLimits)) { } if (ops.empty() && !ops.mustShutdown()) { @@ -755,8 +755,8 @@ private: void SyncTail::oplogApplication(ReplicationCoordinator* replCoord) { OpQueueBatcher batcher(this); - const ServiceContext::UniqueOperationContext txnPtr = cc().makeOperationContext(); - OperationContext& txn = *txnPtr; + const ServiceContext::UniqueOperationContext opCtxPtr = cc().makeOperationContext(); + OperationContext& opCtx = *opCtxPtr; std::unique_ptr<ApplyBatchFinalizer> finalizer{ getGlobalServiceContext()->getGlobalStorageEngine()->isDurable() ? new ApplyBatchFinalizerForJournal(replCoord) @@ -774,7 +774,7 @@ void SyncTail::oplogApplication(ReplicationCoordinator* replCoord) { sleepmillis(10); } - tryToGoLiveAsASecondary(&txn, replCoord); + tryToGoLiveAsASecondary(&opCtx, replCoord); long long termWhenBufferIsEmpty = replCoord->getTerm(); // Blocks up to a second waiting for a batch to be ready to apply. If one doesn't become @@ -788,7 +788,7 @@ void SyncTail::oplogApplication(ReplicationCoordinator* replCoord) { continue; } // Signal drain complete if we're in Draining state and the buffer is empty. - replCoord->signalDrainComplete(&txn, termWhenBufferIsEmpty); + replCoord->signalDrainComplete(&opCtx, termWhenBufferIsEmpty); continue; // Try again. } @@ -813,13 +813,13 @@ void SyncTail::oplogApplication(ReplicationCoordinator* replCoord) { stdx::lock_guard<SimpleMutex> fsynclk(filesLockedFsync); // Do the work. - multiApply(&txn, ops.releaseBatch()); + multiApply(&opCtx, ops.releaseBatch()); // Update various things that care about our last applied optime. Tests rely on 2 happening // before 3 even though it isn't strictly necessary. The order of 1 doesn't matter. - setNewTimestamp(txn.getServiceContext(), lastOpTimeInBatch.getTimestamp()); // 1 - StorageInterface::get(&txn)->setAppliedThrough(&txn, lastOpTimeInBatch); // 2 - finalizer->record(lastOpTimeInBatch); // 3 + setNewTimestamp(opCtx.getServiceContext(), lastOpTimeInBatch.getTimestamp()); // 1 + StorageInterface::get(&opCtx)->setAppliedThrough(&opCtx, lastOpTimeInBatch); // 2 + finalizer->record(lastOpTimeInBatch); // 3 } } @@ -830,13 +830,13 @@ void SyncTail::oplogApplication(ReplicationCoordinator* replCoord) { // This function also blocks 1 second waiting for new ops to appear in the bgsync // queue. We don't block forever so that we can periodically check for things like shutdown or // reconfigs. -bool SyncTail::tryPopAndWaitForMore(OperationContext* txn, +bool SyncTail::tryPopAndWaitForMore(OperationContext* opCtx, SyncTail::OpQueue* ops, const BatchLimits& limits) { { BSONObj op; // Check to see if there are ops waiting in the bgsync queue - bool peek_success = peek(txn, &op); + bool peek_success = peek(opCtx, &op); if (!peek_success) { // If we don't have anything in the queue, wait a bit for something to appear. if (ops->empty()) { @@ -908,7 +908,7 @@ bool SyncTail::tryPopAndWaitForMore(OperationContext* txn, (!entry.ns.empty() && nsToCollectionSubstring(entry.ns) == "system.indexes")) { if (ops->getCount() == 1) { // apply commands one-at-a-time - _networkQueue->consume(txn); + _networkQueue->consume(opCtx); } else { // This op must be processed alone, but we already had ops in the queue so we can't // include it in this batch. Since we didn't call consume(), we'll see this again next @@ -921,7 +921,7 @@ bool SyncTail::tryPopAndWaitForMore(OperationContext* txn, } // We are going to apply this Op. - _networkQueue->consume(txn); + _networkQueue->consume(opCtx); // Go back for more ops, unless we've hit the limit. return ops->getCount() >= limits.ops; @@ -935,7 +935,7 @@ OldThreadPool* SyncTail::getWriterPool() { return _writerPool.get(); } -BSONObj SyncTail::getMissingDoc(OperationContext* txn, Database* db, const BSONObj& o) { +BSONObj SyncTail::getMissingDoc(OperationContext* opCtx, Database* db, const BSONObj& o) { OplogReader missingObjReader; // why are we using OplogReader to run a non-oplog query? const char* ns = o.getStringField("ns"); @@ -1004,18 +1004,18 @@ BSONObj SyncTail::getMissingDoc(OperationContext* txn, Database* db, const BSONO str::stream() << "Can no longer connect to initial sync source: " << _hostname); } -bool SyncTail::shouldRetry(OperationContext* txn, const BSONObj& o) { +bool SyncTail::shouldRetry(OperationContext* opCtx, const BSONObj& o) { const NamespaceString nss(o.getStringField("ns")); MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { // Take an X lock on the database in order to preclude other modifications. // Also, the database might not exist yet, so create it. - AutoGetOrCreateDb autoDb(txn, nss.db(), MODE_X); + AutoGetOrCreateDb autoDb(opCtx, nss.db(), MODE_X); Database* const db = autoDb.getDb(); // we don't have the object yet, which is possible on initial sync. get it. log() << "adding missing object" << endl; // rare enough we can log - BSONObj missingObj = getMissingDoc(txn, db, o); + BSONObj missingObj = getMissingDoc(opCtx, db, o); if (missingObj.isEmpty()) { log() << "missing object not found on source." @@ -1025,13 +1025,13 @@ bool SyncTail::shouldRetry(OperationContext* txn, const BSONObj& o) { return false; } else { - WriteUnitOfWork wunit(txn); + WriteUnitOfWork wunit(opCtx); - Collection* const coll = db->getOrCreateCollection(txn, nss.toString()); + Collection* const coll = db->getOrCreateCollection(opCtx, nss.toString()); invariant(coll); OpDebug* const nullOpDebug = nullptr; - Status status = coll->insertDocument(txn, missingObj, nullOpDebug, true); + Status status = coll->insertDocument(opCtx, missingObj, nullOpDebug, true); uassert(15917, str::stream() << "failed to insert missing doc: " << status.toString(), status.isOK()); @@ -1042,7 +1042,7 @@ bool SyncTail::shouldRetry(OperationContext* txn, const BSONObj& o) { return true; } } - MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "InsertRetry", nss.ns()); + MONGO_WRITE_CONFLICT_RETRY_LOOP_END(opCtx, "InsertRetry", nss.ns()); // fixes compile errors on GCC - see SERVER-18219 for details MONGO_UNREACHABLE; @@ -1051,22 +1051,22 @@ bool SyncTail::shouldRetry(OperationContext* txn, const BSONObj& o) { // This free function is used by the writer threads to apply each op void multiSyncApply(MultiApplier::OperationPtrs* ops, SyncTail*) { initializeWriterThread(); - auto txn = cc().makeOperationContext(); - auto syncApply = [](OperationContext* txn, const BSONObj& op, bool inSteadyStateReplication) { - return SyncTail::syncApply(txn, op, inSteadyStateReplication); + auto opCtx = cc().makeOperationContext(); + auto syncApply = [](OperationContext* opCtx, const BSONObj& op, bool inSteadyStateReplication) { + return SyncTail::syncApply(opCtx, op, inSteadyStateReplication); }; - fassertNoTrace(16359, multiSyncApply_noAbort(txn.get(), ops, syncApply)); + fassertNoTrace(16359, multiSyncApply_noAbort(opCtx.get(), ops, syncApply)); } -Status multiSyncApply_noAbort(OperationContext* txn, +Status multiSyncApply_noAbort(OperationContext* opCtx, MultiApplier::OperationPtrs* oplogEntryPointers, SyncApplyFn syncApply) { - txn->setReplicatedWrites(false); - DisableDocumentValidation validationDisabler(txn); + opCtx->setReplicatedWrites(false); + DisableDocumentValidation validationDisabler(opCtx); // allow us to get through the magic barrier - txn->lockState()->setShouldConflictWithSecondaryBatchApplication(false); + opCtx->lockState()->setShouldConflictWithSecondaryBatchApplication(false); if (oplogEntryPointers->size() > 1) { std::stable_sort(oplogEntryPointers->begin(), @@ -1125,7 +1125,7 @@ Status multiSyncApply_noAbort(OperationContext* txn, try { // Apply the group of inserts. uassertStatusOK( - syncApply(txn, groupedInsertBuilder.done(), inSteadyStateReplication)); + syncApply(opCtx, groupedInsertBuilder.done(), inSteadyStateReplication)); // It succeeded, advance the oplogEntriesIterator to the end of the // group of inserts. oplogEntriesIterator = endOfGroupableOpsIterator - 1; @@ -1145,7 +1145,7 @@ Status multiSyncApply_noAbort(OperationContext* txn, try { // Apply an individual (non-grouped) op. - const Status status = syncApply(txn, entry->raw, inSteadyStateReplication); + const Status status = syncApply(opCtx, entry->raw, inSteadyStateReplication); if (!status.isOK()) { severe() << "Error applying operation (" << redact(entry->raw) @@ -1165,28 +1165,28 @@ Status multiSyncApply_noAbort(OperationContext* txn, // This free function is used by the initial sync writer threads to apply each op void multiInitialSyncApply_abortOnFailure(MultiApplier::OperationPtrs* ops, SyncTail* st) { initializeWriterThread(); - auto txn = cc().makeOperationContext(); + auto opCtx = cc().makeOperationContext(); AtomicUInt32 fetchCount(0); - fassertNoTrace(15915, multiInitialSyncApply_noAbort(txn.get(), ops, st, &fetchCount)); + fassertNoTrace(15915, multiInitialSyncApply_noAbort(opCtx.get(), ops, st, &fetchCount)); } Status multiInitialSyncApply(MultiApplier::OperationPtrs* ops, SyncTail* st, AtomicUInt32* fetchCount) { initializeWriterThread(); - auto txn = cc().makeOperationContext(); - return multiInitialSyncApply_noAbort(txn.get(), ops, st, fetchCount); + auto opCtx = cc().makeOperationContext(); + return multiInitialSyncApply_noAbort(opCtx.get(), ops, st, fetchCount); } -Status multiInitialSyncApply_noAbort(OperationContext* txn, +Status multiInitialSyncApply_noAbort(OperationContext* opCtx, MultiApplier::OperationPtrs* ops, SyncTail* st, AtomicUInt32* fetchCount) { - txn->setReplicatedWrites(false); - DisableDocumentValidation validationDisabler(txn); + opCtx->setReplicatedWrites(false); + DisableDocumentValidation validationDisabler(opCtx); // allow us to get through the magic barrier - txn->lockState()->setShouldConflictWithSecondaryBatchApplication(false); + opCtx->lockState()->setShouldConflictWithSecondaryBatchApplication(false); // This function is only called in initial sync, as its name suggests. const bool inSteadyStateReplication = false; @@ -1194,7 +1194,7 @@ Status multiInitialSyncApply_noAbort(OperationContext* txn, for (auto it = ops->begin(); it != ops->end(); ++it) { auto& entry = **it; try { - const Status s = SyncTail::syncApply(txn, entry.raw, inSteadyStateReplication); + const Status s = SyncTail::syncApply(opCtx, entry.raw, inSteadyStateReplication); if (!s.isOK()) { // Don't retry on commands. if (entry.isCommand()) { @@ -1205,8 +1205,9 @@ Status multiInitialSyncApply_noAbort(OperationContext* txn, // We might need to fetch the missing docs from the sync source. fetchCount->fetchAndAdd(1); - if (st->shouldRetry(txn, entry.raw)) { - const Status s2 = SyncTail::syncApply(txn, entry.raw, inSteadyStateReplication); + if (st->shouldRetry(opCtx, entry.raw)) { + const Status s2 = + SyncTail::syncApply(opCtx, entry.raw, inSteadyStateReplication); if (!s2.isOK()) { severe() << "Error applying operation (" << redact(entry.raw) << "): " << redact(s2); @@ -1234,11 +1235,11 @@ Status multiInitialSyncApply_noAbort(OperationContext* txn, return Status::OK(); } -StatusWith<OpTime> multiApply(OperationContext* txn, +StatusWith<OpTime> multiApply(OperationContext* opCtx, OldThreadPool* workerPool, MultiApplier::Operations ops, MultiApplier::ApplyOperationFn applyOperation) { - if (!txn) { + if (!opCtx) { return {ErrorCodes::BadValue, "invalid operation context"}; } @@ -1259,14 +1260,14 @@ StatusWith<OpTime> multiApply(OperationContext* txn, prefetchOps(ops, workerPool); } - auto storage = StorageInterface::get(txn); + auto storage = StorageInterface::get(opCtx); LOG(2) << "replication batch size is " << ops.size(); // Stop all readers until we're done. This also prevents doc-locking engines from deleting old // entries from the oplog until we finish writing. - Lock::ParallelBatchWriterMode pbwm(txn->lockState()); + Lock::ParallelBatchWriterMode pbwm(opCtx->lockState()); - auto replCoord = ReplicationCoordinator::get(txn); + auto replCoord = ReplicationCoordinator::get(opCtx); if (replCoord->getApplierState() == ReplicationCoordinator::ApplierState::Stopped) { severe() << "attempting to replicate ops while primary"; return {ErrorCodes::CannotApplyOplogWhilePrimary, @@ -1280,14 +1281,14 @@ StatusWith<OpTime> multiApply(OperationContext* txn, std::vector<MultiApplier::OperationPtrs> writerVectors(workerPool->getNumThreads()); ON_BLOCK_EXIT([&] { workerPool->join(); }); - storage->setOplogDeleteFromPoint(txn, ops.front().ts.timestamp()); - scheduleWritesToOplog(txn, workerPool, ops); - fillWriterVectors(txn, &ops, &writerVectors); + storage->setOplogDeleteFromPoint(opCtx, ops.front().ts.timestamp()); + scheduleWritesToOplog(opCtx, workerPool, ops); + fillWriterVectors(opCtx, &ops, &writerVectors); workerPool->join(); - storage->setOplogDeleteFromPoint(txn, Timestamp()); - storage->setMinValidToAtLeast(txn, ops.back().getOpTime()); + storage->setOplogDeleteFromPoint(opCtx, Timestamp()); + storage->setMinValidToAtLeast(opCtx, ops.back().getOpTime()); applyOps(writerVectors, workerPool, applyOperation, &statusVector); } |