summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLingzhi Deng <lingzhi.deng@mongodb.com>2021-04-28 22:47:50 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-04-28 23:23:13 +0000
commit136fa52193c342038b3fa35152fa1ed3dee4ee87 (patch)
treeb2054bae1f5c5daa94666ce0306ad6b68b7df7b9
parent00d3ec0d3a9d7c4077148f528bb1f7293fd1b238 (diff)
downloadmongo-136fa52193c342038b3fa35152fa1ed3dee4ee87.tar.gz
SERVER-56054: Change minThreads value for replication writer thread pool to 0
-rw-r--r--src/mongo/db/repl/oplog_applier.cpp11
-rw-r--r--src/mongo/db/repl/oplog_applier_impl.cpp11
-rw-r--r--src/mongo/db/repl/oplog_applier_impl_test.cpp3
-rw-r--r--src/mongo/db/repl/repl_server_parameters.idl10
-rw-r--r--src/mongo/db/repl/tenant_oplog_applier.cpp5
5 files changed, 31 insertions, 9 deletions
diff --git a/src/mongo/db/repl/oplog_applier.cpp b/src/mongo/db/repl/oplog_applier.cpp
index 35b888194d9..51d3134d565 100644
--- a/src/mongo/db/repl/oplog_applier.cpp
+++ b/src/mongo/db/repl/oplog_applier.cpp
@@ -139,6 +139,13 @@ const OplogApplier::Options& OplogApplier::getOptions() const {
std::unique_ptr<ThreadPool> makeReplWriterPool() {
// Reduce content pinned in cache by single oplog batch on small machines by reducing the number
// of threads of ReplWriter to reduce the number of concurrent open WT transactions.
+ if (replWriterThreadCount < replWriterMinThreadCount) {
+ LOGV2_FATAL_NOTRACE(
+ 5605400,
+ "replWriterMinThreadCount must be less than or equal to replWriterThreadCount",
+ "replWriterMinThreadCount"_attr = replWriterMinThreadCount,
+ "replWriterThreadCount"_attr = replWriterThreadCount);
+ }
auto numberOfThreads =
std::min(replWriterThreadCount, 2 * static_cast<int>(ProcessInfo::getNumAvailableCores()));
return makeReplWriterPool(numberOfThreads);
@@ -154,7 +161,9 @@ std::unique_ptr<ThreadPool> makeReplWriterPool(int threadCount,
ThreadPool::Options options;
options.threadNamePrefix = name + "-";
options.poolName = name + "ThreadPool";
- options.maxThreads = options.minThreads = static_cast<size_t>(threadCount);
+ options.minThreads =
+ replWriterMinThreadCount < threadCount ? replWriterMinThreadCount : threadCount;
+ options.maxThreads = static_cast<size_t>(threadCount);
options.onCreateThread = [isKillableByStepdown](const std::string&) {
Client::initThread(getThreadName());
auto client = Client::getCurrent();
diff --git a/src/mongo/db/repl/oplog_applier_impl.cpp b/src/mongo/db/repl/oplog_applier_impl.cpp
index 93072539fba..7604af715dc 100644
--- a/src/mongo/db/repl/oplog_applier_impl.cpp
+++ b/src/mongo/db/repl/oplog_applier_impl.cpp
@@ -410,7 +410,7 @@ void scheduleWritesToOplog(OperationContext* opCtx,
// setup/teardown overhead across many writes.
const size_t kMinOplogEntriesPerThread = 16;
const bool enoughToMultiThread =
- ops.size() >= kMinOplogEntriesPerThread * writerPool->getStats().numThreads;
+ ops.size() >= kMinOplogEntriesPerThread * writerPool->getStats().options.maxThreads;
// Storage engines support parallel writes to the oplog because they are required to ensure that
// oplog entries are ordered correctly, even if inserted out-of-order.
@@ -420,7 +420,7 @@ void scheduleWritesToOplog(OperationContext* opCtx,
}
- const size_t numOplogThreads = writerPool->getStats().numThreads;
+ const size_t numOplogThreads = writerPool->getStats().options.maxThreads;
const size_t numOpsPerThread = ops.size() / numOplogThreads;
for (size_t thread = 0; thread < numOplogThreads; thread++) {
size_t begin = thread * numOpsPerThread;
@@ -453,7 +453,7 @@ StatusWith<OpTime> OplogApplierImpl::_applyOplogBatch(OperationContext* opCtx,
// Increment the batch size stat.
oplogApplicationBatchSize.increment(ops.size());
- std::vector<WorkerMultikeyPathInfo> multikeyVector(_writerPool->getStats().numThreads);
+ std::vector<WorkerMultikeyPathInfo> multikeyVector(_writerPool->getStats().options.maxThreads);
{
// Each node records cumulative batch application stats for itself using this timer.
TimerHolder timer(&applyBatchStats);
@@ -479,7 +479,7 @@ StatusWith<OpTime> OplogApplierImpl::_applyOplogBatch(OperationContext* opCtx,
std::vector<std::vector<OplogEntry>> derivedOps;
std::vector<std::vector<const OplogEntry*>> writerVectors(
- _writerPool->getStats().numThreads);
+ _writerPool->getStats().options.maxThreads);
fillWriterVectors(opCtx, &ops, &writerVectors, &derivedOps);
// Wait for writes to finish before applying ops.
@@ -501,7 +501,8 @@ StatusWith<OpTime> OplogApplierImpl::_applyOplogBatch(OperationContext* opCtx,
}
{
- std::vector<Status> statusVector(_writerPool->getStats().numThreads, Status::OK());
+ std::vector<Status> statusVector(_writerPool->getStats().options.maxThreads,
+ Status::OK());
// Doles out all the work to the writer pool threads. writerVectors is not modified,
// but applyOplogBatchPerWorker will modify the vectors that it contains.
diff --git a/src/mongo/db/repl/oplog_applier_impl_test.cpp b/src/mongo/db/repl/oplog_applier_impl_test.cpp
index ece2a1f0e87..b527f298119 100644
--- a/src/mongo/db/repl/oplog_applier_impl_test.cpp
+++ b/src/mongo/db/repl/oplog_applier_impl_test.cpp
@@ -512,7 +512,8 @@ TEST_F(OplogApplierImplTest,
repl::OplogApplier::Options(repl::OplogApplication::Mode::kSecondary),
writerPool.get());
- std::vector<std::vector<const OplogEntry*>> writerVectors(writerPool->getStats().numThreads);
+ std::vector<std::vector<const OplogEntry*>> writerVectors(
+ writerPool->getStats().options.maxThreads);
std::vector<std::vector<OplogEntry>> derivedOps;
std::vector<OplogEntry> ops{firstRetryableOp, secondRetryableOp};
oplogApplier.fillWriterVectors_forTest(_opCtx.get(), &ops, &writerVectors, &derivedOps);
diff --git a/src/mongo/db/repl/repl_server_parameters.idl b/src/mongo/db/repl/repl_server_parameters.idl
index 9344d03259f..6f99250f712 100644
--- a/src/mongo/db/repl/repl_server_parameters.idl
+++ b/src/mongo/db/repl/repl_server_parameters.idl
@@ -218,6 +218,16 @@ server_parameters:
gte: 1
lte: 256
+ replWriterMinThreadCount:
+ description: The minimum number of threads in the thread pool used to apply the oplog
+ set_at: startup
+ cpp_vartype: int
+ cpp_varname: replWriterMinThreadCount
+ default: 0
+ validator:
+ gte: 0
+ lte: 256
+
replBatchLimitOperations:
description: The maximum number of operations to apply in a single batch
set_at: [ startup, runtime ]
diff --git a/src/mongo/db/repl/tenant_oplog_applier.cpp b/src/mongo/db/repl/tenant_oplog_applier.cpp
index a510c91c975..1e37018ed81 100644
--- a/src/mongo/db/repl/tenant_oplog_applier.cpp
+++ b/src/mongo/db/repl/tenant_oplog_applier.cpp
@@ -435,7 +435,7 @@ TenantOplogApplier::OpTimePair TenantOplogApplier::_writeNoOpEntries(
greatestOplogSlotUsed = *slotIter++;
}
- const size_t numOplogThreads = _writerPool->getStats().numThreads;
+ const size_t numOplogThreads = _writerPool->getStats().options.maxThreads;
const size_t numOpsPerThread = std::max(std::size_t(minOplogEntriesPerThread.load()),
(nonSessionOps.size() / numOplogThreads));
LOGV2_DEBUG(4886003,
@@ -846,7 +846,8 @@ void TenantOplogApplier::_writeNoOpsForRange(OpObserver* opObserver,
std::vector<std::vector<const OplogEntry*>> TenantOplogApplier::_fillWriterVectors(
OperationContext* opCtx, TenantOplogBatch* batch) {
- std::vector<std::vector<const OplogEntry*>> writerVectors(_writerPool->getStats().numThreads);
+ std::vector<std::vector<const OplogEntry*>> writerVectors(
+ _writerPool->getStats().options.maxThreads);
CachedCollectionProperties collPropertiesCache;
for (auto&& op : batch->ops) {