diff options
author | Benety Goh <benety@mongodb.com> | 2018-05-31 16:56:59 -0400 |
---|---|---|
committer | Benety Goh <benety@mongodb.com> | 2018-05-31 16:56:59 -0400 |
commit | 392a831a351f4a8229fef5efc3d30b94abff379d (patch) | |
tree | b53865e6dd0a994dd8ac41eb6ec42148faf855a1 /src/mongo/db/repl/oplog_applier.cpp | |
parent | 2f8c3ec5a43229e6ed6b585c9421ad6d0ee12a28 (diff) | |
download | mongo-392a831a351f4a8229fef5efc3d30b94abff379d.tar.gz |
SERVER-32335 move SyncTail::makeWriterPool to OplogApplier
Diffstat (limited to 'src/mongo/db/repl/oplog_applier.cpp')
-rw-r--r-- | src/mongo/db/repl/oplog_applier.cpp | 41 |
1 files changed, 41 insertions, 0 deletions
diff --git a/src/mongo/db/repl/oplog_applier.cpp b/src/mongo/db/repl/oplog_applier.cpp index 9c31f9f89a1..4e61ab8c79d 100644 --- a/src/mongo/db/repl/oplog_applier.cpp +++ b/src/mongo/db/repl/oplog_applier.cpp @@ -32,14 +32,55 @@ #include "mongo/db/repl/oplog_applier.h" +#include "mongo/db/auth/authorization_session.h" #include "mongo/db/repl/sync_tail.h" +#include "mongo/db/server_parameters.h" #include "mongo/util/log.h" namespace mongo { namespace repl { +namespace { + +/** + * This server parameter determines the number of writer threads OplogApplier will have. + */ +MONGO_EXPORT_STARTUP_SERVER_PARAMETER(replWriterThreadCount, int, 16) + ->withValidator([](const int& newVal) { + if (newVal < 1 || newVal > 256) { + return Status(ErrorCodes::BadValue, "replWriterThreadCount must be between 1 and 256"); + } + + return Status::OK(); + }); + +} // namespace + using CallbackArgs = executor::TaskExecutor::CallbackArgs; +// static +std::unique_ptr<ThreadPool> OplogApplier::makeWriterPool() { + return makeWriterPool(replWriterThreadCount); +} + +// static +std::unique_ptr<ThreadPool> OplogApplier::makeWriterPool(int threadCount) { + ThreadPool::Options options; + options.threadNamePrefix = "repl writer worker "; + options.poolName = "repl writer worker Pool"; + options.maxThreads = options.minThreads = static_cast<size_t>(threadCount); + options.onCreateThread = [](const std::string&) { + // Only do this once per thread + if (!Client::getCurrent()) { + Client::initThreadIfNotAlready(); + AuthorizationSession::get(cc())->grantInternalAuthorization(); + } + }; + auto pool = stdx::make_unique<ThreadPool>(options); + pool->startup(); + return pool; +} + OplogApplier::OplogApplier(executor::TaskExecutor* executor, OplogBuffer* oplogBuffer, Observer* observer, |