diff options
Diffstat (limited to 'src/mongo/db/repl/sync_tail.cpp')
-rw-r--r-- | src/mongo/db/repl/sync_tail.cpp | 252 |
1 files changed, 1 insertions, 251 deletions
diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp index 18b3d7e0495..a5402fadfa7 100644 --- a/src/mongo/db/repl/sync_tail.cpp +++ b/src/mongo/db/repl/sync_tail.cpp @@ -85,23 +85,12 @@ namespace mongo { namespace repl { namespace { -MONGO_FAIL_POINT_DEFINE(pauseBatchApplicationBeforeCompletion); -MONGO_FAIL_POINT_DEFINE(pauseBatchApplicationAfterWritingOplogEntries); MONGO_FAIL_POINT_DEFINE(hangAfterRecordingOpApplicationStartTime); // The oplog entries applied Counter64 opsAppliedStats; ServerStatusMetricField<Counter64> displayOpsApplied("repl.apply.ops", &opsAppliedStats); -// Tracks the oplog application batch size. -Counter64 oplogApplicationBatchSize; -ServerStatusMetricField<Counter64> displayOplogApplicationBatchSize("repl.apply.batchSize", - &oplogApplicationBatchSize); - -// Number and time of each ApplyOps worker pool round -TimerStats applyBatchStats; -ServerStatusMetricField<TimerStats> displayOpBatchesApplied("repl.apply.batches", &applyBatchStats); - NamespaceString parseUUIDOrNs(OperationContext* opCtx, const OplogEntry& oplogEntry) { auto optionalUuid = oplogEntry.getUuid(); if (!optionalUuid) { @@ -165,17 +154,9 @@ LockMode fixLockModeForSystemDotViewsChanges(const NamespaceString& nss, LockMod } // namespace SyncTail::SyncTail(OplogApplier::Observer* observer, - ReplicationConsistencyMarkers* consistencyMarkers, StorageInterface* storageInterface, - MultiSyncApplyFunc func, - ThreadPool* writerPool, const OplogApplier::Options& options) - : _observer(observer), - _consistencyMarkers(consistencyMarkers), - _storageInterface(storageInterface), - _applyFunc(func), - _writerPool(writerPool), - _options(options) {} + : _observer(observer), _storageInterface(storageInterface), _options(options) {} SyncTail::~SyncTail() {} @@ -185,74 +166,6 @@ const OplogApplier::Options& SyncTail::getOptions() const { namespace { -// Schedules the writes to the oplog for 'ops' into threadPool. The caller must guarantee that 'ops' -// stays valid until all scheduled work in the thread pool completes. -void scheduleWritesToOplog(OperationContext* opCtx, - StorageInterface* storageInterface, - ThreadPool* threadPool, - const MultiApplier::Operations& ops) { - - auto makeOplogWriterForRange = [storageInterface, &ops](size_t begin, size_t end) { - // The returned function will be run in a separate thread after this returns. Therefore all - // captures other than 'ops' must be by value since they will not be available. The caller - // guarantees that 'ops' will stay in scope until the spawned threads complete. - return [storageInterface, &ops, begin, end](auto status) { - invariant(status); - - auto opCtx = cc().makeOperationContext(); - - // This code path is only executed on secondaries and initial syncing nodes, so it is - // safe to exclude any writes from Flow Control. - opCtx->setShouldParticipateInFlowControl(false); - - UnreplicatedWritesBlock uwb(opCtx.get()); - ShouldNotConflictWithSecondaryBatchApplicationBlock shouldNotConflictBlock( - opCtx->lockState()); - - std::vector<InsertStatement> docs; - docs.reserve(end - begin); - for (size_t i = begin; i < end; i++) { - // Add as unowned BSON to avoid unnecessary ref-count bumps. - // 'ops' will outlive 'docs' so the BSON lifetime will be guaranteed. - docs.emplace_back(InsertStatement{ops[i].getRaw(), - ops[i].getOpTime().getTimestamp(), - ops[i].getOpTime().getTerm()}); - } - - fassert(40141, - storageInterface->insertDocuments( - opCtx.get(), NamespaceString::kRsOplogNamespace, docs)); - }; - }; - - // We want to be able to take advantage of bulk inserts so we don't use multiple threads if it - // would result too little work per thread. This also ensures that we can amortize the - // setup/teardown overhead across many writes. - const size_t kMinOplogEntriesPerThread = 16; - const bool enoughToMultiThread = - ops.size() >= kMinOplogEntriesPerThread * threadPool->getStats().numThreads; - - // Only doc-locking engines support parallel writes to the oplog because they are required to - // ensure that oplog entries are ordered correctly, even if inserted out-of-order. Additionally, - // there would be no way to take advantage of multiple threads if a storage engine doesn't - // support document locking. - if (!enoughToMultiThread || - !opCtx->getServiceContext()->getStorageEngine()->supportsDocLocking()) { - - threadPool->schedule(makeOplogWriterForRange(0, ops.size())); - return; - } - - - const size_t numOplogThreads = threadPool->getStats().numThreads; - const size_t numOpsPerThread = ops.size() / numOplogThreads; - for (size_t thread = 0; thread < numOplogThreads; thread++) { - size_t begin = thread * numOpsPerThread; - size_t end = (thread == numOplogThreads - 1) ? ops.size() : begin + numOpsPerThread; - threadPool->schedule(makeOplogWriterForRange(begin, end)); - } -} - /** * Caches per-collection properties which are relevant for oplog application, so that they don't * have to be retrieved repeatedly for each op. @@ -366,16 +279,6 @@ void addDerivedOps(OperationContext* opCtx, } // namespace -void SyncTail::shutdown() { - stdx::lock_guard<Latch> lock(_mutex); - _inShutdown = true; -} - -bool SyncTail::inShutdown() const { - stdx::lock_guard<Latch> lock(_mutex); - return _inShutdown; -} - Status syncApply(OperationContext* opCtx, const OplogEntryBatch& batch, OplogApplication::Mode oplogApplicationMode) { @@ -691,158 +594,5 @@ void SyncTail::fillWriterVectors(OperationContext* opCtx, } } -StatusWith<OpTime> SyncTail::multiApply(OperationContext* opCtx, MultiApplier::Operations ops) { - invariant(!ops.empty()); - - LOG(2) << "replication batch size is " << ops.size(); - - // Stop all readers until we're done. This also prevents doc-locking engines from deleting old - // entries from the oplog until we finish writing. - Lock::ParallelBatchWriterMode pbwm(opCtx->lockState()); - - auto replCoord = ReplicationCoordinator::get(opCtx); - if (replCoord->getApplierState() == ReplicationCoordinator::ApplierState::Stopped) { - severe() << "attempting to replicate ops while primary"; - return {ErrorCodes::CannotApplyOplogWhilePrimary, - "attempting to replicate ops while primary"}; - } - - // Increment the batch size stat. - oplogApplicationBatchSize.increment(ops.size()); - - std::vector<WorkerMultikeyPathInfo> multikeyVector(_writerPool->getStats().numThreads); - { - // Each node records cumulative batch application stats for itself using this timer. - TimerHolder timer(&applyBatchStats); - - // We must wait for the all work we've dispatched to complete before leaving this block - // because the spawned threads refer to objects on the stack - ON_BLOCK_EXIT([&] { _writerPool->waitForIdle(); }); - - // Write batch of ops into oplog. - if (!_options.skipWritesToOplog) { - _consistencyMarkers->setOplogTruncateAfterPoint(opCtx, ops.front().getTimestamp()); - scheduleWritesToOplog(opCtx, _storageInterface, _writerPool, ops); - } - - // Holds 'pseudo operations' generated by secondaries to aid in replication. - // Keep in scope until all operations in 'ops' and 'derivedOps' have been applied. - // Pseudo operations include: - // - applyOps operations expanded to individual ops. - // - ops to update config.transactions. Normal writes to config.transactions in the - // primary don't create an oplog entry, so extract info from writes with transactions - // and create a pseudo oplog. - std::vector<MultiApplier::Operations> derivedOps; - - std::vector<MultiApplier::OperationPtrs> writerVectors(_writerPool->getStats().numThreads); - fillWriterVectors(opCtx, &ops, &writerVectors, &derivedOps); - - // Wait for writes to finish before applying ops. - _writerPool->waitForIdle(); - - // Use this fail point to hold the PBWM lock after we have written the oplog entries but - // before we have applied them. - if (MONGO_unlikely(pauseBatchApplicationAfterWritingOplogEntries.shouldFail())) { - log() << "pauseBatchApplicationAfterWritingOplogEntries fail point enabled. Blocking " - "until fail point is disabled."; - pauseBatchApplicationAfterWritingOplogEntries.pauseWhileSet(opCtx); - } - - // Reset consistency markers in case the node fails while applying ops. - if (!_options.skipWritesToOplog) { - _consistencyMarkers->setOplogTruncateAfterPoint(opCtx, Timestamp()); - _consistencyMarkers->setMinValidToAtLeast(opCtx, ops.back().getOpTime()); - } - - { - std::vector<Status> statusVector(_writerPool->getStats().numThreads, Status::OK()); - - // Doles out all the work to the writer pool threads. writerVectors is not modified, - // but multiSyncApply will modify the vectors that it contains. - invariant(writerVectors.size() == statusVector.size()); - for (size_t i = 0; i < writerVectors.size(); i++) { - if (writerVectors[i].empty()) - continue; - - _writerPool->schedule( - [this, - &writer = writerVectors.at(i), - &status = statusVector.at(i), - &multikeyVector = multikeyVector.at(i)](auto scheduleStatus) { - invariant(scheduleStatus); - - auto opCtx = cc().makeOperationContext(); - - // This code path is only executed on secondaries and initial syncing nodes, - // so it is safe to exclude any writes from Flow Control. - opCtx->setShouldParticipateInFlowControl(false); - - status = opCtx->runWithoutInterruptionExceptAtGlobalShutdown([&] { - return _applyFunc(opCtx.get(), &writer, this, &multikeyVector); - }); - }); - } - - _writerPool->waitForIdle(); - - // If any of the statuses is not ok, return error. - for (auto it = statusVector.cbegin(); it != statusVector.cend(); ++it) { - const auto& status = *it; - if (!status.isOK()) { - severe() - << "Failed to apply batch of operations. Number of operations in batch: " - << ops.size() << ". First operation: " << redact(ops.front().toBSON()) - << ". Last operation: " << redact(ops.back().toBSON()) - << ". Oplog application failed in writer thread " - << std::distance(statusVector.cbegin(), it) << ": " << redact(status); - return status; - } - } - } - } - - // Notify the storage engine that a replication batch has completed. This means that all the - // writes associated with the oplog entries in the batch are finished and no new writes with - // timestamps associated with those oplog entries will show up in the future. - const auto storageEngine = opCtx->getServiceContext()->getStorageEngine(); - storageEngine->replicationBatchIsComplete(); - - // Use this fail point to hold the PBWM lock and prevent the batch from completing. - if (MONGO_unlikely(pauseBatchApplicationBeforeCompletion.shouldFail())) { - log() << "pauseBatchApplicationBeforeCompletion fail point enabled. Blocking until fail " - "point is disabled."; - while (MONGO_unlikely(pauseBatchApplicationBeforeCompletion.shouldFail())) { - if (inShutdown()) { - severe() << "Turn off pauseBatchApplicationBeforeCompletion before attempting " - "clean shutdown"; - fassertFailedNoTrace(50798); - } - sleepmillis(100); - } - } - - Timestamp firstTimeInBatch = ops.front().getTimestamp(); - // Set any indexes to multikey that this batch ignored. This must be done while holding the - // parallel batch writer mode lock. - for (WorkerMultikeyPathInfo infoVector : multikeyVector) { - for (MultikeyPathInfo info : infoVector) { - // We timestamp every multikey write with the first timestamp in the batch. It is always - // safe to set an index as multikey too early, just not too late. We conservatively pick - // the first timestamp in the batch since we do not have enough information to find out - // the timestamp of the first write that set the given multikey path. - fassert(50686, - _storageInterface->setIndexIsMultikey( - opCtx, info.nss, info.indexName, info.multikeyPaths, firstTimeInBatch)); - } - } - - // Increment the counter for the number of ops applied during catchup if the node is in catchup - // mode. - replCoord->incrementNumCatchUpOpsIfCatchingUp(ops.size()); - - // We have now written all database writes and updated the oplog to match. - return ops.back().getOpTime(); -} - } // namespace repl } // namespace mongo |