diff options
author | Vishnu Kaushik <vishnu.kaushik@mongodb.com> | 2020-12-01 21:06:15 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-12-02 01:14:08 +0000 |
commit | 597c867f9adb8a74eaa18d3427a889c01f76aa22 (patch) | |
tree | 9f07817863c62fce3f0657fc04a1fccbafaac872 /src/mongo/db | |
parent | 020703354ed815471613c8ae70fbf71c83f3bd1e (diff) | |
download | mongo-597c867f9adb8a74eaa18d3427a889c01f76aa22.tar.gz |
SERVER-50024 make tunable server parameters for TenantOplogProcessing constants
Diffstat (limited to 'src/mongo/db')
-rw-r--r-- | src/mongo/db/repl/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/db/repl/repl_server_parameters.idl | 47 | ||||
-rw-r--r-- | src/mongo/db/repl/tenant_oplog_applier.cpp | 32 | ||||
-rw-r--r-- | src/mongo/db/repl/tenant_oplog_applier.h | 5 | ||||
-rw-r--r-- | src/mongo/db/repl/tenant_oplog_applier_test.cpp | 17 |
5 files changed, 74 insertions, 28 deletions
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index 5844865da81..16d7cc0ab78 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -1776,5 +1776,6 @@ env.Library( 'oplog', 'oplog_application', 'oplog_application_interface', + 'repl_server_parameters' ], ) diff --git a/src/mongo/db/repl/repl_server_parameters.idl b/src/mongo/db/repl/repl_server_parameters.idl index 9d93f2efe83..d0873418a7d 100644 --- a/src/mongo/db/repl/repl_server_parameters.idl +++ b/src/mongo/db/repl/repl_server_parameters.idl @@ -243,6 +243,53 @@ server_parameters: lte: expr: 100 * 1024 * 1024 + # From tenant_oplog_applier.cpp + tenantApplierBatchSizeBytes: + description: The maximum tenant oplog applier batch size in bytes. + set_at: [startup, runtime] + cpp_vartype: AtomicWord<int> + cpp_varname: tenantApplierBatchSizeBytes + default: + expr: 16 * 1024 * 1024 + validator: + gte: + expr: 16 * 1024 * 1024 + lte: + expr: 100 * 1024 * 1024 + + tenantApplierBatchSizeOps: + description: The maximum number of operations in a tenant oplog applier batch. + set_at: [ startup, runtime ] + cpp_vartype: AtomicWord<int> + cpp_varname: tenantApplierBatchSizeOps + default: 500 + validator: + gte: 1 + lte: + expr: 100 * 1000 + + minOplogEntriesPerThread: + description: >- + The minimum number of operations allotted to a tenant oplog applier worker thread. + set_at: [ startup, runtime ] + cpp_vartype: AtomicWord<int> + cpp_varname: minOplogEntriesPerThread + default: 16 + validator: + gte: 1 + lte: 32 + + tenantApplierThreadCount: + description: >- + The number of threads in the tenant migration writer pool used to apply operations. + set_at: startup + cpp_vartype: int + cpp_varname: tenantApplierThreadCount + default: 5 + validator: + gte: 1 + lte: 256 + # New parameters since this file was created, not taken from elsewhere. initialSyncTransientErrorRetryPeriodSeconds: description: >- diff --git a/src/mongo/db/repl/tenant_oplog_applier.cpp b/src/mongo/db/repl/tenant_oplog_applier.cpp index 8ff049135c4..6f32fb7a4a6 100644 --- a/src/mongo/db/repl/tenant_oplog_applier.cpp +++ b/src/mongo/db/repl/tenant_oplog_applier.cpp @@ -45,6 +45,7 @@ #include "mongo/db/repl/cloner_utils.h" #include "mongo/db/repl/insert_group.h" #include "mongo/db/repl/oplog_applier_utils.h" +#include "mongo/db/repl/repl_server_parameters_gen.h" #include "mongo/db/repl/tenant_migration_decoration.h" #include "mongo/db/repl/tenant_migration_recipient_service.h" #include "mongo/db/repl/tenant_oplog_batcher.h" @@ -54,18 +55,6 @@ namespace mongo { namespace repl { -// These batch sizes are pretty arbitrary. -// TODO(SERVER-50024): come up with some reasonable numbers, and make them a settable parameter. -constexpr size_t kTenantApplierBatchSizeBytes = 16 * 1024 * 1024; - -// TODO(SERVER-50024): kTenantApplierBatchSizeOps is currently chosen as the default value of -// internalInsertMaxBatchSize. This is probably reasonable but should be a settable parameter. -constexpr size_t kTenantApplierBatchSizeOps = 500; -const size_t kMinOplogEntriesPerThread = 16; - -// TODO(SERVER-50024):: This is also arbitary and should be a settable parameter. -constexpr size_t kTenantApplierThreadCount = 5; - TenantOplogApplier::TenantOplogApplier(const UUID& migrationUuid, const std::string& tenantId, OpTime applyFromOpTime, @@ -78,8 +67,7 @@ TenantOplogApplier::TenantOplogApplier(const UUID& migrationUuid, _beginApplyingAfterOpTime(applyFromOpTime), _oplogBuffer(oplogBuffer), _executor(std::move(executor)), - _writerPool(writerPool), - _limits(kTenantApplierBatchSizeBytes, kTenantApplierBatchSizeOps) {} + _writerPool(writerPool) {} TenantOplogApplier::~TenantOplogApplier() { shutdown(); @@ -110,7 +98,10 @@ Status TenantOplogApplier::_doStartup_inlock() noexcept { auto status = _oplogBatcher->startup(); if (!status.isOK()) return status; - auto fut = _oplogBatcher->getNextBatch(_limits); + + auto fut = _oplogBatcher->getNextBatch( + TenantOplogBatcher::BatchLimits(std::size_t(tenantApplierBatchSizeBytes.load()), + std::size_t(tenantApplierBatchSizeOps.load()))); std::move(fut) .thenRunOn(_executor) .then([&](TenantOplogBatch batch) { _applyLoop(std::move(batch)); }) @@ -128,7 +119,10 @@ void TenantOplogApplier::_doShutdown_inlock() noexcept { void TenantOplogApplier::_applyLoop(TenantOplogBatch batch) { // Getting the future for the next batch here means the batcher can retrieve the next batch // while the applier is processing the current one. - auto nextBatchFuture = _oplogBatcher->getNextBatch(_limits); + + auto nextBatchFuture = _oplogBatcher->getNextBatch( + TenantOplogBatcher::BatchLimits(std::size_t(tenantApplierBatchSizeBytes.load()), + std::size_t(tenantApplierBatchSizeOps.load()))); try { _applyOplogBatch(&batch); } catch (const DBException& e) { @@ -329,8 +323,8 @@ TenantOplogApplier::OpTimePair TenantOplogApplier::_writeNoOpEntries( _setRecipientOpTime(op.entry.getOpTime(), *slotIter++); } const size_t numOplogThreads = _writerPool->getStats().numThreads; - const size_t numOpsPerThread = - std::max(kMinOplogEntriesPerThread, (batch.ops.size() / numOplogThreads)); + const size_t numOpsPerThread = std::max(std::size_t(minOplogEntriesPerThread.load()), + (batch.ops.size() / numOplogThreads)); slotIter = oplogSlots.begin(); auto opsIter = batch.ops.begin(); LOGV2_DEBUG(4886003, @@ -540,7 +534,7 @@ Status TenantOplogApplier::_applyOplogBatchPerWorker(std::vector<const OplogEntr } std::unique_ptr<ThreadPool> makeTenantMigrationWriterPool() { - return makeTenantMigrationWriterPool(kTenantApplierThreadCount); + return makeTenantMigrationWriterPool(tenantApplierThreadCount); } std::unique_ptr<ThreadPool> makeTenantMigrationWriterPool(int threadCount) { diff --git a/src/mongo/db/repl/tenant_oplog_applier.h b/src/mongo/db/repl/tenant_oplog_applier.h index 8b2f22ced49..a9fc4c7a830 100644 --- a/src/mongo/db/repl/tenant_oplog_applier.h +++ b/src/mongo/db/repl/tenant_oplog_applier.h @@ -88,10 +88,6 @@ public: */ SemiFuture<OpTimePair> getNotificationForOpTime(OpTime donorOpTime); - void setBatchLimits_forTest(TenantOplogBatcher::BatchLimits limits) { - _limits = limits; - } - private: Status _doStartup_inlock() noexcept final; void _doShutdown_inlock() noexcept final; @@ -148,7 +144,6 @@ private: // Pool of worker threads for writing ops to the databases. // Not owned by us. ThreadPool* const _writerPool; // (S) - TenantOplogBatcher::BatchLimits _limits; // (R) std::map<OpTime, SharedPromise<OpTimePair>> _opTimeNotificationList; // (M) Status _finalStatus = Status::OK(); // (M) stdx::unordered_set<UUID, UUID::Hash> _knownGoodUuids; // (X) diff --git a/src/mongo/db/repl/tenant_oplog_applier_test.cpp b/src/mongo/db/repl/tenant_oplog_applier_test.cpp index 535e8b3537c..cee53e4286e 100644 --- a/src/mongo/db/repl/tenant_oplog_applier_test.cpp +++ b/src/mongo/db/repl/tenant_oplog_applier_test.cpp @@ -39,6 +39,7 @@ #include "mongo/db/op_observer_registry.h" #include "mongo/db/repl/oplog_applier_impl_test_fixture.h" #include "mongo/db/repl/oplog_batcher_test_fixture.h" +#include "mongo/db/repl/repl_server_parameters_gen.h" #include "mongo/db/repl/replication_coordinator_mock.h" #include "mongo/db/repl/storage_interface_impl.h" #include "mongo/db/repl/tenant_migration_decoration.h" @@ -117,6 +118,11 @@ public: void setUp() override { ServiceContextMongoDTest::setUp(); + // These defaults are generated from the repl_server_paremeters IDL file. Set them here + // to start each test case from a clean state. + tenantApplierBatchSizeBytes.store(kTenantApplierBatchSizeBytesDefault); + tenantApplierBatchSizeOps.store(kTenantApplierBatchSizeOpsDefault); + // Set up an OpObserver to track the documents OplogApplierImpl inserts. auto service = getServiceContext(); auto opObserver = std::make_unique<TenantOplogApplierTestOpObserver>(); @@ -247,8 +253,9 @@ TEST_F(TenantOplogApplierTest, NoOpsForMultipleBatches) { TenantOplogApplier applier( _migrationUuid, _tenantId, OpTime(), &_oplogBuffer, _executor, writerPool.get()); - TenantOplogBatcher::BatchLimits smallLimits(100 * 1024 /* bytes */, 2 /*ops*/); - applier.setBatchLimits_forTest(smallLimits); + tenantApplierBatchSizeBytes.store(100 * 1024 /* bytes */); + tenantApplierBatchSizeOps.store(2 /* ops */); + ASSERT_OK(applier.startup()); auto firstBatchFuture = applier.getNotificationForOpTime(srcOps[0].getOpTime()); auto secondBatchFuture = applier.getNotificationForOpTime(srcOps[2].getOpTime()); @@ -783,8 +790,10 @@ TEST_F(TenantOplogApplierTest, ApplyInsertThenResumeTokenNoopInDifferentBatch_Su TenantOplogApplier applier( _migrationUuid, _tenantId, OpTime(), &_oplogBuffer, _executor, writerPool.get()); - TenantOplogBatcher::BatchLimits smallLimits(100 * 1024 /* bytes */, 1 /*ops*/); - applier.setBatchLimits_forTest(smallLimits); + + tenantApplierBatchSizeBytes.store(100 * 1024 /* bytes */); + tenantApplierBatchSizeOps.store(1 /* ops */); + ASSERT_OK(applier.startup()); auto opAppliedFuture = applier.getNotificationForOpTime(srcOps[1].getOpTime()); auto futureRes = opAppliedFuture.getNoThrow(); |