summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl
diff options
context:
space:
mode:
authorA. Jesse Jiryu Davis <jesse@mongodb.com>2019-11-22 01:47:10 +0000
committerevergreen <evergreen@mongodb.com>2019-11-22 01:47:10 +0000
commit355937f48d5c505d606ac1c211fb6427179d8a5b (patch)
tree728f34e6b2978dae7e3da2c667d49794ce5d8ff5 /src/mongo/db/repl
parent48c3f738be846ddb7ad5309ef834f851eed0fd1a (diff)
downloadmongo-355937f48d5c505d606ac1c211fb6427179d8a5b.tar.gz
SERVER-43000 Rename OpQueueBatcher to OplogBatcher
Diffstat (limited to 'src/mongo/db/repl')
-rw-r--r--src/mongo/db/repl/SConscript2
-rw-r--r--src/mongo/db/repl/initial_syncer.h2
-rw-r--r--src/mongo/db/repl/oplog_applier.cpp4
-rw-r--r--src/mongo/db/repl/oplog_applier.h10
-rw-r--r--src/mongo/db/repl/oplog_applier_impl.cpp8
-rw-r--r--src/mongo/db/repl/oplog_batcher.cpp (renamed from src/mongo/db/repl/opqueue_batcher.cpp)22
-rw-r--r--src/mongo/db/repl/oplog_batcher.h (renamed from src/mongo/db/repl/opqueue_batcher.h)16
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.h2
8 files changed, 33 insertions, 33 deletions
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript
index d37cc8fe578..c1a85567db0 100644
--- a/src/mongo/db/repl/SConscript
+++ b/src/mongo/db/repl/SConscript
@@ -506,7 +506,7 @@ env.Library(
target='oplog_application_interface',
source=[
'oplog_applier.cpp',
- 'opqueue_batcher.cpp',
+ 'oplog_batcher.cpp',
],
LIBDEPS=[
'$BUILD_DIR/mongo/base',
diff --git a/src/mongo/db/repl/initial_syncer.h b/src/mongo/db/repl/initial_syncer.h
index 6d3b1293b8d..cd1b6ae5d37 100644
--- a/src/mongo/db/repl/initial_syncer.h
+++ b/src/mongo/db/repl/initial_syncer.h
@@ -95,7 +95,7 @@ struct InitialSyncerOptions {
// InitialSyncer waits this long before retrying getApplierBatchCallback() if there are
// currently no operations available to apply or if the 'rsSyncApplyStop' failpoint is active.
- // This default value is based on the duration in OpQueueBatcher::run().
+ // This default value is based on the duration in OplogBatcher::run().
Milliseconds getApplierBatchCallbackRetryWait{1000};
// Replication settings
diff --git a/src/mongo/db/repl/oplog_applier.cpp b/src/mongo/db/repl/oplog_applier.cpp
index 8872664b0c1..f505933bbdc 100644
--- a/src/mongo/db/repl/oplog_applier.cpp
+++ b/src/mongo/db/repl/oplog_applier.cpp
@@ -51,7 +51,7 @@ OplogApplier::OplogApplier(executor::TaskExecutor* executor,
Observer* observer,
const Options& options)
: _executor(executor), _oplogBuffer(oplogBuffer), _observer(observer), _options(options) {
- _opQueueBatcher = std::make_unique<OpQueueBatcher>(this, oplogBuffer);
+ _oplogBatcher = std::make_unique<OplogBatcher>(this, oplogBuffer);
}
OplogBuffer* OplogApplier::getBuffer() const {
@@ -125,7 +125,7 @@ StatusWith<OpTime> OplogApplier::applyOplogBatch(OperationContext* opCtx,
StatusWith<std::vector<OplogEntry>> OplogApplier::getNextApplierBatch(
OperationContext* opCtx, const BatchLimits& batchLimits) {
- return _opQueueBatcher->getNextApplierBatch(opCtx, batchLimits);
+ return _oplogBatcher->getNextApplierBatch(opCtx, batchLimits);
}
const OplogApplier::Options& OplogApplier::getOptions() const {
diff --git a/src/mongo/db/repl/oplog_applier.h b/src/mongo/db/repl/oplog_applier.h
index 301757d2f4b..42c85b5091c 100644
--- a/src/mongo/db/repl/oplog_applier.h
+++ b/src/mongo/db/repl/oplog_applier.h
@@ -36,9 +36,9 @@
#include "mongo/base/status.h"
#include "mongo/base/status_with.h"
#include "mongo/db/repl/oplog.h"
+#include "mongo/db/repl/oplog_batcher.h"
#include "mongo/db/repl/oplog_buffer.h"
#include "mongo/db/repl/oplog_entry.h"
-#include "mongo/db/repl/opqueue_batcher.h"
#include "mongo/db/repl/storage_interface.h"
#include "mongo/executor/task_executor.h"
#include "mongo/platform/mutex.h"
@@ -85,11 +85,11 @@ public:
class Observer;
/**
- * OpQueueBatcher is an implementation detail that should be abstracted from all levels above
+ * OplogBatcher is an implementation detail that should be abstracted from all levels above
* the OplogApplier. Parts of the system that need to modify BatchLimits can do so through the
* OplogApplier.
*/
- using BatchLimits = OpQueueBatcher::BatchLimits;
+ using BatchLimits = OplogBatcher::BatchLimits;
/**
* Constructs this OplogApplier with specific options.
@@ -159,7 +159,7 @@ public:
StatusWith<OpTime> applyOplogBatch(OperationContext* opCtx, std::vector<OplogEntry> ops);
/**
- * Calls the OpQueueBatcher's getNextApplierBatch.
+ * Calls the OplogBatcher's getNextApplierBatch.
*/
StatusWith<std::vector<OplogEntry>> getNextApplierBatch(OperationContext* opCtx,
const BatchLimits& batchLimits);
@@ -202,7 +202,7 @@ private:
protected:
// Handles consuming oplog entries from the OplogBuffer for oplog application.
- std::unique_ptr<OpQueueBatcher> _opQueueBatcher;
+ std::unique_ptr<OplogBatcher> _oplogBatcher;
};
/**
diff --git a/src/mongo/db/repl/oplog_applier_impl.cpp b/src/mongo/db/repl/oplog_applier_impl.cpp
index f1c12d1e68a..f5716c2a49b 100644
--- a/src/mongo/db/repl/oplog_applier_impl.cpp
+++ b/src/mongo/db/repl/oplog_applier_impl.cpp
@@ -371,9 +371,9 @@ OplogApplierImpl::OplogApplierImpl(executor::TaskExecutor* executor,
void OplogApplierImpl::_run(OplogBuffer* oplogBuffer) {
// Start up a thread from the batcher to pull from the oplog buffer into the batcher's oplog
// batch.
- _opQueueBatcher->startup(_storageInterface);
+ _oplogBatcher->startup(_storageInterface);
- ON_BLOCK_EXIT([this] { _opQueueBatcher->shutdown(); });
+ ON_BLOCK_EXIT([this] { _oplogBatcher->shutdown(); });
// We don't start data replication for arbiters at all and it's not allowed to reconfig
// arbiterOnly field for any member.
@@ -384,7 +384,7 @@ void OplogApplierImpl::_run(OplogBuffer* oplogBuffer) {
? new ApplyBatchFinalizerForJournal(_replCoord)
: new ApplyBatchFinalizer(_replCoord)};
- while (true) { // Exits on message from OpQueueBatcher.
+ while (true) { // Exits on message from OplogBatcher.
// Use a new operation context each iteration, as otherwise we may appear to use a single
// collection name to refer to collections with different UUIDs.
const ServiceContext::UniqueOperationContext opCtxPtr = cc().makeOperationContext();
@@ -407,7 +407,7 @@ void OplogApplierImpl::_run(OplogBuffer* oplogBuffer) {
// Blocks up to a second waiting for a batch to be ready to apply. If one doesn't become
// ready in time, we'll loop again so we can do the above checks periodically.
- OplogBatch ops = _opQueueBatcher->getNextBatch(Seconds(1));
+ OplogBatch ops = _oplogBatcher->getNextBatch(Seconds(1));
if (ops.empty()) {
if (ops.mustShutdown()) {
// Shut down and exit oplog application loop.
diff --git a/src/mongo/db/repl/opqueue_batcher.cpp b/src/mongo/db/repl/oplog_batcher.cpp
index e2023ef3235..9bf942d4e32 100644
--- a/src/mongo/db/repl/opqueue_batcher.cpp
+++ b/src/mongo/db/repl/oplog_batcher.cpp
@@ -29,7 +29,7 @@
#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication
-#include "mongo/db/repl/opqueue_batcher.h"
+#include "mongo/db/repl/oplog_batcher.h"
#include "mongo/db/commands/txn_cmds_gen.h"
#include "mongo/db/repl/oplog_applier.h"
@@ -39,13 +39,13 @@
namespace mongo {
namespace repl {
-OpQueueBatcher::OpQueueBatcher(OplogApplier* oplogApplier, OplogBuffer* oplogBuffer)
+OplogBatcher::OplogBatcher(OplogApplier* oplogApplier, OplogBuffer* oplogBuffer)
: _oplogApplier(oplogApplier), _oplogBuffer(oplogBuffer), _ops(0) {}
-OpQueueBatcher::~OpQueueBatcher() {
+OplogBatcher::~OplogBatcher() {
invariant(!_thread);
}
-OplogBatch OpQueueBatcher::getNextBatch(Seconds maxWaitTime) {
+OplogBatch OplogBatcher::getNextBatch(Seconds maxWaitTime) {
stdx::unique_lock<Latch> lk(_mutex);
// _ops can indicate the following cases:
// 1. A new batch is ready to consume.
@@ -68,11 +68,11 @@ OplogBatch OpQueueBatcher::getNextBatch(Seconds maxWaitTime) {
return ops;
}
-void OpQueueBatcher::startup(StorageInterface* storageInterface) {
+void OplogBatcher::startup(StorageInterface* storageInterface) {
_thread = std::make_unique<stdx::thread>([this, storageInterface] { _run(storageInterface); });
}
-void OpQueueBatcher::shutdown() {
+void OplogBatcher::shutdown() {
if (_thread) {
_thread->join();
_thread.reset();
@@ -158,7 +158,7 @@ std::size_t getOpCount(const OplogEntry& entry) {
}
} // namespace
-StatusWith<std::vector<OplogEntry>> OpQueueBatcher::getNextApplierBatch(
+StatusWith<std::vector<OplogEntry>> OplogBatcher::getNextApplierBatch(
OperationContext* opCtx, const BatchLimits& batchLimits) {
if (batchLimits.ops == 0) {
return Status(ErrorCodes::InvalidOptions, "Batch size must be greater than 0.");
@@ -232,7 +232,7 @@ StatusWith<std::vector<OplogEntry>> OpQueueBatcher::getNextApplierBatch(
* If slaveDelay is enabled, this function calculates the most recent timestamp of any oplog
* entries that can be be returned in a batch.
*/
-boost::optional<Date_t> OpQueueBatcher::_calculateSlaveDelayLatestTimestamp() {
+boost::optional<Date_t> OplogBatcher::_calculateSlaveDelayLatestTimestamp() {
auto service = cc().getServiceContext();
auto replCoord = ReplicationCoordinator::get(service);
auto slaveDelay = replCoord->getSlaveDelaySecs();
@@ -243,7 +243,7 @@ boost::optional<Date_t> OpQueueBatcher::_calculateSlaveDelayLatestTimestamp() {
return fastClockSource->now() - slaveDelay;
}
-void OpQueueBatcher::_consume(OperationContext* opCtx, OplogBuffer* oplogBuffer) {
+void OplogBatcher::_consume(OperationContext* opCtx, OplogBuffer* oplogBuffer) {
// This is just to get the op off the buffer; it's been peeked at and queued for application
// already.
// If we failed to get an op off the buffer, this means that shutdown() was called between the
@@ -254,7 +254,7 @@ void OpQueueBatcher::_consume(OperationContext* opCtx, OplogBuffer* oplogBuffer)
invariant(oplogBuffer->tryPop(opCtx, &opToPopAndDiscard) || _oplogApplier->inShutdown());
}
-void OpQueueBatcher::_run(StorageInterface* storageInterface) {
+void OplogBatcher::_run(StorageInterface* storageInterface) {
Client::initThread("ReplBatcher");
BatchLimits batchLimits;
@@ -300,7 +300,7 @@ void OpQueueBatcher::_run(StorageInterface* storageInterface) {
}
}
- // The applier may be in its 'Draining' state. Determines if the OpQueueBatcher has finished
+ // The applier may be in its 'Draining' state. Determines if the OplogBatcher has finished
// draining the OplogBuffer and should notify the OplogApplier to signal draining is
// complete.
if (ops.empty() && !ops.mustShutdown()) {
diff --git a/src/mongo/db/repl/opqueue_batcher.h b/src/mongo/db/repl/oplog_batcher.h
index 1c86754c44e..092655911e5 100644
--- a/src/mongo/db/repl/opqueue_batcher.h
+++ b/src/mongo/db/repl/oplog_batcher.h
@@ -117,9 +117,9 @@ private:
* Consumes batches of oplog entries from the OplogBuffer to give to the oplog applier, freeing
* up space for more operations to be fetched from a sync source and allocated onto the OplogBuffer.
*/
-class OpQueueBatcher {
- OpQueueBatcher(const OpQueueBatcher&) = delete;
- OpQueueBatcher& operator=(const OpQueueBatcher&) = delete;
+class OplogBatcher {
+ OplogBatcher(const OplogBatcher&) = delete;
+ OplogBatcher& operator=(const OplogBatcher&) = delete;
public:
/**
@@ -142,11 +142,11 @@ public:
};
/**
- * Constructs an OpQueueBatcher
+ * Constructs an OplogBatcher
*/
- OpQueueBatcher(OplogApplier* oplogApplier, OplogBuffer* oplogBuffer);
+ OplogBatcher(OplogApplier* oplogApplier, OplogBuffer* oplogBuffer);
- virtual ~OpQueueBatcher();
+ virtual ~OplogBatcher();
/**
* Returns the batch of oplog entries and clears _ops so the batcher can store a new batch.
@@ -154,7 +154,7 @@ public:
OplogBatch getNextBatch(Seconds maxWaitTime);
/**
- * Starts up a thread to continuously pull from the OplogBuffer into the OpQueueBatcher's oplog
+ * Starts up a thread to continuously pull from the OplogBuffer into the OplogBatcher's oplog
* batch.
*/
void startup(StorageInterface* storageInterface);
@@ -192,7 +192,7 @@ private:
OplogApplier* _oplogApplier;
OplogBuffer* const _oplogBuffer;
- Mutex _mutex = MONGO_MAKE_LATCH("OpQueueBatcher::_mutex");
+ Mutex _mutex = MONGO_MAKE_LATCH("OplogBatcher::_mutex");
stdx::condition_variable _cv;
/**
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.h b/src/mongo/db/repl/replication_coordinator_external_state_impl.h
index 9b6ad4e8510..4ebbeaa9eb6 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.h
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.h
@@ -170,7 +170,7 @@ private:
// The OplogBuffer is used to hold operations read from the sync source. During oplog
// application, Backgrounds Sync adds operations to the OplogBuffer while the applier's
- // OpQueueBatcher consumes these operations from the buffer in batches.
+ // OplogBatcher consumes these operations from the buffer in batches.
std::unique_ptr<OplogBuffer> _oplogBuffer;
// The BackgroundSync class is responsible for pulling ops off the network from the sync source