diff options
-rw-r--r-- | src/mongo/db/repl/oplog_applier.h | 5 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_applier_impl.cpp | 5 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_tail.cpp | 18 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_tail.h | 8 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_tail_test.cpp | 7 |
5 files changed, 31 insertions, 12 deletions
diff --git a/src/mongo/db/repl/oplog_applier.h b/src/mongo/db/repl/oplog_applier.h index 9ee0f3d0b28..290043c8e80 100644 --- a/src/mongo/db/repl/oplog_applier.h +++ b/src/mongo/db/repl/oplog_applier.h @@ -40,6 +40,7 @@ #include "mongo/db/repl/storage_interface.h" #include "mongo/executor/task_executor.h" #include "mongo/util/concurrency/thread_pool.h" +#include "mongo/util/functional.h" #include "mongo/util/future.h" #include "mongo/util/net/hostandport.h" @@ -97,6 +98,10 @@ public: using Operations = std::vector<OplogEntry>; + // Used by SyncTail to access batching logic. + using GetNextApplierBatchFn = stdx::function<StatusWith<OplogApplier::Operations>( + OperationContext* opCtx, const BatchLimits& batchLimits)>; + /** * Lower bound of batch limit size (in bytes) returned by calculateBatchLimitBytes(). */ diff --git a/src/mongo/db/repl/oplog_applier_impl.cpp b/src/mongo/db/repl/oplog_applier_impl.cpp index 2a865dff033..8c293a1bbba 100644 --- a/src/mongo/db/repl/oplog_applier_impl.cpp +++ b/src/mongo/db/repl/oplog_applier_impl.cpp @@ -51,7 +51,10 @@ OplogApplierImpl::OplogApplierImpl(executor::TaskExecutor* executor, } void OplogApplierImpl::_run(OplogBuffer* oplogBuffer) { - _syncTail.oplogApplication(oplogBuffer, _replCoord); + auto getNextApplierBatchFn = [this](OperationContext* opCtx, const BatchLimits& batchLimits) { + return getNextApplierBatch(opCtx, batchLimits); + }; + _syncTail.oplogApplication(oplogBuffer, getNextApplierBatchFn, _replCoord); } void OplogApplierImpl::_shutdown() { diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp index 08e942a956e..ea74fe806cb 100644 --- a/src/mongo/db/repl/sync_tail.cpp +++ b/src/mongo/db/repl/sync_tail.cpp @@ -576,10 +576,14 @@ class SyncTail::OpQueueBatcher { OpQueueBatcher& operator=(const OpQueueBatcher&) = delete; public: - OpQueueBatcher(SyncTail* syncTail, StorageInterface* storageInterface, OplogBuffer* oplogBuffer) + OpQueueBatcher(SyncTail* syncTail, + StorageInterface* storageInterface, + OplogBuffer* oplogBuffer, + OplogApplier::GetNextApplierBatchFn getNextApplierBatchFn) : _syncTail(syncTail), _storageInterface(storageInterface), _oplogBuffer(oplogBuffer), + _getNextApplierBatchFn(getNextApplierBatchFn), _ops(0), _thread([this] { run(); }) {} ~OpQueueBatcher() { @@ -668,6 +672,7 @@ private: SyncTail* const _syncTail; StorageInterface* const _storageInterface; OplogBuffer* const _oplogBuffer; + OplogApplier::GetNextApplierBatchFn const _getNextApplierBatchFn; stdx::mutex _mutex; // Guards _ops. stdx::condition_variable _cv; @@ -680,18 +685,19 @@ private: stdx::thread _thread; // Must be last so all other members are initialized before starting. }; -void SyncTail::oplogApplication(OplogBuffer* oplogBuffer, ReplicationCoordinator* replCoord) { +void SyncTail::oplogApplication(OplogBuffer* oplogBuffer, + OplogApplier::GetNextApplierBatchFn getNextApplierBatchFn, + ReplicationCoordinator* replCoord) { // We don't start data replication for arbiters at all and it's not allowed to reconfig // arbiterOnly field for any member. invariant(!replCoord->getMemberState().arbiter()); - OpQueueBatcher batcher(this, _storageInterface, oplogBuffer); + OpQueueBatcher batcher(this, _storageInterface, oplogBuffer, getNextApplierBatchFn); - _oplogApplication(oplogBuffer, replCoord, &batcher); + _oplogApplication(replCoord, &batcher); } -void SyncTail::_oplogApplication(OplogBuffer* oplogBuffer, - ReplicationCoordinator* replCoord, +void SyncTail::_oplogApplication(ReplicationCoordinator* replCoord, OpQueueBatcher* batcher) noexcept { std::unique_ptr<ApplyBatchFinalizer> finalizer{ getGlobalServiceContext()->getStorageEngine()->isDurable() diff --git a/src/mongo/db/repl/sync_tail.h b/src/mongo/db/repl/sync_tail.h index 686cfd1f305..971702a2c6f 100644 --- a/src/mongo/db/repl/sync_tail.h +++ b/src/mongo/db/repl/sync_tail.h @@ -114,7 +114,9 @@ public: * Retrieves operations from the OplogBuffer in batches that will be applied in parallel using * multiApply(). */ - void oplogApplication(OplogBuffer* oplogBuffer, ReplicationCoordinator* replCoord); + void oplogApplication(OplogBuffer* oplogBuffer, + OplogApplier::GetNextApplierBatchFn getNextApplierBatchFn, + ReplicationCoordinator* replCoord); /** * Shuts down oplogApplication() processing. @@ -249,9 +251,7 @@ private: class OpQueueBatcher; - void _oplogApplication(OplogBuffer* oplogBuffer, - ReplicationCoordinator* replCoord, - OpQueueBatcher* batcher) noexcept; + void _oplogApplication(ReplicationCoordinator* replCoord, OpQueueBatcher* batcher) noexcept; void _fillWriterVectors(OperationContext* opCtx, MultiApplier::Operations* ops, diff --git a/src/mongo/db/repl/sync_tail_test.cpp b/src/mongo/db/repl/sync_tail_test.cpp index f7b11feda49..aae7b2932e1 100644 --- a/src/mongo/db/repl/sync_tail_test.cpp +++ b/src/mongo/db/repl/sync_tail_test.cpp @@ -1657,7 +1657,12 @@ DEATH_TEST_F(SyncTailTest, // SyncTail::oplogApplication() creates its own OperationContext in the current thread context. _opCtx = {}; - syncTail.oplogApplication(oplogBuffer.get(), &replCoord); + auto getNextApplierBatchFn = + [](OperationContext* opCtx, + const OplogApplier::BatchLimits& batchLimits) -> StatusWith<OplogApplier::Operations> { + return OplogApplier::Operations(); + }; + syncTail.oplogApplication(oplogBuffer.get(), getNextApplierBatchFn, &replCoord); } TEST_F(IdempotencyTest, Geo2dsphereIndexFailedOnUpdate) { |