diff options
author | Benety Goh <benety@mongodb.com> | 2016-05-31 14:21:02 -0400 |
---|---|---|
committer | Benety Goh <benety@mongodb.com> | 2016-06-01 10:58:01 -0400 |
commit | 9ea1e5b3845719b1f97863fb440f30bef22b3f32 (patch) | |
tree | e53a075ca1d7fef424fc39f9145d2c0dcd2ba04a | |
parent | f6e1e5a946acfd867fa15fb8cc7e2276f2d96b04 (diff) | |
download | mongo-9ea1e5b3845719b1f97863fb440f30bef22b3f32.tar.gz |
SERVER-24292 SyncTail accepts optional writer thread pool at construction
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_external_state_impl.cpp | 19 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_external_state_impl.h | 8 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_tail.cpp | 36 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_tail.h | 16 |
4 files changed, 46 insertions, 33 deletions
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp index 8c35effae54..1f50f0cb2f3 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp @@ -57,6 +57,7 @@ #include "mongo/db/repl/rs_sync.h" #include "mongo/db/repl/snapshot_thread.h" #include "mongo/db/repl/storage_interface.h" +#include "mongo/db/repl/sync_tail.h" #include "mongo/db/s/sharding_state.h" #include "mongo/db/s/sharding_state_recovery.h" #include "mongo/db/server_parameters.h" @@ -141,11 +142,7 @@ void ReplicationCoordinatorExternalStateImpl::startThreads(const ReplSettings& s } getGlobalServiceContext()->getGlobalStorageEngine()->setJournalListener(this); - // repl::multiInitialSyncApply uses SyncTail::shouldRetry() (and implicitly getMissingDoc()) - // to fetch missing documents during initial sync. Therefore, it is fine to construct SyncTail - // with invalid BackgroundSync and MultiSyncApplyFunc arguments because we will not be accessing - // any SyncTail functionality that require these constructor parameters. - _syncTail = stdx::make_unique<SyncTail>(nullptr, SyncTail::MultiSyncApplyFunc()); + _writerPool = SyncTail::makeWriterPool(); _startedThreads = true; } @@ -489,7 +486,7 @@ StatusWith<OpTime> ReplicationCoordinatorExternalStateImpl::multiApply( OperationContext* txn, const MultiApplier::Operations& ops, MultiApplier::ApplyOperationFn applyOperation) { - return repl::multiApply(txn, _syncTail->getWriterPool(), ops, applyOperation); + return repl::multiApply(txn, _writerPool.get(), ops, applyOperation); } void ReplicationCoordinatorExternalStateImpl::multiSyncApply(const MultiApplier::Operations& ops) { @@ -499,8 +496,14 @@ void ReplicationCoordinatorExternalStateImpl::multiSyncApply(const MultiApplier: void ReplicationCoordinatorExternalStateImpl::multiInitialSyncApply( const MultiApplier::Operations& ops, const HostAndPort& source) { - _syncTail->setHostname(source.toString()); - repl::multiInitialSyncApply(ops, _syncTail.get()); + + // repl::multiInitialSyncApply uses SyncTail::shouldRetry() (and implicitly getMissingDoc()) + // to fetch missing documents during initial sync. Therefore, it is fine to construct SyncTail + // with invalid BackgroundSync, MultiSyncApplyFunc and writerPool arguments because we will not + // be accessing any SyncTail functionality that require these constructor parameters. + SyncTail syncTail(nullptr, SyncTail::MultiSyncApplyFunc(), nullptr); + syncTail.setHostname(source.toString()); + repl::multiInitialSyncApply(ops, &syncTail); } diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.h b/src/mongo/db/repl/replication_coordinator_external_state_impl.h index 4e524e4c67a..f73fc647a03 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.h +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.h @@ -35,11 +35,11 @@ #include "mongo/db/repl/bgsync.h" #include "mongo/db/repl/replication_coordinator_external_state.h" #include "mongo/db/repl/sync_source_feedback.h" -#include "mongo/db/repl/sync_tail.h" #include "mongo/db/storage/journal_listener.h" #include "mongo/db/storage/snapshot_manager.h" #include "mongo/stdx/mutex.h" #include "mongo/stdx/thread.h" +#include "mongo/util/concurrency/old_thread_pool.h" namespace mongo { namespace repl { @@ -140,10 +140,8 @@ private: StartSteadyReplicationFn _startSteadReplicationFn; std::unique_ptr<stdx::thread> _initialSyncThread; - // Used by multiApply(), multiSyncApply() and multiInitialSyncApply() to apply operations read - // from sync source. The thread pool owned by "_syncTail" is used by repl::multiApply() to apply - // the sync source's operations in parallel. - std::unique_ptr<SyncTail> _syncTail; + // Used by repl::multiApply() to apply the sync source's operations in parallel. + std::unique_ptr<OldThreadPool> _writerPool; }; } // namespace repl diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp index e7988fa7de5..30d8dcd6bcd 100644 --- a/src/mongo/db/repl/sync_tail.cpp +++ b/src/mongo/db/repl/sync_tail.cpp @@ -66,6 +66,7 @@ #include "mongo/db/server_parameters.h" #include "mongo/db/service_context.h" #include "mongo/db/stats/timer_stats.h" +#include "mongo/stdx/memory.h" #include "mongo/util/exit.h" #include "mongo/util/fail_point_service.h" #include "mongo/util/log.h" @@ -78,24 +79,28 @@ namespace mongo { using std::endl; namespace repl { + +/** + * This variable determines the number of writer threads SyncTail will have. It has a default + * value, which varies based on architecture and can be overridden using the + * "replWriterThreadCount" server parameter. + */ +namespace { #if defined(MONGO_PLATFORM_64) -int SyncTail::replWriterThreadCount = 16; -const int replPrefetcherThreadCount = 16; +int replWriterThreadCount = 16; #elif defined(MONGO_PLATFORM_32) -int SyncTail::replWriterThreadCount = 2; -const int replPrefetcherThreadCount = 2; +int replWriterThreadCount = 2; #else #error need to include something that defines MONGO_PLATFORM_XX #endif +} // namespace class ExportedWriterThreadCountParameter : public ExportedServerParameter<int, ServerParameterType::kStartupOnly> { public: ExportedWriterThreadCountParameter() : ExportedServerParameter<int, ServerParameterType::kStartupOnly>( - ServerParameterSet::getGlobal(), - "replWriterThreadCount", - &SyncTail::replWriterThreadCount) {} + ServerParameterSet::getGlobal(), "replWriterThreadCount", &replWriterThreadCount) {} virtual Status validate(const int& potentialNewValue) { if (potentialNewValue < 1 || potentialNewValue > 256) { @@ -277,12 +282,19 @@ void ApplyBatchFinalizerForJournal::_run() { } // anonymous namespace containing ApplyBatchFinalizer definitions. SyncTail::SyncTail(BackgroundSync* q, MultiSyncApplyFunc func) - : _networkQueue(q), - _applyFunc(func), - _writerPool(replWriterThreadCount, "repl writer worker ") {} + : SyncTail(q, func, makeWriterPool()) {} + +SyncTail::SyncTail(BackgroundSync* q, + MultiSyncApplyFunc func, + std::unique_ptr<OldThreadPool> writerPool) + : _networkQueue(q), _applyFunc(func), _writerPool(std::move(writerPool)) {} SyncTail::~SyncTail() {} +std::unique_ptr<OldThreadPool> SyncTail::makeWriterPool() { + return stdx::make_unique<OldThreadPool>(replWriterThreadCount, "repl writer worker "); +} + bool SyncTail::peek(BSONObj* op) { return _networkQueue->peek(op); } @@ -534,7 +546,7 @@ OpTime SyncTail::multiApply(OperationContext* txn, const OpQueue& ops) { auto applyOperation = [this](const MultiApplier::Operations& ops) { _applyFunc(ops, this); }; auto status = repl::multiApply(txn, - &_writerPool, + _writerPool.get(), MultiApplier::Operations(convertToVector.begin(), convertToVector.end()), applyOperation); if (!status.isOK()) { @@ -868,7 +880,7 @@ void SyncTail::setHostname(const std::string& hostname) { } OldThreadPool* SyncTail::getWriterPool() { - return &_writerPool; + return _writerPool.get(); } BSONObj SyncTail::getMissingDoc(OperationContext* txn, Database* db, const BSONObj& o) { diff --git a/src/mongo/db/repl/sync_tail.h b/src/mongo/db/repl/sync_tail.h index 9480542e966..745d5287a18 100644 --- a/src/mongo/db/repl/sync_tail.h +++ b/src/mongo/db/repl/sync_tail.h @@ -29,6 +29,7 @@ #pragma once #include <deque> +#include <memory> #include "mongo/base/status.h" #include "mongo/bson/bsonobj.h" @@ -84,9 +85,15 @@ public: using ApplyCommandInLockFn = stdx::function<Status(OperationContext*, const BSONObj&)>; SyncTail(BackgroundSync* q, MultiSyncApplyFunc func); + SyncTail(BackgroundSync* q, MultiSyncApplyFunc func, std::unique_ptr<OldThreadPool> writerPool); virtual ~SyncTail(); /** + * Creates thread pool for writer tasks. + */ + static std::unique_ptr<OldThreadPool> makeWriterPool(); + + /** * Applies the operation that is in param o. * Functions for applying operations/commands and increment server status counters may * be overridden for testing. @@ -151,13 +158,6 @@ public: */ OldThreadPool* getWriterPool(); - /** - * This variable determines the number of writer threads SyncTail will have. It has a default - * value, which varies based on architecture and can be overridden using the - * "replWriterThreadCount" server parameter. - */ - static int replWriterThreadCount; - protected: // Cap the batches using the limit on journal commits. // This works out to be 100 MB (64 bit) or 50 MB (32 bit) @@ -180,7 +180,7 @@ private: MultiSyncApplyFunc _applyFunc; // persistent pool of worker threads for writing ops to the databases - OldThreadPool _writerPool; + std::unique_ptr<OldThreadPool> _writerPool; }; /** |