summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/sync_tail.cpp
diff options
context:
space:
mode:
authorMihai Andrei <mihai.andrei@mongodb.com>2019-09-23 18:51:11 +0000
committerevergreen <evergreen@mongodb.com>2019-09-23 18:51:11 +0000
commita9b6247ed5d02bd546255b3c8974f7eefd1a0a17 (patch)
tree8c9d418827db090f53714609822e385607cdb43a /src/mongo/db/repl/sync_tail.cpp
parentebb647b0b494b65df4cec1cd7b41f87a4b8f16f7 (diff)
downloadmongo-a9b6247ed5d02bd546255b3c8974f7eefd1a0a17.tar.gz
SERVER-43327 Move runLoop from SyncTail to OplogApplierImpl and move OpQueueBatcher to its own file
Diffstat (limited to 'src/mongo/db/repl/sync_tail.cpp')
-rw-r--r--src/mongo/db/repl/sync_tail.cpp389
1 files changed, 0 insertions, 389 deletions
diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp
index a8c66775369..18b3d7e0495 100644
--- a/src/mongo/db/repl/sync_tail.cpp
+++ b/src/mongo/db/repl/sync_tail.cpp
@@ -47,7 +47,6 @@
#include "mongo/db/catalog/document_validation.h"
#include "mongo/db/catalog_raii.h"
#include "mongo/db/client.h"
-#include "mongo/db/commands/fsync.h"
#include "mongo/db/commands/server_status_metric.h"
#include "mongo/db/commands/txn_cmds_gen.h"
#include "mongo/db/concurrency/d_concurrency.h"
@@ -103,109 +102,6 @@ ServerStatusMetricField<Counter64> displayOplogApplicationBatchSize("repl.apply.
TimerStats applyBatchStats;
ServerStatusMetricField<TimerStats> displayOpBatchesApplied("repl.apply.batches", &applyBatchStats);
-class ApplyBatchFinalizer {
-public:
- ApplyBatchFinalizer(ReplicationCoordinator* replCoord) : _replCoord(replCoord) {}
- virtual ~ApplyBatchFinalizer(){};
-
- virtual void record(const OpTimeAndWallTime& newOpTimeAndWallTime,
- ReplicationCoordinator::DataConsistency consistency) {
- _recordApplied(newOpTimeAndWallTime, consistency);
- };
-
-protected:
- void _recordApplied(const OpTimeAndWallTime& newOpTimeAndWallTime,
- ReplicationCoordinator::DataConsistency consistency) {
- // We have to use setMyLastAppliedOpTimeAndWallTimeForward since this thread races with
- // ReplicationExternalStateImpl::onTransitionToPrimary.
- _replCoord->setMyLastAppliedOpTimeAndWallTimeForward(newOpTimeAndWallTime, consistency);
- }
-
- void _recordDurable(const OpTimeAndWallTime& newOpTimeAndWallTime) {
- // We have to use setMyLastDurableOpTimeForward since this thread races with
- // ReplicationExternalStateImpl::onTransitionToPrimary.
- _replCoord->setMyLastDurableOpTimeAndWallTimeForward(newOpTimeAndWallTime);
- }
-
-private:
- // Used to update the replication system's progress.
- ReplicationCoordinator* _replCoord;
-};
-
-class ApplyBatchFinalizerForJournal : public ApplyBatchFinalizer {
-public:
- ApplyBatchFinalizerForJournal(ReplicationCoordinator* replCoord)
- : ApplyBatchFinalizer(replCoord),
- _waiterThread{&ApplyBatchFinalizerForJournal::_run, this} {};
- ~ApplyBatchFinalizerForJournal();
-
- void record(const OpTimeAndWallTime& newOpTimeAndWallTime,
- ReplicationCoordinator::DataConsistency consistency) override;
-
-private:
- /**
- * Loops continuously, waiting for writes to be flushed to disk and then calls
- * ReplicationCoordinator::setMyLastOptime with _latestOpTime.
- * Terminates once _shutdownSignaled is set true.
- */
- void _run();
-
- // Protects _cond, _shutdownSignaled, and _latestOpTime.
- Mutex _mutex = MONGO_MAKE_LATCH("ApplyBatchFinalizerForJournal::_mutex");
- // Used to alert our thread of a new OpTime.
- stdx::condition_variable _cond;
- // The next OpTime to set as the ReplicationCoordinator's lastOpTime after flushing.
- OpTimeAndWallTime _latestOpTimeAndWallTime;
- // Once this is set to true the _run method will terminate.
- bool _shutdownSignaled = false;
- // Thread that will _run(). Must be initialized last as it depends on the other variables.
- stdx::thread _waiterThread;
-};
-
-ApplyBatchFinalizerForJournal::~ApplyBatchFinalizerForJournal() {
- stdx::unique_lock<Latch> lock(_mutex);
- _shutdownSignaled = true;
- _cond.notify_all();
- lock.unlock();
-
- _waiterThread.join();
-}
-
-void ApplyBatchFinalizerForJournal::record(const OpTimeAndWallTime& newOpTimeAndWallTime,
- ReplicationCoordinator::DataConsistency consistency) {
- _recordApplied(newOpTimeAndWallTime, consistency);
-
- stdx::unique_lock<Latch> lock(_mutex);
- _latestOpTimeAndWallTime = newOpTimeAndWallTime;
- _cond.notify_all();
-}
-
-void ApplyBatchFinalizerForJournal::_run() {
- Client::initThread("ApplyBatchFinalizerForJournal");
-
- while (true) {
- OpTimeAndWallTime latestOpTimeAndWallTime = {OpTime(), Date_t()};
-
- {
- stdx::unique_lock<Latch> lock(_mutex);
- while (_latestOpTimeAndWallTime.opTime.isNull() && !_shutdownSignaled) {
- _cond.wait(lock);
- }
-
- if (_shutdownSignaled) {
- return;
- }
-
- latestOpTimeAndWallTime = _latestOpTimeAndWallTime;
- _latestOpTimeAndWallTime = {OpTime(), Date_t()};
- }
-
- auto opCtx = cc().makeOperationContext();
- opCtx->recoveryUnit()->waitUntilDurable(opCtx.get());
- _recordDurable(latestOpTimeAndWallTime);
- }
-}
-
NamespaceString parseUUIDOrNs(OperationContext* opCtx, const OplogEntry& oplogEntry) {
auto optionalUuid = oplogEntry.getUuid();
if (!optionalUuid) {
@@ -470,291 +366,6 @@ void addDerivedOps(OperationContext* opCtx,
} // namespace
-class SyncTail::OpQueueBatcher {
- OpQueueBatcher(const OpQueueBatcher&) = delete;
- OpQueueBatcher& operator=(const OpQueueBatcher&) = delete;
-
-public:
- OpQueueBatcher(SyncTail* syncTail,
- StorageInterface* storageInterface,
- OplogBuffer* oplogBuffer,
- OplogApplier::GetNextApplierBatchFn getNextApplierBatchFn)
- : _syncTail(syncTail),
- _storageInterface(storageInterface),
- _oplogBuffer(oplogBuffer),
- _getNextApplierBatchFn(getNextApplierBatchFn),
- _ops(0),
- _thread([this] { run(); }) {}
- ~OpQueueBatcher() {
- invariant(_isDead);
- _thread.join();
- }
-
- OpQueue getNextBatch(Seconds maxWaitTime) {
- stdx::unique_lock<Latch> lk(_mutex);
- // _ops can indicate the following cases:
- // 1. A new batch is ready to consume.
- // 2. Shutdown.
- // 3. The batch has (or had) exhausted the buffer in draining mode.
- // 4. Empty batch since the batch has/had exhausted the buffer but not in draining mode,
- // so there could be new oplog entries coming.
- // 5. Empty batch since the batcher is still running.
- //
- // In case (4) and (5), we wait for up to "maxWaitTime".
- if (_ops.empty() && !_ops.mustShutdown() && !_ops.termWhenExhausted()) {
- // We intentionally don't care about whether this returns due to signaling or timeout
- // since we do the same thing either way: return whatever is in _ops.
- (void)_cv.wait_for(lk, maxWaitTime.toSystemDuration());
- }
-
- OpQueue ops = std::move(_ops);
- _ops = OpQueue(0);
- _cv.notify_all();
- return ops;
- }
-
-private:
- /**
- * If slaveDelay is enabled, this function calculates the most recent timestamp of any oplog
- * entries that can be be returned in a batch.
- */
- boost::optional<Date_t> _calculateSlaveDelayLatestTimestamp() {
- auto service = cc().getServiceContext();
- auto replCoord = ReplicationCoordinator::get(service);
- auto slaveDelay = replCoord->getSlaveDelaySecs();
- if (slaveDelay <= Seconds(0)) {
- return {};
- }
- auto fastClockSource = service->getFastClockSource();
- return fastClockSource->now() - slaveDelay;
- }
-
- void run() {
- Client::initThread("ReplBatcher");
-
- BatchLimits batchLimits;
-
- while (true) {
- rsSyncApplyStop.pauseWhileSet();
-
- batchLimits.slaveDelayLatestTimestamp = _calculateSlaveDelayLatestTimestamp();
-
- // Check the limits once per batch since users can change them at runtime.
- batchLimits.ops = getBatchLimitOplogEntries();
-
- OpQueue ops(batchLimits.ops);
- {
- auto opCtx = cc().makeOperationContext();
-
- // This use of UninterruptibleLockGuard is intentional. It is undesirable to use an
- // UninterruptibleLockGuard in client operations because stepdown requires the
- // ability to interrupt client operations. However, it is acceptable to use an
- // UninterruptibleLockGuard in batch application because the only cause of
- // interruption would be shutdown, and the ReplBatcher thread has its own shutdown
- // handling.
- UninterruptibleLockGuard noInterrupt(opCtx->lockState());
-
- // Locks the oplog to check its max size, do this in the UninterruptibleLockGuard.
- batchLimits.bytes = getBatchLimitOplogBytes(opCtx.get(), _storageInterface);
-
- auto oplogEntries =
- fassertNoTrace(31004, _getNextApplierBatchFn(opCtx.get(), batchLimits));
- for (const auto& oplogEntry : oplogEntries) {
- ops.emplace_back(oplogEntry);
- }
-
- // If we don't have anything in the queue, wait a bit for something to appear.
- if (oplogEntries.empty()) {
- if (_syncTail->inShutdown()) {
- ops.setMustShutdownFlag();
- } else {
- // Block up to 1 second.
- _oplogBuffer->waitForData(Seconds(1));
- }
- }
- }
-
- if (ops.empty() && !ops.mustShutdown()) {
- // Check whether we have drained the oplog buffer. The states checked here can be
- // stale when it's used by the applier. signalDrainComplete() needs to check the
- // applier is still draining in the same term to make sure these states have not
- // changed.
- auto replCoord = ReplicationCoordinator::get(cc().getServiceContext());
- // Check the term first to detect DRAINING -> RUNNING -> DRAINING when signaling
- // drain complete.
- //
- // Batcher can delay arbitrarily. After stepup, if the batcher drained the buffer
- // and blocks when it's about to notify the applier to signal drain complete, the
- // node may step down and fetch new data into the buffer and then step up again.
- // Now the batcher will resume and let the applier signal drain complete even if
- // the buffer has new data. Checking the term before and after ensures nothing
- // changed in between.
- auto termWhenBufferIsEmpty = replCoord->getTerm();
- // Draining state guarantees the producer has already been fully stopped and no more
- // operations will be pushed in to the oplog buffer until the applier state changes.
- auto isDraining =
- replCoord->getApplierState() == ReplicationCoordinator::ApplierState::Draining;
- // Check the oplog buffer after the applier state to ensure the producer is stopped.
- if (isDraining && _oplogBuffer->isEmpty()) {
- ops.setTermWhenExhausted(termWhenBufferIsEmpty);
- log() << "Oplog buffer has been drained in term " << termWhenBufferIsEmpty;
- } else {
- // Don't emit empty batches.
- continue;
- }
- }
-
- stdx::unique_lock<Latch> lk(_mutex);
- // Block until the previous batch has been taken.
- _cv.wait(lk, [&] { return _ops.empty() && !_ops.termWhenExhausted(); });
- _ops = std::move(ops);
- _cv.notify_all();
- if (_ops.mustShutdown()) {
- _isDead = true;
- return;
- }
- }
- }
-
- SyncTail* const _syncTail;
- StorageInterface* const _storageInterface;
- OplogBuffer* const _oplogBuffer;
- OplogApplier::GetNextApplierBatchFn const _getNextApplierBatchFn;
-
- Mutex _mutex = MONGO_MAKE_LATCH("OpQueueBatcher::_mutex"); // Guards _ops.
- stdx::condition_variable _cv;
- OpQueue _ops;
-
- // This only exists so the destructor invariants rather than deadlocking.
- // TODO remove once we trust noexcept enough to mark oplogApplication() as noexcept.
- bool _isDead = false;
-
- stdx::thread _thread; // Must be last so all other members are initialized before starting.
-};
-
-void SyncTail::runLoop(OplogBuffer* oplogBuffer,
- OplogApplier::GetNextApplierBatchFn getNextApplierBatchFn,
- ReplicationCoordinator* replCoord) {
- // We don't start data replication for arbiters at all and it's not allowed to reconfig
- // arbiterOnly field for any member.
- invariant(!replCoord->getMemberState().arbiter());
-
- OpQueueBatcher batcher(this, _storageInterface, oplogBuffer, getNextApplierBatchFn);
-
- std::unique_ptr<ApplyBatchFinalizer> finalizer{
- getGlobalServiceContext()->getStorageEngine()->isDurable()
- ? new ApplyBatchFinalizerForJournal(replCoord)
- : new ApplyBatchFinalizer(replCoord)};
-
- while (true) { // Exits on message from OpQueueBatcher.
- // Use a new operation context each iteration, as otherwise we may appear to use a single
- // collection name to refer to collections with different UUIDs.
- const ServiceContext::UniqueOperationContext opCtxPtr = cc().makeOperationContext();
- OperationContext& opCtx = *opCtxPtr;
-
- // This code path gets used during elections, so it should not be subject to Flow Control.
- // It is safe to exclude this operation context from Flow Control here because this code
- // path only gets used on secondaries or on a node transitioning to primary.
- opCtx.setShouldParticipateInFlowControl(false);
-
- // For pausing replication in tests.
- if (MONGO_unlikely(rsSyncApplyStop.shouldFail())) {
- log() << "sync tail - rsSyncApplyStop fail point enabled. Blocking until fail point is "
- "disabled.";
- while (MONGO_unlikely(rsSyncApplyStop.shouldFail())) {
- // Tests should not trigger clean shutdown while that failpoint is active. If we
- // think we need this, we need to think hard about what the behavior should be.
- if (inShutdown()) {
- severe() << "Turn off rsSyncApplyStop before attempting clean shutdown";
- fassertFailedNoTrace(40304);
- }
- sleepmillis(10);
- }
- }
-
- // Transition to SECONDARY state, if possible.
- replCoord->finishRecoveryIfEligible(&opCtx);
-
- // Blocks up to a second waiting for a batch to be ready to apply. If one doesn't become
- // ready in time, we'll loop again so we can do the above checks periodically.
- OpQueue ops = batcher.getNextBatch(Seconds(1));
- if (ops.empty()) {
- if (ops.mustShutdown()) {
- // Shut down and exit oplog application loop.
- return;
- }
- if (MONGO_unlikely(rsSyncApplyStop.shouldFail())) {
- continue;
- }
- if (ops.termWhenExhausted()) {
- // Signal drain complete if we're in Draining state and the buffer is empty.
- // Since we check the states of batcher and oplog buffer without synchronization,
- // they can be stale. We make sure the applier is still draining in the given term
- // before and after the check, so that if the oplog buffer was exhausted, then
- // it still will be.
- replCoord->signalDrainComplete(&opCtx, *ops.termWhenExhausted());
- }
- continue; // Try again.
- }
-
- // Extract some info from ops that we'll need after releasing the batch below.
- const auto firstOpTimeInBatch = ops.front().getOpTime();
- const auto lastOpInBatch = ops.back();
- const auto lastOpTimeInBatch = lastOpInBatch.getOpTime();
- const auto lastWallTimeInBatch = lastOpInBatch.getWallClockTime();
- const auto lastAppliedOpTimeAtStartOfBatch = replCoord->getMyLastAppliedOpTime();
-
- // Make sure the oplog doesn't go back in time or repeat an entry.
- if (firstOpTimeInBatch <= lastAppliedOpTimeAtStartOfBatch) {
- fassert(34361,
- Status(ErrorCodes::OplogOutOfOrder,
- str::stream() << "Attempted to apply an oplog entry ("
- << firstOpTimeInBatch.toString()
- << ") which is not greater than our last applied OpTime ("
- << lastAppliedOpTimeAtStartOfBatch.toString() << ")."));
- }
-
- // Don't allow the fsync+lock thread to see intermediate states of batch application.
- stdx::lock_guard<SimpleMutex> fsynclk(filesLockedFsync);
-
- // Apply the operations in this batch. 'multiApply' returns the optime of the last op that
- // was applied, which should be the last optime in the batch.
- auto lastOpTimeAppliedInBatch =
- fassertNoTrace(34437, multiApply(&opCtx, ops.releaseBatch()));
- invariant(lastOpTimeAppliedInBatch == lastOpTimeInBatch);
-
- // Update various things that care about our last applied optime. Tests rely on 1 happening
- // before 2 even though it isn't strictly necessary.
-
- // 1. Persist our "applied through" optime to disk.
- _consistencyMarkers->setAppliedThrough(&opCtx, lastOpTimeInBatch);
-
- // 2. Ensure that the last applied op time hasn't changed since the start of this batch.
- const auto lastAppliedOpTimeAtEndOfBatch = replCoord->getMyLastAppliedOpTime();
- invariant(lastAppliedOpTimeAtStartOfBatch == lastAppliedOpTimeAtEndOfBatch,
- str::stream() << "the last known applied OpTime has changed from "
- << lastAppliedOpTimeAtStartOfBatch.toString() << " to "
- << lastAppliedOpTimeAtEndOfBatch.toString()
- << " in the middle of batch application");
-
- // 3. Update oplog visibility by notifying the storage engine of the new oplog entries.
- const bool orderedCommit = true;
- _storageInterface->oplogDiskLocRegister(
- &opCtx, lastOpTimeInBatch.getTimestamp(), orderedCommit);
-
- // 4. Finalize this batch. We are at a consistent optime if our current optime is >= the
- // current 'minValid' optime. In case we crash while applying a batch, multiApply advances
- // minValid to the last opTime in the batch, so check minValid *after* calling multiApply.
- const auto minValid = _consistencyMarkers->getMinValid(&opCtx);
- auto consistency = (lastOpTimeInBatch >= minValid)
- ? ReplicationCoordinator::DataConsistency::Consistent
- : ReplicationCoordinator::DataConsistency::Inconsistent;
-
- // The finalizer advances the global timestamp to lastOpTimeInBatch.
- finalizer->record({lastOpTimeInBatch, lastWallTimeInBatch}, consistency);
- }
-}
-
void SyncTail::shutdown() {
stdx::lock_guard<Latch> lock(_mutex);
_inShutdown = true;