summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/mongo/db/repl/oplog_applier.cpp41
-rw-r--r--src/mongo/db/repl/oplog_applier.h6
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.cpp2
-rw-r--r--src/mongo/db/repl/replication_recovery.cpp2
-rw-r--r--src/mongo/db/repl/sync_tail.cpp45
-rw-r--r--src/mongo/db/repl/sync_tail.h6
-rw-r--r--src/mongo/db/repl/sync_tail_test.cpp16
-rw-r--r--src/mongo/dbtests/storage_timestamp_tests.cpp6
8 files changed, 60 insertions, 64 deletions
diff --git a/src/mongo/db/repl/oplog_applier.cpp b/src/mongo/db/repl/oplog_applier.cpp
index 9c31f9f89a1..4e61ab8c79d 100644
--- a/src/mongo/db/repl/oplog_applier.cpp
+++ b/src/mongo/db/repl/oplog_applier.cpp
@@ -32,14 +32,55 @@
#include "mongo/db/repl/oplog_applier.h"
+#include "mongo/db/auth/authorization_session.h"
#include "mongo/db/repl/sync_tail.h"
+#include "mongo/db/server_parameters.h"
#include "mongo/util/log.h"
namespace mongo {
namespace repl {
+namespace {
+
+/**
+ * This server parameter determines the number of writer threads OplogApplier will have.
+ */
+MONGO_EXPORT_STARTUP_SERVER_PARAMETER(replWriterThreadCount, int, 16)
+ ->withValidator([](const int& newVal) {
+ if (newVal < 1 || newVal > 256) {
+ return Status(ErrorCodes::BadValue, "replWriterThreadCount must be between 1 and 256");
+ }
+
+ return Status::OK();
+ });
+
+} // namespace
+
using CallbackArgs = executor::TaskExecutor::CallbackArgs;
+// static
+std::unique_ptr<ThreadPool> OplogApplier::makeWriterPool() {
+ return makeWriterPool(replWriterThreadCount);
+}
+
+// static
+std::unique_ptr<ThreadPool> OplogApplier::makeWriterPool(int threadCount) {
+ ThreadPool::Options options;
+ options.threadNamePrefix = "repl writer worker ";
+ options.poolName = "repl writer worker Pool";
+ options.maxThreads = options.minThreads = static_cast<size_t>(threadCount);
+ options.onCreateThread = [](const std::string&) {
+ // Only do this once per thread
+ if (!Client::getCurrent()) {
+ Client::initThreadIfNotAlready();
+ AuthorizationSession::get(cc())->grantInternalAuthorization();
+ }
+ };
+ auto pool = stdx::make_unique<ThreadPool>(options);
+ pool->startup();
+ return pool;
+}
+
OplogApplier::OplogApplier(executor::TaskExecutor* executor,
OplogBuffer* oplogBuffer,
Observer* observer,
diff --git a/src/mongo/db/repl/oplog_applier.h b/src/mongo/db/repl/oplog_applier.h
index a9e032ee776..fa7f437ac36 100644
--- a/src/mongo/db/repl/oplog_applier.h
+++ b/src/mongo/db/repl/oplog_applier.h
@@ -92,6 +92,12 @@ public:
using Operations = std::vector<OplogEntry>;
/**
+ * Creates thread pool for writer tasks.
+ */
+ static std::unique_ptr<ThreadPool> makeWriterPool();
+ static std::unique_ptr<ThreadPool> makeWriterPool(int threadCount);
+
+ /**
* Constructs this OplogApplier with specific options.
* Obtains batches of operations from the OplogBuffer to apply.
* Reports oplog application progress using the Observer.
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
index e7f96cb50d3..827d194fda4 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
@@ -318,7 +318,7 @@ void ReplicationCoordinatorExternalStateImpl::startThreads(const ReplSettings& s
_taskExecutor = makeTaskExecutor(_service, "replication");
_taskExecutor->startup();
- _writerPool = SyncTail::makeWriterPool();
+ _writerPool = OplogApplier::makeWriterPool();
_startedThreads = true;
}
diff --git a/src/mongo/db/repl/replication_recovery.cpp b/src/mongo/db/repl/replication_recovery.cpp
index 29205755ba3..153736e018d 100644
--- a/src/mongo/db/repl/replication_recovery.cpp
+++ b/src/mongo/db/repl/replication_recovery.cpp
@@ -353,7 +353,7 @@ void ReplicationRecoveryImpl::_applyToEndOfOplog(OperationContext* opCtx,
RecoveryOplogApplierStats stats;
- auto writerPool = SyncTail::makeWriterPool();
+ auto writerPool = OplogApplier::makeWriterPool();
OplogApplier::Options options;
options.allowNamespaceNotFoundErrorsOnCrudOps = true;
options.skipWritesToOplog = true;
diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp
index 73f82665d88..be688de1b9a 100644
--- a/src/mongo/db/repl/sync_tail.cpp
+++ b/src/mongo/db/repl/sync_tail.cpp
@@ -39,7 +39,6 @@
#include "mongo/base/counter.h"
#include "mongo/bson/bsonelement_comparator.h"
#include "mongo/bson/timestamp.h"
-#include "mongo/db/auth/authorization_session.h"
#include "mongo/db/catalog/collection.h"
#include "mongo/db/catalog/database.h"
#include "mongo/db/catalog/database_holder.h"
@@ -93,29 +92,6 @@ namespace {
MONGO_FAIL_POINT_DEFINE(pauseBatchApplicationBeforeCompletion);
-/**
- * This variable determines the number of writer threads SyncTail will have. It can be overridden
- * using the "replWriterThreadCount" server parameter.
- */
-int replWriterThreadCount = 16;
-
-class ExportedWriterThreadCountParameter
- : public ExportedServerParameter<int, ServerParameterType::kStartupOnly> {
-public:
- ExportedWriterThreadCountParameter()
- : ExportedServerParameter<int, ServerParameterType::kStartupOnly>(
- ServerParameterSet::getGlobal(), "replWriterThreadCount", &replWriterThreadCount) {}
-
- virtual Status validate(const int& potentialNewValue) {
- if (potentialNewValue < 1 || potentialNewValue > 256) {
- return Status(ErrorCodes::BadValue, "replWriterThreadCount must be between 1 and 256");
- }
-
- return Status::OK();
- }
-
-} exportedWriterThreadCountParam;
-
class ExportedBatchLimitOperationsParameter
: public ExportedServerParameter<int, ServerParameterType::kStartupAndRuntime> {
public:
@@ -283,27 +259,6 @@ std::size_t SyncTail::calculateBatchLimitBytes(OperationContext* opCtx,
return std::min(oplogMaxSize / 10, std::size_t(replBatchLimitBytes));
}
-std::unique_ptr<ThreadPool> SyncTail::makeWriterPool() {
- return makeWriterPool(replWriterThreadCount);
-}
-
-std::unique_ptr<ThreadPool> SyncTail::makeWriterPool(int threadCount) {
- ThreadPool::Options options;
- options.threadNamePrefix = "repl writer worker ";
- options.poolName = "repl writer worker Pool";
- options.maxThreads = options.minThreads = static_cast<size_t>(threadCount);
- options.onCreateThread = [](const std::string&) {
- // Only do this once per thread
- if (!Client::getCurrent()) {
- Client::initThreadIfNotAlready();
- AuthorizationSession::get(cc())->grantInternalAuthorization();
- }
- };
- auto pool = stdx::make_unique<ThreadPool>(options);
- pool->startup();
- return pool;
-}
-
// static
Status SyncTail::syncApply(OperationContext* opCtx,
const BSONObj& op,
diff --git a/src/mongo/db/repl/sync_tail.h b/src/mongo/db/repl/sync_tail.h
index dff6fc32e32..81e4ef4bff7 100644
--- a/src/mongo/db/repl/sync_tail.h
+++ b/src/mongo/db/repl/sync_tail.h
@@ -89,12 +89,6 @@ public:
StorageInterface* storageInterface);
/**
- * Creates thread pool for writer tasks.
- */
- static std::unique_ptr<ThreadPool> makeWriterPool();
- static std::unique_ptr<ThreadPool> makeWriterPool(int threadCount);
-
- /**
* Applies the operation that is in param o.
* Functions for applying operations/commands and increment server status counters may
* be overridden for testing.
diff --git a/src/mongo/db/repl/sync_tail_test.cpp b/src/mongo/db/repl/sync_tail_test.cpp
index 7edcc147eef..ca76337d7ec 100644
--- a/src/mongo/db/repl/sync_tail_test.cpp
+++ b/src/mongo/db/repl/sync_tail_test.cpp
@@ -388,7 +388,7 @@ TEST_F(SyncTailTest, SyncApplyCommandThrowsException) {
}
DEATH_TEST_F(SyncTailTest, MultiApplyAbortsWhenNoOperationsAreGiven, "!ops.empty()") {
- auto writerPool = SyncTail::makeWriterPool();
+ auto writerPool = OplogApplier::makeWriterPool();
SyncTail syncTail(nullptr,
getConsistencyMarkers(),
getStorageInterface(),
@@ -402,7 +402,7 @@ bool _testOplogEntryIsForCappedCollection(OperationContext* opCtx,
StorageInterface* const storageInterface,
const NamespaceString& nss,
const CollectionOptions& options) {
- auto writerPool = SyncTail::makeWriterPool();
+ auto writerPool = OplogApplier::makeWriterPool();
MultiApplier::Operations operationsApplied;
auto applyOperationFn = [&operationsApplied](OperationContext* opCtx,
MultiApplier::OperationPtrs* operationsToApply,
@@ -455,7 +455,7 @@ TEST_F(SyncTailTest, MultiApplyAssignsOperationsToWriterThreadsBasedOnNamespaceH
// the number of threads in the pool.
NamespaceString nss1("test.t0");
NamespaceString nss2("test.t1");
- auto writerPool = SyncTail::makeWriterPool(2);
+ auto writerPool = OplogApplier::makeWriterPool(2);
stdx::mutex mutex;
std::vector<MultiApplier::Operations> operationsApplied;
@@ -1613,7 +1613,7 @@ TEST_F(SyncTailTxnTableTest, SimpleWriteWithTxn) {
sessionInfo,
date);
- auto writerPool = SyncTail::makeWriterPool();
+ auto writerPool = OplogApplier::makeWriterPool();
SyncTail syncTail(
nullptr, getConsistencyMarkers(), getStorageInterface(), multiSyncApply, writerPool.get());
ASSERT_OK(syncTail.multiApply(_opCtx.get(), {insertOp}));
@@ -1644,7 +1644,7 @@ TEST_F(SyncTailTxnTableTest, WriteWithTxnMixedWithDirectWriteToTxnTable) {
{},
Date_t::now());
- auto writerPool = SyncTail::makeWriterPool();
+ auto writerPool = OplogApplier::makeWriterPool();
SyncTail syncTail(
nullptr, getConsistencyMarkers(), getStorageInterface(), multiSyncApply, writerPool.get());
ASSERT_OK(syncTail.multiApply(_opCtx.get(), {insertOp, deleteOp}));
@@ -1689,7 +1689,7 @@ TEST_F(SyncTailTxnTableTest, InterleavedWriteWithTxnMixedWithDirectDeleteToTxnTa
sessionInfo,
date);
- auto writerPool = SyncTail::makeWriterPool();
+ auto writerPool = OplogApplier::makeWriterPool();
SyncTail syncTail(
nullptr, getConsistencyMarkers(), getStorageInterface(), multiSyncApply, writerPool.get());
ASSERT_OK(syncTail.multiApply(_opCtx.get(), {insertOp, deleteOp, insertOp2}));
@@ -1721,7 +1721,7 @@ TEST_F(SyncTailTxnTableTest, InterleavedWriteWithTxnMixedWithDirectUpdateToTxnTa
{},
Date_t::now());
- auto writerPool = SyncTail::makeWriterPool();
+ auto writerPool = OplogApplier::makeWriterPool();
SyncTail syncTail(
nullptr, getConsistencyMarkers(), getStorageInterface(), multiSyncApply, writerPool.get());
ASSERT_OK(syncTail.multiApply(_opCtx.get(), {insertOp, updateOp}));
@@ -1768,7 +1768,7 @@ TEST_F(SyncTailTxnTableTest, MultiApplyUpdatesTheTransactionTable) {
auto opNoTxn = makeInsertDocumentOplogEntryWithSessionInfo(
{Timestamp(Seconds(7), 0), 1LL}, ns3, BSON("_id" << 0), info);
- auto writerPool = SyncTail::makeWriterPool();
+ auto writerPool = OplogApplier::makeWriterPool();
SyncTail syncTail(
nullptr, getConsistencyMarkers(), getStorageInterface(), multiSyncApply, writerPool.get());
ASSERT_OK(syncTail.multiApply(
diff --git a/src/mongo/dbtests/storage_timestamp_tests.cpp b/src/mongo/dbtests/storage_timestamp_tests.cpp
index 8a842c8ff2c..3a723ef0cd3 100644
--- a/src/mongo/dbtests/storage_timestamp_tests.cpp
+++ b/src/mongo/dbtests/storage_timestamp_tests.cpp
@@ -1340,7 +1340,7 @@ public:
DoNothingOplogApplierObserver observer;
auto storageInterface = repl::StorageInterface::get(_opCtx);
- auto writerPool = repl::SyncTail::makeWriterPool();
+ auto writerPool = repl::OplogApplier::makeWriterPool();
repl::OplogApplier oplogApplier(nullptr,
nullptr,
&observer,
@@ -1452,7 +1452,7 @@ public:
DoNothingOplogApplierObserver observer;
auto storageInterface = repl::StorageInterface::get(_opCtx);
- auto writerPool = repl::SyncTail::makeWriterPool();
+ auto writerPool = repl::OplogApplier::makeWriterPool();
repl::OplogApplier::Options options;
options.allowNamespaceNotFoundErrorsOnCrudOps = true;
options.missingDocumentSourceForInitialSync = HostAndPort("localhost", 123);
@@ -2204,7 +2204,7 @@ public:
// Apply the operation.
auto storageInterface = repl::StorageInterface::get(_opCtx);
- auto writerPool = repl::SyncTail::makeWriterPool(1);
+ auto writerPool = repl::OplogApplier::makeWriterPool(1);
repl::SyncTail syncTail(
nullptr, _consistencyMarkers, storageInterface, applyOperationFn, writerPool.get());
auto lastOpTime = unittest::assertGet(syncTail.multiApply(_opCtx, {insertOp}));