diff options
author | Mihai Andrei <mihai.andrei@mongodb.com> | 2019-09-23 18:51:11 +0000 |
---|---|---|
committer | evergreen <evergreen@mongodb.com> | 2019-09-23 18:51:11 +0000 |
commit | a9b6247ed5d02bd546255b3c8974f7eefd1a0a17 (patch) | |
tree | 8c9d418827db090f53714609822e385607cdb43a | |
parent | ebb647b0b494b65df4cec1cd7b41f87a4b8f16f7 (diff) | |
download | mongo-a9b6247ed5d02bd546255b3c8974f7eefd1a0a17.tar.gz |
SERVER-43327 Move runLoop from SyncTail to OplogApplierImpl and move OpQueueBatcher to its own file
-rw-r--r-- | src/mongo/db/repl/SConscript | 3 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_applier_impl.cpp | 237 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_applier_impl.h | 17 | ||||
-rw-r--r-- | src/mongo/db/repl/opqueue_batcher.cpp | 181 | ||||
-rw-r--r-- | src/mongo/db/repl/opqueue_batcher.h | 163 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_tail.cpp | 389 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_tail.h | 90 |
7 files changed, 599 insertions, 481 deletions
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index e683788d1f9..cd06cba18bc 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -523,8 +523,9 @@ env.Library( source=[ 'insert_group.cpp', 'oplog_applier_impl.cpp', + 'opqueue_batcher.cpp', 'session_update_tracker.cpp', - 'sync_tail.cpp', + 'sync_tail.cpp' ], LIBDEPS=[ '$BUILD_DIR/mongo/db/auth/authorization_manager_global', diff --git a/src/mongo/db/repl/oplog_applier_impl.cpp b/src/mongo/db/repl/oplog_applier_impl.cpp index 860505e3e7c..1c094f6d077 100644 --- a/src/mongo/db/repl/oplog_applier_impl.cpp +++ b/src/mongo/db/repl/oplog_applier_impl.cpp @@ -27,13 +27,124 @@ * it in the license file. */ +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication + #include "mongo/platform/basic.h" +#include "mongo/util/log.h" #include "mongo/db/repl/oplog_applier_impl.h" namespace mongo { namespace repl { + +namespace { + +class ApplyBatchFinalizer { +public: + ApplyBatchFinalizer(ReplicationCoordinator* replCoord) : _replCoord(replCoord) {} + virtual ~ApplyBatchFinalizer(){}; + + virtual void record(const OpTimeAndWallTime& newOpTimeAndWallTime, + ReplicationCoordinator::DataConsistency consistency) { + _recordApplied(newOpTimeAndWallTime, consistency); + }; + +protected: + void _recordApplied(const OpTimeAndWallTime& newOpTimeAndWallTime, + ReplicationCoordinator::DataConsistency consistency) { + // We have to use setMyLastAppliedOpTimeAndWallTimeForward since this thread races with + // ReplicationExternalStateImpl::onTransitionToPrimary. + _replCoord->setMyLastAppliedOpTimeAndWallTimeForward(newOpTimeAndWallTime, consistency); + } + + void _recordDurable(const OpTimeAndWallTime& newOpTimeAndWallTime) { + // We have to use setMyLastDurableOpTimeAndWallTimeFoward since this thread races with + // ReplicationExternalStateImpl::onTransitionToPrimary. + _replCoord->setMyLastDurableOpTimeAndWallTimeForward(newOpTimeAndWallTime); + } + +private: + // Used to update the replication system's progress. + ReplicationCoordinator* _replCoord; +}; + +class ApplyBatchFinalizerForJournal : public ApplyBatchFinalizer { +public: + ApplyBatchFinalizerForJournal(ReplicationCoordinator* replCoord) + : ApplyBatchFinalizer(replCoord), + _waiterThread{&ApplyBatchFinalizerForJournal::_run, this} {}; + ~ApplyBatchFinalizerForJournal(); + + void record(const OpTimeAndWallTime& newOpTimeAndWallTime, + ReplicationCoordinator::DataConsistency consistency) override; + +private: + /** + * Loops continuously, waiting for writes to be flushed to disk and then calls + * ReplicationCoordinator::setMyLastOptime with _latestOpTime. + * Terminates once _shutdownSignaled is set true. + */ + void _run(); + + // Protects _cond, _shutdownSignaled, and _latestOpTime. + Mutex _mutex = MONGO_MAKE_LATCH("OplogApplierImpl::_mutex"); + // Used to alert our thread of a new OpTime. + stdx::condition_variable _cond; + // The next OpTime to set as the ReplicationCoordinator's lastOpTime after flushing. + OpTimeAndWallTime _latestOpTimeAndWallTime; + // Once this is set to true the _run method will terminate. + bool _shutdownSignaled = false; + // Thread that will _run(). Must be initialized last as it depends on the other variables. + stdx::thread _waiterThread; +}; + +ApplyBatchFinalizerForJournal::~ApplyBatchFinalizerForJournal() { + stdx::unique_lock<Latch> lock(_mutex); + _shutdownSignaled = true; + _cond.notify_all(); + lock.unlock(); + + _waiterThread.join(); +} + +void ApplyBatchFinalizerForJournal::record(const OpTimeAndWallTime& newOpTimeAndWallTime, + ReplicationCoordinator::DataConsistency consistency) { + _recordApplied(newOpTimeAndWallTime, consistency); + + stdx::unique_lock<Latch> lock(_mutex); + _latestOpTimeAndWallTime = newOpTimeAndWallTime; + _cond.notify_all(); +} + +void ApplyBatchFinalizerForJournal::_run() { + Client::initThread("ApplyBatchFinalizerForJournal"); + + while (true) { + OpTimeAndWallTime latestOpTimeAndWallTime = {OpTime(), Date_t()}; + + { + stdx::unique_lock<Latch> lock(_mutex); + while (_latestOpTimeAndWallTime.opTime.isNull() && !_shutdownSignaled) { + _cond.wait(lock); + } + + if (_shutdownSignaled) { + return; + } + + latestOpTimeAndWallTime = _latestOpTimeAndWallTime; + _latestOpTimeAndWallTime = {OpTime(), Date_t()}; + } + + auto opCtx = cc().makeOperationContext(); + opCtx->recoveryUnit()->waitUntilDurable(opCtx.get()); + _recordDurable(latestOpTimeAndWallTime); + } +} + +} // namespace + OplogApplierImpl::OplogApplierImpl(executor::TaskExecutor* executor, OplogBuffer* oplogBuffer, Observer* observer, @@ -44,6 +155,8 @@ OplogApplierImpl::OplogApplierImpl(executor::TaskExecutor* executor, ThreadPool* writerPool) : OplogApplier(executor, oplogBuffer, observer, options), _replCoord(replCoord), + _storageInterface(storageInterface), + _consistencyMarkers(consistencyMarkers), _syncTail( observer, consistencyMarkers, storageInterface, multiSyncApply, writerPool, options), _beginApplyingOpTime(options.beginApplyingOpTime) {} @@ -52,13 +165,135 @@ void OplogApplierImpl::_run(OplogBuffer* oplogBuffer) { auto getNextApplierBatchFn = [this](OperationContext* opCtx, const BatchLimits& batchLimits) { return getNextApplierBatch(opCtx, batchLimits); }; - _syncTail.runLoop(oplogBuffer, getNextApplierBatchFn, _replCoord); + _runLoop(oplogBuffer, getNextApplierBatchFn); } void OplogApplierImpl::_shutdown() { _syncTail.shutdown(); } +void OplogApplierImpl::_runLoop(OplogBuffer* oplogBuffer, + OplogApplier::GetNextApplierBatchFn getNextApplierBatchFn) { + // We don't start data replication for arbiters at all and it's not allowed to reconfig + // arbiterOnly field for any member. + invariant(!_replCoord->getMemberState().arbiter()); + + OpQueueBatcher batcher(&_syncTail, _storageInterface, oplogBuffer, getNextApplierBatchFn); + + std::unique_ptr<ApplyBatchFinalizer> finalizer{ + getGlobalServiceContext()->getStorageEngine()->isDurable() + ? new ApplyBatchFinalizerForJournal(_replCoord) + : new ApplyBatchFinalizer(_replCoord)}; + + while (true) { // Exits on message from OpQueueBatcher. + // Use a new operation context each iteration, as otherwise we may appear to use a single + // collection name to refer to collections with different UUIDs. + const ServiceContext::UniqueOperationContext opCtxPtr = cc().makeOperationContext(); + OperationContext& opCtx = *opCtxPtr; + + // This code path gets used during elections, so it should not be subject to Flow Control. + // It is safe to exclude this operation context from Flow Control here because this code + // path only gets used on secondaries or on a node transitioning to primary. + opCtx.setShouldParticipateInFlowControl(false); + + // For pausing replication in tests. + if (MONGO_unlikely(rsSyncApplyStop.shouldFail())) { + log() << "Oplog Applier - rsSyncApplyStop fail point enabled. Blocking until fail " + "point is disabled."; + while (MONGO_unlikely(rsSyncApplyStop.shouldFail())) { + // Tests should not trigger clean shutdown while that failpoint is active. If we + // think we need this, we need to think hard about what the behavior should be. + if (_syncTail.inShutdown()) { + severe() << "Turn off rsSyncApplyStop before attempting clean shutdown"; + fassertFailedNoTrace(40304); + } + sleepmillis(10); + } + } + + // Transition to SECONDARY state, if possible. + _replCoord->finishRecoveryIfEligible(&opCtx); + + // Blocks up to a second waiting for a batch to be ready to apply. If one doesn't become + // ready in time, we'll loop again so we can do the above checks periodically. + OpQueue ops = batcher.getNextBatch(Seconds(1)); + if (ops.empty()) { + if (ops.mustShutdown()) { + // Shut down and exit oplog application loop. + return; + } + if (MONGO_unlikely(rsSyncApplyStop.shouldFail())) { + continue; + } + if (ops.termWhenExhausted()) { + // Signal drain complete if we're in Draining state and the buffer is empty. + // Since we check the states of batcher and oplog buffer without synchronization, + // they can be stale. We make sure the applier is still draining in the given term + // before and after the check, so that if the oplog buffer was exhausted, then + // it still will be. + _replCoord->signalDrainComplete(&opCtx, *ops.termWhenExhausted()); + } + continue; // Try again. + } + + // Extract some info from ops that we'll need after releasing the batch below. + const auto firstOpTimeInBatch = ops.front().getOpTime(); + const auto lastOpInBatch = ops.back(); + const auto lastOpTimeInBatch = lastOpInBatch.getOpTime(); + const auto lastWallTimeInBatch = lastOpInBatch.getWallClockTime(); + const auto lastAppliedOpTimeAtStartOfBatch = _replCoord->getMyLastAppliedOpTime(); + + // Make sure the oplog doesn't go back in time or repeat an entry. + if (firstOpTimeInBatch <= lastAppliedOpTimeAtStartOfBatch) { + fassert(34361, + Status(ErrorCodes::OplogOutOfOrder, + str::stream() << "Attempted to apply an oplog entry (" + << firstOpTimeInBatch.toString() + << ") which is not greater than our last applied OpTime (" + << lastAppliedOpTimeAtStartOfBatch.toString() << ").")); + } + + // Don't allow the fsync+lock thread to see intermediate states of batch application. + stdx::lock_guard<SimpleMutex> fsynclk(filesLockedFsync); + + // Apply the operations in this batch. 'multiApply' returns the optime of the last op that + // was applied, which should be the last optime in the batch. + auto lastOpTimeAppliedInBatch = + fassertNoTrace(34437, _syncTail.multiApply(&opCtx, ops.releaseBatch())); + invariant(lastOpTimeAppliedInBatch == lastOpTimeInBatch); + + // Update various things that care about our last applied optime. Tests rely on 1 happening + // before 2 even though it isn't strictly necessary. + + // 1. Persist our "applied through" optime to disk. + _consistencyMarkers->setAppliedThrough(&opCtx, lastOpTimeInBatch); + + // 2. Ensure that the last applied op time hasn't changed since the start of this batch. + const auto lastAppliedOpTimeAtEndOfBatch = _replCoord->getMyLastAppliedOpTime(); + invariant(lastAppliedOpTimeAtStartOfBatch == lastAppliedOpTimeAtEndOfBatch, + str::stream() << "the last known applied OpTime has changed from " + << lastAppliedOpTimeAtStartOfBatch.toString() << " to " + << lastAppliedOpTimeAtEndOfBatch.toString() + << " in the middle of batch application"); + + // 3. Update oplog visibility by notifying the storage engine of the new oplog entries. + const bool orderedCommit = true; + _storageInterface->oplogDiskLocRegister( + &opCtx, lastOpTimeInBatch.getTimestamp(), orderedCommit); + + // 4. Finalize this batch. We are at a consistent optime if our current optime is >= the + // current 'minValid' optime. Note that recording the lastOpTime in the finalizer includes + // advancing the global timestamp to at least its timestamp. + const auto minValid = _consistencyMarkers->getMinValid(&opCtx); + auto consistency = (lastOpTimeInBatch >= minValid) + ? ReplicationCoordinator::DataConsistency::Consistent + : ReplicationCoordinator::DataConsistency::Inconsistent; + + // The finalizer advances the global timestamp to lastOpTimeInBatch. + finalizer->record({lastOpTimeInBatch, lastWallTimeInBatch}, consistency); + } +} + StatusWith<OpTime> OplogApplierImpl::_multiApply(OperationContext* opCtx, Operations ops) { return _syncTail.multiApply(opCtx, std::move(ops)); } diff --git a/src/mongo/db/repl/oplog_applier_impl.h b/src/mongo/db/repl/oplog_applier_impl.h index d6e0e523220..087e5870e56 100644 --- a/src/mongo/db/repl/oplog_applier_impl.h +++ b/src/mongo/db/repl/oplog_applier_impl.h @@ -30,7 +30,12 @@ #pragma once +#include "mongo/db/commands/fsync.h" +#include "mongo/db/commands/server_status_metric.h" +#include "mongo/db/concurrency/replication_state_transition_lock_guard.h" +#include "mongo/db/repl/initial_syncer.h" #include "mongo/db/repl/oplog_applier.h" +#include "mongo/db/repl/opqueue_batcher.h" #include "mongo/db/repl/replication_consistency_markers.h" #include "mongo/db/repl/replication_coordinator.h" #include "mongo/db/repl/storage_interface.h" @@ -67,11 +72,23 @@ private: void _shutdown() override; + /** + * Runs oplog application in a loop until shutdown() is called. + * Retrieves operations from the OplogBuffer in batches that will be applied in parallel using + * multiApply(). + */ + void _runLoop(OplogBuffer* oplogBuffer, + OplogApplier::GetNextApplierBatchFn getNextApplierBatchFn); + StatusWith<OpTime> _multiApply(OperationContext* opCtx, Operations ops) override; // Not owned by us. ReplicationCoordinator* const _replCoord; + StorageInterface* _storageInterface; + + ReplicationConsistencyMarkers* const _consistencyMarkers; + // Used to run oplog application loop. SyncTail _syncTail; diff --git a/src/mongo/db/repl/opqueue_batcher.cpp b/src/mongo/db/repl/opqueue_batcher.cpp new file mode 100644 index 00000000000..8c3802c1e3f --- /dev/null +++ b/src/mongo/db/repl/opqueue_batcher.cpp @@ -0,0 +1,181 @@ +/** + * Copyright (C) 2019-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication + +#include "mongo/db/repl/opqueue_batcher.h" +#include "mongo/util/log.h" + +namespace mongo { +namespace repl { + + +OpQueueBatcher::OpQueueBatcher(SyncTail* syncTail, + StorageInterface* storageInterface, + OplogBuffer* oplogBuffer, + OplogApplier::GetNextApplierBatchFn getNextApplierBatchFn) + : _syncTail(syncTail), + _storageInterface(storageInterface), + _oplogBuffer(oplogBuffer), + _getNextApplierBatchFn(getNextApplierBatchFn), + _ops(0), + _thread([this] { run(); }) {} +OpQueueBatcher::~OpQueueBatcher() { + invariant(_isDead); + _thread.join(); +} + + +OpQueue OpQueueBatcher::getNextBatch(Seconds maxWaitTime) { + stdx::unique_lock<Latch> lk(_mutex); + // _ops can indicate the following cases: + // 1. A new batch is ready to consume. + // 2. Shutdown. + // 3. The batch has (or had) exhausted the buffer in draining mode. + // 4. Empty batch since the batch has/had exhausted the buffer but not in draining mode, + // so there could be new oplog entries coming. + // 5. Empty batch since the batcher is still running. + // + // In case (4) and (5), we wait for up to "maxWaitTime". + if (_ops.empty() && !_ops.mustShutdown() && !_ops.termWhenExhausted()) { + // We intentionally don't care about whether this returns due to signaling or timeout + // since we do the same thing either way: return whatever is in _ops. + (void)_cv.wait_for(lk, maxWaitTime.toSystemDuration()); + } + + OpQueue ops = std::move(_ops); + _ops = OpQueue(0); + _cv.notify_all(); + return ops; +} + +/** + * If slaveDelay is enabled, this function calculates the most recent timestamp of any oplog + * entries that can be be returned in a batch. + */ +boost::optional<Date_t> OpQueueBatcher::_calculateSlaveDelayLatestTimestamp() { + auto service = cc().getServiceContext(); + auto replCoord = ReplicationCoordinator::get(service); + auto slaveDelay = replCoord->getSlaveDelaySecs(); + if (slaveDelay <= Seconds(0)) { + return {}; + } + auto fastClockSource = service->getFastClockSource(); + return fastClockSource->now() - slaveDelay; +} + +void OpQueueBatcher::run() { + Client::initThread("ReplBatcher"); + + OplogApplier::BatchLimits batchLimits; + + while (true) { + rsSyncApplyStop.pauseWhileSet(); + + batchLimits.slaveDelayLatestTimestamp = _calculateSlaveDelayLatestTimestamp(); + + // Check the limits once per batch since users can change them at runtime. + batchLimits.ops = getBatchLimitOplogEntries(); + + OpQueue ops(batchLimits.ops); + { + auto opCtx = cc().makeOperationContext(); + + // This use of UninterruptibleLockGuard is intentional. It is undesirable to use an + // UninterruptibleLockGuard in client operations because stepdown requires the + // ability to interrupt client operations. However, it is acceptable to use an + // UninterruptibleLockGuard in batch application because the only cause of + // interruption would be shutdown, and the ReplBatcher thread has its own shutdown + // handling. + UninterruptibleLockGuard noInterrupt(opCtx->lockState()); + + // Locks the oplog to check its max size, do this in the UninterruptibleLockGuard. + batchLimits.bytes = getBatchLimitOplogBytes(opCtx.get(), _storageInterface); + + auto oplogEntries = + fassertNoTrace(31004, _getNextApplierBatchFn(opCtx.get(), batchLimits)); + for (const auto& oplogEntry : oplogEntries) { + ops.emplace_back(oplogEntry); + } + + // If we don't have anything in the queue, wait a bit for something to appear. + if (oplogEntries.empty()) { + if (_syncTail->inShutdown()) { + ops.setMustShutdownFlag(); + } else { + // Block up to 1 second. + _oplogBuffer->waitForData(Seconds(1)); + } + } + } + + if (ops.empty() && !ops.mustShutdown()) { + // Check whether we have drained the oplog buffer. The states checked here can be + // stale when it's used by the applier. signalDrainComplete() needs to check the + // applier is still draining in the same term to make sure these states have not + // changed. + auto replCoord = ReplicationCoordinator::get(cc().getServiceContext()); + // Check the term first to detect DRAINING -> RUNNING -> DRAINING when signaling + // drain complete. + // + // Batcher can delay arbitrarily. After stepup, if the batcher drained the buffer + // and blocks when it's about to notify the applier to signal drain complete, the + // node may step down and fetch new data into the buffer and then step up again. + // Now the batcher will resume and let the applier signal drain complete even if + // the buffer has new data. Checking the term before and after ensures nothing + // changed in between. + auto termWhenBufferIsEmpty = replCoord->getTerm(); + // Draining state guarantees the producer has already been fully stopped and no more + // operations will be pushed in to the oplog buffer until the applier state changes. + auto isDraining = + replCoord->getApplierState() == ReplicationCoordinator::ApplierState::Draining; + // Check the oplog buffer after the applier state to ensure the producer is stopped. + if (isDraining && _oplogBuffer->isEmpty()) { + ops.setTermWhenExhausted(termWhenBufferIsEmpty); + log() << "Oplog buffer has been drained in term " << termWhenBufferIsEmpty; + } else { + // Don't emit empty batches. + continue; + } + } + + stdx::unique_lock<Latch> lk(_mutex); + // Block until the previous batch has been taken. + _cv.wait(lk, [&] { return _ops.empty() && !_ops.termWhenExhausted(); }); + _ops = std::move(ops); + _cv.notify_all(); + if (_ops.mustShutdown()) { + _isDead = true; + return; + } + } +} + +} // namespace repl +} // namespace mongo diff --git a/src/mongo/db/repl/opqueue_batcher.h b/src/mongo/db/repl/opqueue_batcher.h new file mode 100644 index 00000000000..0cdad053ddd --- /dev/null +++ b/src/mongo/db/repl/opqueue_batcher.h @@ -0,0 +1,163 @@ +/** + * Copyright (C) 2019-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/db/repl/initial_syncer.h" +#include "mongo/db/repl/sync_tail.h" + +namespace mongo { +namespace repl { + +class OpQueue { +public: + explicit OpQueue(std::size_t batchLimitOps) : _bytes(0) { + _batch.reserve(batchLimitOps); + } + + size_t getBytes() const { + return _bytes; + } + size_t getCount() const { + return _batch.size(); + } + bool empty() const { + return _batch.empty(); + } + const OplogEntry& front() const { + invariant(!_batch.empty()); + return _batch.front(); + } + const OplogEntry& back() const { + invariant(!_batch.empty()); + return _batch.back(); + } + const std::vector<OplogEntry>& getBatch() const { + return _batch; + } + + void emplace_back(OplogEntry oplog) { + invariant(!_mustShutdown); + _bytes += oplog.getRawObjSizeBytes(); + _batch.emplace_back(std::move(oplog)); + } + void pop_back() { + _bytes -= back().getRawObjSizeBytes(); + _batch.pop_back(); + } + + /** + * A batch with this set indicates that the upstream stages of the pipeline are shutdown and + * no more batches will be coming. + * + * This can only happen with empty batches. + * + * TODO replace the empty object used to signal draining with this. + */ + bool mustShutdown() const { + return _mustShutdown; + } + void setMustShutdownFlag() { + invariant(empty()); + _mustShutdown = true; + } + + /** + * If the oplog buffer is exhausted, return the term before we learned that the buffer was + * empty. + */ + boost::optional<long long> termWhenExhausted() const { + return _termWhenExhausted; + } + void setTermWhenExhausted(long long term) { + invariant(empty()); + _termWhenExhausted = term; + } + + /** + * Leaves this object in an unspecified state. Only assignment and destruction are valid. + */ + std::vector<OplogEntry> releaseBatch() { + return std::move(_batch); + } + +private: + std::vector<OplogEntry> _batch; + size_t _bytes; + bool _mustShutdown = false; + boost::optional<long long> _termWhenExhausted; +}; + +class OpQueueBatcher { + OpQueueBatcher(const OpQueueBatcher&) = delete; + OpQueueBatcher& operator=(const OpQueueBatcher&) = delete; + +public: + /** + * Constructs an OpQueueBatcher + */ + OpQueueBatcher(SyncTail* syncTail, + StorageInterface* storageInterface, + OplogBuffer* oplogBuffer, + OplogApplier::GetNextApplierBatchFn getNextApplierBatchFn); + + virtual ~OpQueueBatcher(); + + /** + * Retrieves the next batch of ops that are ready to apply. + */ + OpQueue getNextBatch(Seconds maxWaitTime); + +private: + /** + * If slaveDelay is enabled, this function calculates the most recent timestamp of any oplog + * entries that can be be returned in a batch. + */ + boost::optional<Date_t> _calculateSlaveDelayLatestTimestamp(); + + void run(); + + SyncTail* const _syncTail; + StorageInterface* const _storageInterface; + OplogBuffer* const _oplogBuffer; + OplogApplier::GetNextApplierBatchFn const _getNextApplierBatchFn; + + Mutex _mutex = MONGO_MAKE_LATCH("OpQueueBatcher::_mutex"); + stdx::condition_variable _cv; + OpQueue _ops; + + // This only exists so the destructor invariants rather than deadlocking. + bool _isDead = false; + + stdx::thread _thread; // Must be last so all other members are initialized before starting. +}; + + +} // namespace repl +} // namespace mongo diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp index a8c66775369..18b3d7e0495 100644 --- a/src/mongo/db/repl/sync_tail.cpp +++ b/src/mongo/db/repl/sync_tail.cpp @@ -47,7 +47,6 @@ #include "mongo/db/catalog/document_validation.h" #include "mongo/db/catalog_raii.h" #include "mongo/db/client.h" -#include "mongo/db/commands/fsync.h" #include "mongo/db/commands/server_status_metric.h" #include "mongo/db/commands/txn_cmds_gen.h" #include "mongo/db/concurrency/d_concurrency.h" @@ -103,109 +102,6 @@ ServerStatusMetricField<Counter64> displayOplogApplicationBatchSize("repl.apply. TimerStats applyBatchStats; ServerStatusMetricField<TimerStats> displayOpBatchesApplied("repl.apply.batches", &applyBatchStats); -class ApplyBatchFinalizer { -public: - ApplyBatchFinalizer(ReplicationCoordinator* replCoord) : _replCoord(replCoord) {} - virtual ~ApplyBatchFinalizer(){}; - - virtual void record(const OpTimeAndWallTime& newOpTimeAndWallTime, - ReplicationCoordinator::DataConsistency consistency) { - _recordApplied(newOpTimeAndWallTime, consistency); - }; - -protected: - void _recordApplied(const OpTimeAndWallTime& newOpTimeAndWallTime, - ReplicationCoordinator::DataConsistency consistency) { - // We have to use setMyLastAppliedOpTimeAndWallTimeForward since this thread races with - // ReplicationExternalStateImpl::onTransitionToPrimary. - _replCoord->setMyLastAppliedOpTimeAndWallTimeForward(newOpTimeAndWallTime, consistency); - } - - void _recordDurable(const OpTimeAndWallTime& newOpTimeAndWallTime) { - // We have to use setMyLastDurableOpTimeForward since this thread races with - // ReplicationExternalStateImpl::onTransitionToPrimary. - _replCoord->setMyLastDurableOpTimeAndWallTimeForward(newOpTimeAndWallTime); - } - -private: - // Used to update the replication system's progress. - ReplicationCoordinator* _replCoord; -}; - -class ApplyBatchFinalizerForJournal : public ApplyBatchFinalizer { -public: - ApplyBatchFinalizerForJournal(ReplicationCoordinator* replCoord) - : ApplyBatchFinalizer(replCoord), - _waiterThread{&ApplyBatchFinalizerForJournal::_run, this} {}; - ~ApplyBatchFinalizerForJournal(); - - void record(const OpTimeAndWallTime& newOpTimeAndWallTime, - ReplicationCoordinator::DataConsistency consistency) override; - -private: - /** - * Loops continuously, waiting for writes to be flushed to disk and then calls - * ReplicationCoordinator::setMyLastOptime with _latestOpTime. - * Terminates once _shutdownSignaled is set true. - */ - void _run(); - - // Protects _cond, _shutdownSignaled, and _latestOpTime. - Mutex _mutex = MONGO_MAKE_LATCH("ApplyBatchFinalizerForJournal::_mutex"); - // Used to alert our thread of a new OpTime. - stdx::condition_variable _cond; - // The next OpTime to set as the ReplicationCoordinator's lastOpTime after flushing. - OpTimeAndWallTime _latestOpTimeAndWallTime; - // Once this is set to true the _run method will terminate. - bool _shutdownSignaled = false; - // Thread that will _run(). Must be initialized last as it depends on the other variables. - stdx::thread _waiterThread; -}; - -ApplyBatchFinalizerForJournal::~ApplyBatchFinalizerForJournal() { - stdx::unique_lock<Latch> lock(_mutex); - _shutdownSignaled = true; - _cond.notify_all(); - lock.unlock(); - - _waiterThread.join(); -} - -void ApplyBatchFinalizerForJournal::record(const OpTimeAndWallTime& newOpTimeAndWallTime, - ReplicationCoordinator::DataConsistency consistency) { - _recordApplied(newOpTimeAndWallTime, consistency); - - stdx::unique_lock<Latch> lock(_mutex); - _latestOpTimeAndWallTime = newOpTimeAndWallTime; - _cond.notify_all(); -} - -void ApplyBatchFinalizerForJournal::_run() { - Client::initThread("ApplyBatchFinalizerForJournal"); - - while (true) { - OpTimeAndWallTime latestOpTimeAndWallTime = {OpTime(), Date_t()}; - - { - stdx::unique_lock<Latch> lock(_mutex); - while (_latestOpTimeAndWallTime.opTime.isNull() && !_shutdownSignaled) { - _cond.wait(lock); - } - - if (_shutdownSignaled) { - return; - } - - latestOpTimeAndWallTime = _latestOpTimeAndWallTime; - _latestOpTimeAndWallTime = {OpTime(), Date_t()}; - } - - auto opCtx = cc().makeOperationContext(); - opCtx->recoveryUnit()->waitUntilDurable(opCtx.get()); - _recordDurable(latestOpTimeAndWallTime); - } -} - NamespaceString parseUUIDOrNs(OperationContext* opCtx, const OplogEntry& oplogEntry) { auto optionalUuid = oplogEntry.getUuid(); if (!optionalUuid) { @@ -470,291 +366,6 @@ void addDerivedOps(OperationContext* opCtx, } // namespace -class SyncTail::OpQueueBatcher { - OpQueueBatcher(const OpQueueBatcher&) = delete; - OpQueueBatcher& operator=(const OpQueueBatcher&) = delete; - -public: - OpQueueBatcher(SyncTail* syncTail, - StorageInterface* storageInterface, - OplogBuffer* oplogBuffer, - OplogApplier::GetNextApplierBatchFn getNextApplierBatchFn) - : _syncTail(syncTail), - _storageInterface(storageInterface), - _oplogBuffer(oplogBuffer), - _getNextApplierBatchFn(getNextApplierBatchFn), - _ops(0), - _thread([this] { run(); }) {} - ~OpQueueBatcher() { - invariant(_isDead); - _thread.join(); - } - - OpQueue getNextBatch(Seconds maxWaitTime) { - stdx::unique_lock<Latch> lk(_mutex); - // _ops can indicate the following cases: - // 1. A new batch is ready to consume. - // 2. Shutdown. - // 3. The batch has (or had) exhausted the buffer in draining mode. - // 4. Empty batch since the batch has/had exhausted the buffer but not in draining mode, - // so there could be new oplog entries coming. - // 5. Empty batch since the batcher is still running. - // - // In case (4) and (5), we wait for up to "maxWaitTime". - if (_ops.empty() && !_ops.mustShutdown() && !_ops.termWhenExhausted()) { - // We intentionally don't care about whether this returns due to signaling or timeout - // since we do the same thing either way: return whatever is in _ops. - (void)_cv.wait_for(lk, maxWaitTime.toSystemDuration()); - } - - OpQueue ops = std::move(_ops); - _ops = OpQueue(0); - _cv.notify_all(); - return ops; - } - -private: - /** - * If slaveDelay is enabled, this function calculates the most recent timestamp of any oplog - * entries that can be be returned in a batch. - */ - boost::optional<Date_t> _calculateSlaveDelayLatestTimestamp() { - auto service = cc().getServiceContext(); - auto replCoord = ReplicationCoordinator::get(service); - auto slaveDelay = replCoord->getSlaveDelaySecs(); - if (slaveDelay <= Seconds(0)) { - return {}; - } - auto fastClockSource = service->getFastClockSource(); - return fastClockSource->now() - slaveDelay; - } - - void run() { - Client::initThread("ReplBatcher"); - - BatchLimits batchLimits; - - while (true) { - rsSyncApplyStop.pauseWhileSet(); - - batchLimits.slaveDelayLatestTimestamp = _calculateSlaveDelayLatestTimestamp(); - - // Check the limits once per batch since users can change them at runtime. - batchLimits.ops = getBatchLimitOplogEntries(); - - OpQueue ops(batchLimits.ops); - { - auto opCtx = cc().makeOperationContext(); - - // This use of UninterruptibleLockGuard is intentional. It is undesirable to use an - // UninterruptibleLockGuard in client operations because stepdown requires the - // ability to interrupt client operations. However, it is acceptable to use an - // UninterruptibleLockGuard in batch application because the only cause of - // interruption would be shutdown, and the ReplBatcher thread has its own shutdown - // handling. - UninterruptibleLockGuard noInterrupt(opCtx->lockState()); - - // Locks the oplog to check its max size, do this in the UninterruptibleLockGuard. - batchLimits.bytes = getBatchLimitOplogBytes(opCtx.get(), _storageInterface); - - auto oplogEntries = - fassertNoTrace(31004, _getNextApplierBatchFn(opCtx.get(), batchLimits)); - for (const auto& oplogEntry : oplogEntries) { - ops.emplace_back(oplogEntry); - } - - // If we don't have anything in the queue, wait a bit for something to appear. - if (oplogEntries.empty()) { - if (_syncTail->inShutdown()) { - ops.setMustShutdownFlag(); - } else { - // Block up to 1 second. - _oplogBuffer->waitForData(Seconds(1)); - } - } - } - - if (ops.empty() && !ops.mustShutdown()) { - // Check whether we have drained the oplog buffer. The states checked here can be - // stale when it's used by the applier. signalDrainComplete() needs to check the - // applier is still draining in the same term to make sure these states have not - // changed. - auto replCoord = ReplicationCoordinator::get(cc().getServiceContext()); - // Check the term first to detect DRAINING -> RUNNING -> DRAINING when signaling - // drain complete. - // - // Batcher can delay arbitrarily. After stepup, if the batcher drained the buffer - // and blocks when it's about to notify the applier to signal drain complete, the - // node may step down and fetch new data into the buffer and then step up again. - // Now the batcher will resume and let the applier signal drain complete even if - // the buffer has new data. Checking the term before and after ensures nothing - // changed in between. - auto termWhenBufferIsEmpty = replCoord->getTerm(); - // Draining state guarantees the producer has already been fully stopped and no more - // operations will be pushed in to the oplog buffer until the applier state changes. - auto isDraining = - replCoord->getApplierState() == ReplicationCoordinator::ApplierState::Draining; - // Check the oplog buffer after the applier state to ensure the producer is stopped. - if (isDraining && _oplogBuffer->isEmpty()) { - ops.setTermWhenExhausted(termWhenBufferIsEmpty); - log() << "Oplog buffer has been drained in term " << termWhenBufferIsEmpty; - } else { - // Don't emit empty batches. - continue; - } - } - - stdx::unique_lock<Latch> lk(_mutex); - // Block until the previous batch has been taken. - _cv.wait(lk, [&] { return _ops.empty() && !_ops.termWhenExhausted(); }); - _ops = std::move(ops); - _cv.notify_all(); - if (_ops.mustShutdown()) { - _isDead = true; - return; - } - } - } - - SyncTail* const _syncTail; - StorageInterface* const _storageInterface; - OplogBuffer* const _oplogBuffer; - OplogApplier::GetNextApplierBatchFn const _getNextApplierBatchFn; - - Mutex _mutex = MONGO_MAKE_LATCH("OpQueueBatcher::_mutex"); // Guards _ops. - stdx::condition_variable _cv; - OpQueue _ops; - - // This only exists so the destructor invariants rather than deadlocking. - // TODO remove once we trust noexcept enough to mark oplogApplication() as noexcept. - bool _isDead = false; - - stdx::thread _thread; // Must be last so all other members are initialized before starting. -}; - -void SyncTail::runLoop(OplogBuffer* oplogBuffer, - OplogApplier::GetNextApplierBatchFn getNextApplierBatchFn, - ReplicationCoordinator* replCoord) { - // We don't start data replication for arbiters at all and it's not allowed to reconfig - // arbiterOnly field for any member. - invariant(!replCoord->getMemberState().arbiter()); - - OpQueueBatcher batcher(this, _storageInterface, oplogBuffer, getNextApplierBatchFn); - - std::unique_ptr<ApplyBatchFinalizer> finalizer{ - getGlobalServiceContext()->getStorageEngine()->isDurable() - ? new ApplyBatchFinalizerForJournal(replCoord) - : new ApplyBatchFinalizer(replCoord)}; - - while (true) { // Exits on message from OpQueueBatcher. - // Use a new operation context each iteration, as otherwise we may appear to use a single - // collection name to refer to collections with different UUIDs. - const ServiceContext::UniqueOperationContext opCtxPtr = cc().makeOperationContext(); - OperationContext& opCtx = *opCtxPtr; - - // This code path gets used during elections, so it should not be subject to Flow Control. - // It is safe to exclude this operation context from Flow Control here because this code - // path only gets used on secondaries or on a node transitioning to primary. - opCtx.setShouldParticipateInFlowControl(false); - - // For pausing replication in tests. - if (MONGO_unlikely(rsSyncApplyStop.shouldFail())) { - log() << "sync tail - rsSyncApplyStop fail point enabled. Blocking until fail point is " - "disabled."; - while (MONGO_unlikely(rsSyncApplyStop.shouldFail())) { - // Tests should not trigger clean shutdown while that failpoint is active. If we - // think we need this, we need to think hard about what the behavior should be. - if (inShutdown()) { - severe() << "Turn off rsSyncApplyStop before attempting clean shutdown"; - fassertFailedNoTrace(40304); - } - sleepmillis(10); - } - } - - // Transition to SECONDARY state, if possible. - replCoord->finishRecoveryIfEligible(&opCtx); - - // Blocks up to a second waiting for a batch to be ready to apply. If one doesn't become - // ready in time, we'll loop again so we can do the above checks periodically. - OpQueue ops = batcher.getNextBatch(Seconds(1)); - if (ops.empty()) { - if (ops.mustShutdown()) { - // Shut down and exit oplog application loop. - return; - } - if (MONGO_unlikely(rsSyncApplyStop.shouldFail())) { - continue; - } - if (ops.termWhenExhausted()) { - // Signal drain complete if we're in Draining state and the buffer is empty. - // Since we check the states of batcher and oplog buffer without synchronization, - // they can be stale. We make sure the applier is still draining in the given term - // before and after the check, so that if the oplog buffer was exhausted, then - // it still will be. - replCoord->signalDrainComplete(&opCtx, *ops.termWhenExhausted()); - } - continue; // Try again. - } - - // Extract some info from ops that we'll need after releasing the batch below. - const auto firstOpTimeInBatch = ops.front().getOpTime(); - const auto lastOpInBatch = ops.back(); - const auto lastOpTimeInBatch = lastOpInBatch.getOpTime(); - const auto lastWallTimeInBatch = lastOpInBatch.getWallClockTime(); - const auto lastAppliedOpTimeAtStartOfBatch = replCoord->getMyLastAppliedOpTime(); - - // Make sure the oplog doesn't go back in time or repeat an entry. - if (firstOpTimeInBatch <= lastAppliedOpTimeAtStartOfBatch) { - fassert(34361, - Status(ErrorCodes::OplogOutOfOrder, - str::stream() << "Attempted to apply an oplog entry (" - << firstOpTimeInBatch.toString() - << ") which is not greater than our last applied OpTime (" - << lastAppliedOpTimeAtStartOfBatch.toString() << ").")); - } - - // Don't allow the fsync+lock thread to see intermediate states of batch application. - stdx::lock_guard<SimpleMutex> fsynclk(filesLockedFsync); - - // Apply the operations in this batch. 'multiApply' returns the optime of the last op that - // was applied, which should be the last optime in the batch. - auto lastOpTimeAppliedInBatch = - fassertNoTrace(34437, multiApply(&opCtx, ops.releaseBatch())); - invariant(lastOpTimeAppliedInBatch == lastOpTimeInBatch); - - // Update various things that care about our last applied optime. Tests rely on 1 happening - // before 2 even though it isn't strictly necessary. - - // 1. Persist our "applied through" optime to disk. - _consistencyMarkers->setAppliedThrough(&opCtx, lastOpTimeInBatch); - - // 2. Ensure that the last applied op time hasn't changed since the start of this batch. - const auto lastAppliedOpTimeAtEndOfBatch = replCoord->getMyLastAppliedOpTime(); - invariant(lastAppliedOpTimeAtStartOfBatch == lastAppliedOpTimeAtEndOfBatch, - str::stream() << "the last known applied OpTime has changed from " - << lastAppliedOpTimeAtStartOfBatch.toString() << " to " - << lastAppliedOpTimeAtEndOfBatch.toString() - << " in the middle of batch application"); - - // 3. Update oplog visibility by notifying the storage engine of the new oplog entries. - const bool orderedCommit = true; - _storageInterface->oplogDiskLocRegister( - &opCtx, lastOpTimeInBatch.getTimestamp(), orderedCommit); - - // 4. Finalize this batch. We are at a consistent optime if our current optime is >= the - // current 'minValid' optime. In case we crash while applying a batch, multiApply advances - // minValid to the last opTime in the batch, so check minValid *after* calling multiApply. - const auto minValid = _consistencyMarkers->getMinValid(&opCtx); - auto consistency = (lastOpTimeInBatch >= minValid) - ? ReplicationCoordinator::DataConsistency::Consistent - : ReplicationCoordinator::DataConsistency::Inconsistent; - - // The finalizer advances the global timestamp to lastOpTimeInBatch. - finalizer->record({lastOpTimeInBatch, lastWallTimeInBatch}, consistency); - } -} - void SyncTail::shutdown() { stdx::lock_guard<Latch> lock(_mutex); _inShutdown = true; diff --git a/src/mongo/db/repl/sync_tail.h b/src/mongo/db/repl/sync_tail.h index 364df797cad..8fd2d9aef31 100644 --- a/src/mongo/db/repl/sync_tail.h +++ b/src/mongo/db/repl/sync_tail.h @@ -95,15 +95,6 @@ public: const OplogApplier::Options& getOptions() const; /** - * Runs oplog application in a loop until shutdown() is called. - * Retrieves operations from the OplogBuffer in batches that will be applied in parallel using - * multiApply(). - */ - void runLoop(OplogBuffer* oplogBuffer, - OplogApplier::GetNextApplierBatchFn getNextApplierBatchFn, - ReplicationCoordinator* replCoord); - - /** * Shuts down oplogApplication() processing. */ void shutdown(); @@ -114,85 +105,6 @@ public: bool inShutdown() const; - class OpQueue { - public: - explicit OpQueue(std::size_t batchLimitOps) : _bytes(0) { - _batch.reserve(batchLimitOps); - } - - size_t getBytes() const { - return _bytes; - } - size_t getCount() const { - return _batch.size(); - } - bool empty() const { - return _batch.empty(); - } - const OplogEntry& front() const { - invariant(!_batch.empty()); - return _batch.front(); - } - const OplogEntry& back() const { - invariant(!_batch.empty()); - return _batch.back(); - } - const std::vector<OplogEntry>& getBatch() const { - return _batch; - } - - void emplace_back(OplogEntry oplog) { - invariant(!_mustShutdown); - _bytes += oplog.getRawObjSizeBytes(); - _batch.emplace_back(std::move(oplog)); - } - void pop_back() { - _bytes -= back().getRawObjSizeBytes(); - _batch.pop_back(); - } - - /** - * A batch with this set indicates that the upstream stages of the pipeline are shutdown and - * no more batches will be coming. - * - * This can only happen with empty batches. - * - * TODO replace the empty object used to signal draining with this. - */ - bool mustShutdown() const { - return _mustShutdown; - } - void setMustShutdownFlag() { - invariant(empty()); - _mustShutdown = true; - } - - /** - * If the oplog buffer is exhausted, return the term before we learned that the buffer was - * empty. - */ - boost::optional<long long> termWhenExhausted() const { - return _termWhenExhausted; - } - void setTermWhenExhausted(long long term) { - invariant(empty()); - _termWhenExhausted = term; - } - - /** - * Leaves this object in an unspecified state. Only assignment and destruction are valid. - */ - std::vector<OplogEntry> releaseBatch() { - return std::move(_batch); - } - - private: - std::vector<OplogEntry> _batch; - size_t _bytes; - bool _mustShutdown = false; - boost::optional<long long> _termWhenExhausted; - }; - using BatchLimits = OplogApplier::BatchLimits; /** @@ -217,8 +129,6 @@ public: std::vector<MultiApplier::Operations>* derivedOps) noexcept; private: - class OpQueueBatcher; - OplogApplier::Observer* const _observer; ReplicationConsistencyMarkers* const _consistencyMarkers; StorageInterface* const _storageInterface; |