summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/oplog_applier.cpp
diff options
context:
space:
mode:
authorBenety Goh <benety@mongodb.com>2018-05-31 16:56:59 -0400
committerBenety Goh <benety@mongodb.com>2018-05-31 16:56:59 -0400
commit392a831a351f4a8229fef5efc3d30b94abff379d (patch)
treeb53865e6dd0a994dd8ac41eb6ec42148faf855a1 /src/mongo/db/repl/oplog_applier.cpp
parent2f8c3ec5a43229e6ed6b585c9421ad6d0ee12a28 (diff)
downloadmongo-392a831a351f4a8229fef5efc3d30b94abff379d.tar.gz
SERVER-32335 move SyncTail::makeWriterPool to OplogApplier
Diffstat (limited to 'src/mongo/db/repl/oplog_applier.cpp')
-rw-r--r--src/mongo/db/repl/oplog_applier.cpp41
1 files changed, 41 insertions, 0 deletions
diff --git a/src/mongo/db/repl/oplog_applier.cpp b/src/mongo/db/repl/oplog_applier.cpp
index 9c31f9f89a1..4e61ab8c79d 100644
--- a/src/mongo/db/repl/oplog_applier.cpp
+++ b/src/mongo/db/repl/oplog_applier.cpp
@@ -32,14 +32,55 @@
#include "mongo/db/repl/oplog_applier.h"
+#include "mongo/db/auth/authorization_session.h"
#include "mongo/db/repl/sync_tail.h"
+#include "mongo/db/server_parameters.h"
#include "mongo/util/log.h"
namespace mongo {
namespace repl {
+namespace {
+
+/**
+ * This server parameter determines the number of writer threads OplogApplier will have.
+ */
+MONGO_EXPORT_STARTUP_SERVER_PARAMETER(replWriterThreadCount, int, 16)
+ ->withValidator([](const int& newVal) {
+ if (newVal < 1 || newVal > 256) {
+ return Status(ErrorCodes::BadValue, "replWriterThreadCount must be between 1 and 256");
+ }
+
+ return Status::OK();
+ });
+
+} // namespace
+
using CallbackArgs = executor::TaskExecutor::CallbackArgs;
+// static
+std::unique_ptr<ThreadPool> OplogApplier::makeWriterPool() {
+ return makeWriterPool(replWriterThreadCount);
+}
+
+// static
+std::unique_ptr<ThreadPool> OplogApplier::makeWriterPool(int threadCount) {
+ ThreadPool::Options options;
+ options.threadNamePrefix = "repl writer worker ";
+ options.poolName = "repl writer worker Pool";
+ options.maxThreads = options.minThreads = static_cast<size_t>(threadCount);
+ options.onCreateThread = [](const std::string&) {
+ // Only do this once per thread
+ if (!Client::getCurrent()) {
+ Client::initThreadIfNotAlready();
+ AuthorizationSession::get(cc())->grantInternalAuthorization();
+ }
+ };
+ auto pool = stdx::make_unique<ThreadPool>(options);
+ pool->startup();
+ return pool;
+}
+
OplogApplier::OplogApplier(executor::TaskExecutor* executor,
OplogBuffer* oplogBuffer,
Observer* observer,