summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBenety Goh <benety@mongodb.com>2016-05-25 14:18:16 -0400
committerBenety Goh <benety@mongodb.com>2016-05-26 10:48:36 -0400
commit327de7a490f182e37719a9205a597285e7dce7c2 (patch)
tree0dc234087558ea456d4b221b869bd44fdbe43ab3
parentcc6ee9cf116853289ee41784220b3ce8ed14c29c (diff)
downloadmongo-327de7a490f182e37719a9205a597285e7dce7c2.tar.gz
SERVER-24292 repl::multiApply uses external thread pool
-rw-r--r--src/mongo/db/repl/multiapplier.h2
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.cpp20
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.h6
-rw-r--r--src/mongo/db/repl/sync_tail.cpp19
-rw-r--r--src/mongo/db/repl/sync_tail.h9
-rw-r--r--src/mongo/util/concurrency/old_thread_pool.cpp4
-rw-r--r--src/mongo/util/concurrency/old_thread_pool.h3
-rw-r--r--src/mongo/util/concurrency/thread_pool.cpp2
-rw-r--r--src/mongo/util/concurrency/thread_pool.h4
9 files changed, 45 insertions, 24 deletions
diff --git a/src/mongo/db/repl/multiapplier.h b/src/mongo/db/repl/multiapplier.h
index f3d5ea3badf..e4f355ebd80 100644
--- a/src/mongo/db/repl/multiapplier.h
+++ b/src/mongo/db/repl/multiapplier.h
@@ -53,8 +53,6 @@ class MultiApplier {
MONGO_DISALLOW_COPYING(MultiApplier);
public:
- static const size_t kReplWriterThreadCount = 16;
-
/**
* Operations sorted by timestamp in ascending order.
*/
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
index e5042d8f24b..d7e413b590e 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
@@ -57,7 +57,6 @@
#include "mongo/db/repl/rs_initialsync.h"
#include "mongo/db/repl/snapshot_thread.h"
#include "mongo/db/repl/storage_interface.h"
-#include "mongo/db/repl/sync_tail.h"
#include "mongo/db/server_parameters.h"
#include "mongo/db/service_context.h"
#include "mongo/db/s/sharding_state.h"
@@ -67,6 +66,7 @@
#include "mongo/s/grid.h"
#include "mongo/s/client/shard_registry.h"
#include "mongo/stdx/functional.h"
+#include "mongo/stdx/memory.h"
#include "mongo/stdx/thread.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/exit.h"
@@ -141,6 +141,13 @@ void ReplicationCoordinatorExternalStateImpl::startThreads(const ReplSettings& s
_snapshotThread = SnapshotThread::start(getGlobalServiceContext());
}
getGlobalServiceContext()->getGlobalStorageEngine()->setJournalListener(this);
+
+ // repl::multiInitialSyncApply uses SyncTail::shouldRetry() (and implicitly getMissingDoc())
+ // to fetch missing documents during initial sync. Therefore, it is fine to construct SyncTail
+ // with invalid BackgroundSync and MultiSyncApplyFunc arguments because we will not be accessing
+ // any SyncTail functionality that require these constructor parameters.
+ _syncTail = stdx::make_unique<SyncTail>(nullptr, SyncTail::MultiSyncApplyFunc());
+
_startedThreads = true;
}
@@ -506,7 +513,7 @@ StatusWith<OpTime> ReplicationCoordinatorExternalStateImpl::multiApply(
OperationContext* txn,
const MultiApplier::Operations& ops,
MultiApplier::ApplyOperationFn applyOperation) {
- return repl::multiApply(txn, ops, applyOperation);
+ return repl::multiApply(txn, _syncTail->getWriterPool(), ops, applyOperation);
}
void ReplicationCoordinatorExternalStateImpl::multiSyncApply(const MultiApplier::Operations& ops) {
@@ -516,13 +523,8 @@ void ReplicationCoordinatorExternalStateImpl::multiSyncApply(const MultiApplier:
void ReplicationCoordinatorExternalStateImpl::multiInitialSyncApply(
const MultiApplier::Operations& ops, const HostAndPort& source) {
- // repl::multiInitialSyncApply uses SyncTail::shouldRetry() (and implicitly getMissingDoc())
- // to fetch missing documents during initial sync. Therefore, it is fine to construct SyncTail
- // with invalid BackgroundSync and MultiSyncApplyFunc arguments because we will not be accessing
- // any SyncTail functionality that require these constructor parameters.
- SyncTail syncTail(nullptr, SyncTail::MultiSyncApplyFunc());
- syncTail.setHostname(source.toString());
- repl::multiInitialSyncApply(ops, &syncTail);
+ _syncTail->setHostname(source.toString());
+ repl::multiInitialSyncApply(ops, _syncTail.get());
}
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.h b/src/mongo/db/repl/replication_coordinator_external_state_impl.h
index e8b8aaa7207..a74976dbc5e 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.h
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.h
@@ -34,6 +34,7 @@
#include "mongo/db/concurrency/d_concurrency.h"
#include "mongo/db/repl/replication_coordinator_external_state.h"
#include "mongo/db/repl/sync_source_feedback.h"
+#include "mongo/db/repl/sync_tail.h"
#include "mongo/db/storage/journal_listener.h"
#include "mongo/db/storage/snapshot_manager.h"
#include "mongo/stdx/mutex.h"
@@ -132,6 +133,11 @@ private:
StartInitialSyncFn _startInitialSyncIfNeededFn;
StartSteadyReplicationFn _startSteadReplicationFn;
std::unique_ptr<stdx::thread> _initialSyncThread;
+
+ // Used by multiApply(), multiSyncApply() and multiInitialSyncApply() to apply operations read
+ // from sync source. The thread pool owned by "_syncTail" is used by repl::multiApply() to apply
+ // the sync source's operations in parallel.
+ std::unique_ptr<SyncTail> _syncTail;
};
} // namespace repl
diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp
index 3356afce60d..ebaff27d29c 100644
--- a/src/mongo/db/repl/sync_tail.cpp
+++ b/src/mongo/db/repl/sync_tail.cpp
@@ -278,8 +278,7 @@ void ApplyBatchFinalizerForJournal::_run() {
SyncTail::SyncTail(BackgroundSyncInterface* q, MultiSyncApplyFunc func)
: _networkQueue(q),
_applyFunc(func),
- _writerPool(replWriterThreadCount, "repl writer worker "),
- _prefetcherPool(replPrefetcherThreadCount, "repl prefetch worker ") {}
+ _writerPool(replWriterThreadCount, "repl writer worker ") {}
SyncTail::~SyncTail() {}
@@ -534,6 +533,7 @@ OpTime SyncTail::multiApply(OperationContext* txn, const OpQueue& ops) {
auto applyOperation = [this](const MultiApplier::Operations& ops) { _applyFunc(ops, this); };
auto status =
repl::multiApply(txn,
+ &_writerPool,
MultiApplier::Operations(convertToVector.begin(), convertToVector.end()),
applyOperation);
if (!status.isOK()) {
@@ -865,6 +865,10 @@ void SyncTail::setHostname(const std::string& hostname) {
_hostname = hostname;
}
+OldThreadPool* SyncTail::getWriterPool() {
+ return &_writerPool;
+}
+
BSONObj SyncTail::getMissingDoc(OperationContext* txn, Database* db, const BSONObj& o) {
OplogReader missingObjReader; // why are we using OplogReader to run a non-oplog query?
const char* ns = o.getStringField("ns");
@@ -1149,18 +1153,17 @@ void multiInitialSyncApply(const std::vector<OplogEntry>& ops, SyncTail* st) {
}
StatusWith<OpTime> multiApply(OperationContext* txn,
+ OldThreadPool* workerPool,
const MultiApplier::Operations& ops,
MultiApplier::ApplyOperationFn applyOperation) {
invariant(applyOperation);
- OldThreadPool workerPool(MultiApplier::kReplWriterThreadCount, "repl writer worker ");
-
if (getGlobalServiceContext()->getGlobalStorageEngine()->isMmapV1()) {
// Use a ThreadPool to prefetch all the operations in a batch.
- prefetchOps(ops, &workerPool);
+ prefetchOps(ops, workerPool);
}
- std::vector<std::vector<OplogEntry>> writerVectors(MultiApplier::kReplWriterThreadCount);
+ std::vector<std::vector<OplogEntry>> writerVectors(workerPool->getNumThreads());
fillWriterVectors(txn, ops, &writerVectors);
LOG(2) << "replication batch size is " << ops.size();
@@ -1179,11 +1182,11 @@ StatusWith<OpTime> multiApply(OperationContext* txn,
"attempting to replicate ops while primary"};
}
- applyOps(writerVectors, &workerPool, applyOperation);
+ applyOps(writerVectors, workerPool, applyOperation);
OpTime lastOpTime;
{
- ON_BLOCK_EXIT([&] { workerPool.join(); });
+ ON_BLOCK_EXIT([&] { workerPool->join(); });
std::vector<BSONObj> raws;
raws.reserve(ops.size());
for (auto&& op : ops) {
diff --git a/src/mongo/db/repl/sync_tail.h b/src/mongo/db/repl/sync_tail.h
index 21459b723a6..41311d3242f 100644
--- a/src/mongo/db/repl/sync_tail.h
+++ b/src/mongo/db/repl/sync_tail.h
@@ -146,6 +146,12 @@ public:
void setHostname(const std::string& hostname);
/**
+ * Returns writer thread pool.
+ * Used by ReplicationCoordinatorExternalStateImpl only.
+ */
+ OldThreadPool* getWriterPool();
+
+ /**
* This variable determines the number of writer threads SyncTail will have. It has a default
* value, which varies based on architecture and can be overridden using the
* "replWriterThreadCount" server parameter.
@@ -175,8 +181,6 @@ private:
// persistent pool of worker threads for writing ops to the databases
OldThreadPool _writerPool;
- // persistent pool of worker threads for prefetching
- OldThreadPool _prefetcherPool;
};
/**
@@ -190,6 +194,7 @@ private:
* Shared between here and MultiApplier.
*/
StatusWith<OpTime> multiApply(OperationContext* txn,
+ OldThreadPool* workerPool,
const MultiApplier::Operations& ops,
MultiApplier::ApplyOperationFn applyOperation);
diff --git a/src/mongo/util/concurrency/old_thread_pool.cpp b/src/mongo/util/concurrency/old_thread_pool.cpp
index 63e7c34e070..c168ae49e6b 100644
--- a/src/mongo/util/concurrency/old_thread_pool.cpp
+++ b/src/mongo/util/concurrency/old_thread_pool.cpp
@@ -60,6 +60,10 @@ OldThreadPool::OldThreadPool(const DoNotStartThreadsTag&,
const std::string& threadNamePrefix)
: _pool(makeOptions(nThreads, threadNamePrefix)) {}
+std::size_t OldThreadPool::getNumThreads() const {
+ return _pool.getStats().numThreads;
+}
+
void OldThreadPool::startThreads() {
_pool.startup();
}
diff --git a/src/mongo/util/concurrency/old_thread_pool.h b/src/mongo/util/concurrency/old_thread_pool.h
index df0d133367f..efa57c14670 100644
--- a/src/mongo/util/concurrency/old_thread_pool.h
+++ b/src/mongo/util/concurrency/old_thread_pool.h
@@ -27,6 +27,7 @@
#pragma once
+#include <cstddef>
#include <string>
#include "mongo/base/disallow_copying.h"
@@ -53,6 +54,8 @@ public:
int nThreads = 8,
const std::string& threadNamePrefix = "");
+ std::size_t getNumThreads() const;
+
// Launches the worker threads; call exactly once, if and only if
// you used the DoNotStartThreadsTag form of the constructor.
void startThreads();
diff --git a/src/mongo/util/concurrency/thread_pool.cpp b/src/mongo/util/concurrency/thread_pool.cpp
index 2be5983c535..4a5e456b972 100644
--- a/src/mongo/util/concurrency/thread_pool.cpp
+++ b/src/mongo/util/concurrency/thread_pool.cpp
@@ -208,7 +208,7 @@ void ThreadPool::waitForIdle() {
}
}
-ThreadPool::Stats ThreadPool::getStats() {
+ThreadPool::Stats ThreadPool::getStats() const {
stdx::lock_guard<stdx::mutex> lk(_mutex);
Stats result;
result.options = _options;
diff --git a/src/mongo/util/concurrency/thread_pool.h b/src/mongo/util/concurrency/thread_pool.h
index f34872e5209..35d817febfa 100644
--- a/src/mongo/util/concurrency/thread_pool.h
+++ b/src/mongo/util/concurrency/thread_pool.h
@@ -130,7 +130,7 @@ public:
/**
* Returns statistics about the thread pool's utilization.
*/
- Stats getStats();
+ Stats getStats() const;
private:
using TaskList = std::deque<Task>;
@@ -197,7 +197,7 @@ private:
const Options _options;
// Mutex guarding all non-const member variables.
- stdx::mutex _mutex;
+ mutable stdx::mutex _mutex;
// This variable represents the lifecycle state of the pool.
//