diff options
author | Benety Goh <benety@mongodb.com> | 2016-05-25 14:18:16 -0400 |
---|---|---|
committer | Benety Goh <benety@mongodb.com> | 2016-05-26 10:48:36 -0400 |
commit | 327de7a490f182e37719a9205a597285e7dce7c2 (patch) | |
tree | 0dc234087558ea456d4b221b869bd44fdbe43ab3 | |
parent | cc6ee9cf116853289ee41784220b3ce8ed14c29c (diff) | |
download | mongo-327de7a490f182e37719a9205a597285e7dce7c2.tar.gz |
SERVER-24292 repl::multiApply uses external thread pool
-rw-r--r-- | src/mongo/db/repl/multiapplier.h | 2 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_external_state_impl.cpp | 20 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_external_state_impl.h | 6 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_tail.cpp | 19 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_tail.h | 9 | ||||
-rw-r--r-- | src/mongo/util/concurrency/old_thread_pool.cpp | 4 | ||||
-rw-r--r-- | src/mongo/util/concurrency/old_thread_pool.h | 3 | ||||
-rw-r--r-- | src/mongo/util/concurrency/thread_pool.cpp | 2 | ||||
-rw-r--r-- | src/mongo/util/concurrency/thread_pool.h | 4 |
9 files changed, 45 insertions, 24 deletions
diff --git a/src/mongo/db/repl/multiapplier.h b/src/mongo/db/repl/multiapplier.h index f3d5ea3badf..e4f355ebd80 100644 --- a/src/mongo/db/repl/multiapplier.h +++ b/src/mongo/db/repl/multiapplier.h @@ -53,8 +53,6 @@ class MultiApplier { MONGO_DISALLOW_COPYING(MultiApplier); public: - static const size_t kReplWriterThreadCount = 16; - /** * Operations sorted by timestamp in ascending order. */ diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp index e5042d8f24b..d7e413b590e 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp @@ -57,7 +57,6 @@ #include "mongo/db/repl/rs_initialsync.h" #include "mongo/db/repl/snapshot_thread.h" #include "mongo/db/repl/storage_interface.h" -#include "mongo/db/repl/sync_tail.h" #include "mongo/db/server_parameters.h" #include "mongo/db/service_context.h" #include "mongo/db/s/sharding_state.h" @@ -67,6 +66,7 @@ #include "mongo/s/grid.h" #include "mongo/s/client/shard_registry.h" #include "mongo/stdx/functional.h" +#include "mongo/stdx/memory.h" #include "mongo/stdx/thread.h" #include "mongo/util/assert_util.h" #include "mongo/util/exit.h" @@ -141,6 +141,13 @@ void ReplicationCoordinatorExternalStateImpl::startThreads(const ReplSettings& s _snapshotThread = SnapshotThread::start(getGlobalServiceContext()); } getGlobalServiceContext()->getGlobalStorageEngine()->setJournalListener(this); + + // repl::multiInitialSyncApply uses SyncTail::shouldRetry() (and implicitly getMissingDoc()) + // to fetch missing documents during initial sync. Therefore, it is fine to construct SyncTail + // with invalid BackgroundSync and MultiSyncApplyFunc arguments because we will not be accessing + // any SyncTail functionality that require these constructor parameters. + _syncTail = stdx::make_unique<SyncTail>(nullptr, SyncTail::MultiSyncApplyFunc()); + _startedThreads = true; } @@ -506,7 +513,7 @@ StatusWith<OpTime> ReplicationCoordinatorExternalStateImpl::multiApply( OperationContext* txn, const MultiApplier::Operations& ops, MultiApplier::ApplyOperationFn applyOperation) { - return repl::multiApply(txn, ops, applyOperation); + return repl::multiApply(txn, _syncTail->getWriterPool(), ops, applyOperation); } void ReplicationCoordinatorExternalStateImpl::multiSyncApply(const MultiApplier::Operations& ops) { @@ -516,13 +523,8 @@ void ReplicationCoordinatorExternalStateImpl::multiSyncApply(const MultiApplier: void ReplicationCoordinatorExternalStateImpl::multiInitialSyncApply( const MultiApplier::Operations& ops, const HostAndPort& source) { - // repl::multiInitialSyncApply uses SyncTail::shouldRetry() (and implicitly getMissingDoc()) - // to fetch missing documents during initial sync. Therefore, it is fine to construct SyncTail - // with invalid BackgroundSync and MultiSyncApplyFunc arguments because we will not be accessing - // any SyncTail functionality that require these constructor parameters. - SyncTail syncTail(nullptr, SyncTail::MultiSyncApplyFunc()); - syncTail.setHostname(source.toString()); - repl::multiInitialSyncApply(ops, &syncTail); + _syncTail->setHostname(source.toString()); + repl::multiInitialSyncApply(ops, _syncTail.get()); } diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.h b/src/mongo/db/repl/replication_coordinator_external_state_impl.h index e8b8aaa7207..a74976dbc5e 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.h +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.h @@ -34,6 +34,7 @@ #include "mongo/db/concurrency/d_concurrency.h" #include "mongo/db/repl/replication_coordinator_external_state.h" #include "mongo/db/repl/sync_source_feedback.h" +#include "mongo/db/repl/sync_tail.h" #include "mongo/db/storage/journal_listener.h" #include "mongo/db/storage/snapshot_manager.h" #include "mongo/stdx/mutex.h" @@ -132,6 +133,11 @@ private: StartInitialSyncFn _startInitialSyncIfNeededFn; StartSteadyReplicationFn _startSteadReplicationFn; std::unique_ptr<stdx::thread> _initialSyncThread; + + // Used by multiApply(), multiSyncApply() and multiInitialSyncApply() to apply operations read + // from sync source. The thread pool owned by "_syncTail" is used by repl::multiApply() to apply + // the sync source's operations in parallel. + std::unique_ptr<SyncTail> _syncTail; }; } // namespace repl diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp index 3356afce60d..ebaff27d29c 100644 --- a/src/mongo/db/repl/sync_tail.cpp +++ b/src/mongo/db/repl/sync_tail.cpp @@ -278,8 +278,7 @@ void ApplyBatchFinalizerForJournal::_run() { SyncTail::SyncTail(BackgroundSyncInterface* q, MultiSyncApplyFunc func) : _networkQueue(q), _applyFunc(func), - _writerPool(replWriterThreadCount, "repl writer worker "), - _prefetcherPool(replPrefetcherThreadCount, "repl prefetch worker ") {} + _writerPool(replWriterThreadCount, "repl writer worker ") {} SyncTail::~SyncTail() {} @@ -534,6 +533,7 @@ OpTime SyncTail::multiApply(OperationContext* txn, const OpQueue& ops) { auto applyOperation = [this](const MultiApplier::Operations& ops) { _applyFunc(ops, this); }; auto status = repl::multiApply(txn, + &_writerPool, MultiApplier::Operations(convertToVector.begin(), convertToVector.end()), applyOperation); if (!status.isOK()) { @@ -865,6 +865,10 @@ void SyncTail::setHostname(const std::string& hostname) { _hostname = hostname; } +OldThreadPool* SyncTail::getWriterPool() { + return &_writerPool; +} + BSONObj SyncTail::getMissingDoc(OperationContext* txn, Database* db, const BSONObj& o) { OplogReader missingObjReader; // why are we using OplogReader to run a non-oplog query? const char* ns = o.getStringField("ns"); @@ -1149,18 +1153,17 @@ void multiInitialSyncApply(const std::vector<OplogEntry>& ops, SyncTail* st) { } StatusWith<OpTime> multiApply(OperationContext* txn, + OldThreadPool* workerPool, const MultiApplier::Operations& ops, MultiApplier::ApplyOperationFn applyOperation) { invariant(applyOperation); - OldThreadPool workerPool(MultiApplier::kReplWriterThreadCount, "repl writer worker "); - if (getGlobalServiceContext()->getGlobalStorageEngine()->isMmapV1()) { // Use a ThreadPool to prefetch all the operations in a batch. - prefetchOps(ops, &workerPool); + prefetchOps(ops, workerPool); } - std::vector<std::vector<OplogEntry>> writerVectors(MultiApplier::kReplWriterThreadCount); + std::vector<std::vector<OplogEntry>> writerVectors(workerPool->getNumThreads()); fillWriterVectors(txn, ops, &writerVectors); LOG(2) << "replication batch size is " << ops.size(); @@ -1179,11 +1182,11 @@ StatusWith<OpTime> multiApply(OperationContext* txn, "attempting to replicate ops while primary"}; } - applyOps(writerVectors, &workerPool, applyOperation); + applyOps(writerVectors, workerPool, applyOperation); OpTime lastOpTime; { - ON_BLOCK_EXIT([&] { workerPool.join(); }); + ON_BLOCK_EXIT([&] { workerPool->join(); }); std::vector<BSONObj> raws; raws.reserve(ops.size()); for (auto&& op : ops) { diff --git a/src/mongo/db/repl/sync_tail.h b/src/mongo/db/repl/sync_tail.h index 21459b723a6..41311d3242f 100644 --- a/src/mongo/db/repl/sync_tail.h +++ b/src/mongo/db/repl/sync_tail.h @@ -146,6 +146,12 @@ public: void setHostname(const std::string& hostname); /** + * Returns writer thread pool. + * Used by ReplicationCoordinatorExternalStateImpl only. + */ + OldThreadPool* getWriterPool(); + + /** * This variable determines the number of writer threads SyncTail will have. It has a default * value, which varies based on architecture and can be overridden using the * "replWriterThreadCount" server parameter. @@ -175,8 +181,6 @@ private: // persistent pool of worker threads for writing ops to the databases OldThreadPool _writerPool; - // persistent pool of worker threads for prefetching - OldThreadPool _prefetcherPool; }; /** @@ -190,6 +194,7 @@ private: * Shared between here and MultiApplier. */ StatusWith<OpTime> multiApply(OperationContext* txn, + OldThreadPool* workerPool, const MultiApplier::Operations& ops, MultiApplier::ApplyOperationFn applyOperation); diff --git a/src/mongo/util/concurrency/old_thread_pool.cpp b/src/mongo/util/concurrency/old_thread_pool.cpp index 63e7c34e070..c168ae49e6b 100644 --- a/src/mongo/util/concurrency/old_thread_pool.cpp +++ b/src/mongo/util/concurrency/old_thread_pool.cpp @@ -60,6 +60,10 @@ OldThreadPool::OldThreadPool(const DoNotStartThreadsTag&, const std::string& threadNamePrefix) : _pool(makeOptions(nThreads, threadNamePrefix)) {} +std::size_t OldThreadPool::getNumThreads() const { + return _pool.getStats().numThreads; +} + void OldThreadPool::startThreads() { _pool.startup(); } diff --git a/src/mongo/util/concurrency/old_thread_pool.h b/src/mongo/util/concurrency/old_thread_pool.h index df0d133367f..efa57c14670 100644 --- a/src/mongo/util/concurrency/old_thread_pool.h +++ b/src/mongo/util/concurrency/old_thread_pool.h @@ -27,6 +27,7 @@ #pragma once +#include <cstddef> #include <string> #include "mongo/base/disallow_copying.h" @@ -53,6 +54,8 @@ public: int nThreads = 8, const std::string& threadNamePrefix = ""); + std::size_t getNumThreads() const; + // Launches the worker threads; call exactly once, if and only if // you used the DoNotStartThreadsTag form of the constructor. void startThreads(); diff --git a/src/mongo/util/concurrency/thread_pool.cpp b/src/mongo/util/concurrency/thread_pool.cpp index 2be5983c535..4a5e456b972 100644 --- a/src/mongo/util/concurrency/thread_pool.cpp +++ b/src/mongo/util/concurrency/thread_pool.cpp @@ -208,7 +208,7 @@ void ThreadPool::waitForIdle() { } } -ThreadPool::Stats ThreadPool::getStats() { +ThreadPool::Stats ThreadPool::getStats() const { stdx::lock_guard<stdx::mutex> lk(_mutex); Stats result; result.options = _options; diff --git a/src/mongo/util/concurrency/thread_pool.h b/src/mongo/util/concurrency/thread_pool.h index f34872e5209..35d817febfa 100644 --- a/src/mongo/util/concurrency/thread_pool.h +++ b/src/mongo/util/concurrency/thread_pool.h @@ -130,7 +130,7 @@ public: /** * Returns statistics about the thread pool's utilization. */ - Stats getStats(); + Stats getStats() const; private: using TaskList = std::deque<Task>; @@ -197,7 +197,7 @@ private: const Options _options; // Mutex guarding all non-const member variables. - stdx::mutex _mutex; + mutable stdx::mutex _mutex; // This variable represents the lifecycle state of the pool. // |