summaryrefslogtreecommitdiff
path: root/src/mongo/db
diff options
context:
space:
mode:
authorVishnu Kaushik <vishnu.kaushik@mongodb.com>2020-12-01 21:06:15 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-12-02 01:14:08 +0000
commit597c867f9adb8a74eaa18d3427a889c01f76aa22 (patch)
tree9f07817863c62fce3f0657fc04a1fccbafaac872 /src/mongo/db
parent020703354ed815471613c8ae70fbf71c83f3bd1e (diff)
downloadmongo-597c867f9adb8a74eaa18d3427a889c01f76aa22.tar.gz
SERVER-50024 make tunable server parameters for TenantOplogProcessing constants
Diffstat (limited to 'src/mongo/db')
-rw-r--r--src/mongo/db/repl/SConscript1
-rw-r--r--src/mongo/db/repl/repl_server_parameters.idl47
-rw-r--r--src/mongo/db/repl/tenant_oplog_applier.cpp32
-rw-r--r--src/mongo/db/repl/tenant_oplog_applier.h5
-rw-r--r--src/mongo/db/repl/tenant_oplog_applier_test.cpp17
5 files changed, 74 insertions, 28 deletions
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript
index 5844865da81..16d7cc0ab78 100644
--- a/src/mongo/db/repl/SConscript
+++ b/src/mongo/db/repl/SConscript
@@ -1776,5 +1776,6 @@ env.Library(
'oplog',
'oplog_application',
'oplog_application_interface',
+ 'repl_server_parameters'
],
)
diff --git a/src/mongo/db/repl/repl_server_parameters.idl b/src/mongo/db/repl/repl_server_parameters.idl
index 9d93f2efe83..d0873418a7d 100644
--- a/src/mongo/db/repl/repl_server_parameters.idl
+++ b/src/mongo/db/repl/repl_server_parameters.idl
@@ -243,6 +243,53 @@ server_parameters:
lte:
expr: 100 * 1024 * 1024
+ # From tenant_oplog_applier.cpp
+ tenantApplierBatchSizeBytes:
+ description: The maximum tenant oplog applier batch size in bytes.
+ set_at: [startup, runtime]
+ cpp_vartype: AtomicWord<int>
+ cpp_varname: tenantApplierBatchSizeBytes
+ default:
+ expr: 16 * 1024 * 1024
+ validator:
+ gte:
+ expr: 16 * 1024 * 1024
+ lte:
+ expr: 100 * 1024 * 1024
+
+ tenantApplierBatchSizeOps:
+ description: The maximum number of operations in a tenant oplog applier batch.
+ set_at: [ startup, runtime ]
+ cpp_vartype: AtomicWord<int>
+ cpp_varname: tenantApplierBatchSizeOps
+ default: 500
+ validator:
+ gte: 1
+ lte:
+ expr: 100 * 1000
+
+ minOplogEntriesPerThread:
+ description: >-
+ The minimum number of operations allotted to a tenant oplog applier worker thread.
+ set_at: [ startup, runtime ]
+ cpp_vartype: AtomicWord<int>
+ cpp_varname: minOplogEntriesPerThread
+ default: 16
+ validator:
+ gte: 1
+ lte: 32
+
+ tenantApplierThreadCount:
+ description: >-
+ The number of threads in the tenant migration writer pool used to apply operations.
+ set_at: startup
+ cpp_vartype: int
+ cpp_varname: tenantApplierThreadCount
+ default: 5
+ validator:
+ gte: 1
+ lte: 256
+
# New parameters since this file was created, not taken from elsewhere.
initialSyncTransientErrorRetryPeriodSeconds:
description: >-
diff --git a/src/mongo/db/repl/tenant_oplog_applier.cpp b/src/mongo/db/repl/tenant_oplog_applier.cpp
index 8ff049135c4..6f32fb7a4a6 100644
--- a/src/mongo/db/repl/tenant_oplog_applier.cpp
+++ b/src/mongo/db/repl/tenant_oplog_applier.cpp
@@ -45,6 +45,7 @@
#include "mongo/db/repl/cloner_utils.h"
#include "mongo/db/repl/insert_group.h"
#include "mongo/db/repl/oplog_applier_utils.h"
+#include "mongo/db/repl/repl_server_parameters_gen.h"
#include "mongo/db/repl/tenant_migration_decoration.h"
#include "mongo/db/repl/tenant_migration_recipient_service.h"
#include "mongo/db/repl/tenant_oplog_batcher.h"
@@ -54,18 +55,6 @@
namespace mongo {
namespace repl {
-// These batch sizes are pretty arbitrary.
-// TODO(SERVER-50024): come up with some reasonable numbers, and make them a settable parameter.
-constexpr size_t kTenantApplierBatchSizeBytes = 16 * 1024 * 1024;
-
-// TODO(SERVER-50024): kTenantApplierBatchSizeOps is currently chosen as the default value of
-// internalInsertMaxBatchSize. This is probably reasonable but should be a settable parameter.
-constexpr size_t kTenantApplierBatchSizeOps = 500;
-const size_t kMinOplogEntriesPerThread = 16;
-
-// TODO(SERVER-50024):: This is also arbitary and should be a settable parameter.
-constexpr size_t kTenantApplierThreadCount = 5;
-
TenantOplogApplier::TenantOplogApplier(const UUID& migrationUuid,
const std::string& tenantId,
OpTime applyFromOpTime,
@@ -78,8 +67,7 @@ TenantOplogApplier::TenantOplogApplier(const UUID& migrationUuid,
_beginApplyingAfterOpTime(applyFromOpTime),
_oplogBuffer(oplogBuffer),
_executor(std::move(executor)),
- _writerPool(writerPool),
- _limits(kTenantApplierBatchSizeBytes, kTenantApplierBatchSizeOps) {}
+ _writerPool(writerPool) {}
TenantOplogApplier::~TenantOplogApplier() {
shutdown();
@@ -110,7 +98,10 @@ Status TenantOplogApplier::_doStartup_inlock() noexcept {
auto status = _oplogBatcher->startup();
if (!status.isOK())
return status;
- auto fut = _oplogBatcher->getNextBatch(_limits);
+
+ auto fut = _oplogBatcher->getNextBatch(
+ TenantOplogBatcher::BatchLimits(std::size_t(tenantApplierBatchSizeBytes.load()),
+ std::size_t(tenantApplierBatchSizeOps.load())));
std::move(fut)
.thenRunOn(_executor)
.then([&](TenantOplogBatch batch) { _applyLoop(std::move(batch)); })
@@ -128,7 +119,10 @@ void TenantOplogApplier::_doShutdown_inlock() noexcept {
void TenantOplogApplier::_applyLoop(TenantOplogBatch batch) {
// Getting the future for the next batch here means the batcher can retrieve the next batch
// while the applier is processing the current one.
- auto nextBatchFuture = _oplogBatcher->getNextBatch(_limits);
+
+ auto nextBatchFuture = _oplogBatcher->getNextBatch(
+ TenantOplogBatcher::BatchLimits(std::size_t(tenantApplierBatchSizeBytes.load()),
+ std::size_t(tenantApplierBatchSizeOps.load())));
try {
_applyOplogBatch(&batch);
} catch (const DBException& e) {
@@ -329,8 +323,8 @@ TenantOplogApplier::OpTimePair TenantOplogApplier::_writeNoOpEntries(
_setRecipientOpTime(op.entry.getOpTime(), *slotIter++);
}
const size_t numOplogThreads = _writerPool->getStats().numThreads;
- const size_t numOpsPerThread =
- std::max(kMinOplogEntriesPerThread, (batch.ops.size() / numOplogThreads));
+ const size_t numOpsPerThread = std::max(std::size_t(minOplogEntriesPerThread.load()),
+ (batch.ops.size() / numOplogThreads));
slotIter = oplogSlots.begin();
auto opsIter = batch.ops.begin();
LOGV2_DEBUG(4886003,
@@ -540,7 +534,7 @@ Status TenantOplogApplier::_applyOplogBatchPerWorker(std::vector<const OplogEntr
}
std::unique_ptr<ThreadPool> makeTenantMigrationWriterPool() {
- return makeTenantMigrationWriterPool(kTenantApplierThreadCount);
+ return makeTenantMigrationWriterPool(tenantApplierThreadCount);
}
std::unique_ptr<ThreadPool> makeTenantMigrationWriterPool(int threadCount) {
diff --git a/src/mongo/db/repl/tenant_oplog_applier.h b/src/mongo/db/repl/tenant_oplog_applier.h
index 8b2f22ced49..a9fc4c7a830 100644
--- a/src/mongo/db/repl/tenant_oplog_applier.h
+++ b/src/mongo/db/repl/tenant_oplog_applier.h
@@ -88,10 +88,6 @@ public:
*/
SemiFuture<OpTimePair> getNotificationForOpTime(OpTime donorOpTime);
- void setBatchLimits_forTest(TenantOplogBatcher::BatchLimits limits) {
- _limits = limits;
- }
-
private:
Status _doStartup_inlock() noexcept final;
void _doShutdown_inlock() noexcept final;
@@ -148,7 +144,6 @@ private:
// Pool of worker threads for writing ops to the databases.
// Not owned by us.
ThreadPool* const _writerPool; // (S)
- TenantOplogBatcher::BatchLimits _limits; // (R)
std::map<OpTime, SharedPromise<OpTimePair>> _opTimeNotificationList; // (M)
Status _finalStatus = Status::OK(); // (M)
stdx::unordered_set<UUID, UUID::Hash> _knownGoodUuids; // (X)
diff --git a/src/mongo/db/repl/tenant_oplog_applier_test.cpp b/src/mongo/db/repl/tenant_oplog_applier_test.cpp
index 535e8b3537c..cee53e4286e 100644
--- a/src/mongo/db/repl/tenant_oplog_applier_test.cpp
+++ b/src/mongo/db/repl/tenant_oplog_applier_test.cpp
@@ -39,6 +39,7 @@
#include "mongo/db/op_observer_registry.h"
#include "mongo/db/repl/oplog_applier_impl_test_fixture.h"
#include "mongo/db/repl/oplog_batcher_test_fixture.h"
+#include "mongo/db/repl/repl_server_parameters_gen.h"
#include "mongo/db/repl/replication_coordinator_mock.h"
#include "mongo/db/repl/storage_interface_impl.h"
#include "mongo/db/repl/tenant_migration_decoration.h"
@@ -117,6 +118,11 @@ public:
void setUp() override {
ServiceContextMongoDTest::setUp();
+ // These defaults are generated from the repl_server_paremeters IDL file. Set them here
+ // to start each test case from a clean state.
+ tenantApplierBatchSizeBytes.store(kTenantApplierBatchSizeBytesDefault);
+ tenantApplierBatchSizeOps.store(kTenantApplierBatchSizeOpsDefault);
+
// Set up an OpObserver to track the documents OplogApplierImpl inserts.
auto service = getServiceContext();
auto opObserver = std::make_unique<TenantOplogApplierTestOpObserver>();
@@ -247,8 +253,9 @@ TEST_F(TenantOplogApplierTest, NoOpsForMultipleBatches) {
TenantOplogApplier applier(
_migrationUuid, _tenantId, OpTime(), &_oplogBuffer, _executor, writerPool.get());
- TenantOplogBatcher::BatchLimits smallLimits(100 * 1024 /* bytes */, 2 /*ops*/);
- applier.setBatchLimits_forTest(smallLimits);
+ tenantApplierBatchSizeBytes.store(100 * 1024 /* bytes */);
+ tenantApplierBatchSizeOps.store(2 /* ops */);
+
ASSERT_OK(applier.startup());
auto firstBatchFuture = applier.getNotificationForOpTime(srcOps[0].getOpTime());
auto secondBatchFuture = applier.getNotificationForOpTime(srcOps[2].getOpTime());
@@ -783,8 +790,10 @@ TEST_F(TenantOplogApplierTest, ApplyInsertThenResumeTokenNoopInDifferentBatch_Su
TenantOplogApplier applier(
_migrationUuid, _tenantId, OpTime(), &_oplogBuffer, _executor, writerPool.get());
- TenantOplogBatcher::BatchLimits smallLimits(100 * 1024 /* bytes */, 1 /*ops*/);
- applier.setBatchLimits_forTest(smallLimits);
+
+ tenantApplierBatchSizeBytes.store(100 * 1024 /* bytes */);
+ tenantApplierBatchSizeOps.store(1 /* ops */);
+
ASSERT_OK(applier.startup());
auto opAppliedFuture = applier.getNotificationForOpTime(srcOps[1].getOpTime());
auto futureRes = opAppliedFuture.getNoThrow();