summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLingzhi Deng <lingzhi.deng@mongodb.com>2021-04-28 22:47:50 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-06-24 01:32:58 +0000
commitb9b407313d1afd49e820e2cbb6b75336641dbad9 (patch)
tree137f7e6a4b7fe4cf732839dd8951faebf04cdf8c
parent4bcd3bcfc7b37739b8a414df2ccec05569bd307e (diff)
downloadmongo-b9b407313d1afd49e820e2cbb6b75336641dbad9.tar.gz
SERVER-56054: Change minThreads value for replication writer thread pool to 0
(cherry picked from commit 136fa52193c342038b3fa35152fa1ed3dee4ee87) (cherry picked from commit 4935fe56743814ffc39e9f7d9eebb3648dc2846f)
-rw-r--r--src/mongo/db/repl/sync_tail.cpp47
1 files changed, 41 insertions, 6 deletions
diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp
index ba87dafd096..9f921bfb714 100644
--- a/src/mongo/db/repl/sync_tail.cpp
+++ b/src/mongo/db/repl/sync_tail.cpp
@@ -122,6 +122,33 @@ public:
} exportedWriterThreadCountParam;
+/**
+ * This variable determines the minimum number of writer threads SyncTail will have. It can be
+ * overridden
+ * using the "replWriterMinThreadCount" server parameter.
+ */
+int replWriterMinThreadCount = 0;
+
+class ExportedWriterMinThreadCountParameter
+ : public ExportedServerParameter<int, ServerParameterType::kStartupOnly> {
+public:
+ ExportedWriterMinThreadCountParameter()
+ : ExportedServerParameter<int, ServerParameterType::kStartupOnly>(
+ ServerParameterSet::getGlobal(),
+ "replWriterMinThreadCount",
+ &replWriterMinThreadCount) {}
+
+ virtual Status validate(const int& potentialNewValue) {
+ if (potentialNewValue < 0 || potentialNewValue > 256) {
+ return Status(ErrorCodes::BadValue,
+ "replWriterMinThreadCount must be between 0 and 256");
+ }
+
+ return Status::OK();
+ }
+
+} exportedWriterMinThreadCountParam;
+
class ExportedBatchLimitOperationsParameter
: public ExportedServerParameter<int, ServerParameterType::kStartupAndRuntime> {
public:
@@ -349,6 +376,10 @@ std::size_t SyncTail::calculateBatchLimitBytes(OperationContext* opCtx,
}
std::unique_ptr<ThreadPool> SyncTail::makeWriterPool() {
+ if (replWriterThreadCount < replWriterMinThreadCount) {
+ severe() << "replWriterMinThreadCount must be less than or equal to replWriterThreadCount.";
+ fassertFailedNoTrace(5605400);
+ }
return makeWriterPool(replWriterThreadCount);
}
@@ -356,7 +387,9 @@ std::unique_ptr<ThreadPool> SyncTail::makeWriterPool(int threadCount) {
ThreadPool::Options options;
options.threadNamePrefix = "repl writer worker ";
options.poolName = "repl writer worker Pool";
- options.maxThreads = options.minThreads = static_cast<size_t>(threadCount);
+ options.minThreads =
+ replWriterMinThreadCount < threadCount ? replWriterMinThreadCount : threadCount;
+ options.maxThreads = static_cast<size_t>(threadCount);
options.onCreateThread = [](const std::string&) {
// Only do this once per thread
if (!Client::getCurrent()) {
@@ -616,7 +649,7 @@ void scheduleWritesToOplog(OperationContext* opCtx,
// setup/teardown overhead across many writes.
const size_t kMinOplogEntriesPerThread = 16;
const bool enoughToMultiThread =
- ops.size() >= kMinOplogEntriesPerThread * threadPool->getStats().numThreads;
+ ops.size() >= kMinOplogEntriesPerThread * threadPool->getStats().options.maxThreads;
// Only doc-locking engines support parallel writes to the oplog because they are required to
// ensure that oplog entries are ordered correctly, even if inserted out-of-order. Additionally,
@@ -630,7 +663,7 @@ void scheduleWritesToOplog(OperationContext* opCtx,
}
- const size_t numOplogThreads = threadPool->getStats().numThreads;
+ const size_t numOplogThreads = threadPool->getStats().options.maxThreads;
const size_t numOpsPerThread = ops.size() / numOplogThreads;
for (size_t thread = 0; thread < numOplogThreads; thread++) {
size_t begin = thread * numOpsPerThread;
@@ -1555,7 +1588,7 @@ StatusWith<OpTime> SyncTail::multiApply(OperationContext* opCtx, MultiApplier::O
// Increment the batch size stat.
oplogApplicationBatchSize.increment(ops.size());
- std::vector<WorkerMultikeyPathInfo> multikeyVector(_writerPool->getStats().numThreads);
+ std::vector<WorkerMultikeyPathInfo> multikeyVector(_writerPool->getStats().options.maxThreads);
{
// Each node records cumulative batch application stats for itself using this timer.
TimerHolder timer(&applyBatchStats);
@@ -1605,7 +1638,8 @@ StatusWith<OpTime> SyncTail::multiApply(OperationContext* opCtx, MultiApplier::O
// and create a pseudo oplog.
std::vector<MultiApplier::Operations> derivedOps;
- std::vector<MultiApplier::OperationPtrs> writerVectors(_writerPool->getStats().numThreads);
+ std::vector<MultiApplier::OperationPtrs> writerVectors(
+ _writerPool->getStats().options.maxThreads);
fillWriterVectors(opCtx, &ops, &writerVectors, &derivedOps);
// Wait for writes to finish before applying ops.
@@ -1622,7 +1656,8 @@ StatusWith<OpTime> SyncTail::multiApply(OperationContext* opCtx, MultiApplier::O
}
{
- std::vector<Status> statusVector(_writerPool->getStats().numThreads, Status::OK());
+ std::vector<Status> statusVector(_writerPool->getStats().options.maxThreads,
+ Status::OK());
applyOps(writerVectors,
_writerPool,
_applyFunc,