summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBenety Goh <benety@mongodb.com>2018-03-23 06:56:18 -0400
committerBenety Goh <benety@mongodb.com>2018-03-23 06:56:29 -0400
commitecd44a0d4e7ef134ab9ec0236e1d5fa8ab4e59b0 (patch)
tree208dd5e87703fff79facb81c39108b53d664485c
parente0f8ca9a3f8201c27dffcdf4014f2b2d95bbd869 (diff)
downloadmongo-ecd44a0d4e7ef134ab9ec0236e1d5fa8ab4e59b0.tar.gz
SERVER-32332 implement OplogApplier for steady state replication
-rw-r--r--src/mongo/db/repl/oplog_applier.cpp35
-rw-r--r--src/mongo/db/repl/oplog_applier.h26
2 files changed, 50 insertions, 11 deletions
diff --git a/src/mongo/db/repl/oplog_applier.cpp b/src/mongo/db/repl/oplog_applier.cpp
index d4647aaa79b..5630be7baac 100644
--- a/src/mongo/db/repl/oplog_applier.cpp
+++ b/src/mongo/db/repl/oplog_applier.cpp
@@ -32,23 +32,46 @@
#include "mongo/db/repl/oplog_applier.h"
+#include "mongo/db/repl/sync_tail.h"
#include "mongo/util/log.h"
namespace mongo {
namespace repl {
+using CallbackArgs = executor::TaskExecutor::CallbackArgs;
+
OplogApplier::OplogApplier(executor::TaskExecutor* executor,
OplogBuffer* oplogBuffer,
Observer* observer,
- const OplogApplier::Options& options)
- : _executor(executor), _oplogBuffer(oplogBuffer), _observer(observer), _options(options) {}
+ ReplicationCoordinator* replCoord,
+ const OplogApplier::Options& options,
+ ThreadPool* writerPool)
+ : _executor(executor),
+ _oplogBuffer(oplogBuffer),
+ _observer(observer),
+ _replCoord(replCoord),
+ _options(options),
+ _syncTail(std::make_unique<SyncTail>(_observer, multiSyncApply, writerPool)) {
+ invariant(!options.allowNamespaceNotFoundErrorsOnCrudOps);
+ invariant(!options.relaxUniqueIndexConstraints);
+}
-Status OplogApplier::startup() {
- return Status::OK();
+Future<void> OplogApplier::startup() {
+ auto future = _promise.getFuture();
+ auto callback =
+ [ this, promise = _promise.share() ](const CallbackArgs& args) mutable noexcept {
+ invariantOK(args.status);
+ log() << "Starting oplog application";
+ _syncTail->oplogApplication(_oplogBuffer, _replCoord);
+ log() << "Finished oplog application";
+ promise.setWith([] {});
+ };
+ invariantOK(_executor->scheduleWork(callback).getStatus());
+ return future;
}
-Future<void> shutdown() {
- return {};
+void OplogApplier::shutdown() {
+ _syncTail->shutdown();
}
/**
diff --git a/src/mongo/db/repl/oplog_applier.h b/src/mongo/db/repl/oplog_applier.h
index 2fc88e449f8..a198b9ecb91 100644
--- a/src/mongo/db/repl/oplog_applier.h
+++ b/src/mongo/db/repl/oplog_applier.h
@@ -37,12 +37,16 @@
#include "mongo/db/repl/multiapplier.h"
#include "mongo/db/repl/oplog_buffer.h"
#include "mongo/db/repl/oplog_entry.h"
+#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/executor/task_executor.h"
+#include "mongo/util/concurrency/thread_pool.h"
#include "mongo/util/future.h"
namespace mongo {
namespace repl {
+class SyncTail;
+
/**
* Applies oplog entries.
* Reads from an OplogBuffer batches of operations that may be applied in parallel.
@@ -54,7 +58,7 @@ public:
**/
class Options {
public:
- bool failOnCrudOpsNamespaceNotFoundErrors = true;
+ bool allowNamespaceNotFoundErrorsOnCrudOps = false;
bool relaxUniqueIndexConstraints = false;
};
@@ -71,19 +75,21 @@ public:
OplogApplier(executor::TaskExecutor* executor,
OplogBuffer* oplogBuffer,
Observer* observer,
- const Options& options);
+ ReplicationCoordinator* replCoord,
+ const Options& options,
+ ThreadPool* writerPool);
/**
* Starts this OplogApplier.
+ * Use the Future object to be notified when this OplogApplier has finished shutting down.
*/
- Status startup();
+ Future<void> startup();
/**
* Starts the shutdown process for this OplogApplier.
- * Use the Future object to be notified when this OplogApplier has finished shutting down.
* It is safe to call shutdown() multiplie times.
*/
- Future<void> shutdown();
+ void shutdown();
/**
* Pushes operations read into oplog buffer.
@@ -92,6 +98,7 @@ public:
private:
+ // Used to schedule task for oplog application loop.
// Not owned by us.
executor::TaskExecutor* const _executor;
@@ -101,8 +108,17 @@ private:
// Not owned by us.
Observer* const _observer;
+ // Not owned by us.
+ ReplicationCoordinator* const _replCoord;
+
// Used to configure OplogApplier behavior.
const Options _options;
+
+ // Used to run oplog application loop.
+ std::unique_ptr<SyncTail> _syncTail;
+
+ // Used to generate Future to allow callers to wait for oplog application shutdown.
+ Promise<void> _promise;
};
/**