diff options
Diffstat (limited to 'src/mongo/db/repl/sync_tail.cpp')
-rw-r--r-- | src/mongo/db/repl/sync_tail.cpp | 389 |
1 files changed, 0 insertions, 389 deletions
diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp index a8c66775369..18b3d7e0495 100644 --- a/src/mongo/db/repl/sync_tail.cpp +++ b/src/mongo/db/repl/sync_tail.cpp @@ -47,7 +47,6 @@ #include "mongo/db/catalog/document_validation.h" #include "mongo/db/catalog_raii.h" #include "mongo/db/client.h" -#include "mongo/db/commands/fsync.h" #include "mongo/db/commands/server_status_metric.h" #include "mongo/db/commands/txn_cmds_gen.h" #include "mongo/db/concurrency/d_concurrency.h" @@ -103,109 +102,6 @@ ServerStatusMetricField<Counter64> displayOplogApplicationBatchSize("repl.apply. TimerStats applyBatchStats; ServerStatusMetricField<TimerStats> displayOpBatchesApplied("repl.apply.batches", &applyBatchStats); -class ApplyBatchFinalizer { -public: - ApplyBatchFinalizer(ReplicationCoordinator* replCoord) : _replCoord(replCoord) {} - virtual ~ApplyBatchFinalizer(){}; - - virtual void record(const OpTimeAndWallTime& newOpTimeAndWallTime, - ReplicationCoordinator::DataConsistency consistency) { - _recordApplied(newOpTimeAndWallTime, consistency); - }; - -protected: - void _recordApplied(const OpTimeAndWallTime& newOpTimeAndWallTime, - ReplicationCoordinator::DataConsistency consistency) { - // We have to use setMyLastAppliedOpTimeAndWallTimeForward since this thread races with - // ReplicationExternalStateImpl::onTransitionToPrimary. - _replCoord->setMyLastAppliedOpTimeAndWallTimeForward(newOpTimeAndWallTime, consistency); - } - - void _recordDurable(const OpTimeAndWallTime& newOpTimeAndWallTime) { - // We have to use setMyLastDurableOpTimeForward since this thread races with - // ReplicationExternalStateImpl::onTransitionToPrimary. - _replCoord->setMyLastDurableOpTimeAndWallTimeForward(newOpTimeAndWallTime); - } - -private: - // Used to update the replication system's progress. - ReplicationCoordinator* _replCoord; -}; - -class ApplyBatchFinalizerForJournal : public ApplyBatchFinalizer { -public: - ApplyBatchFinalizerForJournal(ReplicationCoordinator* replCoord) - : ApplyBatchFinalizer(replCoord), - _waiterThread{&ApplyBatchFinalizerForJournal::_run, this} {}; - ~ApplyBatchFinalizerForJournal(); - - void record(const OpTimeAndWallTime& newOpTimeAndWallTime, - ReplicationCoordinator::DataConsistency consistency) override; - -private: - /** - * Loops continuously, waiting for writes to be flushed to disk and then calls - * ReplicationCoordinator::setMyLastOptime with _latestOpTime. - * Terminates once _shutdownSignaled is set true. - */ - void _run(); - - // Protects _cond, _shutdownSignaled, and _latestOpTime. - Mutex _mutex = MONGO_MAKE_LATCH("ApplyBatchFinalizerForJournal::_mutex"); - // Used to alert our thread of a new OpTime. - stdx::condition_variable _cond; - // The next OpTime to set as the ReplicationCoordinator's lastOpTime after flushing. - OpTimeAndWallTime _latestOpTimeAndWallTime; - // Once this is set to true the _run method will terminate. - bool _shutdownSignaled = false; - // Thread that will _run(). Must be initialized last as it depends on the other variables. - stdx::thread _waiterThread; -}; - -ApplyBatchFinalizerForJournal::~ApplyBatchFinalizerForJournal() { - stdx::unique_lock<Latch> lock(_mutex); - _shutdownSignaled = true; - _cond.notify_all(); - lock.unlock(); - - _waiterThread.join(); -} - -void ApplyBatchFinalizerForJournal::record(const OpTimeAndWallTime& newOpTimeAndWallTime, - ReplicationCoordinator::DataConsistency consistency) { - _recordApplied(newOpTimeAndWallTime, consistency); - - stdx::unique_lock<Latch> lock(_mutex); - _latestOpTimeAndWallTime = newOpTimeAndWallTime; - _cond.notify_all(); -} - -void ApplyBatchFinalizerForJournal::_run() { - Client::initThread("ApplyBatchFinalizerForJournal"); - - while (true) { - OpTimeAndWallTime latestOpTimeAndWallTime = {OpTime(), Date_t()}; - - { - stdx::unique_lock<Latch> lock(_mutex); - while (_latestOpTimeAndWallTime.opTime.isNull() && !_shutdownSignaled) { - _cond.wait(lock); - } - - if (_shutdownSignaled) { - return; - } - - latestOpTimeAndWallTime = _latestOpTimeAndWallTime; - _latestOpTimeAndWallTime = {OpTime(), Date_t()}; - } - - auto opCtx = cc().makeOperationContext(); - opCtx->recoveryUnit()->waitUntilDurable(opCtx.get()); - _recordDurable(latestOpTimeAndWallTime); - } -} - NamespaceString parseUUIDOrNs(OperationContext* opCtx, const OplogEntry& oplogEntry) { auto optionalUuid = oplogEntry.getUuid(); if (!optionalUuid) { @@ -470,291 +366,6 @@ void addDerivedOps(OperationContext* opCtx, } // namespace -class SyncTail::OpQueueBatcher { - OpQueueBatcher(const OpQueueBatcher&) = delete; - OpQueueBatcher& operator=(const OpQueueBatcher&) = delete; - -public: - OpQueueBatcher(SyncTail* syncTail, - StorageInterface* storageInterface, - OplogBuffer* oplogBuffer, - OplogApplier::GetNextApplierBatchFn getNextApplierBatchFn) - : _syncTail(syncTail), - _storageInterface(storageInterface), - _oplogBuffer(oplogBuffer), - _getNextApplierBatchFn(getNextApplierBatchFn), - _ops(0), - _thread([this] { run(); }) {} - ~OpQueueBatcher() { - invariant(_isDead); - _thread.join(); - } - - OpQueue getNextBatch(Seconds maxWaitTime) { - stdx::unique_lock<Latch> lk(_mutex); - // _ops can indicate the following cases: - // 1. A new batch is ready to consume. - // 2. Shutdown. - // 3. The batch has (or had) exhausted the buffer in draining mode. - // 4. Empty batch since the batch has/had exhausted the buffer but not in draining mode, - // so there could be new oplog entries coming. - // 5. Empty batch since the batcher is still running. - // - // In case (4) and (5), we wait for up to "maxWaitTime". - if (_ops.empty() && !_ops.mustShutdown() && !_ops.termWhenExhausted()) { - // We intentionally don't care about whether this returns due to signaling or timeout - // since we do the same thing either way: return whatever is in _ops. - (void)_cv.wait_for(lk, maxWaitTime.toSystemDuration()); - } - - OpQueue ops = std::move(_ops); - _ops = OpQueue(0); - _cv.notify_all(); - return ops; - } - -private: - /** - * If slaveDelay is enabled, this function calculates the most recent timestamp of any oplog - * entries that can be be returned in a batch. - */ - boost::optional<Date_t> _calculateSlaveDelayLatestTimestamp() { - auto service = cc().getServiceContext(); - auto replCoord = ReplicationCoordinator::get(service); - auto slaveDelay = replCoord->getSlaveDelaySecs(); - if (slaveDelay <= Seconds(0)) { - return {}; - } - auto fastClockSource = service->getFastClockSource(); - return fastClockSource->now() - slaveDelay; - } - - void run() { - Client::initThread("ReplBatcher"); - - BatchLimits batchLimits; - - while (true) { - rsSyncApplyStop.pauseWhileSet(); - - batchLimits.slaveDelayLatestTimestamp = _calculateSlaveDelayLatestTimestamp(); - - // Check the limits once per batch since users can change them at runtime. - batchLimits.ops = getBatchLimitOplogEntries(); - - OpQueue ops(batchLimits.ops); - { - auto opCtx = cc().makeOperationContext(); - - // This use of UninterruptibleLockGuard is intentional. It is undesirable to use an - // UninterruptibleLockGuard in client operations because stepdown requires the - // ability to interrupt client operations. However, it is acceptable to use an - // UninterruptibleLockGuard in batch application because the only cause of - // interruption would be shutdown, and the ReplBatcher thread has its own shutdown - // handling. - UninterruptibleLockGuard noInterrupt(opCtx->lockState()); - - // Locks the oplog to check its max size, do this in the UninterruptibleLockGuard. - batchLimits.bytes = getBatchLimitOplogBytes(opCtx.get(), _storageInterface); - - auto oplogEntries = - fassertNoTrace(31004, _getNextApplierBatchFn(opCtx.get(), batchLimits)); - for (const auto& oplogEntry : oplogEntries) { - ops.emplace_back(oplogEntry); - } - - // If we don't have anything in the queue, wait a bit for something to appear. - if (oplogEntries.empty()) { - if (_syncTail->inShutdown()) { - ops.setMustShutdownFlag(); - } else { - // Block up to 1 second. - _oplogBuffer->waitForData(Seconds(1)); - } - } - } - - if (ops.empty() && !ops.mustShutdown()) { - // Check whether we have drained the oplog buffer. The states checked here can be - // stale when it's used by the applier. signalDrainComplete() needs to check the - // applier is still draining in the same term to make sure these states have not - // changed. - auto replCoord = ReplicationCoordinator::get(cc().getServiceContext()); - // Check the term first to detect DRAINING -> RUNNING -> DRAINING when signaling - // drain complete. - // - // Batcher can delay arbitrarily. After stepup, if the batcher drained the buffer - // and blocks when it's about to notify the applier to signal drain complete, the - // node may step down and fetch new data into the buffer and then step up again. - // Now the batcher will resume and let the applier signal drain complete even if - // the buffer has new data. Checking the term before and after ensures nothing - // changed in between. - auto termWhenBufferIsEmpty = replCoord->getTerm(); - // Draining state guarantees the producer has already been fully stopped and no more - // operations will be pushed in to the oplog buffer until the applier state changes. - auto isDraining = - replCoord->getApplierState() == ReplicationCoordinator::ApplierState::Draining; - // Check the oplog buffer after the applier state to ensure the producer is stopped. - if (isDraining && _oplogBuffer->isEmpty()) { - ops.setTermWhenExhausted(termWhenBufferIsEmpty); - log() << "Oplog buffer has been drained in term " << termWhenBufferIsEmpty; - } else { - // Don't emit empty batches. - continue; - } - } - - stdx::unique_lock<Latch> lk(_mutex); - // Block until the previous batch has been taken. - _cv.wait(lk, [&] { return _ops.empty() && !_ops.termWhenExhausted(); }); - _ops = std::move(ops); - _cv.notify_all(); - if (_ops.mustShutdown()) { - _isDead = true; - return; - } - } - } - - SyncTail* const _syncTail; - StorageInterface* const _storageInterface; - OplogBuffer* const _oplogBuffer; - OplogApplier::GetNextApplierBatchFn const _getNextApplierBatchFn; - - Mutex _mutex = MONGO_MAKE_LATCH("OpQueueBatcher::_mutex"); // Guards _ops. - stdx::condition_variable _cv; - OpQueue _ops; - - // This only exists so the destructor invariants rather than deadlocking. - // TODO remove once we trust noexcept enough to mark oplogApplication() as noexcept. - bool _isDead = false; - - stdx::thread _thread; // Must be last so all other members are initialized before starting. -}; - -void SyncTail::runLoop(OplogBuffer* oplogBuffer, - OplogApplier::GetNextApplierBatchFn getNextApplierBatchFn, - ReplicationCoordinator* replCoord) { - // We don't start data replication for arbiters at all and it's not allowed to reconfig - // arbiterOnly field for any member. - invariant(!replCoord->getMemberState().arbiter()); - - OpQueueBatcher batcher(this, _storageInterface, oplogBuffer, getNextApplierBatchFn); - - std::unique_ptr<ApplyBatchFinalizer> finalizer{ - getGlobalServiceContext()->getStorageEngine()->isDurable() - ? new ApplyBatchFinalizerForJournal(replCoord) - : new ApplyBatchFinalizer(replCoord)}; - - while (true) { // Exits on message from OpQueueBatcher. - // Use a new operation context each iteration, as otherwise we may appear to use a single - // collection name to refer to collections with different UUIDs. - const ServiceContext::UniqueOperationContext opCtxPtr = cc().makeOperationContext(); - OperationContext& opCtx = *opCtxPtr; - - // This code path gets used during elections, so it should not be subject to Flow Control. - // It is safe to exclude this operation context from Flow Control here because this code - // path only gets used on secondaries or on a node transitioning to primary. - opCtx.setShouldParticipateInFlowControl(false); - - // For pausing replication in tests. - if (MONGO_unlikely(rsSyncApplyStop.shouldFail())) { - log() << "sync tail - rsSyncApplyStop fail point enabled. Blocking until fail point is " - "disabled."; - while (MONGO_unlikely(rsSyncApplyStop.shouldFail())) { - // Tests should not trigger clean shutdown while that failpoint is active. If we - // think we need this, we need to think hard about what the behavior should be. - if (inShutdown()) { - severe() << "Turn off rsSyncApplyStop before attempting clean shutdown"; - fassertFailedNoTrace(40304); - } - sleepmillis(10); - } - } - - // Transition to SECONDARY state, if possible. - replCoord->finishRecoveryIfEligible(&opCtx); - - // Blocks up to a second waiting for a batch to be ready to apply. If one doesn't become - // ready in time, we'll loop again so we can do the above checks periodically. - OpQueue ops = batcher.getNextBatch(Seconds(1)); - if (ops.empty()) { - if (ops.mustShutdown()) { - // Shut down and exit oplog application loop. - return; - } - if (MONGO_unlikely(rsSyncApplyStop.shouldFail())) { - continue; - } - if (ops.termWhenExhausted()) { - // Signal drain complete if we're in Draining state and the buffer is empty. - // Since we check the states of batcher and oplog buffer without synchronization, - // they can be stale. We make sure the applier is still draining in the given term - // before and after the check, so that if the oplog buffer was exhausted, then - // it still will be. - replCoord->signalDrainComplete(&opCtx, *ops.termWhenExhausted()); - } - continue; // Try again. - } - - // Extract some info from ops that we'll need after releasing the batch below. - const auto firstOpTimeInBatch = ops.front().getOpTime(); - const auto lastOpInBatch = ops.back(); - const auto lastOpTimeInBatch = lastOpInBatch.getOpTime(); - const auto lastWallTimeInBatch = lastOpInBatch.getWallClockTime(); - const auto lastAppliedOpTimeAtStartOfBatch = replCoord->getMyLastAppliedOpTime(); - - // Make sure the oplog doesn't go back in time or repeat an entry. - if (firstOpTimeInBatch <= lastAppliedOpTimeAtStartOfBatch) { - fassert(34361, - Status(ErrorCodes::OplogOutOfOrder, - str::stream() << "Attempted to apply an oplog entry (" - << firstOpTimeInBatch.toString() - << ") which is not greater than our last applied OpTime (" - << lastAppliedOpTimeAtStartOfBatch.toString() << ").")); - } - - // Don't allow the fsync+lock thread to see intermediate states of batch application. - stdx::lock_guard<SimpleMutex> fsynclk(filesLockedFsync); - - // Apply the operations in this batch. 'multiApply' returns the optime of the last op that - // was applied, which should be the last optime in the batch. - auto lastOpTimeAppliedInBatch = - fassertNoTrace(34437, multiApply(&opCtx, ops.releaseBatch())); - invariant(lastOpTimeAppliedInBatch == lastOpTimeInBatch); - - // Update various things that care about our last applied optime. Tests rely on 1 happening - // before 2 even though it isn't strictly necessary. - - // 1. Persist our "applied through" optime to disk. - _consistencyMarkers->setAppliedThrough(&opCtx, lastOpTimeInBatch); - - // 2. Ensure that the last applied op time hasn't changed since the start of this batch. - const auto lastAppliedOpTimeAtEndOfBatch = replCoord->getMyLastAppliedOpTime(); - invariant(lastAppliedOpTimeAtStartOfBatch == lastAppliedOpTimeAtEndOfBatch, - str::stream() << "the last known applied OpTime has changed from " - << lastAppliedOpTimeAtStartOfBatch.toString() << " to " - << lastAppliedOpTimeAtEndOfBatch.toString() - << " in the middle of batch application"); - - // 3. Update oplog visibility by notifying the storage engine of the new oplog entries. - const bool orderedCommit = true; - _storageInterface->oplogDiskLocRegister( - &opCtx, lastOpTimeInBatch.getTimestamp(), orderedCommit); - - // 4. Finalize this batch. We are at a consistent optime if our current optime is >= the - // current 'minValid' optime. In case we crash while applying a batch, multiApply advances - // minValid to the last opTime in the batch, so check minValid *after* calling multiApply. - const auto minValid = _consistencyMarkers->getMinValid(&opCtx); - auto consistency = (lastOpTimeInBatch >= minValid) - ? ReplicationCoordinator::DataConsistency::Consistent - : ReplicationCoordinator::DataConsistency::Inconsistent; - - // The finalizer advances the global timestamp to lastOpTimeInBatch. - finalizer->record({lastOpTimeInBatch, lastWallTimeInBatch}, consistency); - } -} - void SyncTail::shutdown() { stdx::lock_guard<Latch> lock(_mutex); _inShutdown = true; |