summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorScott Hernandez <scotthernandez@gmail.com>2016-07-19 17:15:52 -0400
committerScott Hernandez <scotthernandez@gmail.com>2016-07-23 14:01:04 -0400
commit739f13872ceee7b5301b3699f962cd08bf7d6eb2 (patch)
tree989ca8c4ffcd053d50470551bd068d8fb5e60299 /src/mongo
parent2f6616860247e070d5f2db54f448904546e137d2 (diff)
downloadmongo-739f13872ceee7b5301b3699f962cd08bf7d6eb2.tar.gz
SERVER-23476: break out rs_sync runSyncThread into its own class
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.cpp5
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.h3
-rw-r--r--src/mongo/db/repl/rs_sync.cpp64
-rw-r--r--src/mongo/db/repl/rs_sync.h44
-rw-r--r--src/mongo/db/repl/sync_tail.cpp19
-rw-r--r--src/mongo/db/repl/sync_tail.h2
6 files changed, 104 insertions, 33 deletions
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
index 640e81a39af..d6c49eeedf6 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
@@ -182,7 +182,8 @@ void ReplicationCoordinatorExternalStateImpl::startSteadyStateReplication(Operat
log() << "Starting replication applier threads";
invariant(!_applierThread);
- _applierThread.reset(new stdx::thread(stdx::bind(&runSyncThread, _bgSync.get())));
+ _applierThread.reset(new RSDataSync{_bgSync.get(), ReplicationCoordinator::get(txn)});
+ _applierThread->startup();
log() << "Starting replication reporter thread";
invariant(!_syncSourceFeedbackThread);
_syncSourceFeedbackThread.reset(new stdx::thread(stdx::bind(
@@ -224,7 +225,9 @@ void ReplicationCoordinatorExternalStateImpl::shutdown(OperationContext* txn) {
_syncSourceFeedbackThread->join();
}
if (_applierThread) {
+ _applierThread->shutdown();
_applierThread->join();
+ _applierThread.reset();
}
if (_bgSync) {
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.h b/src/mongo/db/repl/replication_coordinator_external_state_impl.h
index 60349f49a3c..cfff6225fc4 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.h
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.h
@@ -34,6 +34,7 @@
#include "mongo/db/concurrency/d_concurrency.h"
#include "mongo/db/repl/bgsync.h"
#include "mongo/db/repl/replication_coordinator_external_state.h"
+#include "mongo/db/repl/rs_sync.h"
#include "mongo/db/repl/sync_source_feedback.h"
#include "mongo/db/storage/journal_listener.h"
#include "mongo/db/storage/snapshot_manager.h"
@@ -136,7 +137,7 @@ private:
std::unique_ptr<stdx::thread> _syncSourceFeedbackThread;
// Thread running runSyncThread().
- std::unique_ptr<stdx::thread> _applierThread;
+ std::unique_ptr<RSDataSync> _applierThread;
// Mutex guarding the _nextThreadId value to prevent concurrent incrementing.
stdx::mutex _nextThreadIdMutex;
diff --git a/src/mongo/db/repl/rs_sync.cpp b/src/mongo/db/repl/rs_sync.cpp
index 1fe21e6d659..066bdbc0498 100644
--- a/src/mongo/db/repl/rs_sync.cpp
+++ b/src/mongo/db/repl/rs_sync.cpp
@@ -55,32 +55,69 @@
#include "mongo/util/exit.h"
#include "mongo/util/fail_point_service.h"
#include "mongo/util/log.h"
+#include "mongo/util/scopeguard.h"
namespace mongo {
namespace repl {
-void runSyncThread(BackgroundSync* bgsync) {
+namespace {
+using LockGuard = stdx::lock_guard<stdx::mutex>;
+using UniqueLock = stdx::unique_lock<stdx::mutex>;
+
+} // namespace
+
+RSDataSync::RSDataSync(BackgroundSync* bgsync, ReplicationCoordinator* replCoord)
+ : _bgsync(bgsync), _replCoord(replCoord) {}
+
+void RSDataSync::startup() {
+ LockGuard lk(_mutex);
+ _runThread = stdx::make_unique<stdx::thread>(&RSDataSync::_run, this);
+ _stopped = false;
+ _inShutdown = _stopped;
+}
+
+void RSDataSync::shutdown() {
+ LockGuard lk(_mutex);
+ _inShutdown = true;
+}
+
+bool RSDataSync::_isInShutdown() const {
+ LockGuard lk(_mutex);
+ return _inShutdown;
+}
+
+void RSDataSync::join() {
+ UniqueLock lk(_mutex);
+ if (_stopped) {
+ return;
+ }
+ if (_runThread) {
+ lk.unlock();
+ _runThread->join();
+ }
+}
+
+void RSDataSync::_run() {
Client::initThread("rsSync");
AuthorizationSession::get(cc())->grantInternalAuthorization();
- ReplicationCoordinator* replCoord = getGlobalReplicationCoordinator();
// Overwrite prefetch index mode in BackgroundSync if ReplSettings has a mode set.
- ReplSettings replSettings = replCoord->getSettings();
+ auto&& replSettings = _replCoord->getSettings();
if (replSettings.isPrefetchIndexModeSet())
- replCoord->setIndexPrefetchConfig(replSettings.getPrefetchIndexMode());
+ _replCoord->setIndexPrefetchConfig(replSettings.getPrefetchIndexMode());
- while (!inShutdown()) {
+ while (!_isInShutdown()) {
// After a reconfig, we may not be in the replica set anymore, so
// check that we are in the set (and not an arbiter) before
// trying to sync with other replicas.
// TODO(spencer): Use a condition variable to await loading a config
- if (replCoord->getMemberState().startup()) {
+ if (_replCoord->getMemberState().startup()) {
warning() << "did not receive a valid config yet";
sleepsecs(1);
continue;
}
- const MemberState memberState = replCoord->getMemberState();
+ const MemberState memberState = _replCoord->getMemberState();
// An arbiter can never transition to any other state, and doesn't replicate, ever
if (memberState.arbiter()) {
@@ -94,22 +131,25 @@ void runSyncThread(BackgroundSync* bgsync) {
}
try {
- if (memberState.primary() && !replCoord->isWaitingForApplierToDrain()) {
+ if (memberState.primary() && !_replCoord->isWaitingForApplierToDrain()) {
sleepsecs(1);
continue;
}
- if (!replCoord->setFollowerMode(MemberState::RS_RECOVERING)) {
+ if (!_replCoord->setFollowerMode(MemberState::RS_RECOVERING)) {
continue;
}
- /* we have some data. continue tailing. */
- SyncTail tail(bgsync, multiSyncApply);
- tail.oplogApplication();
+ SyncTail tail(_bgsync, multiSyncApply);
+ tail.oplogApplication(_replCoord, [this]() { return _isInShutdown(); });
} catch (...) {
std::terminate();
}
}
+
+ LockGuard lk(_mutex);
+ _inShutdown = false;
+ _stopped = true;
}
} // namespace repl
diff --git a/src/mongo/db/repl/rs_sync.h b/src/mongo/db/repl/rs_sync.h
index 047fdc3d9a6..55fa6182576 100644
--- a/src/mongo/db/repl/rs_sync.h
+++ b/src/mongo/db/repl/rs_sync.h
@@ -28,22 +28,46 @@
#pragma once
-#include <deque>
-#include <vector>
+#include <memory>
+
+#include "mongo/platform/atomic_word.h"
+#include "mongo/stdx/mutex.h"
+#include "mongo/stdx/thread.h"
-#include "mongo/db/client.h"
-#include "mongo/db/jsobj.h"
-#include "mongo/db/repl/initial_sync.h"
-#include "mongo/db/repl/sync_tail.h"
-#include "mongo/db/storage/mmap_v1/dur.h"
-#include "mongo/util/concurrency/old_thread_pool.h"
namespace mongo {
namespace repl {
class BackgroundSync;
+class ReplicationCoordinator;
+
+/**
+ * This class is used to apply
+ **/
+class RSDataSync {
+public:
+ RSDataSync(BackgroundSync* bgsync, ReplicationCoordinator* replCoord);
+ void startup();
+ void shutdown();
+ void join();
+
+private:
+ // Runs in a loop apply oplog entries from the buffer until this class cancels, or an error.
+ void _run();
+ bool _isInShutdown() const;
-// Body of the thread that will do the background sync.
-void runSyncThread(BackgroundSync* bgsync);
+ // _mutex protects all of the class variables declared below.
+ mutable stdx::mutex _mutex;
+ // Thread doing the work.
+ std::unique_ptr<stdx::thread> _runThread;
+ // Set to true if shutdown() has been called.
+ bool _inShutdown = false;
+ // If the thread should not be running.
+ bool _stopped = true;
+ // BackgroundSync instance that is paired to this instance.
+ BackgroundSync* _bgsync;
+ // ReplicationCordinator instance.
+ ReplicationCoordinator* _replCoord;
+};
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp
index 52ebf077741..17dd237ee3c 100644
--- a/src/mongo/db/repl/sync_tail.cpp
+++ b/src/mongo/db/repl/sync_tail.cpp
@@ -669,7 +669,10 @@ class SyncTail::OpQueueBatcher {
MONGO_DISALLOW_COPYING(OpQueueBatcher);
public:
- explicit OpQueueBatcher(SyncTail* syncTail) : _syncTail(syncTail), _thread([&] { run(); }) {}
+ OpQueueBatcher(SyncTail* syncTail, stdx::function<bool()> shouldShutdown)
+ : _syncTail(syncTail), _thread([this, shouldShutdown] {
+ run([this, shouldShutdown] { return _inShutdown.load() || shouldShutdown(); });
+ }) {}
~OpQueueBatcher() {
_inShutdown.store(true);
_cv.notify_all();
@@ -692,14 +695,14 @@ public:
}
private:
- void run() {
+ void run(stdx::function<bool()> shouldShutdown) {
Client::initThread("ReplBatcher");
const ServiceContext::UniqueOperationContext txnPtr = cc().makeOperationContext();
OperationContext& txn = *txnPtr;
const auto replCoord = ReplicationCoordinator::get(&txn);
const auto fastClockSource = txn.getServiceContext()->getFastClockSource();
- while (!_inShutdown.load()) {
+ while (!shouldShutdown()) {
const auto batchStartTime = fastClockSource->now();
const int slaveDelaySecs = durationCount<Seconds>(replCoord->getSlaveDelaySecs());
@@ -744,7 +747,7 @@ private:
stdx::unique_lock<stdx::mutex> lk(_mutex);
while (!_ops.empty()) {
// Block until the previous batch has been taken.
- if (_inShutdown.load())
+ if (shouldShutdown())
return;
_cv.wait(lk);
}
@@ -764,12 +767,12 @@ private:
};
/* tail an oplog. ok to return, will be re-called. */
-void SyncTail::oplogApplication() {
- OpQueueBatcher batcher(this);
+void SyncTail::oplogApplication(ReplicationCoordinator* replCoord,
+ stdx::function<bool()> shouldShutdown) {
+ OpQueueBatcher batcher(this, shouldShutdown);
const ServiceContext::UniqueOperationContext txnPtr = cc().makeOperationContext();
OperationContext& txn = *txnPtr;
- auto replCoord = ReplicationCoordinator::get(&txn);
std::unique_ptr<ApplyBatchFinalizer> finalizer{
getGlobalServiceContext()->getGlobalStorageEngine()->isDurable()
? new ApplyBatchFinalizerForJournal(replCoord)
@@ -778,7 +781,7 @@ void SyncTail::oplogApplication() {
auto minValidBoundaries = StorageInterface::get(&txn)->getMinValid(&txn);
OpTime originalEndOpTime(minValidBoundaries.end);
OpTime lastWriteOpTime{replCoord->getMyLastAppliedOpTime()};
- while (!inShutdown()) {
+ while (!shouldShutdown()) {
if (replCoord->getInitialSyncRequestedFlag()) {
// got a resync command
return;
diff --git a/src/mongo/db/repl/sync_tail.h b/src/mongo/db/repl/sync_tail.h
index c6cd128be95..e9fe7067029 100644
--- a/src/mongo/db/repl/sync_tail.h
+++ b/src/mongo/db/repl/sync_tail.h
@@ -106,7 +106,7 @@ public:
static Status syncApply(OperationContext* txn, const BSONObj& o, bool convertUpdateToUpsert);
- void oplogApplication();
+ void oplogApplication(ReplicationCoordinator* replCoord, stdx::function<bool()> shouldShutdown);
bool peek(OperationContext* txn, BSONObj* obj);
class OpQueue {