diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/repl/oplog_applier.cpp | 11 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_applier_impl.cpp | 11 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_applier_impl_test.cpp | 3 | ||||
-rw-r--r-- | src/mongo/db/repl/repl_server_parameters.idl | 10 | ||||
-rw-r--r-- | src/mongo/db/repl/tenant_oplog_applier.cpp | 5 |
5 files changed, 31 insertions, 9 deletions
diff --git a/src/mongo/db/repl/oplog_applier.cpp b/src/mongo/db/repl/oplog_applier.cpp index 35b888194d9..51d3134d565 100644 --- a/src/mongo/db/repl/oplog_applier.cpp +++ b/src/mongo/db/repl/oplog_applier.cpp @@ -139,6 +139,13 @@ const OplogApplier::Options& OplogApplier::getOptions() const { std::unique_ptr<ThreadPool> makeReplWriterPool() { // Reduce content pinned in cache by single oplog batch on small machines by reducing the number // of threads of ReplWriter to reduce the number of concurrent open WT transactions. + if (replWriterThreadCount < replWriterMinThreadCount) { + LOGV2_FATAL_NOTRACE( + 5605400, + "replWriterMinThreadCount must be less than or equal to replWriterThreadCount", + "replWriterMinThreadCount"_attr = replWriterMinThreadCount, + "replWriterThreadCount"_attr = replWriterThreadCount); + } auto numberOfThreads = std::min(replWriterThreadCount, 2 * static_cast<int>(ProcessInfo::getNumAvailableCores())); return makeReplWriterPool(numberOfThreads); @@ -154,7 +161,9 @@ std::unique_ptr<ThreadPool> makeReplWriterPool(int threadCount, ThreadPool::Options options; options.threadNamePrefix = name + "-"; options.poolName = name + "ThreadPool"; - options.maxThreads = options.minThreads = static_cast<size_t>(threadCount); + options.minThreads = + replWriterMinThreadCount < threadCount ? replWriterMinThreadCount : threadCount; + options.maxThreads = static_cast<size_t>(threadCount); options.onCreateThread = [isKillableByStepdown](const std::string&) { Client::initThread(getThreadName()); auto client = Client::getCurrent(); diff --git a/src/mongo/db/repl/oplog_applier_impl.cpp b/src/mongo/db/repl/oplog_applier_impl.cpp index 93072539fba..7604af715dc 100644 --- a/src/mongo/db/repl/oplog_applier_impl.cpp +++ b/src/mongo/db/repl/oplog_applier_impl.cpp @@ -410,7 +410,7 @@ void scheduleWritesToOplog(OperationContext* opCtx, // setup/teardown overhead across many writes. const size_t kMinOplogEntriesPerThread = 16; const bool enoughToMultiThread = - ops.size() >= kMinOplogEntriesPerThread * writerPool->getStats().numThreads; + ops.size() >= kMinOplogEntriesPerThread * writerPool->getStats().options.maxThreads; // Storage engines support parallel writes to the oplog because they are required to ensure that // oplog entries are ordered correctly, even if inserted out-of-order. @@ -420,7 +420,7 @@ void scheduleWritesToOplog(OperationContext* opCtx, } - const size_t numOplogThreads = writerPool->getStats().numThreads; + const size_t numOplogThreads = writerPool->getStats().options.maxThreads; const size_t numOpsPerThread = ops.size() / numOplogThreads; for (size_t thread = 0; thread < numOplogThreads; thread++) { size_t begin = thread * numOpsPerThread; @@ -453,7 +453,7 @@ StatusWith<OpTime> OplogApplierImpl::_applyOplogBatch(OperationContext* opCtx, // Increment the batch size stat. oplogApplicationBatchSize.increment(ops.size()); - std::vector<WorkerMultikeyPathInfo> multikeyVector(_writerPool->getStats().numThreads); + std::vector<WorkerMultikeyPathInfo> multikeyVector(_writerPool->getStats().options.maxThreads); { // Each node records cumulative batch application stats for itself using this timer. TimerHolder timer(&applyBatchStats); @@ -479,7 +479,7 @@ StatusWith<OpTime> OplogApplierImpl::_applyOplogBatch(OperationContext* opCtx, std::vector<std::vector<OplogEntry>> derivedOps; std::vector<std::vector<const OplogEntry*>> writerVectors( - _writerPool->getStats().numThreads); + _writerPool->getStats().options.maxThreads); fillWriterVectors(opCtx, &ops, &writerVectors, &derivedOps); // Wait for writes to finish before applying ops. @@ -501,7 +501,8 @@ StatusWith<OpTime> OplogApplierImpl::_applyOplogBatch(OperationContext* opCtx, } { - std::vector<Status> statusVector(_writerPool->getStats().numThreads, Status::OK()); + std::vector<Status> statusVector(_writerPool->getStats().options.maxThreads, + Status::OK()); // Doles out all the work to the writer pool threads. writerVectors is not modified, // but applyOplogBatchPerWorker will modify the vectors that it contains. diff --git a/src/mongo/db/repl/oplog_applier_impl_test.cpp b/src/mongo/db/repl/oplog_applier_impl_test.cpp index ece2a1f0e87..b527f298119 100644 --- a/src/mongo/db/repl/oplog_applier_impl_test.cpp +++ b/src/mongo/db/repl/oplog_applier_impl_test.cpp @@ -512,7 +512,8 @@ TEST_F(OplogApplierImplTest, repl::OplogApplier::Options(repl::OplogApplication::Mode::kSecondary), writerPool.get()); - std::vector<std::vector<const OplogEntry*>> writerVectors(writerPool->getStats().numThreads); + std::vector<std::vector<const OplogEntry*>> writerVectors( + writerPool->getStats().options.maxThreads); std::vector<std::vector<OplogEntry>> derivedOps; std::vector<OplogEntry> ops{firstRetryableOp, secondRetryableOp}; oplogApplier.fillWriterVectors_forTest(_opCtx.get(), &ops, &writerVectors, &derivedOps); diff --git a/src/mongo/db/repl/repl_server_parameters.idl b/src/mongo/db/repl/repl_server_parameters.idl index 9344d03259f..6f99250f712 100644 --- a/src/mongo/db/repl/repl_server_parameters.idl +++ b/src/mongo/db/repl/repl_server_parameters.idl @@ -218,6 +218,16 @@ server_parameters: gte: 1 lte: 256 + replWriterMinThreadCount: + description: The minimum number of threads in the thread pool used to apply the oplog + set_at: startup + cpp_vartype: int + cpp_varname: replWriterMinThreadCount + default: 0 + validator: + gte: 0 + lte: 256 + replBatchLimitOperations: description: The maximum number of operations to apply in a single batch set_at: [ startup, runtime ] diff --git a/src/mongo/db/repl/tenant_oplog_applier.cpp b/src/mongo/db/repl/tenant_oplog_applier.cpp index a510c91c975..1e37018ed81 100644 --- a/src/mongo/db/repl/tenant_oplog_applier.cpp +++ b/src/mongo/db/repl/tenant_oplog_applier.cpp @@ -435,7 +435,7 @@ TenantOplogApplier::OpTimePair TenantOplogApplier::_writeNoOpEntries( greatestOplogSlotUsed = *slotIter++; } - const size_t numOplogThreads = _writerPool->getStats().numThreads; + const size_t numOplogThreads = _writerPool->getStats().options.maxThreads; const size_t numOpsPerThread = std::max(std::size_t(minOplogEntriesPerThread.load()), (nonSessionOps.size() / numOplogThreads)); LOGV2_DEBUG(4886003, @@ -846,7 +846,8 @@ void TenantOplogApplier::_writeNoOpsForRange(OpObserver* opObserver, std::vector<std::vector<const OplogEntry*>> TenantOplogApplier::_fillWriterVectors( OperationContext* opCtx, TenantOplogBatch* batch) { - std::vector<std::vector<const OplogEntry*>> writerVectors(_writerPool->getStats().numThreads); + std::vector<std::vector<const OplogEntry*>> writerVectors( + _writerPool->getStats().options.maxThreads); CachedCollectionProperties collPropertiesCache; for (auto&& op : batch->ops) { |