summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBenety Goh <benety@mongodb.com>2016-05-31 14:21:02 -0400
committerBenety Goh <benety@mongodb.com>2016-06-01 10:58:01 -0400
commit9ea1e5b3845719b1f97863fb440f30bef22b3f32 (patch)
treee53a075ca1d7fef424fc39f9145d2c0dcd2ba04a
parentf6e1e5a946acfd867fa15fb8cc7e2276f2d96b04 (diff)
downloadmongo-9ea1e5b3845719b1f97863fb440f30bef22b3f32.tar.gz
SERVER-24292 SyncTail accepts optional writer thread pool at construction
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.cpp19
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.h8
-rw-r--r--src/mongo/db/repl/sync_tail.cpp36
-rw-r--r--src/mongo/db/repl/sync_tail.h16
4 files changed, 46 insertions, 33 deletions
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
index 8c35effae54..1f50f0cb2f3 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
@@ -57,6 +57,7 @@
#include "mongo/db/repl/rs_sync.h"
#include "mongo/db/repl/snapshot_thread.h"
#include "mongo/db/repl/storage_interface.h"
+#include "mongo/db/repl/sync_tail.h"
#include "mongo/db/s/sharding_state.h"
#include "mongo/db/s/sharding_state_recovery.h"
#include "mongo/db/server_parameters.h"
@@ -141,11 +142,7 @@ void ReplicationCoordinatorExternalStateImpl::startThreads(const ReplSettings& s
}
getGlobalServiceContext()->getGlobalStorageEngine()->setJournalListener(this);
- // repl::multiInitialSyncApply uses SyncTail::shouldRetry() (and implicitly getMissingDoc())
- // to fetch missing documents during initial sync. Therefore, it is fine to construct SyncTail
- // with invalid BackgroundSync and MultiSyncApplyFunc arguments because we will not be accessing
- // any SyncTail functionality that require these constructor parameters.
- _syncTail = stdx::make_unique<SyncTail>(nullptr, SyncTail::MultiSyncApplyFunc());
+ _writerPool = SyncTail::makeWriterPool();
_startedThreads = true;
}
@@ -489,7 +486,7 @@ StatusWith<OpTime> ReplicationCoordinatorExternalStateImpl::multiApply(
OperationContext* txn,
const MultiApplier::Operations& ops,
MultiApplier::ApplyOperationFn applyOperation) {
- return repl::multiApply(txn, _syncTail->getWriterPool(), ops, applyOperation);
+ return repl::multiApply(txn, _writerPool.get(), ops, applyOperation);
}
void ReplicationCoordinatorExternalStateImpl::multiSyncApply(const MultiApplier::Operations& ops) {
@@ -499,8 +496,14 @@ void ReplicationCoordinatorExternalStateImpl::multiSyncApply(const MultiApplier:
void ReplicationCoordinatorExternalStateImpl::multiInitialSyncApply(
const MultiApplier::Operations& ops, const HostAndPort& source) {
- _syncTail->setHostname(source.toString());
- repl::multiInitialSyncApply(ops, _syncTail.get());
+
+ // repl::multiInitialSyncApply uses SyncTail::shouldRetry() (and implicitly getMissingDoc())
+ // to fetch missing documents during initial sync. Therefore, it is fine to construct SyncTail
+ // with invalid BackgroundSync, MultiSyncApplyFunc and writerPool arguments because we will not
+ // be accessing any SyncTail functionality that require these constructor parameters.
+ SyncTail syncTail(nullptr, SyncTail::MultiSyncApplyFunc(), nullptr);
+ syncTail.setHostname(source.toString());
+ repl::multiInitialSyncApply(ops, &syncTail);
}
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.h b/src/mongo/db/repl/replication_coordinator_external_state_impl.h
index 4e524e4c67a..f73fc647a03 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.h
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.h
@@ -35,11 +35,11 @@
#include "mongo/db/repl/bgsync.h"
#include "mongo/db/repl/replication_coordinator_external_state.h"
#include "mongo/db/repl/sync_source_feedback.h"
-#include "mongo/db/repl/sync_tail.h"
#include "mongo/db/storage/journal_listener.h"
#include "mongo/db/storage/snapshot_manager.h"
#include "mongo/stdx/mutex.h"
#include "mongo/stdx/thread.h"
+#include "mongo/util/concurrency/old_thread_pool.h"
namespace mongo {
namespace repl {
@@ -140,10 +140,8 @@ private:
StartSteadyReplicationFn _startSteadReplicationFn;
std::unique_ptr<stdx::thread> _initialSyncThread;
- // Used by multiApply(), multiSyncApply() and multiInitialSyncApply() to apply operations read
- // from sync source. The thread pool owned by "_syncTail" is used by repl::multiApply() to apply
- // the sync source's operations in parallel.
- std::unique_ptr<SyncTail> _syncTail;
+ // Used by repl::multiApply() to apply the sync source's operations in parallel.
+ std::unique_ptr<OldThreadPool> _writerPool;
};
} // namespace repl
diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp
index e7988fa7de5..30d8dcd6bcd 100644
--- a/src/mongo/db/repl/sync_tail.cpp
+++ b/src/mongo/db/repl/sync_tail.cpp
@@ -66,6 +66,7 @@
#include "mongo/db/server_parameters.h"
#include "mongo/db/service_context.h"
#include "mongo/db/stats/timer_stats.h"
+#include "mongo/stdx/memory.h"
#include "mongo/util/exit.h"
#include "mongo/util/fail_point_service.h"
#include "mongo/util/log.h"
@@ -78,24 +79,28 @@ namespace mongo {
using std::endl;
namespace repl {
+
+/**
+ * This variable determines the number of writer threads SyncTail will have. It has a default
+ * value, which varies based on architecture and can be overridden using the
+ * "replWriterThreadCount" server parameter.
+ */
+namespace {
#if defined(MONGO_PLATFORM_64)
-int SyncTail::replWriterThreadCount = 16;
-const int replPrefetcherThreadCount = 16;
+int replWriterThreadCount = 16;
#elif defined(MONGO_PLATFORM_32)
-int SyncTail::replWriterThreadCount = 2;
-const int replPrefetcherThreadCount = 2;
+int replWriterThreadCount = 2;
#else
#error need to include something that defines MONGO_PLATFORM_XX
#endif
+} // namespace
class ExportedWriterThreadCountParameter
: public ExportedServerParameter<int, ServerParameterType::kStartupOnly> {
public:
ExportedWriterThreadCountParameter()
: ExportedServerParameter<int, ServerParameterType::kStartupOnly>(
- ServerParameterSet::getGlobal(),
- "replWriterThreadCount",
- &SyncTail::replWriterThreadCount) {}
+ ServerParameterSet::getGlobal(), "replWriterThreadCount", &replWriterThreadCount) {}
virtual Status validate(const int& potentialNewValue) {
if (potentialNewValue < 1 || potentialNewValue > 256) {
@@ -277,12 +282,19 @@ void ApplyBatchFinalizerForJournal::_run() {
} // anonymous namespace containing ApplyBatchFinalizer definitions.
SyncTail::SyncTail(BackgroundSync* q, MultiSyncApplyFunc func)
- : _networkQueue(q),
- _applyFunc(func),
- _writerPool(replWriterThreadCount, "repl writer worker ") {}
+ : SyncTail(q, func, makeWriterPool()) {}
+
+SyncTail::SyncTail(BackgroundSync* q,
+ MultiSyncApplyFunc func,
+ std::unique_ptr<OldThreadPool> writerPool)
+ : _networkQueue(q), _applyFunc(func), _writerPool(std::move(writerPool)) {}
SyncTail::~SyncTail() {}
+std::unique_ptr<OldThreadPool> SyncTail::makeWriterPool() {
+ return stdx::make_unique<OldThreadPool>(replWriterThreadCount, "repl writer worker ");
+}
+
bool SyncTail::peek(BSONObj* op) {
return _networkQueue->peek(op);
}
@@ -534,7 +546,7 @@ OpTime SyncTail::multiApply(OperationContext* txn, const OpQueue& ops) {
auto applyOperation = [this](const MultiApplier::Operations& ops) { _applyFunc(ops, this); };
auto status =
repl::multiApply(txn,
- &_writerPool,
+ _writerPool.get(),
MultiApplier::Operations(convertToVector.begin(), convertToVector.end()),
applyOperation);
if (!status.isOK()) {
@@ -868,7 +880,7 @@ void SyncTail::setHostname(const std::string& hostname) {
}
OldThreadPool* SyncTail::getWriterPool() {
- return &_writerPool;
+ return _writerPool.get();
}
BSONObj SyncTail::getMissingDoc(OperationContext* txn, Database* db, const BSONObj& o) {
diff --git a/src/mongo/db/repl/sync_tail.h b/src/mongo/db/repl/sync_tail.h
index 9480542e966..745d5287a18 100644
--- a/src/mongo/db/repl/sync_tail.h
+++ b/src/mongo/db/repl/sync_tail.h
@@ -29,6 +29,7 @@
#pragma once
#include <deque>
+#include <memory>
#include "mongo/base/status.h"
#include "mongo/bson/bsonobj.h"
@@ -84,9 +85,15 @@ public:
using ApplyCommandInLockFn = stdx::function<Status(OperationContext*, const BSONObj&)>;
SyncTail(BackgroundSync* q, MultiSyncApplyFunc func);
+ SyncTail(BackgroundSync* q, MultiSyncApplyFunc func, std::unique_ptr<OldThreadPool> writerPool);
virtual ~SyncTail();
/**
+ * Creates thread pool for writer tasks.
+ */
+ static std::unique_ptr<OldThreadPool> makeWriterPool();
+
+ /**
* Applies the operation that is in param o.
* Functions for applying operations/commands and increment server status counters may
* be overridden for testing.
@@ -151,13 +158,6 @@ public:
*/
OldThreadPool* getWriterPool();
- /**
- * This variable determines the number of writer threads SyncTail will have. It has a default
- * value, which varies based on architecture and can be overridden using the
- * "replWriterThreadCount" server parameter.
- */
- static int replWriterThreadCount;
-
protected:
// Cap the batches using the limit on journal commits.
// This works out to be 100 MB (64 bit) or 50 MB (32 bit)
@@ -180,7 +180,7 @@ private:
MultiSyncApplyFunc _applyFunc;
// persistent pool of worker threads for writing ops to the databases
- OldThreadPool _writerPool;
+ std::unique_ptr<OldThreadPool> _writerPool;
};
/**