diff options
author | Spencer T Brody <spencer@mongodb.com> | 2014-09-29 18:01:24 -0400 |
---|---|---|
committer | Spencer T Brody <spencer@mongodb.com> | 2014-10-02 14:13:19 -0400 |
commit | 4485b41323100ade5e1f1ac2a8b2f4c8b6f4597b (patch) | |
tree | 433824590872dcf0d8b64d4976d2f67068f3858f /src/mongo | |
parent | cfddfb4b976a38a23f319abae6021c5864fa16d9 (diff) | |
download | mongo-4485b41323100ade5e1f1ac2a8b2f4c8b6f4597b.tar.gz |
SERVER-15429 Prepare applier threads to be started by the ReplicationCoordinatorImpl
Diffstat (limited to 'src/mongo')
-rw-r--r-- | src/mongo/db/repl/bgsync.cpp | 5 | ||||
-rw-r--r-- | src/mongo/db/repl/member_state.h | 1 | ||||
-rw-r--r-- | src/mongo/db/repl/repl_coordinator_impl.cpp | 10 | ||||
-rw-r--r-- | src/mongo/db/repl/repl_set_impl.h | 2 | ||||
-rw-r--r-- | src/mongo/db/repl/rs_sync.cpp | 77 |
5 files changed, 47 insertions, 48 deletions
diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp index 302e6e08df5..d25f2690ddf 100644 --- a/src/mongo/db/repl/bgsync.cpp +++ b/src/mongo/db/repl/bgsync.cpp @@ -155,8 +155,7 @@ namespace { } void BackgroundSync::_producerThread() { - MemberState state = _replCoord->getCurrentMemberState(); - + const MemberState state = _replCoord->getCurrentMemberState(); // we want to pause when the state changes to primary if (_replCoord->isWaitingForApplierToDrain() || state.primary()) { if (!_pause) { @@ -166,7 +165,9 @@ namespace { return; } + // TODO(spencer): Use a condition variable to await loading a config. if (state.startup()) { + // Wait for a config to be loaded sleepsecs(1); return; } diff --git a/src/mongo/db/repl/member_state.h b/src/mongo/db/repl/member_state.h index 207350857d0..c3e3ffd292b 100644 --- a/src/mongo/db/repl/member_state.h +++ b/src/mongo/db/repl/member_state.h @@ -72,6 +72,7 @@ namespace repl { bool rollback() const { return s == RS_ROLLBACK; } bool readable() const { return s == RS_PRIMARY || s == RS_SECONDARY; } bool removed() const { return s == RS_REMOVED; } + bool arbiter() const { return s == RS_ARBITER; } std::string toString() const; diff --git a/src/mongo/db/repl/repl_coordinator_impl.cpp b/src/mongo/db/repl/repl_coordinator_impl.cpp index 07a01c12206..f0aab4b3617 100644 --- a/src/mongo/db/repl/repl_coordinator_impl.cpp +++ b/src/mongo/db/repl/repl_coordinator_impl.cpp @@ -319,6 +319,11 @@ namespace { return _getCurrentMemberState_inlock(); } + MemberState ReplicationCoordinatorImpl::_getCurrentMemberState_inlock() const { + invariant(_settings.usingReplSets()); + return _currentState; + } + void ReplicationCoordinatorImpl::clearSyncSourceBlacklist() { CBHStatus cbh = _replExecutor.scheduleWork( stdx::bind(&ReplicationCoordinatorImpl::_clearSyncSourceBlacklist_finish, @@ -338,11 +343,6 @@ namespace { _topCoord->clearSyncSourceBlacklist(); } - MemberState ReplicationCoordinatorImpl::_getCurrentMemberState_inlock() const { - invariant(_settings.usingReplSets()); - return _currentState; - } - void ReplicationCoordinatorImpl::_setCurrentMemberState_forTest(const MemberState& newState) { CBHStatus cbh = _replExecutor.scheduleWork( stdx::bind(&ReplicationCoordinatorImpl::_setCurrentMemberState_forTestFinish, diff --git a/src/mongo/db/repl/repl_set_impl.h b/src/mongo/db/repl/repl_set_impl.h index 7d5da4c4af1..75fc8ca56e4 100644 --- a/src/mongo/db/repl/repl_set_impl.h +++ b/src/mongo/db/repl/repl_set_impl.h @@ -274,7 +274,6 @@ namespace repl { OplogReader* r, const Member* source); void _initialSync(); - void _syncThread(); void syncTail(); void syncFixUp(OperationContext* txn, FixUpInfo& h, OplogReader& r); @@ -295,7 +294,6 @@ namespace repl { } const ReplSetConfig::MemberCfg& myConfig() const { return _config; } - void syncThread(); const OpTime lastOtherOpTime() const; /** * The most up to date electable replica diff --git a/src/mongo/db/repl/rs_sync.cpp b/src/mongo/db/repl/rs_sync.cpp index acafddf8685..8923130b9c5 100644 --- a/src/mongo/db/repl/rs_sync.cpp +++ b/src/mongo/db/repl/rs_sync.cpp @@ -56,6 +56,7 @@ #include "mongo/db/stats/timer_stats.h" #include "mongo/db/operation_context_impl.h" #include "mongo/db/storage_options.h" +#include "mongo/util/exit.h" #include "mongo/util/fail_point_service.h" #include "mongo/util/log.h" @@ -149,54 +150,58 @@ namespace repl { return false; } - void ReplSetImpl::_syncThread() { - StateBox::SP sp = box.get(); - if (sp.state.primary()) { - sleepsecs(1); - return; - } - bool initialSyncRequested = BackgroundSync::get()->getInitialSyncRequestedFlag(); - // Check criteria for doing an initial sync: - // 1. If the oplog is empty, do an initial sync - // 2. If minValid has _initialSyncFlag set, do an initial sync - // 3. If initialSyncRequested is true - if (getGlobalReplicationCoordinator()->getMyLastOptime().isNull() || - getInitialSyncFlag() || - initialSyncRequested) { - syncDoInitialSync(); - return; // _syncThread will be recalled, starts from top again in case sync failed. - } - getGlobalReplicationCoordinator()->setFollowerMode(MemberState::RS_RECOVERING); - - /* we have some data. continue tailing. */ - SyncTail tail(BackgroundSync::get(), multiSyncApply); - tail.oplogApplication(); - } - void ReplSetImpl::clearVetoes() { lock lk(this); _veto.clear(); } - void ReplSetImpl::syncThread() { - while( 1 ) { + void startSyncThread() { + Client::initThread("rsSync"); + replLocalAuth(); + ReplicationCoordinator* replCoord = getGlobalReplicationCoordinator(); + while (!inShutdown()) { // After a reconfig, we may not be in the replica set anymore, so // check that we are in the set (and not an arbiter) before // trying to sync with other replicas. - if( ! _self ) { - log() << "replSet warning did not receive a valid config yet, sleeping 20 seconds " << rsLog; - sleepsecs(20); + // TODO(spencer): Use a condition variable to await loading a config + if (replCoord->getReplicationMode() != ReplicationCoordinator::modeReplSet) { + log() << "replSet warning did not receive a valid config yet, sleeping 5 seconds " + << rsLog; + sleepsecs(5); continue; } - if( myConfig().arbiterOnly ) { - return; + + const MemberState memberState = replCoord->getCurrentMemberState(); + if (replCoord->getCurrentMemberState().arbiter()) { + break; } try { - _syncThread(); + + if (memberState.primary()) { + sleepsecs(1); + continue; + } + + bool initialSyncRequested = BackgroundSync::get()->getInitialSyncRequestedFlag(); + // Check criteria for doing an initial sync: + // 1. If the oplog is empty, do an initial sync + // 2. If minValid has _initialSyncFlag set, do an initial sync + // 3. If initialSyncRequested is true + if (getGlobalReplicationCoordinator()->getMyLastOptime().isNull() || + getInitialSyncFlag() || + initialSyncRequested) { + syncDoInitialSync(); + continue; // start from top again in case sync failed. + } + replCoord->setFollowerMode(MemberState::RS_RECOVERING); + + /* we have some data. continue tailing. */ + SyncTail tail(BackgroundSync::get(), multiSyncApply); + tail.oplogApplication(); } catch(const DBException& e) { - sethbmsg(str::stream() << "syncThread: " << e.toString()); + log() << "Received exception while syncing: " << e.toString(); sleepsecs(10); } catch(...) { @@ -205,12 +210,6 @@ namespace repl { sleepsecs(60); } } - } - - void startSyncThread() { - Client::initThread("rsSync"); - replLocalAuth(); - theReplSet->syncThread(); cc().shutdown(); } |