summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMihai Andrei <mihai.andrei@mongodb.com>2019-09-23 18:51:11 +0000
committerevergreen <evergreen@mongodb.com>2019-09-23 18:51:11 +0000
commita9b6247ed5d02bd546255b3c8974f7eefd1a0a17 (patch)
tree8c9d418827db090f53714609822e385607cdb43a
parentebb647b0b494b65df4cec1cd7b41f87a4b8f16f7 (diff)
downloadmongo-a9b6247ed5d02bd546255b3c8974f7eefd1a0a17.tar.gz
SERVER-43327 Move runLoop from SyncTail to OplogApplierImpl and move OpQueueBatcher to its own file
-rw-r--r--src/mongo/db/repl/SConscript3
-rw-r--r--src/mongo/db/repl/oplog_applier_impl.cpp237
-rw-r--r--src/mongo/db/repl/oplog_applier_impl.h17
-rw-r--r--src/mongo/db/repl/opqueue_batcher.cpp181
-rw-r--r--src/mongo/db/repl/opqueue_batcher.h163
-rw-r--r--src/mongo/db/repl/sync_tail.cpp389
-rw-r--r--src/mongo/db/repl/sync_tail.h90
7 files changed, 599 insertions, 481 deletions
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript
index e683788d1f9..cd06cba18bc 100644
--- a/src/mongo/db/repl/SConscript
+++ b/src/mongo/db/repl/SConscript
@@ -523,8 +523,9 @@ env.Library(
source=[
'insert_group.cpp',
'oplog_applier_impl.cpp',
+ 'opqueue_batcher.cpp',
'session_update_tracker.cpp',
- 'sync_tail.cpp',
+ 'sync_tail.cpp'
],
LIBDEPS=[
'$BUILD_DIR/mongo/db/auth/authorization_manager_global',
diff --git a/src/mongo/db/repl/oplog_applier_impl.cpp b/src/mongo/db/repl/oplog_applier_impl.cpp
index 860505e3e7c..1c094f6d077 100644
--- a/src/mongo/db/repl/oplog_applier_impl.cpp
+++ b/src/mongo/db/repl/oplog_applier_impl.cpp
@@ -27,13 +27,124 @@
* it in the license file.
*/
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication
+
#include "mongo/platform/basic.h"
+#include "mongo/util/log.h"
#include "mongo/db/repl/oplog_applier_impl.h"
namespace mongo {
namespace repl {
+
+namespace {
+
+class ApplyBatchFinalizer {
+public:
+ ApplyBatchFinalizer(ReplicationCoordinator* replCoord) : _replCoord(replCoord) {}
+ virtual ~ApplyBatchFinalizer(){};
+
+ virtual void record(const OpTimeAndWallTime& newOpTimeAndWallTime,
+ ReplicationCoordinator::DataConsistency consistency) {
+ _recordApplied(newOpTimeAndWallTime, consistency);
+ };
+
+protected:
+ void _recordApplied(const OpTimeAndWallTime& newOpTimeAndWallTime,
+ ReplicationCoordinator::DataConsistency consistency) {
+ // We have to use setMyLastAppliedOpTimeAndWallTimeForward since this thread races with
+ // ReplicationExternalStateImpl::onTransitionToPrimary.
+ _replCoord->setMyLastAppliedOpTimeAndWallTimeForward(newOpTimeAndWallTime, consistency);
+ }
+
+ void _recordDurable(const OpTimeAndWallTime& newOpTimeAndWallTime) {
+ // We have to use setMyLastDurableOpTimeAndWallTimeFoward since this thread races with
+ // ReplicationExternalStateImpl::onTransitionToPrimary.
+ _replCoord->setMyLastDurableOpTimeAndWallTimeForward(newOpTimeAndWallTime);
+ }
+
+private:
+ // Used to update the replication system's progress.
+ ReplicationCoordinator* _replCoord;
+};
+
+class ApplyBatchFinalizerForJournal : public ApplyBatchFinalizer {
+public:
+ ApplyBatchFinalizerForJournal(ReplicationCoordinator* replCoord)
+ : ApplyBatchFinalizer(replCoord),
+ _waiterThread{&ApplyBatchFinalizerForJournal::_run, this} {};
+ ~ApplyBatchFinalizerForJournal();
+
+ void record(const OpTimeAndWallTime& newOpTimeAndWallTime,
+ ReplicationCoordinator::DataConsistency consistency) override;
+
+private:
+ /**
+ * Loops continuously, waiting for writes to be flushed to disk and then calls
+ * ReplicationCoordinator::setMyLastOptime with _latestOpTime.
+ * Terminates once _shutdownSignaled is set true.
+ */
+ void _run();
+
+ // Protects _cond, _shutdownSignaled, and _latestOpTime.
+ Mutex _mutex = MONGO_MAKE_LATCH("OplogApplierImpl::_mutex");
+ // Used to alert our thread of a new OpTime.
+ stdx::condition_variable _cond;
+ // The next OpTime to set as the ReplicationCoordinator's lastOpTime after flushing.
+ OpTimeAndWallTime _latestOpTimeAndWallTime;
+ // Once this is set to true the _run method will terminate.
+ bool _shutdownSignaled = false;
+ // Thread that will _run(). Must be initialized last as it depends on the other variables.
+ stdx::thread _waiterThread;
+};
+
+ApplyBatchFinalizerForJournal::~ApplyBatchFinalizerForJournal() {
+ stdx::unique_lock<Latch> lock(_mutex);
+ _shutdownSignaled = true;
+ _cond.notify_all();
+ lock.unlock();
+
+ _waiterThread.join();
+}
+
+void ApplyBatchFinalizerForJournal::record(const OpTimeAndWallTime& newOpTimeAndWallTime,
+ ReplicationCoordinator::DataConsistency consistency) {
+ _recordApplied(newOpTimeAndWallTime, consistency);
+
+ stdx::unique_lock<Latch> lock(_mutex);
+ _latestOpTimeAndWallTime = newOpTimeAndWallTime;
+ _cond.notify_all();
+}
+
+void ApplyBatchFinalizerForJournal::_run() {
+ Client::initThread("ApplyBatchFinalizerForJournal");
+
+ while (true) {
+ OpTimeAndWallTime latestOpTimeAndWallTime = {OpTime(), Date_t()};
+
+ {
+ stdx::unique_lock<Latch> lock(_mutex);
+ while (_latestOpTimeAndWallTime.opTime.isNull() && !_shutdownSignaled) {
+ _cond.wait(lock);
+ }
+
+ if (_shutdownSignaled) {
+ return;
+ }
+
+ latestOpTimeAndWallTime = _latestOpTimeAndWallTime;
+ _latestOpTimeAndWallTime = {OpTime(), Date_t()};
+ }
+
+ auto opCtx = cc().makeOperationContext();
+ opCtx->recoveryUnit()->waitUntilDurable(opCtx.get());
+ _recordDurable(latestOpTimeAndWallTime);
+ }
+}
+
+} // namespace
+
OplogApplierImpl::OplogApplierImpl(executor::TaskExecutor* executor,
OplogBuffer* oplogBuffer,
Observer* observer,
@@ -44,6 +155,8 @@ OplogApplierImpl::OplogApplierImpl(executor::TaskExecutor* executor,
ThreadPool* writerPool)
: OplogApplier(executor, oplogBuffer, observer, options),
_replCoord(replCoord),
+ _storageInterface(storageInterface),
+ _consistencyMarkers(consistencyMarkers),
_syncTail(
observer, consistencyMarkers, storageInterface, multiSyncApply, writerPool, options),
_beginApplyingOpTime(options.beginApplyingOpTime) {}
@@ -52,13 +165,135 @@ void OplogApplierImpl::_run(OplogBuffer* oplogBuffer) {
auto getNextApplierBatchFn = [this](OperationContext* opCtx, const BatchLimits& batchLimits) {
return getNextApplierBatch(opCtx, batchLimits);
};
- _syncTail.runLoop(oplogBuffer, getNextApplierBatchFn, _replCoord);
+ _runLoop(oplogBuffer, getNextApplierBatchFn);
}
void OplogApplierImpl::_shutdown() {
_syncTail.shutdown();
}
+void OplogApplierImpl::_runLoop(OplogBuffer* oplogBuffer,
+ OplogApplier::GetNextApplierBatchFn getNextApplierBatchFn) {
+ // We don't start data replication for arbiters at all and it's not allowed to reconfig
+ // arbiterOnly field for any member.
+ invariant(!_replCoord->getMemberState().arbiter());
+
+ OpQueueBatcher batcher(&_syncTail, _storageInterface, oplogBuffer, getNextApplierBatchFn);
+
+ std::unique_ptr<ApplyBatchFinalizer> finalizer{
+ getGlobalServiceContext()->getStorageEngine()->isDurable()
+ ? new ApplyBatchFinalizerForJournal(_replCoord)
+ : new ApplyBatchFinalizer(_replCoord)};
+
+ while (true) { // Exits on message from OpQueueBatcher.
+ // Use a new operation context each iteration, as otherwise we may appear to use a single
+ // collection name to refer to collections with different UUIDs.
+ const ServiceContext::UniqueOperationContext opCtxPtr = cc().makeOperationContext();
+ OperationContext& opCtx = *opCtxPtr;
+
+ // This code path gets used during elections, so it should not be subject to Flow Control.
+ // It is safe to exclude this operation context from Flow Control here because this code
+ // path only gets used on secondaries or on a node transitioning to primary.
+ opCtx.setShouldParticipateInFlowControl(false);
+
+ // For pausing replication in tests.
+ if (MONGO_unlikely(rsSyncApplyStop.shouldFail())) {
+ log() << "Oplog Applier - rsSyncApplyStop fail point enabled. Blocking until fail "
+ "point is disabled.";
+ while (MONGO_unlikely(rsSyncApplyStop.shouldFail())) {
+ // Tests should not trigger clean shutdown while that failpoint is active. If we
+ // think we need this, we need to think hard about what the behavior should be.
+ if (_syncTail.inShutdown()) {
+ severe() << "Turn off rsSyncApplyStop before attempting clean shutdown";
+ fassertFailedNoTrace(40304);
+ }
+ sleepmillis(10);
+ }
+ }
+
+ // Transition to SECONDARY state, if possible.
+ _replCoord->finishRecoveryIfEligible(&opCtx);
+
+ // Blocks up to a second waiting for a batch to be ready to apply. If one doesn't become
+ // ready in time, we'll loop again so we can do the above checks periodically.
+ OpQueue ops = batcher.getNextBatch(Seconds(1));
+ if (ops.empty()) {
+ if (ops.mustShutdown()) {
+ // Shut down and exit oplog application loop.
+ return;
+ }
+ if (MONGO_unlikely(rsSyncApplyStop.shouldFail())) {
+ continue;
+ }
+ if (ops.termWhenExhausted()) {
+ // Signal drain complete if we're in Draining state and the buffer is empty.
+ // Since we check the states of batcher and oplog buffer without synchronization,
+ // they can be stale. We make sure the applier is still draining in the given term
+ // before and after the check, so that if the oplog buffer was exhausted, then
+ // it still will be.
+ _replCoord->signalDrainComplete(&opCtx, *ops.termWhenExhausted());
+ }
+ continue; // Try again.
+ }
+
+ // Extract some info from ops that we'll need after releasing the batch below.
+ const auto firstOpTimeInBatch = ops.front().getOpTime();
+ const auto lastOpInBatch = ops.back();
+ const auto lastOpTimeInBatch = lastOpInBatch.getOpTime();
+ const auto lastWallTimeInBatch = lastOpInBatch.getWallClockTime();
+ const auto lastAppliedOpTimeAtStartOfBatch = _replCoord->getMyLastAppliedOpTime();
+
+ // Make sure the oplog doesn't go back in time or repeat an entry.
+ if (firstOpTimeInBatch <= lastAppliedOpTimeAtStartOfBatch) {
+ fassert(34361,
+ Status(ErrorCodes::OplogOutOfOrder,
+ str::stream() << "Attempted to apply an oplog entry ("
+ << firstOpTimeInBatch.toString()
+ << ") which is not greater than our last applied OpTime ("
+ << lastAppliedOpTimeAtStartOfBatch.toString() << ")."));
+ }
+
+ // Don't allow the fsync+lock thread to see intermediate states of batch application.
+ stdx::lock_guard<SimpleMutex> fsynclk(filesLockedFsync);
+
+ // Apply the operations in this batch. 'multiApply' returns the optime of the last op that
+ // was applied, which should be the last optime in the batch.
+ auto lastOpTimeAppliedInBatch =
+ fassertNoTrace(34437, _syncTail.multiApply(&opCtx, ops.releaseBatch()));
+ invariant(lastOpTimeAppliedInBatch == lastOpTimeInBatch);
+
+ // Update various things that care about our last applied optime. Tests rely on 1 happening
+ // before 2 even though it isn't strictly necessary.
+
+ // 1. Persist our "applied through" optime to disk.
+ _consistencyMarkers->setAppliedThrough(&opCtx, lastOpTimeInBatch);
+
+ // 2. Ensure that the last applied op time hasn't changed since the start of this batch.
+ const auto lastAppliedOpTimeAtEndOfBatch = _replCoord->getMyLastAppliedOpTime();
+ invariant(lastAppliedOpTimeAtStartOfBatch == lastAppliedOpTimeAtEndOfBatch,
+ str::stream() << "the last known applied OpTime has changed from "
+ << lastAppliedOpTimeAtStartOfBatch.toString() << " to "
+ << lastAppliedOpTimeAtEndOfBatch.toString()
+ << " in the middle of batch application");
+
+ // 3. Update oplog visibility by notifying the storage engine of the new oplog entries.
+ const bool orderedCommit = true;
+ _storageInterface->oplogDiskLocRegister(
+ &opCtx, lastOpTimeInBatch.getTimestamp(), orderedCommit);
+
+ // 4. Finalize this batch. We are at a consistent optime if our current optime is >= the
+ // current 'minValid' optime. Note that recording the lastOpTime in the finalizer includes
+ // advancing the global timestamp to at least its timestamp.
+ const auto minValid = _consistencyMarkers->getMinValid(&opCtx);
+ auto consistency = (lastOpTimeInBatch >= minValid)
+ ? ReplicationCoordinator::DataConsistency::Consistent
+ : ReplicationCoordinator::DataConsistency::Inconsistent;
+
+ // The finalizer advances the global timestamp to lastOpTimeInBatch.
+ finalizer->record({lastOpTimeInBatch, lastWallTimeInBatch}, consistency);
+ }
+}
+
StatusWith<OpTime> OplogApplierImpl::_multiApply(OperationContext* opCtx, Operations ops) {
return _syncTail.multiApply(opCtx, std::move(ops));
}
diff --git a/src/mongo/db/repl/oplog_applier_impl.h b/src/mongo/db/repl/oplog_applier_impl.h
index d6e0e523220..087e5870e56 100644
--- a/src/mongo/db/repl/oplog_applier_impl.h
+++ b/src/mongo/db/repl/oplog_applier_impl.h
@@ -30,7 +30,12 @@
#pragma once
+#include "mongo/db/commands/fsync.h"
+#include "mongo/db/commands/server_status_metric.h"
+#include "mongo/db/concurrency/replication_state_transition_lock_guard.h"
+#include "mongo/db/repl/initial_syncer.h"
#include "mongo/db/repl/oplog_applier.h"
+#include "mongo/db/repl/opqueue_batcher.h"
#include "mongo/db/repl/replication_consistency_markers.h"
#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/db/repl/storage_interface.h"
@@ -67,11 +72,23 @@ private:
void _shutdown() override;
+ /**
+ * Runs oplog application in a loop until shutdown() is called.
+ * Retrieves operations from the OplogBuffer in batches that will be applied in parallel using
+ * multiApply().
+ */
+ void _runLoop(OplogBuffer* oplogBuffer,
+ OplogApplier::GetNextApplierBatchFn getNextApplierBatchFn);
+
StatusWith<OpTime> _multiApply(OperationContext* opCtx, Operations ops) override;
// Not owned by us.
ReplicationCoordinator* const _replCoord;
+ StorageInterface* _storageInterface;
+
+ ReplicationConsistencyMarkers* const _consistencyMarkers;
+
// Used to run oplog application loop.
SyncTail _syncTail;
diff --git a/src/mongo/db/repl/opqueue_batcher.cpp b/src/mongo/db/repl/opqueue_batcher.cpp
new file mode 100644
index 00000000000..8c3802c1e3f
--- /dev/null
+++ b/src/mongo/db/repl/opqueue_batcher.cpp
@@ -0,0 +1,181 @@
+/**
+ * Copyright (C) 2019-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication
+
+#include "mongo/db/repl/opqueue_batcher.h"
+#include "mongo/util/log.h"
+
+namespace mongo {
+namespace repl {
+
+
+OpQueueBatcher::OpQueueBatcher(SyncTail* syncTail,
+ StorageInterface* storageInterface,
+ OplogBuffer* oplogBuffer,
+ OplogApplier::GetNextApplierBatchFn getNextApplierBatchFn)
+ : _syncTail(syncTail),
+ _storageInterface(storageInterface),
+ _oplogBuffer(oplogBuffer),
+ _getNextApplierBatchFn(getNextApplierBatchFn),
+ _ops(0),
+ _thread([this] { run(); }) {}
+OpQueueBatcher::~OpQueueBatcher() {
+ invariant(_isDead);
+ _thread.join();
+}
+
+
+OpQueue OpQueueBatcher::getNextBatch(Seconds maxWaitTime) {
+ stdx::unique_lock<Latch> lk(_mutex);
+ // _ops can indicate the following cases:
+ // 1. A new batch is ready to consume.
+ // 2. Shutdown.
+ // 3. The batch has (or had) exhausted the buffer in draining mode.
+ // 4. Empty batch since the batch has/had exhausted the buffer but not in draining mode,
+ // so there could be new oplog entries coming.
+ // 5. Empty batch since the batcher is still running.
+ //
+ // In case (4) and (5), we wait for up to "maxWaitTime".
+ if (_ops.empty() && !_ops.mustShutdown() && !_ops.termWhenExhausted()) {
+ // We intentionally don't care about whether this returns due to signaling or timeout
+ // since we do the same thing either way: return whatever is in _ops.
+ (void)_cv.wait_for(lk, maxWaitTime.toSystemDuration());
+ }
+
+ OpQueue ops = std::move(_ops);
+ _ops = OpQueue(0);
+ _cv.notify_all();
+ return ops;
+}
+
+/**
+ * If slaveDelay is enabled, this function calculates the most recent timestamp of any oplog
+ * entries that can be be returned in a batch.
+ */
+boost::optional<Date_t> OpQueueBatcher::_calculateSlaveDelayLatestTimestamp() {
+ auto service = cc().getServiceContext();
+ auto replCoord = ReplicationCoordinator::get(service);
+ auto slaveDelay = replCoord->getSlaveDelaySecs();
+ if (slaveDelay <= Seconds(0)) {
+ return {};
+ }
+ auto fastClockSource = service->getFastClockSource();
+ return fastClockSource->now() - slaveDelay;
+}
+
+void OpQueueBatcher::run() {
+ Client::initThread("ReplBatcher");
+
+ OplogApplier::BatchLimits batchLimits;
+
+ while (true) {
+ rsSyncApplyStop.pauseWhileSet();
+
+ batchLimits.slaveDelayLatestTimestamp = _calculateSlaveDelayLatestTimestamp();
+
+ // Check the limits once per batch since users can change them at runtime.
+ batchLimits.ops = getBatchLimitOplogEntries();
+
+ OpQueue ops(batchLimits.ops);
+ {
+ auto opCtx = cc().makeOperationContext();
+
+ // This use of UninterruptibleLockGuard is intentional. It is undesirable to use an
+ // UninterruptibleLockGuard in client operations because stepdown requires the
+ // ability to interrupt client operations. However, it is acceptable to use an
+ // UninterruptibleLockGuard in batch application because the only cause of
+ // interruption would be shutdown, and the ReplBatcher thread has its own shutdown
+ // handling.
+ UninterruptibleLockGuard noInterrupt(opCtx->lockState());
+
+ // Locks the oplog to check its max size, do this in the UninterruptibleLockGuard.
+ batchLimits.bytes = getBatchLimitOplogBytes(opCtx.get(), _storageInterface);
+
+ auto oplogEntries =
+ fassertNoTrace(31004, _getNextApplierBatchFn(opCtx.get(), batchLimits));
+ for (const auto& oplogEntry : oplogEntries) {
+ ops.emplace_back(oplogEntry);
+ }
+
+ // If we don't have anything in the queue, wait a bit for something to appear.
+ if (oplogEntries.empty()) {
+ if (_syncTail->inShutdown()) {
+ ops.setMustShutdownFlag();
+ } else {
+ // Block up to 1 second.
+ _oplogBuffer->waitForData(Seconds(1));
+ }
+ }
+ }
+
+ if (ops.empty() && !ops.mustShutdown()) {
+ // Check whether we have drained the oplog buffer. The states checked here can be
+ // stale when it's used by the applier. signalDrainComplete() needs to check the
+ // applier is still draining in the same term to make sure these states have not
+ // changed.
+ auto replCoord = ReplicationCoordinator::get(cc().getServiceContext());
+ // Check the term first to detect DRAINING -> RUNNING -> DRAINING when signaling
+ // drain complete.
+ //
+ // Batcher can delay arbitrarily. After stepup, if the batcher drained the buffer
+ // and blocks when it's about to notify the applier to signal drain complete, the
+ // node may step down and fetch new data into the buffer and then step up again.
+ // Now the batcher will resume and let the applier signal drain complete even if
+ // the buffer has new data. Checking the term before and after ensures nothing
+ // changed in between.
+ auto termWhenBufferIsEmpty = replCoord->getTerm();
+ // Draining state guarantees the producer has already been fully stopped and no more
+ // operations will be pushed in to the oplog buffer until the applier state changes.
+ auto isDraining =
+ replCoord->getApplierState() == ReplicationCoordinator::ApplierState::Draining;
+ // Check the oplog buffer after the applier state to ensure the producer is stopped.
+ if (isDraining && _oplogBuffer->isEmpty()) {
+ ops.setTermWhenExhausted(termWhenBufferIsEmpty);
+ log() << "Oplog buffer has been drained in term " << termWhenBufferIsEmpty;
+ } else {
+ // Don't emit empty batches.
+ continue;
+ }
+ }
+
+ stdx::unique_lock<Latch> lk(_mutex);
+ // Block until the previous batch has been taken.
+ _cv.wait(lk, [&] { return _ops.empty() && !_ops.termWhenExhausted(); });
+ _ops = std::move(ops);
+ _cv.notify_all();
+ if (_ops.mustShutdown()) {
+ _isDead = true;
+ return;
+ }
+ }
+}
+
+} // namespace repl
+} // namespace mongo
diff --git a/src/mongo/db/repl/opqueue_batcher.h b/src/mongo/db/repl/opqueue_batcher.h
new file mode 100644
index 00000000000..0cdad053ddd
--- /dev/null
+++ b/src/mongo/db/repl/opqueue_batcher.h
@@ -0,0 +1,163 @@
+/**
+ * Copyright (C) 2019-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/repl/initial_syncer.h"
+#include "mongo/db/repl/sync_tail.h"
+
+namespace mongo {
+namespace repl {
+
+class OpQueue {
+public:
+ explicit OpQueue(std::size_t batchLimitOps) : _bytes(0) {
+ _batch.reserve(batchLimitOps);
+ }
+
+ size_t getBytes() const {
+ return _bytes;
+ }
+ size_t getCount() const {
+ return _batch.size();
+ }
+ bool empty() const {
+ return _batch.empty();
+ }
+ const OplogEntry& front() const {
+ invariant(!_batch.empty());
+ return _batch.front();
+ }
+ const OplogEntry& back() const {
+ invariant(!_batch.empty());
+ return _batch.back();
+ }
+ const std::vector<OplogEntry>& getBatch() const {
+ return _batch;
+ }
+
+ void emplace_back(OplogEntry oplog) {
+ invariant(!_mustShutdown);
+ _bytes += oplog.getRawObjSizeBytes();
+ _batch.emplace_back(std::move(oplog));
+ }
+ void pop_back() {
+ _bytes -= back().getRawObjSizeBytes();
+ _batch.pop_back();
+ }
+
+ /**
+ * A batch with this set indicates that the upstream stages of the pipeline are shutdown and
+ * no more batches will be coming.
+ *
+ * This can only happen with empty batches.
+ *
+ * TODO replace the empty object used to signal draining with this.
+ */
+ bool mustShutdown() const {
+ return _mustShutdown;
+ }
+ void setMustShutdownFlag() {
+ invariant(empty());
+ _mustShutdown = true;
+ }
+
+ /**
+ * If the oplog buffer is exhausted, return the term before we learned that the buffer was
+ * empty.
+ */
+ boost::optional<long long> termWhenExhausted() const {
+ return _termWhenExhausted;
+ }
+ void setTermWhenExhausted(long long term) {
+ invariant(empty());
+ _termWhenExhausted = term;
+ }
+
+ /**
+ * Leaves this object in an unspecified state. Only assignment and destruction are valid.
+ */
+ std::vector<OplogEntry> releaseBatch() {
+ return std::move(_batch);
+ }
+
+private:
+ std::vector<OplogEntry> _batch;
+ size_t _bytes;
+ bool _mustShutdown = false;
+ boost::optional<long long> _termWhenExhausted;
+};
+
+class OpQueueBatcher {
+ OpQueueBatcher(const OpQueueBatcher&) = delete;
+ OpQueueBatcher& operator=(const OpQueueBatcher&) = delete;
+
+public:
+ /**
+ * Constructs an OpQueueBatcher
+ */
+ OpQueueBatcher(SyncTail* syncTail,
+ StorageInterface* storageInterface,
+ OplogBuffer* oplogBuffer,
+ OplogApplier::GetNextApplierBatchFn getNextApplierBatchFn);
+
+ virtual ~OpQueueBatcher();
+
+ /**
+ * Retrieves the next batch of ops that are ready to apply.
+ */
+ OpQueue getNextBatch(Seconds maxWaitTime);
+
+private:
+ /**
+ * If slaveDelay is enabled, this function calculates the most recent timestamp of any oplog
+ * entries that can be be returned in a batch.
+ */
+ boost::optional<Date_t> _calculateSlaveDelayLatestTimestamp();
+
+ void run();
+
+ SyncTail* const _syncTail;
+ StorageInterface* const _storageInterface;
+ OplogBuffer* const _oplogBuffer;
+ OplogApplier::GetNextApplierBatchFn const _getNextApplierBatchFn;
+
+ Mutex _mutex = MONGO_MAKE_LATCH("OpQueueBatcher::_mutex");
+ stdx::condition_variable _cv;
+ OpQueue _ops;
+
+ // This only exists so the destructor invariants rather than deadlocking.
+ bool _isDead = false;
+
+ stdx::thread _thread; // Must be last so all other members are initialized before starting.
+};
+
+
+} // namespace repl
+} // namespace mongo
diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp
index a8c66775369..18b3d7e0495 100644
--- a/src/mongo/db/repl/sync_tail.cpp
+++ b/src/mongo/db/repl/sync_tail.cpp
@@ -47,7 +47,6 @@
#include "mongo/db/catalog/document_validation.h"
#include "mongo/db/catalog_raii.h"
#include "mongo/db/client.h"
-#include "mongo/db/commands/fsync.h"
#include "mongo/db/commands/server_status_metric.h"
#include "mongo/db/commands/txn_cmds_gen.h"
#include "mongo/db/concurrency/d_concurrency.h"
@@ -103,109 +102,6 @@ ServerStatusMetricField<Counter64> displayOplogApplicationBatchSize("repl.apply.
TimerStats applyBatchStats;
ServerStatusMetricField<TimerStats> displayOpBatchesApplied("repl.apply.batches", &applyBatchStats);
-class ApplyBatchFinalizer {
-public:
- ApplyBatchFinalizer(ReplicationCoordinator* replCoord) : _replCoord(replCoord) {}
- virtual ~ApplyBatchFinalizer(){};
-
- virtual void record(const OpTimeAndWallTime& newOpTimeAndWallTime,
- ReplicationCoordinator::DataConsistency consistency) {
- _recordApplied(newOpTimeAndWallTime, consistency);
- };
-
-protected:
- void _recordApplied(const OpTimeAndWallTime& newOpTimeAndWallTime,
- ReplicationCoordinator::DataConsistency consistency) {
- // We have to use setMyLastAppliedOpTimeAndWallTimeForward since this thread races with
- // ReplicationExternalStateImpl::onTransitionToPrimary.
- _replCoord->setMyLastAppliedOpTimeAndWallTimeForward(newOpTimeAndWallTime, consistency);
- }
-
- void _recordDurable(const OpTimeAndWallTime& newOpTimeAndWallTime) {
- // We have to use setMyLastDurableOpTimeForward since this thread races with
- // ReplicationExternalStateImpl::onTransitionToPrimary.
- _replCoord->setMyLastDurableOpTimeAndWallTimeForward(newOpTimeAndWallTime);
- }
-
-private:
- // Used to update the replication system's progress.
- ReplicationCoordinator* _replCoord;
-};
-
-class ApplyBatchFinalizerForJournal : public ApplyBatchFinalizer {
-public:
- ApplyBatchFinalizerForJournal(ReplicationCoordinator* replCoord)
- : ApplyBatchFinalizer(replCoord),
- _waiterThread{&ApplyBatchFinalizerForJournal::_run, this} {};
- ~ApplyBatchFinalizerForJournal();
-
- void record(const OpTimeAndWallTime& newOpTimeAndWallTime,
- ReplicationCoordinator::DataConsistency consistency) override;
-
-private:
- /**
- * Loops continuously, waiting for writes to be flushed to disk and then calls
- * ReplicationCoordinator::setMyLastOptime with _latestOpTime.
- * Terminates once _shutdownSignaled is set true.
- */
- void _run();
-
- // Protects _cond, _shutdownSignaled, and _latestOpTime.
- Mutex _mutex = MONGO_MAKE_LATCH("ApplyBatchFinalizerForJournal::_mutex");
- // Used to alert our thread of a new OpTime.
- stdx::condition_variable _cond;
- // The next OpTime to set as the ReplicationCoordinator's lastOpTime after flushing.
- OpTimeAndWallTime _latestOpTimeAndWallTime;
- // Once this is set to true the _run method will terminate.
- bool _shutdownSignaled = false;
- // Thread that will _run(). Must be initialized last as it depends on the other variables.
- stdx::thread _waiterThread;
-};
-
-ApplyBatchFinalizerForJournal::~ApplyBatchFinalizerForJournal() {
- stdx::unique_lock<Latch> lock(_mutex);
- _shutdownSignaled = true;
- _cond.notify_all();
- lock.unlock();
-
- _waiterThread.join();
-}
-
-void ApplyBatchFinalizerForJournal::record(const OpTimeAndWallTime& newOpTimeAndWallTime,
- ReplicationCoordinator::DataConsistency consistency) {
- _recordApplied(newOpTimeAndWallTime, consistency);
-
- stdx::unique_lock<Latch> lock(_mutex);
- _latestOpTimeAndWallTime = newOpTimeAndWallTime;
- _cond.notify_all();
-}
-
-void ApplyBatchFinalizerForJournal::_run() {
- Client::initThread("ApplyBatchFinalizerForJournal");
-
- while (true) {
- OpTimeAndWallTime latestOpTimeAndWallTime = {OpTime(), Date_t()};
-
- {
- stdx::unique_lock<Latch> lock(_mutex);
- while (_latestOpTimeAndWallTime.opTime.isNull() && !_shutdownSignaled) {
- _cond.wait(lock);
- }
-
- if (_shutdownSignaled) {
- return;
- }
-
- latestOpTimeAndWallTime = _latestOpTimeAndWallTime;
- _latestOpTimeAndWallTime = {OpTime(), Date_t()};
- }
-
- auto opCtx = cc().makeOperationContext();
- opCtx->recoveryUnit()->waitUntilDurable(opCtx.get());
- _recordDurable(latestOpTimeAndWallTime);
- }
-}
-
NamespaceString parseUUIDOrNs(OperationContext* opCtx, const OplogEntry& oplogEntry) {
auto optionalUuid = oplogEntry.getUuid();
if (!optionalUuid) {
@@ -470,291 +366,6 @@ void addDerivedOps(OperationContext* opCtx,
} // namespace
-class SyncTail::OpQueueBatcher {
- OpQueueBatcher(const OpQueueBatcher&) = delete;
- OpQueueBatcher& operator=(const OpQueueBatcher&) = delete;
-
-public:
- OpQueueBatcher(SyncTail* syncTail,
- StorageInterface* storageInterface,
- OplogBuffer* oplogBuffer,
- OplogApplier::GetNextApplierBatchFn getNextApplierBatchFn)
- : _syncTail(syncTail),
- _storageInterface(storageInterface),
- _oplogBuffer(oplogBuffer),
- _getNextApplierBatchFn(getNextApplierBatchFn),
- _ops(0),
- _thread([this] { run(); }) {}
- ~OpQueueBatcher() {
- invariant(_isDead);
- _thread.join();
- }
-
- OpQueue getNextBatch(Seconds maxWaitTime) {
- stdx::unique_lock<Latch> lk(_mutex);
- // _ops can indicate the following cases:
- // 1. A new batch is ready to consume.
- // 2. Shutdown.
- // 3. The batch has (or had) exhausted the buffer in draining mode.
- // 4. Empty batch since the batch has/had exhausted the buffer but not in draining mode,
- // so there could be new oplog entries coming.
- // 5. Empty batch since the batcher is still running.
- //
- // In case (4) and (5), we wait for up to "maxWaitTime".
- if (_ops.empty() && !_ops.mustShutdown() && !_ops.termWhenExhausted()) {
- // We intentionally don't care about whether this returns due to signaling or timeout
- // since we do the same thing either way: return whatever is in _ops.
- (void)_cv.wait_for(lk, maxWaitTime.toSystemDuration());
- }
-
- OpQueue ops = std::move(_ops);
- _ops = OpQueue(0);
- _cv.notify_all();
- return ops;
- }
-
-private:
- /**
- * If slaveDelay is enabled, this function calculates the most recent timestamp of any oplog
- * entries that can be be returned in a batch.
- */
- boost::optional<Date_t> _calculateSlaveDelayLatestTimestamp() {
- auto service = cc().getServiceContext();
- auto replCoord = ReplicationCoordinator::get(service);
- auto slaveDelay = replCoord->getSlaveDelaySecs();
- if (slaveDelay <= Seconds(0)) {
- return {};
- }
- auto fastClockSource = service->getFastClockSource();
- return fastClockSource->now() - slaveDelay;
- }
-
- void run() {
- Client::initThread("ReplBatcher");
-
- BatchLimits batchLimits;
-
- while (true) {
- rsSyncApplyStop.pauseWhileSet();
-
- batchLimits.slaveDelayLatestTimestamp = _calculateSlaveDelayLatestTimestamp();
-
- // Check the limits once per batch since users can change them at runtime.
- batchLimits.ops = getBatchLimitOplogEntries();
-
- OpQueue ops(batchLimits.ops);
- {
- auto opCtx = cc().makeOperationContext();
-
- // This use of UninterruptibleLockGuard is intentional. It is undesirable to use an
- // UninterruptibleLockGuard in client operations because stepdown requires the
- // ability to interrupt client operations. However, it is acceptable to use an
- // UninterruptibleLockGuard in batch application because the only cause of
- // interruption would be shutdown, and the ReplBatcher thread has its own shutdown
- // handling.
- UninterruptibleLockGuard noInterrupt(opCtx->lockState());
-
- // Locks the oplog to check its max size, do this in the UninterruptibleLockGuard.
- batchLimits.bytes = getBatchLimitOplogBytes(opCtx.get(), _storageInterface);
-
- auto oplogEntries =
- fassertNoTrace(31004, _getNextApplierBatchFn(opCtx.get(), batchLimits));
- for (const auto& oplogEntry : oplogEntries) {
- ops.emplace_back(oplogEntry);
- }
-
- // If we don't have anything in the queue, wait a bit for something to appear.
- if (oplogEntries.empty()) {
- if (_syncTail->inShutdown()) {
- ops.setMustShutdownFlag();
- } else {
- // Block up to 1 second.
- _oplogBuffer->waitForData(Seconds(1));
- }
- }
- }
-
- if (ops.empty() && !ops.mustShutdown()) {
- // Check whether we have drained the oplog buffer. The states checked here can be
- // stale when it's used by the applier. signalDrainComplete() needs to check the
- // applier is still draining in the same term to make sure these states have not
- // changed.
- auto replCoord = ReplicationCoordinator::get(cc().getServiceContext());
- // Check the term first to detect DRAINING -> RUNNING -> DRAINING when signaling
- // drain complete.
- //
- // Batcher can delay arbitrarily. After stepup, if the batcher drained the buffer
- // and blocks when it's about to notify the applier to signal drain complete, the
- // node may step down and fetch new data into the buffer and then step up again.
- // Now the batcher will resume and let the applier signal drain complete even if
- // the buffer has new data. Checking the term before and after ensures nothing
- // changed in between.
- auto termWhenBufferIsEmpty = replCoord->getTerm();
- // Draining state guarantees the producer has already been fully stopped and no more
- // operations will be pushed in to the oplog buffer until the applier state changes.
- auto isDraining =
- replCoord->getApplierState() == ReplicationCoordinator::ApplierState::Draining;
- // Check the oplog buffer after the applier state to ensure the producer is stopped.
- if (isDraining && _oplogBuffer->isEmpty()) {
- ops.setTermWhenExhausted(termWhenBufferIsEmpty);
- log() << "Oplog buffer has been drained in term " << termWhenBufferIsEmpty;
- } else {
- // Don't emit empty batches.
- continue;
- }
- }
-
- stdx::unique_lock<Latch> lk(_mutex);
- // Block until the previous batch has been taken.
- _cv.wait(lk, [&] { return _ops.empty() && !_ops.termWhenExhausted(); });
- _ops = std::move(ops);
- _cv.notify_all();
- if (_ops.mustShutdown()) {
- _isDead = true;
- return;
- }
- }
- }
-
- SyncTail* const _syncTail;
- StorageInterface* const _storageInterface;
- OplogBuffer* const _oplogBuffer;
- OplogApplier::GetNextApplierBatchFn const _getNextApplierBatchFn;
-
- Mutex _mutex = MONGO_MAKE_LATCH("OpQueueBatcher::_mutex"); // Guards _ops.
- stdx::condition_variable _cv;
- OpQueue _ops;
-
- // This only exists so the destructor invariants rather than deadlocking.
- // TODO remove once we trust noexcept enough to mark oplogApplication() as noexcept.
- bool _isDead = false;
-
- stdx::thread _thread; // Must be last so all other members are initialized before starting.
-};
-
-void SyncTail::runLoop(OplogBuffer* oplogBuffer,
- OplogApplier::GetNextApplierBatchFn getNextApplierBatchFn,
- ReplicationCoordinator* replCoord) {
- // We don't start data replication for arbiters at all and it's not allowed to reconfig
- // arbiterOnly field for any member.
- invariant(!replCoord->getMemberState().arbiter());
-
- OpQueueBatcher batcher(this, _storageInterface, oplogBuffer, getNextApplierBatchFn);
-
- std::unique_ptr<ApplyBatchFinalizer> finalizer{
- getGlobalServiceContext()->getStorageEngine()->isDurable()
- ? new ApplyBatchFinalizerForJournal(replCoord)
- : new ApplyBatchFinalizer(replCoord)};
-
- while (true) { // Exits on message from OpQueueBatcher.
- // Use a new operation context each iteration, as otherwise we may appear to use a single
- // collection name to refer to collections with different UUIDs.
- const ServiceContext::UniqueOperationContext opCtxPtr = cc().makeOperationContext();
- OperationContext& opCtx = *opCtxPtr;
-
- // This code path gets used during elections, so it should not be subject to Flow Control.
- // It is safe to exclude this operation context from Flow Control here because this code
- // path only gets used on secondaries or on a node transitioning to primary.
- opCtx.setShouldParticipateInFlowControl(false);
-
- // For pausing replication in tests.
- if (MONGO_unlikely(rsSyncApplyStop.shouldFail())) {
- log() << "sync tail - rsSyncApplyStop fail point enabled. Blocking until fail point is "
- "disabled.";
- while (MONGO_unlikely(rsSyncApplyStop.shouldFail())) {
- // Tests should not trigger clean shutdown while that failpoint is active. If we
- // think we need this, we need to think hard about what the behavior should be.
- if (inShutdown()) {
- severe() << "Turn off rsSyncApplyStop before attempting clean shutdown";
- fassertFailedNoTrace(40304);
- }
- sleepmillis(10);
- }
- }
-
- // Transition to SECONDARY state, if possible.
- replCoord->finishRecoveryIfEligible(&opCtx);
-
- // Blocks up to a second waiting for a batch to be ready to apply. If one doesn't become
- // ready in time, we'll loop again so we can do the above checks periodically.
- OpQueue ops = batcher.getNextBatch(Seconds(1));
- if (ops.empty()) {
- if (ops.mustShutdown()) {
- // Shut down and exit oplog application loop.
- return;
- }
- if (MONGO_unlikely(rsSyncApplyStop.shouldFail())) {
- continue;
- }
- if (ops.termWhenExhausted()) {
- // Signal drain complete if we're in Draining state and the buffer is empty.
- // Since we check the states of batcher and oplog buffer without synchronization,
- // they can be stale. We make sure the applier is still draining in the given term
- // before and after the check, so that if the oplog buffer was exhausted, then
- // it still will be.
- replCoord->signalDrainComplete(&opCtx, *ops.termWhenExhausted());
- }
- continue; // Try again.
- }
-
- // Extract some info from ops that we'll need after releasing the batch below.
- const auto firstOpTimeInBatch = ops.front().getOpTime();
- const auto lastOpInBatch = ops.back();
- const auto lastOpTimeInBatch = lastOpInBatch.getOpTime();
- const auto lastWallTimeInBatch = lastOpInBatch.getWallClockTime();
- const auto lastAppliedOpTimeAtStartOfBatch = replCoord->getMyLastAppliedOpTime();
-
- // Make sure the oplog doesn't go back in time or repeat an entry.
- if (firstOpTimeInBatch <= lastAppliedOpTimeAtStartOfBatch) {
- fassert(34361,
- Status(ErrorCodes::OplogOutOfOrder,
- str::stream() << "Attempted to apply an oplog entry ("
- << firstOpTimeInBatch.toString()
- << ") which is not greater than our last applied OpTime ("
- << lastAppliedOpTimeAtStartOfBatch.toString() << ")."));
- }
-
- // Don't allow the fsync+lock thread to see intermediate states of batch application.
- stdx::lock_guard<SimpleMutex> fsynclk(filesLockedFsync);
-
- // Apply the operations in this batch. 'multiApply' returns the optime of the last op that
- // was applied, which should be the last optime in the batch.
- auto lastOpTimeAppliedInBatch =
- fassertNoTrace(34437, multiApply(&opCtx, ops.releaseBatch()));
- invariant(lastOpTimeAppliedInBatch == lastOpTimeInBatch);
-
- // Update various things that care about our last applied optime. Tests rely on 1 happening
- // before 2 even though it isn't strictly necessary.
-
- // 1. Persist our "applied through" optime to disk.
- _consistencyMarkers->setAppliedThrough(&opCtx, lastOpTimeInBatch);
-
- // 2. Ensure that the last applied op time hasn't changed since the start of this batch.
- const auto lastAppliedOpTimeAtEndOfBatch = replCoord->getMyLastAppliedOpTime();
- invariant(lastAppliedOpTimeAtStartOfBatch == lastAppliedOpTimeAtEndOfBatch,
- str::stream() << "the last known applied OpTime has changed from "
- << lastAppliedOpTimeAtStartOfBatch.toString() << " to "
- << lastAppliedOpTimeAtEndOfBatch.toString()
- << " in the middle of batch application");
-
- // 3. Update oplog visibility by notifying the storage engine of the new oplog entries.
- const bool orderedCommit = true;
- _storageInterface->oplogDiskLocRegister(
- &opCtx, lastOpTimeInBatch.getTimestamp(), orderedCommit);
-
- // 4. Finalize this batch. We are at a consistent optime if our current optime is >= the
- // current 'minValid' optime. In case we crash while applying a batch, multiApply advances
- // minValid to the last opTime in the batch, so check minValid *after* calling multiApply.
- const auto minValid = _consistencyMarkers->getMinValid(&opCtx);
- auto consistency = (lastOpTimeInBatch >= minValid)
- ? ReplicationCoordinator::DataConsistency::Consistent
- : ReplicationCoordinator::DataConsistency::Inconsistent;
-
- // The finalizer advances the global timestamp to lastOpTimeInBatch.
- finalizer->record({lastOpTimeInBatch, lastWallTimeInBatch}, consistency);
- }
-}
-
void SyncTail::shutdown() {
stdx::lock_guard<Latch> lock(_mutex);
_inShutdown = true;
diff --git a/src/mongo/db/repl/sync_tail.h b/src/mongo/db/repl/sync_tail.h
index 364df797cad..8fd2d9aef31 100644
--- a/src/mongo/db/repl/sync_tail.h
+++ b/src/mongo/db/repl/sync_tail.h
@@ -95,15 +95,6 @@ public:
const OplogApplier::Options& getOptions() const;
/**
- * Runs oplog application in a loop until shutdown() is called.
- * Retrieves operations from the OplogBuffer in batches that will be applied in parallel using
- * multiApply().
- */
- void runLoop(OplogBuffer* oplogBuffer,
- OplogApplier::GetNextApplierBatchFn getNextApplierBatchFn,
- ReplicationCoordinator* replCoord);
-
- /**
* Shuts down oplogApplication() processing.
*/
void shutdown();
@@ -114,85 +105,6 @@ public:
bool inShutdown() const;
- class OpQueue {
- public:
- explicit OpQueue(std::size_t batchLimitOps) : _bytes(0) {
- _batch.reserve(batchLimitOps);
- }
-
- size_t getBytes() const {
- return _bytes;
- }
- size_t getCount() const {
- return _batch.size();
- }
- bool empty() const {
- return _batch.empty();
- }
- const OplogEntry& front() const {
- invariant(!_batch.empty());
- return _batch.front();
- }
- const OplogEntry& back() const {
- invariant(!_batch.empty());
- return _batch.back();
- }
- const std::vector<OplogEntry>& getBatch() const {
- return _batch;
- }
-
- void emplace_back(OplogEntry oplog) {
- invariant(!_mustShutdown);
- _bytes += oplog.getRawObjSizeBytes();
- _batch.emplace_back(std::move(oplog));
- }
- void pop_back() {
- _bytes -= back().getRawObjSizeBytes();
- _batch.pop_back();
- }
-
- /**
- * A batch with this set indicates that the upstream stages of the pipeline are shutdown and
- * no more batches will be coming.
- *
- * This can only happen with empty batches.
- *
- * TODO replace the empty object used to signal draining with this.
- */
- bool mustShutdown() const {
- return _mustShutdown;
- }
- void setMustShutdownFlag() {
- invariant(empty());
- _mustShutdown = true;
- }
-
- /**
- * If the oplog buffer is exhausted, return the term before we learned that the buffer was
- * empty.
- */
- boost::optional<long long> termWhenExhausted() const {
- return _termWhenExhausted;
- }
- void setTermWhenExhausted(long long term) {
- invariant(empty());
- _termWhenExhausted = term;
- }
-
- /**
- * Leaves this object in an unspecified state. Only assignment and destruction are valid.
- */
- std::vector<OplogEntry> releaseBatch() {
- return std::move(_batch);
- }
-
- private:
- std::vector<OplogEntry> _batch;
- size_t _bytes;
- bool _mustShutdown = false;
- boost::optional<long long> _termWhenExhausted;
- };
-
using BatchLimits = OplogApplier::BatchLimits;
/**
@@ -217,8 +129,6 @@ public:
std::vector<MultiApplier::Operations>* derivedOps) noexcept;
private:
- class OpQueueBatcher;
-
OplogApplier::Observer* const _observer;
ReplicationConsistencyMarkers* const _consistencyMarkers;
StorageInterface* const _storageInterface;