diff options
author | A. Jesse Jiryu Davis <jesse@mongodb.com> | 2019-11-22 01:47:10 +0000 |
---|---|---|
committer | evergreen <evergreen@mongodb.com> | 2019-11-22 01:47:10 +0000 |
commit | 355937f48d5c505d606ac1c211fb6427179d8a5b (patch) | |
tree | 728f34e6b2978dae7e3da2c667d49794ce5d8ff5 /src/mongo/db/repl | |
parent | 48c3f738be846ddb7ad5309ef834f851eed0fd1a (diff) | |
download | mongo-355937f48d5c505d606ac1c211fb6427179d8a5b.tar.gz |
SERVER-43000 Rename OpQueueBatcher to OplogBatcher
Diffstat (limited to 'src/mongo/db/repl')
-rw-r--r-- | src/mongo/db/repl/SConscript | 2 | ||||
-rw-r--r-- | src/mongo/db/repl/initial_syncer.h | 2 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_applier.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_applier.h | 10 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_applier_impl.cpp | 8 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_batcher.cpp (renamed from src/mongo/db/repl/opqueue_batcher.cpp) | 22 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_batcher.h (renamed from src/mongo/db/repl/opqueue_batcher.h) | 16 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_external_state_impl.h | 2 |
8 files changed, 33 insertions, 33 deletions
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index d37cc8fe578..c1a85567db0 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -506,7 +506,7 @@ env.Library( target='oplog_application_interface', source=[ 'oplog_applier.cpp', - 'opqueue_batcher.cpp', + 'oplog_batcher.cpp', ], LIBDEPS=[ '$BUILD_DIR/mongo/base', diff --git a/src/mongo/db/repl/initial_syncer.h b/src/mongo/db/repl/initial_syncer.h index 6d3b1293b8d..cd1b6ae5d37 100644 --- a/src/mongo/db/repl/initial_syncer.h +++ b/src/mongo/db/repl/initial_syncer.h @@ -95,7 +95,7 @@ struct InitialSyncerOptions { // InitialSyncer waits this long before retrying getApplierBatchCallback() if there are // currently no operations available to apply or if the 'rsSyncApplyStop' failpoint is active. - // This default value is based on the duration in OpQueueBatcher::run(). + // This default value is based on the duration in OplogBatcher::run(). Milliseconds getApplierBatchCallbackRetryWait{1000}; // Replication settings diff --git a/src/mongo/db/repl/oplog_applier.cpp b/src/mongo/db/repl/oplog_applier.cpp index 8872664b0c1..f505933bbdc 100644 --- a/src/mongo/db/repl/oplog_applier.cpp +++ b/src/mongo/db/repl/oplog_applier.cpp @@ -51,7 +51,7 @@ OplogApplier::OplogApplier(executor::TaskExecutor* executor, Observer* observer, const Options& options) : _executor(executor), _oplogBuffer(oplogBuffer), _observer(observer), _options(options) { - _opQueueBatcher = std::make_unique<OpQueueBatcher>(this, oplogBuffer); + _oplogBatcher = std::make_unique<OplogBatcher>(this, oplogBuffer); } OplogBuffer* OplogApplier::getBuffer() const { @@ -125,7 +125,7 @@ StatusWith<OpTime> OplogApplier::applyOplogBatch(OperationContext* opCtx, StatusWith<std::vector<OplogEntry>> OplogApplier::getNextApplierBatch( OperationContext* opCtx, const BatchLimits& batchLimits) { - return _opQueueBatcher->getNextApplierBatch(opCtx, batchLimits); + return _oplogBatcher->getNextApplierBatch(opCtx, batchLimits); } const OplogApplier::Options& OplogApplier::getOptions() const { diff --git a/src/mongo/db/repl/oplog_applier.h b/src/mongo/db/repl/oplog_applier.h index 301757d2f4b..42c85b5091c 100644 --- a/src/mongo/db/repl/oplog_applier.h +++ b/src/mongo/db/repl/oplog_applier.h @@ -36,9 +36,9 @@ #include "mongo/base/status.h" #include "mongo/base/status_with.h" #include "mongo/db/repl/oplog.h" +#include "mongo/db/repl/oplog_batcher.h" #include "mongo/db/repl/oplog_buffer.h" #include "mongo/db/repl/oplog_entry.h" -#include "mongo/db/repl/opqueue_batcher.h" #include "mongo/db/repl/storage_interface.h" #include "mongo/executor/task_executor.h" #include "mongo/platform/mutex.h" @@ -85,11 +85,11 @@ public: class Observer; /** - * OpQueueBatcher is an implementation detail that should be abstracted from all levels above + * OplogBatcher is an implementation detail that should be abstracted from all levels above * the OplogApplier. Parts of the system that need to modify BatchLimits can do so through the * OplogApplier. */ - using BatchLimits = OpQueueBatcher::BatchLimits; + using BatchLimits = OplogBatcher::BatchLimits; /** * Constructs this OplogApplier with specific options. @@ -159,7 +159,7 @@ public: StatusWith<OpTime> applyOplogBatch(OperationContext* opCtx, std::vector<OplogEntry> ops); /** - * Calls the OpQueueBatcher's getNextApplierBatch. + * Calls the OplogBatcher's getNextApplierBatch. */ StatusWith<std::vector<OplogEntry>> getNextApplierBatch(OperationContext* opCtx, const BatchLimits& batchLimits); @@ -202,7 +202,7 @@ private: protected: // Handles consuming oplog entries from the OplogBuffer for oplog application. - std::unique_ptr<OpQueueBatcher> _opQueueBatcher; + std::unique_ptr<OplogBatcher> _oplogBatcher; }; /** diff --git a/src/mongo/db/repl/oplog_applier_impl.cpp b/src/mongo/db/repl/oplog_applier_impl.cpp index f1c12d1e68a..f5716c2a49b 100644 --- a/src/mongo/db/repl/oplog_applier_impl.cpp +++ b/src/mongo/db/repl/oplog_applier_impl.cpp @@ -371,9 +371,9 @@ OplogApplierImpl::OplogApplierImpl(executor::TaskExecutor* executor, void OplogApplierImpl::_run(OplogBuffer* oplogBuffer) { // Start up a thread from the batcher to pull from the oplog buffer into the batcher's oplog // batch. - _opQueueBatcher->startup(_storageInterface); + _oplogBatcher->startup(_storageInterface); - ON_BLOCK_EXIT([this] { _opQueueBatcher->shutdown(); }); + ON_BLOCK_EXIT([this] { _oplogBatcher->shutdown(); }); // We don't start data replication for arbiters at all and it's not allowed to reconfig // arbiterOnly field for any member. @@ -384,7 +384,7 @@ void OplogApplierImpl::_run(OplogBuffer* oplogBuffer) { ? new ApplyBatchFinalizerForJournal(_replCoord) : new ApplyBatchFinalizer(_replCoord)}; - while (true) { // Exits on message from OpQueueBatcher. + while (true) { // Exits on message from OplogBatcher. // Use a new operation context each iteration, as otherwise we may appear to use a single // collection name to refer to collections with different UUIDs. const ServiceContext::UniqueOperationContext opCtxPtr = cc().makeOperationContext(); @@ -407,7 +407,7 @@ void OplogApplierImpl::_run(OplogBuffer* oplogBuffer) { // Blocks up to a second waiting for a batch to be ready to apply. If one doesn't become // ready in time, we'll loop again so we can do the above checks periodically. - OplogBatch ops = _opQueueBatcher->getNextBatch(Seconds(1)); + OplogBatch ops = _oplogBatcher->getNextBatch(Seconds(1)); if (ops.empty()) { if (ops.mustShutdown()) { // Shut down and exit oplog application loop. diff --git a/src/mongo/db/repl/opqueue_batcher.cpp b/src/mongo/db/repl/oplog_batcher.cpp index e2023ef3235..9bf942d4e32 100644 --- a/src/mongo/db/repl/opqueue_batcher.cpp +++ b/src/mongo/db/repl/oplog_batcher.cpp @@ -29,7 +29,7 @@ #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication -#include "mongo/db/repl/opqueue_batcher.h" +#include "mongo/db/repl/oplog_batcher.h" #include "mongo/db/commands/txn_cmds_gen.h" #include "mongo/db/repl/oplog_applier.h" @@ -39,13 +39,13 @@ namespace mongo { namespace repl { -OpQueueBatcher::OpQueueBatcher(OplogApplier* oplogApplier, OplogBuffer* oplogBuffer) +OplogBatcher::OplogBatcher(OplogApplier* oplogApplier, OplogBuffer* oplogBuffer) : _oplogApplier(oplogApplier), _oplogBuffer(oplogBuffer), _ops(0) {} -OpQueueBatcher::~OpQueueBatcher() { +OplogBatcher::~OplogBatcher() { invariant(!_thread); } -OplogBatch OpQueueBatcher::getNextBatch(Seconds maxWaitTime) { +OplogBatch OplogBatcher::getNextBatch(Seconds maxWaitTime) { stdx::unique_lock<Latch> lk(_mutex); // _ops can indicate the following cases: // 1. A new batch is ready to consume. @@ -68,11 +68,11 @@ OplogBatch OpQueueBatcher::getNextBatch(Seconds maxWaitTime) { return ops; } -void OpQueueBatcher::startup(StorageInterface* storageInterface) { +void OplogBatcher::startup(StorageInterface* storageInterface) { _thread = std::make_unique<stdx::thread>([this, storageInterface] { _run(storageInterface); }); } -void OpQueueBatcher::shutdown() { +void OplogBatcher::shutdown() { if (_thread) { _thread->join(); _thread.reset(); @@ -158,7 +158,7 @@ std::size_t getOpCount(const OplogEntry& entry) { } } // namespace -StatusWith<std::vector<OplogEntry>> OpQueueBatcher::getNextApplierBatch( +StatusWith<std::vector<OplogEntry>> OplogBatcher::getNextApplierBatch( OperationContext* opCtx, const BatchLimits& batchLimits) { if (batchLimits.ops == 0) { return Status(ErrorCodes::InvalidOptions, "Batch size must be greater than 0."); @@ -232,7 +232,7 @@ StatusWith<std::vector<OplogEntry>> OpQueueBatcher::getNextApplierBatch( * If slaveDelay is enabled, this function calculates the most recent timestamp of any oplog * entries that can be be returned in a batch. */ -boost::optional<Date_t> OpQueueBatcher::_calculateSlaveDelayLatestTimestamp() { +boost::optional<Date_t> OplogBatcher::_calculateSlaveDelayLatestTimestamp() { auto service = cc().getServiceContext(); auto replCoord = ReplicationCoordinator::get(service); auto slaveDelay = replCoord->getSlaveDelaySecs(); @@ -243,7 +243,7 @@ boost::optional<Date_t> OpQueueBatcher::_calculateSlaveDelayLatestTimestamp() { return fastClockSource->now() - slaveDelay; } -void OpQueueBatcher::_consume(OperationContext* opCtx, OplogBuffer* oplogBuffer) { +void OplogBatcher::_consume(OperationContext* opCtx, OplogBuffer* oplogBuffer) { // This is just to get the op off the buffer; it's been peeked at and queued for application // already. // If we failed to get an op off the buffer, this means that shutdown() was called between the @@ -254,7 +254,7 @@ void OpQueueBatcher::_consume(OperationContext* opCtx, OplogBuffer* oplogBuffer) invariant(oplogBuffer->tryPop(opCtx, &opToPopAndDiscard) || _oplogApplier->inShutdown()); } -void OpQueueBatcher::_run(StorageInterface* storageInterface) { +void OplogBatcher::_run(StorageInterface* storageInterface) { Client::initThread("ReplBatcher"); BatchLimits batchLimits; @@ -300,7 +300,7 @@ void OpQueueBatcher::_run(StorageInterface* storageInterface) { } } - // The applier may be in its 'Draining' state. Determines if the OpQueueBatcher has finished + // The applier may be in its 'Draining' state. Determines if the OplogBatcher has finished // draining the OplogBuffer and should notify the OplogApplier to signal draining is // complete. if (ops.empty() && !ops.mustShutdown()) { diff --git a/src/mongo/db/repl/opqueue_batcher.h b/src/mongo/db/repl/oplog_batcher.h index 1c86754c44e..092655911e5 100644 --- a/src/mongo/db/repl/opqueue_batcher.h +++ b/src/mongo/db/repl/oplog_batcher.h @@ -117,9 +117,9 @@ private: * Consumes batches of oplog entries from the OplogBuffer to give to the oplog applier, freeing * up space for more operations to be fetched from a sync source and allocated onto the OplogBuffer. */ -class OpQueueBatcher { - OpQueueBatcher(const OpQueueBatcher&) = delete; - OpQueueBatcher& operator=(const OpQueueBatcher&) = delete; +class OplogBatcher { + OplogBatcher(const OplogBatcher&) = delete; + OplogBatcher& operator=(const OplogBatcher&) = delete; public: /** @@ -142,11 +142,11 @@ public: }; /** - * Constructs an OpQueueBatcher + * Constructs an OplogBatcher */ - OpQueueBatcher(OplogApplier* oplogApplier, OplogBuffer* oplogBuffer); + OplogBatcher(OplogApplier* oplogApplier, OplogBuffer* oplogBuffer); - virtual ~OpQueueBatcher(); + virtual ~OplogBatcher(); /** * Returns the batch of oplog entries and clears _ops so the batcher can store a new batch. @@ -154,7 +154,7 @@ public: OplogBatch getNextBatch(Seconds maxWaitTime); /** - * Starts up a thread to continuously pull from the OplogBuffer into the OpQueueBatcher's oplog + * Starts up a thread to continuously pull from the OplogBuffer into the OplogBatcher's oplog * batch. */ void startup(StorageInterface* storageInterface); @@ -192,7 +192,7 @@ private: OplogApplier* _oplogApplier; OplogBuffer* const _oplogBuffer; - Mutex _mutex = MONGO_MAKE_LATCH("OpQueueBatcher::_mutex"); + Mutex _mutex = MONGO_MAKE_LATCH("OplogBatcher::_mutex"); stdx::condition_variable _cv; /** diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.h b/src/mongo/db/repl/replication_coordinator_external_state_impl.h index 9b6ad4e8510..4ebbeaa9eb6 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.h +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.h @@ -170,7 +170,7 @@ private: // The OplogBuffer is used to hold operations read from the sync source. During oplog // application, Backgrounds Sync adds operations to the OplogBuffer while the applier's - // OpQueueBatcher consumes these operations from the buffer in batches. + // OplogBatcher consumes these operations from the buffer in batches. std::unique_ptr<OplogBuffer> _oplogBuffer; // The BackgroundSync class is responsible for pulling ops off the network from the sync source |