summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorSpencer T Brody <spencer@mongodb.com>2014-09-29 18:01:24 -0400
committerSpencer T Brody <spencer@mongodb.com>2014-10-02 14:13:19 -0400
commit4485b41323100ade5e1f1ac2a8b2f4c8b6f4597b (patch)
tree433824590872dcf0d8b64d4976d2f67068f3858f /src/mongo
parentcfddfb4b976a38a23f319abae6021c5864fa16d9 (diff)
downloadmongo-4485b41323100ade5e1f1ac2a8b2f4c8b6f4597b.tar.gz
SERVER-15429 Prepare applier threads to be started by the ReplicationCoordinatorImpl
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/db/repl/bgsync.cpp5
-rw-r--r--src/mongo/db/repl/member_state.h1
-rw-r--r--src/mongo/db/repl/repl_coordinator_impl.cpp10
-rw-r--r--src/mongo/db/repl/repl_set_impl.h2
-rw-r--r--src/mongo/db/repl/rs_sync.cpp77
5 files changed, 47 insertions, 48 deletions
diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp
index 302e6e08df5..d25f2690ddf 100644
--- a/src/mongo/db/repl/bgsync.cpp
+++ b/src/mongo/db/repl/bgsync.cpp
@@ -155,8 +155,7 @@ namespace {
}
void BackgroundSync::_producerThread() {
- MemberState state = _replCoord->getCurrentMemberState();
-
+ const MemberState state = _replCoord->getCurrentMemberState();
// we want to pause when the state changes to primary
if (_replCoord->isWaitingForApplierToDrain() || state.primary()) {
if (!_pause) {
@@ -166,7 +165,9 @@ namespace {
return;
}
+ // TODO(spencer): Use a condition variable to await loading a config.
if (state.startup()) {
+ // Wait for a config to be loaded
sleepsecs(1);
return;
}
diff --git a/src/mongo/db/repl/member_state.h b/src/mongo/db/repl/member_state.h
index 207350857d0..c3e3ffd292b 100644
--- a/src/mongo/db/repl/member_state.h
+++ b/src/mongo/db/repl/member_state.h
@@ -72,6 +72,7 @@ namespace repl {
bool rollback() const { return s == RS_ROLLBACK; }
bool readable() const { return s == RS_PRIMARY || s == RS_SECONDARY; }
bool removed() const { return s == RS_REMOVED; }
+ bool arbiter() const { return s == RS_ARBITER; }
std::string toString() const;
diff --git a/src/mongo/db/repl/repl_coordinator_impl.cpp b/src/mongo/db/repl/repl_coordinator_impl.cpp
index 07a01c12206..f0aab4b3617 100644
--- a/src/mongo/db/repl/repl_coordinator_impl.cpp
+++ b/src/mongo/db/repl/repl_coordinator_impl.cpp
@@ -319,6 +319,11 @@ namespace {
return _getCurrentMemberState_inlock();
}
+ MemberState ReplicationCoordinatorImpl::_getCurrentMemberState_inlock() const {
+ invariant(_settings.usingReplSets());
+ return _currentState;
+ }
+
void ReplicationCoordinatorImpl::clearSyncSourceBlacklist() {
CBHStatus cbh = _replExecutor.scheduleWork(
stdx::bind(&ReplicationCoordinatorImpl::_clearSyncSourceBlacklist_finish,
@@ -338,11 +343,6 @@ namespace {
_topCoord->clearSyncSourceBlacklist();
}
- MemberState ReplicationCoordinatorImpl::_getCurrentMemberState_inlock() const {
- invariant(_settings.usingReplSets());
- return _currentState;
- }
-
void ReplicationCoordinatorImpl::_setCurrentMemberState_forTest(const MemberState& newState) {
CBHStatus cbh = _replExecutor.scheduleWork(
stdx::bind(&ReplicationCoordinatorImpl::_setCurrentMemberState_forTestFinish,
diff --git a/src/mongo/db/repl/repl_set_impl.h b/src/mongo/db/repl/repl_set_impl.h
index 7d5da4c4af1..75fc8ca56e4 100644
--- a/src/mongo/db/repl/repl_set_impl.h
+++ b/src/mongo/db/repl/repl_set_impl.h
@@ -274,7 +274,6 @@ namespace repl {
OplogReader* r,
const Member* source);
void _initialSync();
- void _syncThread();
void syncTail();
void syncFixUp(OperationContext* txn, FixUpInfo& h, OplogReader& r);
@@ -295,7 +294,6 @@ namespace repl {
}
const ReplSetConfig::MemberCfg& myConfig() const { return _config; }
- void syncThread();
const OpTime lastOtherOpTime() const;
/**
* The most up to date electable replica
diff --git a/src/mongo/db/repl/rs_sync.cpp b/src/mongo/db/repl/rs_sync.cpp
index acafddf8685..8923130b9c5 100644
--- a/src/mongo/db/repl/rs_sync.cpp
+++ b/src/mongo/db/repl/rs_sync.cpp
@@ -56,6 +56,7 @@
#include "mongo/db/stats/timer_stats.h"
#include "mongo/db/operation_context_impl.h"
#include "mongo/db/storage_options.h"
+#include "mongo/util/exit.h"
#include "mongo/util/fail_point_service.h"
#include "mongo/util/log.h"
@@ -149,54 +150,58 @@ namespace repl {
return false;
}
- void ReplSetImpl::_syncThread() {
- StateBox::SP sp = box.get();
- if (sp.state.primary()) {
- sleepsecs(1);
- return;
- }
- bool initialSyncRequested = BackgroundSync::get()->getInitialSyncRequestedFlag();
- // Check criteria for doing an initial sync:
- // 1. If the oplog is empty, do an initial sync
- // 2. If minValid has _initialSyncFlag set, do an initial sync
- // 3. If initialSyncRequested is true
- if (getGlobalReplicationCoordinator()->getMyLastOptime().isNull() ||
- getInitialSyncFlag() ||
- initialSyncRequested) {
- syncDoInitialSync();
- return; // _syncThread will be recalled, starts from top again in case sync failed.
- }
- getGlobalReplicationCoordinator()->setFollowerMode(MemberState::RS_RECOVERING);
-
- /* we have some data. continue tailing. */
- SyncTail tail(BackgroundSync::get(), multiSyncApply);
- tail.oplogApplication();
- }
-
void ReplSetImpl::clearVetoes() {
lock lk(this);
_veto.clear();
}
- void ReplSetImpl::syncThread() {
- while( 1 ) {
+ void startSyncThread() {
+ Client::initThread("rsSync");
+ replLocalAuth();
+ ReplicationCoordinator* replCoord = getGlobalReplicationCoordinator();
+ while (!inShutdown()) {
// After a reconfig, we may not be in the replica set anymore, so
// check that we are in the set (and not an arbiter) before
// trying to sync with other replicas.
- if( ! _self ) {
- log() << "replSet warning did not receive a valid config yet, sleeping 20 seconds " << rsLog;
- sleepsecs(20);
+ // TODO(spencer): Use a condition variable to await loading a config
+ if (replCoord->getReplicationMode() != ReplicationCoordinator::modeReplSet) {
+ log() << "replSet warning did not receive a valid config yet, sleeping 5 seconds "
+ << rsLog;
+ sleepsecs(5);
continue;
}
- if( myConfig().arbiterOnly ) {
- return;
+
+ const MemberState memberState = replCoord->getCurrentMemberState();
+ if (replCoord->getCurrentMemberState().arbiter()) {
+ break;
}
try {
- _syncThread();
+
+ if (memberState.primary()) {
+ sleepsecs(1);
+ continue;
+ }
+
+ bool initialSyncRequested = BackgroundSync::get()->getInitialSyncRequestedFlag();
+ // Check criteria for doing an initial sync:
+ // 1. If the oplog is empty, do an initial sync
+ // 2. If minValid has _initialSyncFlag set, do an initial sync
+ // 3. If initialSyncRequested is true
+ if (getGlobalReplicationCoordinator()->getMyLastOptime().isNull() ||
+ getInitialSyncFlag() ||
+ initialSyncRequested) {
+ syncDoInitialSync();
+ continue; // start from top again in case sync failed.
+ }
+ replCoord->setFollowerMode(MemberState::RS_RECOVERING);
+
+ /* we have some data. continue tailing. */
+ SyncTail tail(BackgroundSync::get(), multiSyncApply);
+ tail.oplogApplication();
}
catch(const DBException& e) {
- sethbmsg(str::stream() << "syncThread: " << e.toString());
+ log() << "Received exception while syncing: " << e.toString();
sleepsecs(10);
}
catch(...) {
@@ -205,12 +210,6 @@ namespace repl {
sleepsecs(60);
}
}
- }
-
- void startSyncThread() {
- Client::initThread("rsSync");
- replLocalAuth();
- theReplSet->syncThread();
cc().shutdown();
}