diff options
author | Benety Goh <benety@mongodb.com> | 2018-05-31 16:56:59 -0400 |
---|---|---|
committer | Benety Goh <benety@mongodb.com> | 2018-05-31 16:56:59 -0400 |
commit | 392a831a351f4a8229fef5efc3d30b94abff379d (patch) | |
tree | b53865e6dd0a994dd8ac41eb6ec42148faf855a1 /src/mongo | |
parent | 2f8c3ec5a43229e6ed6b585c9421ad6d0ee12a28 (diff) | |
download | mongo-392a831a351f4a8229fef5efc3d30b94abff379d.tar.gz |
SERVER-32335 move SyncTail::makeWriterPool to OplogApplier
Diffstat (limited to 'src/mongo')
-rw-r--r-- | src/mongo/db/repl/oplog_applier.cpp | 41 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_applier.h | 6 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_external_state_impl.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_recovery.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_tail.cpp | 45 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_tail.h | 6 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_tail_test.cpp | 16 | ||||
-rw-r--r-- | src/mongo/dbtests/storage_timestamp_tests.cpp | 6 |
8 files changed, 60 insertions, 64 deletions
diff --git a/src/mongo/db/repl/oplog_applier.cpp b/src/mongo/db/repl/oplog_applier.cpp index 9c31f9f89a1..4e61ab8c79d 100644 --- a/src/mongo/db/repl/oplog_applier.cpp +++ b/src/mongo/db/repl/oplog_applier.cpp @@ -32,14 +32,55 @@ #include "mongo/db/repl/oplog_applier.h" +#include "mongo/db/auth/authorization_session.h" #include "mongo/db/repl/sync_tail.h" +#include "mongo/db/server_parameters.h" #include "mongo/util/log.h" namespace mongo { namespace repl { +namespace { + +/** + * This server parameter determines the number of writer threads OplogApplier will have. + */ +MONGO_EXPORT_STARTUP_SERVER_PARAMETER(replWriterThreadCount, int, 16) + ->withValidator([](const int& newVal) { + if (newVal < 1 || newVal > 256) { + return Status(ErrorCodes::BadValue, "replWriterThreadCount must be between 1 and 256"); + } + + return Status::OK(); + }); + +} // namespace + using CallbackArgs = executor::TaskExecutor::CallbackArgs; +// static +std::unique_ptr<ThreadPool> OplogApplier::makeWriterPool() { + return makeWriterPool(replWriterThreadCount); +} + +// static +std::unique_ptr<ThreadPool> OplogApplier::makeWriterPool(int threadCount) { + ThreadPool::Options options; + options.threadNamePrefix = "repl writer worker "; + options.poolName = "repl writer worker Pool"; + options.maxThreads = options.minThreads = static_cast<size_t>(threadCount); + options.onCreateThread = [](const std::string&) { + // Only do this once per thread + if (!Client::getCurrent()) { + Client::initThreadIfNotAlready(); + AuthorizationSession::get(cc())->grantInternalAuthorization(); + } + }; + auto pool = stdx::make_unique<ThreadPool>(options); + pool->startup(); + return pool; +} + OplogApplier::OplogApplier(executor::TaskExecutor* executor, OplogBuffer* oplogBuffer, Observer* observer, diff --git a/src/mongo/db/repl/oplog_applier.h b/src/mongo/db/repl/oplog_applier.h index a9e032ee776..fa7f437ac36 100644 --- a/src/mongo/db/repl/oplog_applier.h +++ b/src/mongo/db/repl/oplog_applier.h @@ -92,6 +92,12 @@ public: using Operations = std::vector<OplogEntry>; /** + * Creates thread pool for writer tasks. + */ + static std::unique_ptr<ThreadPool> makeWriterPool(); + static std::unique_ptr<ThreadPool> makeWriterPool(int threadCount); + + /** * Constructs this OplogApplier with specific options. * Obtains batches of operations from the OplogBuffer to apply. * Reports oplog application progress using the Observer. diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp index e7f96cb50d3..827d194fda4 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp @@ -318,7 +318,7 @@ void ReplicationCoordinatorExternalStateImpl::startThreads(const ReplSettings& s _taskExecutor = makeTaskExecutor(_service, "replication"); _taskExecutor->startup(); - _writerPool = SyncTail::makeWriterPool(); + _writerPool = OplogApplier::makeWriterPool(); _startedThreads = true; } diff --git a/src/mongo/db/repl/replication_recovery.cpp b/src/mongo/db/repl/replication_recovery.cpp index 29205755ba3..153736e018d 100644 --- a/src/mongo/db/repl/replication_recovery.cpp +++ b/src/mongo/db/repl/replication_recovery.cpp @@ -353,7 +353,7 @@ void ReplicationRecoveryImpl::_applyToEndOfOplog(OperationContext* opCtx, RecoveryOplogApplierStats stats; - auto writerPool = SyncTail::makeWriterPool(); + auto writerPool = OplogApplier::makeWriterPool(); OplogApplier::Options options; options.allowNamespaceNotFoundErrorsOnCrudOps = true; options.skipWritesToOplog = true; diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp index 73f82665d88..be688de1b9a 100644 --- a/src/mongo/db/repl/sync_tail.cpp +++ b/src/mongo/db/repl/sync_tail.cpp @@ -39,7 +39,6 @@ #include "mongo/base/counter.h" #include "mongo/bson/bsonelement_comparator.h" #include "mongo/bson/timestamp.h" -#include "mongo/db/auth/authorization_session.h" #include "mongo/db/catalog/collection.h" #include "mongo/db/catalog/database.h" #include "mongo/db/catalog/database_holder.h" @@ -93,29 +92,6 @@ namespace { MONGO_FAIL_POINT_DEFINE(pauseBatchApplicationBeforeCompletion); -/** - * This variable determines the number of writer threads SyncTail will have. It can be overridden - * using the "replWriterThreadCount" server parameter. - */ -int replWriterThreadCount = 16; - -class ExportedWriterThreadCountParameter - : public ExportedServerParameter<int, ServerParameterType::kStartupOnly> { -public: - ExportedWriterThreadCountParameter() - : ExportedServerParameter<int, ServerParameterType::kStartupOnly>( - ServerParameterSet::getGlobal(), "replWriterThreadCount", &replWriterThreadCount) {} - - virtual Status validate(const int& potentialNewValue) { - if (potentialNewValue < 1 || potentialNewValue > 256) { - return Status(ErrorCodes::BadValue, "replWriterThreadCount must be between 1 and 256"); - } - - return Status::OK(); - } - -} exportedWriterThreadCountParam; - class ExportedBatchLimitOperationsParameter : public ExportedServerParameter<int, ServerParameterType::kStartupAndRuntime> { public: @@ -283,27 +259,6 @@ std::size_t SyncTail::calculateBatchLimitBytes(OperationContext* opCtx, return std::min(oplogMaxSize / 10, std::size_t(replBatchLimitBytes)); } -std::unique_ptr<ThreadPool> SyncTail::makeWriterPool() { - return makeWriterPool(replWriterThreadCount); -} - -std::unique_ptr<ThreadPool> SyncTail::makeWriterPool(int threadCount) { - ThreadPool::Options options; - options.threadNamePrefix = "repl writer worker "; - options.poolName = "repl writer worker Pool"; - options.maxThreads = options.minThreads = static_cast<size_t>(threadCount); - options.onCreateThread = [](const std::string&) { - // Only do this once per thread - if (!Client::getCurrent()) { - Client::initThreadIfNotAlready(); - AuthorizationSession::get(cc())->grantInternalAuthorization(); - } - }; - auto pool = stdx::make_unique<ThreadPool>(options); - pool->startup(); - return pool; -} - // static Status SyncTail::syncApply(OperationContext* opCtx, const BSONObj& op, diff --git a/src/mongo/db/repl/sync_tail.h b/src/mongo/db/repl/sync_tail.h index dff6fc32e32..81e4ef4bff7 100644 --- a/src/mongo/db/repl/sync_tail.h +++ b/src/mongo/db/repl/sync_tail.h @@ -89,12 +89,6 @@ public: StorageInterface* storageInterface); /** - * Creates thread pool for writer tasks. - */ - static std::unique_ptr<ThreadPool> makeWriterPool(); - static std::unique_ptr<ThreadPool> makeWriterPool(int threadCount); - - /** * Applies the operation that is in param o. * Functions for applying operations/commands and increment server status counters may * be overridden for testing. diff --git a/src/mongo/db/repl/sync_tail_test.cpp b/src/mongo/db/repl/sync_tail_test.cpp index 7edcc147eef..ca76337d7ec 100644 --- a/src/mongo/db/repl/sync_tail_test.cpp +++ b/src/mongo/db/repl/sync_tail_test.cpp @@ -388,7 +388,7 @@ TEST_F(SyncTailTest, SyncApplyCommandThrowsException) { } DEATH_TEST_F(SyncTailTest, MultiApplyAbortsWhenNoOperationsAreGiven, "!ops.empty()") { - auto writerPool = SyncTail::makeWriterPool(); + auto writerPool = OplogApplier::makeWriterPool(); SyncTail syncTail(nullptr, getConsistencyMarkers(), getStorageInterface(), @@ -402,7 +402,7 @@ bool _testOplogEntryIsForCappedCollection(OperationContext* opCtx, StorageInterface* const storageInterface, const NamespaceString& nss, const CollectionOptions& options) { - auto writerPool = SyncTail::makeWriterPool(); + auto writerPool = OplogApplier::makeWriterPool(); MultiApplier::Operations operationsApplied; auto applyOperationFn = [&operationsApplied](OperationContext* opCtx, MultiApplier::OperationPtrs* operationsToApply, @@ -455,7 +455,7 @@ TEST_F(SyncTailTest, MultiApplyAssignsOperationsToWriterThreadsBasedOnNamespaceH // the number of threads in the pool. NamespaceString nss1("test.t0"); NamespaceString nss2("test.t1"); - auto writerPool = SyncTail::makeWriterPool(2); + auto writerPool = OplogApplier::makeWriterPool(2); stdx::mutex mutex; std::vector<MultiApplier::Operations> operationsApplied; @@ -1613,7 +1613,7 @@ TEST_F(SyncTailTxnTableTest, SimpleWriteWithTxn) { sessionInfo, date); - auto writerPool = SyncTail::makeWriterPool(); + auto writerPool = OplogApplier::makeWriterPool(); SyncTail syncTail( nullptr, getConsistencyMarkers(), getStorageInterface(), multiSyncApply, writerPool.get()); ASSERT_OK(syncTail.multiApply(_opCtx.get(), {insertOp})); @@ -1644,7 +1644,7 @@ TEST_F(SyncTailTxnTableTest, WriteWithTxnMixedWithDirectWriteToTxnTable) { {}, Date_t::now()); - auto writerPool = SyncTail::makeWriterPool(); + auto writerPool = OplogApplier::makeWriterPool(); SyncTail syncTail( nullptr, getConsistencyMarkers(), getStorageInterface(), multiSyncApply, writerPool.get()); ASSERT_OK(syncTail.multiApply(_opCtx.get(), {insertOp, deleteOp})); @@ -1689,7 +1689,7 @@ TEST_F(SyncTailTxnTableTest, InterleavedWriteWithTxnMixedWithDirectDeleteToTxnTa sessionInfo, date); - auto writerPool = SyncTail::makeWriterPool(); + auto writerPool = OplogApplier::makeWriterPool(); SyncTail syncTail( nullptr, getConsistencyMarkers(), getStorageInterface(), multiSyncApply, writerPool.get()); ASSERT_OK(syncTail.multiApply(_opCtx.get(), {insertOp, deleteOp, insertOp2})); @@ -1721,7 +1721,7 @@ TEST_F(SyncTailTxnTableTest, InterleavedWriteWithTxnMixedWithDirectUpdateToTxnTa {}, Date_t::now()); - auto writerPool = SyncTail::makeWriterPool(); + auto writerPool = OplogApplier::makeWriterPool(); SyncTail syncTail( nullptr, getConsistencyMarkers(), getStorageInterface(), multiSyncApply, writerPool.get()); ASSERT_OK(syncTail.multiApply(_opCtx.get(), {insertOp, updateOp})); @@ -1768,7 +1768,7 @@ TEST_F(SyncTailTxnTableTest, MultiApplyUpdatesTheTransactionTable) { auto opNoTxn = makeInsertDocumentOplogEntryWithSessionInfo( {Timestamp(Seconds(7), 0), 1LL}, ns3, BSON("_id" << 0), info); - auto writerPool = SyncTail::makeWriterPool(); + auto writerPool = OplogApplier::makeWriterPool(); SyncTail syncTail( nullptr, getConsistencyMarkers(), getStorageInterface(), multiSyncApply, writerPool.get()); ASSERT_OK(syncTail.multiApply( diff --git a/src/mongo/dbtests/storage_timestamp_tests.cpp b/src/mongo/dbtests/storage_timestamp_tests.cpp index 8a842c8ff2c..3a723ef0cd3 100644 --- a/src/mongo/dbtests/storage_timestamp_tests.cpp +++ b/src/mongo/dbtests/storage_timestamp_tests.cpp @@ -1340,7 +1340,7 @@ public: DoNothingOplogApplierObserver observer; auto storageInterface = repl::StorageInterface::get(_opCtx); - auto writerPool = repl::SyncTail::makeWriterPool(); + auto writerPool = repl::OplogApplier::makeWriterPool(); repl::OplogApplier oplogApplier(nullptr, nullptr, &observer, @@ -1452,7 +1452,7 @@ public: DoNothingOplogApplierObserver observer; auto storageInterface = repl::StorageInterface::get(_opCtx); - auto writerPool = repl::SyncTail::makeWriterPool(); + auto writerPool = repl::OplogApplier::makeWriterPool(); repl::OplogApplier::Options options; options.allowNamespaceNotFoundErrorsOnCrudOps = true; options.missingDocumentSourceForInitialSync = HostAndPort("localhost", 123); @@ -2204,7 +2204,7 @@ public: // Apply the operation. auto storageInterface = repl::StorageInterface::get(_opCtx); - auto writerPool = repl::SyncTail::makeWriterPool(1); + auto writerPool = repl::OplogApplier::makeWriterPool(1); repl::SyncTail syncTail( nullptr, _consistencyMarkers, storageInterface, applyOperationFn, writerPool.get()); auto lastOpTime = unittest::assertGet(syncTail.multiApply(_opCtx, {insertOp})); |