diff options
author | Benety Goh <benety@mongodb.com> | 2018-03-23 06:56:18 -0400 |
---|---|---|
committer | Benety Goh <benety@mongodb.com> | 2018-03-23 06:56:29 -0400 |
commit | ecd44a0d4e7ef134ab9ec0236e1d5fa8ab4e59b0 (patch) | |
tree | 208dd5e87703fff79facb81c39108b53d664485c | |
parent | e0f8ca9a3f8201c27dffcdf4014f2b2d95bbd869 (diff) | |
download | mongo-ecd44a0d4e7ef134ab9ec0236e1d5fa8ab4e59b0.tar.gz |
SERVER-32332 implement OplogApplier for steady state replication
-rw-r--r-- | src/mongo/db/repl/oplog_applier.cpp | 35 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_applier.h | 26 |
2 files changed, 50 insertions, 11 deletions
diff --git a/src/mongo/db/repl/oplog_applier.cpp b/src/mongo/db/repl/oplog_applier.cpp index d4647aaa79b..5630be7baac 100644 --- a/src/mongo/db/repl/oplog_applier.cpp +++ b/src/mongo/db/repl/oplog_applier.cpp @@ -32,23 +32,46 @@ #include "mongo/db/repl/oplog_applier.h" +#include "mongo/db/repl/sync_tail.h" #include "mongo/util/log.h" namespace mongo { namespace repl { +using CallbackArgs = executor::TaskExecutor::CallbackArgs; + OplogApplier::OplogApplier(executor::TaskExecutor* executor, OplogBuffer* oplogBuffer, Observer* observer, - const OplogApplier::Options& options) - : _executor(executor), _oplogBuffer(oplogBuffer), _observer(observer), _options(options) {} + ReplicationCoordinator* replCoord, + const OplogApplier::Options& options, + ThreadPool* writerPool) + : _executor(executor), + _oplogBuffer(oplogBuffer), + _observer(observer), + _replCoord(replCoord), + _options(options), + _syncTail(std::make_unique<SyncTail>(_observer, multiSyncApply, writerPool)) { + invariant(!options.allowNamespaceNotFoundErrorsOnCrudOps); + invariant(!options.relaxUniqueIndexConstraints); +} -Status OplogApplier::startup() { - return Status::OK(); +Future<void> OplogApplier::startup() { + auto future = _promise.getFuture(); + auto callback = + [ this, promise = _promise.share() ](const CallbackArgs& args) mutable noexcept { + invariantOK(args.status); + log() << "Starting oplog application"; + _syncTail->oplogApplication(_oplogBuffer, _replCoord); + log() << "Finished oplog application"; + promise.setWith([] {}); + }; + invariantOK(_executor->scheduleWork(callback).getStatus()); + return future; } -Future<void> shutdown() { - return {}; +void OplogApplier::shutdown() { + _syncTail->shutdown(); } /** diff --git a/src/mongo/db/repl/oplog_applier.h b/src/mongo/db/repl/oplog_applier.h index 2fc88e449f8..a198b9ecb91 100644 --- a/src/mongo/db/repl/oplog_applier.h +++ b/src/mongo/db/repl/oplog_applier.h @@ -37,12 +37,16 @@ #include "mongo/db/repl/multiapplier.h" #include "mongo/db/repl/oplog_buffer.h" #include "mongo/db/repl/oplog_entry.h" +#include "mongo/db/repl/replication_coordinator.h" #include "mongo/executor/task_executor.h" +#include "mongo/util/concurrency/thread_pool.h" #include "mongo/util/future.h" namespace mongo { namespace repl { +class SyncTail; + /** * Applies oplog entries. * Reads from an OplogBuffer batches of operations that may be applied in parallel. @@ -54,7 +58,7 @@ public: **/ class Options { public: - bool failOnCrudOpsNamespaceNotFoundErrors = true; + bool allowNamespaceNotFoundErrorsOnCrudOps = false; bool relaxUniqueIndexConstraints = false; }; @@ -71,19 +75,21 @@ public: OplogApplier(executor::TaskExecutor* executor, OplogBuffer* oplogBuffer, Observer* observer, - const Options& options); + ReplicationCoordinator* replCoord, + const Options& options, + ThreadPool* writerPool); /** * Starts this OplogApplier. + * Use the Future object to be notified when this OplogApplier has finished shutting down. */ - Status startup(); + Future<void> startup(); /** * Starts the shutdown process for this OplogApplier. - * Use the Future object to be notified when this OplogApplier has finished shutting down. * It is safe to call shutdown() multiplie times. */ - Future<void> shutdown(); + void shutdown(); /** * Pushes operations read into oplog buffer. @@ -92,6 +98,7 @@ public: private: + // Used to schedule task for oplog application loop. // Not owned by us. executor::TaskExecutor* const _executor; @@ -101,8 +108,17 @@ private: // Not owned by us. Observer* const _observer; + // Not owned by us. + ReplicationCoordinator* const _replCoord; + // Used to configure OplogApplier behavior. const Options _options; + + // Used to run oplog application loop. + std::unique_ptr<SyncTail> _syncTail; + + // Used to generate Future to allow callers to wait for oplog application shutdown. + Promise<void> _promise; }; /** |