summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/mongo/db/repl/oplog_applier.h5
-rw-r--r--src/mongo/db/repl/oplog_applier_impl.cpp5
-rw-r--r--src/mongo/db/repl/sync_tail.cpp18
-rw-r--r--src/mongo/db/repl/sync_tail.h8
-rw-r--r--src/mongo/db/repl/sync_tail_test.cpp7
5 files changed, 31 insertions, 12 deletions
diff --git a/src/mongo/db/repl/oplog_applier.h b/src/mongo/db/repl/oplog_applier.h
index 9ee0f3d0b28..290043c8e80 100644
--- a/src/mongo/db/repl/oplog_applier.h
+++ b/src/mongo/db/repl/oplog_applier.h
@@ -40,6 +40,7 @@
#include "mongo/db/repl/storage_interface.h"
#include "mongo/executor/task_executor.h"
#include "mongo/util/concurrency/thread_pool.h"
+#include "mongo/util/functional.h"
#include "mongo/util/future.h"
#include "mongo/util/net/hostandport.h"
@@ -97,6 +98,10 @@ public:
using Operations = std::vector<OplogEntry>;
+ // Used by SyncTail to access batching logic.
+ using GetNextApplierBatchFn = stdx::function<StatusWith<OplogApplier::Operations>(
+ OperationContext* opCtx, const BatchLimits& batchLimits)>;
+
/**
* Lower bound of batch limit size (in bytes) returned by calculateBatchLimitBytes().
*/
diff --git a/src/mongo/db/repl/oplog_applier_impl.cpp b/src/mongo/db/repl/oplog_applier_impl.cpp
index 2a865dff033..8c293a1bbba 100644
--- a/src/mongo/db/repl/oplog_applier_impl.cpp
+++ b/src/mongo/db/repl/oplog_applier_impl.cpp
@@ -51,7 +51,10 @@ OplogApplierImpl::OplogApplierImpl(executor::TaskExecutor* executor,
}
void OplogApplierImpl::_run(OplogBuffer* oplogBuffer) {
- _syncTail.oplogApplication(oplogBuffer, _replCoord);
+ auto getNextApplierBatchFn = [this](OperationContext* opCtx, const BatchLimits& batchLimits) {
+ return getNextApplierBatch(opCtx, batchLimits);
+ };
+ _syncTail.oplogApplication(oplogBuffer, getNextApplierBatchFn, _replCoord);
}
void OplogApplierImpl::_shutdown() {
diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp
index 08e942a956e..ea74fe806cb 100644
--- a/src/mongo/db/repl/sync_tail.cpp
+++ b/src/mongo/db/repl/sync_tail.cpp
@@ -576,10 +576,14 @@ class SyncTail::OpQueueBatcher {
OpQueueBatcher& operator=(const OpQueueBatcher&) = delete;
public:
- OpQueueBatcher(SyncTail* syncTail, StorageInterface* storageInterface, OplogBuffer* oplogBuffer)
+ OpQueueBatcher(SyncTail* syncTail,
+ StorageInterface* storageInterface,
+ OplogBuffer* oplogBuffer,
+ OplogApplier::GetNextApplierBatchFn getNextApplierBatchFn)
: _syncTail(syncTail),
_storageInterface(storageInterface),
_oplogBuffer(oplogBuffer),
+ _getNextApplierBatchFn(getNextApplierBatchFn),
_ops(0),
_thread([this] { run(); }) {}
~OpQueueBatcher() {
@@ -668,6 +672,7 @@ private:
SyncTail* const _syncTail;
StorageInterface* const _storageInterface;
OplogBuffer* const _oplogBuffer;
+ OplogApplier::GetNextApplierBatchFn const _getNextApplierBatchFn;
stdx::mutex _mutex; // Guards _ops.
stdx::condition_variable _cv;
@@ -680,18 +685,19 @@ private:
stdx::thread _thread; // Must be last so all other members are initialized before starting.
};
-void SyncTail::oplogApplication(OplogBuffer* oplogBuffer, ReplicationCoordinator* replCoord) {
+void SyncTail::oplogApplication(OplogBuffer* oplogBuffer,
+ OplogApplier::GetNextApplierBatchFn getNextApplierBatchFn,
+ ReplicationCoordinator* replCoord) {
// We don't start data replication for arbiters at all and it's not allowed to reconfig
// arbiterOnly field for any member.
invariant(!replCoord->getMemberState().arbiter());
- OpQueueBatcher batcher(this, _storageInterface, oplogBuffer);
+ OpQueueBatcher batcher(this, _storageInterface, oplogBuffer, getNextApplierBatchFn);
- _oplogApplication(oplogBuffer, replCoord, &batcher);
+ _oplogApplication(replCoord, &batcher);
}
-void SyncTail::_oplogApplication(OplogBuffer* oplogBuffer,
- ReplicationCoordinator* replCoord,
+void SyncTail::_oplogApplication(ReplicationCoordinator* replCoord,
OpQueueBatcher* batcher) noexcept {
std::unique_ptr<ApplyBatchFinalizer> finalizer{
getGlobalServiceContext()->getStorageEngine()->isDurable()
diff --git a/src/mongo/db/repl/sync_tail.h b/src/mongo/db/repl/sync_tail.h
index 686cfd1f305..971702a2c6f 100644
--- a/src/mongo/db/repl/sync_tail.h
+++ b/src/mongo/db/repl/sync_tail.h
@@ -114,7 +114,9 @@ public:
* Retrieves operations from the OplogBuffer in batches that will be applied in parallel using
* multiApply().
*/
- void oplogApplication(OplogBuffer* oplogBuffer, ReplicationCoordinator* replCoord);
+ void oplogApplication(OplogBuffer* oplogBuffer,
+ OplogApplier::GetNextApplierBatchFn getNextApplierBatchFn,
+ ReplicationCoordinator* replCoord);
/**
* Shuts down oplogApplication() processing.
@@ -249,9 +251,7 @@ private:
class OpQueueBatcher;
- void _oplogApplication(OplogBuffer* oplogBuffer,
- ReplicationCoordinator* replCoord,
- OpQueueBatcher* batcher) noexcept;
+ void _oplogApplication(ReplicationCoordinator* replCoord, OpQueueBatcher* batcher) noexcept;
void _fillWriterVectors(OperationContext* opCtx,
MultiApplier::Operations* ops,
diff --git a/src/mongo/db/repl/sync_tail_test.cpp b/src/mongo/db/repl/sync_tail_test.cpp
index f7b11feda49..aae7b2932e1 100644
--- a/src/mongo/db/repl/sync_tail_test.cpp
+++ b/src/mongo/db/repl/sync_tail_test.cpp
@@ -1657,7 +1657,12 @@ DEATH_TEST_F(SyncTailTest,
// SyncTail::oplogApplication() creates its own OperationContext in the current thread context.
_opCtx = {};
- syncTail.oplogApplication(oplogBuffer.get(), &replCoord);
+ auto getNextApplierBatchFn =
+ [](OperationContext* opCtx,
+ const OplogApplier::BatchLimits& batchLimits) -> StatusWith<OplogApplier::Operations> {
+ return OplogApplier::Operations();
+ };
+ syncTail.oplogApplication(oplogBuffer.get(), getNextApplierBatchFn, &replCoord);
}
TEST_F(IdempotencyTest, Geo2dsphereIndexFailedOnUpdate) {