diff options
author | Scott Hernandez <scotthernandez@gmail.com> | 2016-03-31 09:18:20 -0400 |
---|---|---|
committer | Scott Hernandez <scotthernandez@gmail.com> | 2016-04-05 13:10:19 -0400 |
commit | bfe6a0cefe770f0c21495be5e5bdf4328fc771c4 (patch) | |
tree | 3fe010eb40083f099ccf1c534527b56b1d36c94e /src/mongo | |
parent | 45be6ce3c54aca9d1453380b74a6adc464a3c148 (diff) | |
download | mongo-bfe6a0cefe770f0c21495be5e5bdf4328fc771c4.tar.gz |
SERVER-19200: move initial sync behavior out of rs_sync
Diffstat (limited to 'src/mongo')
12 files changed, 166 insertions, 45 deletions
diff --git a/src/mongo/db/repl/bgsync.h b/src/mongo/db/repl/bgsync.h index 5edb2079928..0f9619f7280 100644 --- a/src/mongo/db/repl/bgsync.h +++ b/src/mongo/db/repl/bgsync.h @@ -86,6 +86,7 @@ public: PREFETCH_ALL = 3 }; + // TODO: remove, once initialSyncRequestedFlag and indexPrefetchConfig go somewhere else. static BackgroundSync* get(); // stop syncing (when this node becomes a primary, e.g.) diff --git a/src/mongo/db/repl/replication_coordinator_external_state.h b/src/mongo/db/repl/replication_coordinator_external_state.h index b11352312d6..5bbef20bd25 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state.h +++ b/src/mongo/db/repl/replication_coordinator_external_state.h @@ -32,7 +32,9 @@ #include "mongo/base/disallow_copying.h" #include "mongo/bson/timestamp.h" +#include "mongo/db/repl/member_state.h" #include "mongo/db/repl/optime.h" +#include "mongo/stdx/functional.h" #include "mongo/util/time_support.h" namespace mongo { @@ -51,6 +53,9 @@ namespace repl { class LastVote; class ReplSettings; +using OnInitialSyncFinishedFn = stdx::function<void()>; +using StartInitialSyncFn = stdx::function<void(OnInitialSyncFinishedFn callback)>; +using StartSteadyReplicationFn = stdx::function<void()>; /** * This class represents the interface the ReplicationCoordinator uses to interact with the * rest of the system. All functionality of the ReplicationCoordinatorImpl that would introduce @@ -65,13 +70,28 @@ public: virtual ~ReplicationCoordinatorExternalState(); /** - * Starts the background sync, producer, and sync source feedback threads + * Starts the journal listener, and snapshot threads * * NOTE: Only starts threads if they are not already started, */ virtual void startThreads(const ReplSettings& settings) = 0; /** + * Starts an initial sync, and calls "finished" when done, + * for replica set member -- legacy impl not in DataReplicator. + * + * NOTE: Use either this (and below function) or the Master/Slave version, but not both. + */ + virtual void startInitialSync(OnInitialSyncFinishedFn finished) = 0; + + /** + * Starts steady state sync for replica set member -- legacy impl not in DataReplicator. + * + * NOTE: Use either this or the Master/Slave version, but not both. + */ + virtual void startSteadyStateReplication() = 0; + + /** * Starts the Master/Slave threads and sets up logOp */ virtual void startMasterSlave(OperationContext* txn) = 0; diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp index 161bdcd2b78..8814de381f8 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp @@ -50,11 +50,13 @@ #include "mongo/db/repl/isself.h" #include "mongo/db/repl/last_vote.h" #include "mongo/db/repl/master_slave.h" +#include "mongo/db/repl/member_state.h" #include "mongo/db/repl/minvalid.h" #include "mongo/db/repl/oplog.h" #include "mongo/db/repl/repl_settings.h" #include "mongo/db/repl/replication_coordinator_global.h" #include "mongo/db/repl/rs_sync.h" +#include "mongo/db/repl/rs_initialsync.h" #include "mongo/db/repl/snapshot_thread.h" #include "mongo/db/server_parameters.h" #include "mongo/db/service_context.h" @@ -65,6 +67,7 @@ #include "mongo/stdx/functional.h" #include "mongo/stdx/thread.h" #include "mongo/util/assert_util.h" +#include "mongo/util/exit.h" #include "mongo/util/log.h" #include "mongo/util/mongoutils/str.h" #include "mongo/util/net/hostandport.h" @@ -93,22 +96,50 @@ ReplicationCoordinatorExternalStateImpl::ReplicationCoordinatorExternalStateImpl : _startedThreads(false), _nextThreadId(0) {} ReplicationCoordinatorExternalStateImpl::~ReplicationCoordinatorExternalStateImpl() {} -void ReplicationCoordinatorExternalStateImpl::startThreads(const ReplSettings& settings) { - stdx::lock_guard<stdx::mutex> lk(_threadMutex); - if (_startedThreads) { - return; +void ReplicationCoordinatorExternalStateImpl::startInitialSync(OnInitialSyncFinishedFn finished) { + _initialSyncThread.reset(new stdx::thread{[finished, this]() { + Client::initThreadIfNotAlready("initial sync"); + log() << "Starting replication fetcher thread"; + + // Start bgsync. + BackgroundSync* bgsync = BackgroundSync::get(); + invariant(!(bgsync == nullptr && !inShutdownStrict())); // bgsync can be null @shutdown. + invariant(!_producerThread); // The producer thread should not be init'd before this. + _producerThread.reset( + new stdx::thread(stdx::bind(&BackgroundSync::producerThread, bgsync))); + // Do initial sync. + syncDoInitialSync(); + finished(); + }}); +} + +void ReplicationCoordinatorExternalStateImpl::startSteadyStateReplication() { + if (!_producerThread) { + log() << "Starting replication fetcher thread"; + BackgroundSync* bgsync = BackgroundSync::get(); + _producerThread.reset( + new stdx::thread(stdx::bind(&BackgroundSync::producerThread, bgsync))); } log() << "Starting replication applier threads"; + invariant(!_applierThread); _applierThread.reset(new stdx::thread(runSyncThread)); - BackgroundSync* bgsync = BackgroundSync::get(); - _producerThread.reset(new stdx::thread(stdx::bind(&BackgroundSync::producerThread, bgsync))); + log() << "Starting replication reporter thread"; + invariant(!_syncSourceFeedbackThread); _syncSourceFeedbackThread.reset( new stdx::thread(stdx::bind(&SyncSourceFeedback::run, &_syncSourceFeedback))); +} + +void ReplicationCoordinatorExternalStateImpl::startThreads(const ReplSettings& settings) { + stdx::lock_guard<stdx::mutex> lk(_threadMutex); + if (_startedThreads) { + return; + } + log() << "Starting replication storage threads"; if (settings.isMajorityReadConcernEnabled() || enableReplSnapshotThread) { _snapshotThread = SnapshotThread::start(getGlobalServiceContext()); } - _startedThreads = true; getGlobalServiceContext()->getGlobalStorageEngine()->setJournalListener(this); + _startedThreads = true; } void ReplicationCoordinatorExternalStateImpl::startMasterSlave(OperationContext* txn) { @@ -119,13 +150,21 @@ void ReplicationCoordinatorExternalStateImpl::shutdown() { stdx::lock_guard<stdx::mutex> lk(_threadMutex); if (_startedThreads) { log() << "Stopping replication applier threads"; - _syncSourceFeedback.shutdown(); - _syncSourceFeedbackThread->join(); - _applierThread->join(); - BackgroundSync* bgsync = BackgroundSync::get(); - bgsync->shutdown(); - _producerThread->join(); + if (_syncSourceFeedbackThread) { + _syncSourceFeedback.shutdown(); + _syncSourceFeedbackThread->join(); + } + if (_applierThread) { + _applierThread->join(); + } + if (_producerThread) { + BackgroundSync* bgsync = BackgroundSync::get(); + if (bgsync) { + bgsync->shutdown(); + } + _producerThread->join(); + } if (_snapshotThread) _snapshotThread->shutdown(); } @@ -369,11 +408,17 @@ void ReplicationCoordinatorExternalStateImpl::recoverShardingState(OperationCont } void ReplicationCoordinatorExternalStateImpl::signalApplierToChooseNewSyncSource() { - BackgroundSync::get()->clearSyncTarget(); + auto bgsync = BackgroundSync::get(); + if (bgsync) { + bgsync->clearSyncTarget(); + } } void ReplicationCoordinatorExternalStateImpl::signalApplierToCancelFetcher() { - BackgroundSync::get()->cancelFetcher(); + auto bgsync = BackgroundSync::get(); + if (bgsync) { + bgsync->cancelFetcher(); + } } void ReplicationCoordinatorExternalStateImpl::dropAllTempCollections(OperationContext* txn) { diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.h b/src/mongo/db/repl/replication_coordinator_external_state_impl.h index 577c0b2aa4f..78969cbb292 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.h +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.h @@ -52,6 +52,9 @@ public: ReplicationCoordinatorExternalStateImpl(); virtual ~ReplicationCoordinatorExternalStateImpl(); virtual void startThreads(const ReplSettings& settings) override; + virtual void startInitialSync(OnInitialSyncFinishedFn finished) override; + virtual void startSteadyStateReplication() override; + virtual void startMasterSlave(OperationContext* txn); virtual void shutdown(); virtual Status initializeReplSetStorage(OperationContext* txn, @@ -117,6 +120,11 @@ private: long long _nextThreadId; std::unique_ptr<SnapshotThread> _snapshotThread; + + // Initial sync stuff + StartInitialSyncFn _startInitialSyncIfNeededFn; + StartSteadyReplicationFn _startSteadReplicationFn; + std::unique_ptr<stdx::thread> _initialSyncThread; }; } // namespace repl diff --git a/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp b/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp index a95cb6c47c2..3ed6023fd71 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp @@ -60,6 +60,9 @@ void ReplicationCoordinatorExternalStateMock::startThreads(const ReplSettings& s _threadsStarted = true; } +void ReplicationCoordinatorExternalStateMock::startInitialSync(OnInitialSyncFinishedFn finished) {} +void ReplicationCoordinatorExternalStateMock::startSteadyStateReplication() {} + void ReplicationCoordinatorExternalStateMock::startMasterSlave(OperationContext*) {} Status ReplicationCoordinatorExternalStateMock::initializeReplSetStorage(OperationContext* txn, const BSONObj& config, diff --git a/src/mongo/db/repl/replication_coordinator_external_state_mock.h b/src/mongo/db/repl/replication_coordinator_external_state_mock.h index 89fa3399206..57970b239d5 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_mock.h +++ b/src/mongo/db/repl/replication_coordinator_external_state_mock.h @@ -53,6 +53,8 @@ public: ReplicationCoordinatorExternalStateMock(); virtual ~ReplicationCoordinatorExternalStateMock(); virtual void startThreads(const ReplSettings& settings) override; + virtual void startInitialSync(OnInitialSyncFinishedFn finished) override; + virtual void startSteadyStateReplication() override; virtual void startMasterSlave(OperationContext*); virtual void shutdown(); virtual Status initializeReplSetStorage(OperationContext* txn, diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index 05d45fdf42a..23fd498cb52 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -468,6 +468,8 @@ void ReplicationCoordinatorImpl::_finishLoadLocalConfig( _setMyLastAppliedOpTime_inlock(lastOpTime, false); _setMyLastDurableOpTime_inlock(lastOpTime, false); _reportUpstream_inlock(std::move(lock)); + // Unlocked below. + _externalState->setGlobalTimestamp(lastOpTime.getTimestamp()); // Step down is impossible, so we don't need to wait for the returned event. _updateTerm_incallback(term); @@ -475,6 +477,30 @@ void ReplicationCoordinatorImpl::_finishLoadLocalConfig( _performPostMemberStateUpdateAction(action); if (!isArbiter) { _externalState->startThreads(_settings); + _startDataReplication(); + } +} + +void ReplicationCoordinatorImpl::_stopDataReplication() {} +void ReplicationCoordinatorImpl::_startDataReplication() { + // When initial sync is done, callback. + OnInitialSyncFinishedFn callback{[this]() { + log() << "Initial sync done, starting steady state replication."; + _externalState->startSteadyStateReplication(); + }}; + + const auto lastApplied = getMyLastAppliedOpTime(); + if (!lastApplied.isNull()) { + callback(); + return; + } + + // Do initial sync. + if (false) { + // TODO: make this async with callback. + _dr.initialSync(); + } else { + _externalState->startInitialSync(callback); } } @@ -534,7 +560,7 @@ void ReplicationCoordinatorImpl::shutdown() { _inShutdown = true; if (_rsConfigState == kConfigPreStart) { warning() << "ReplicationCoordinatorImpl::shutdown() called before " - "start() finished. Shutting down without cleaning up the " + "startup() finished. Shutting down without cleaning up the " "replication system"; return; } @@ -2485,6 +2511,7 @@ Status ReplicationCoordinatorImpl::processReplSetInitiate(OperationContext* txn, // will fail validation with a "replSet initiate got ... while validating" reason. invariant(!newConfig.getMemberAt(myIndex.getValue()).isArbiter()); _externalState->startThreads(_settings); + _startDataReplication(); } return Status::OK(); @@ -2637,11 +2664,11 @@ void ReplicationCoordinatorImpl::_performPostMemberStateUpdateAction( } _topCoord->processWinElection(_electionId, getNextGlobalTimestamp()); _isWaitingForDrainToComplete = true; - _externalState->signalApplierToCancelFetcher(); const PostMemberStateUpdateAction nextAction = _updateMemberStateFromTopologyCoordinator_inlock(); invariant(nextAction != kActionWinElection); lk.unlock(); + _externalState->signalApplierToCancelFetcher(); _performPostMemberStateUpdateAction(nextAction); // Notify all secondaries of the election win. _scheduleElectionWinNotification(); @@ -3089,6 +3116,8 @@ void ReplicationCoordinatorImpl::resetLastOpTimesFromOplog(OperationContext* txn _setMyLastAppliedOpTime_inlock(lastOpTime, true); _setMyLastDurableOpTime_inlock(lastOpTime, true); _reportUpstream_inlock(std::move(lock)); + // Unlocked below. + _externalState->setGlobalTimestamp(lastOpTime.getTimestamp()); } diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h index 1d5cd3b0737..461d3ababad 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.h +++ b/src/mongo/db/repl/replication_coordinator_impl.h @@ -882,6 +882,16 @@ private: const StatusWith<LastVote>& lastVoteStatus); /** + * Start replicating data, and does an initial sync if needed first. + */ + void _startDataReplication(); + + /** + * Stops replicating data by stopping the applier, fetcher and such. + */ + void _stopDataReplication(); + + /** * Callback that finishes the work of processReplSetInitiate() inside the replication * executor context, in the event of a successful quorum check. */ diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp index 4c23bab4f5b..df03ebe37b7 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp @@ -441,13 +441,14 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigStore( } return; } - + auto isFirstConfig = !_rsConfig.isInitialized(); lk.unlock(); bool isArbiter = myIndex.isOK() && myIndex.getValue() != -1 && newConfig.getMemberAt(myIndex.getValue()).isArbiter(); - if (!isArbiter) { + if (!isArbiter && isFirstConfig) { _externalState->startThreads(_settings); + _startDataReplication(); } } diff --git a/src/mongo/db/repl/replication_coordinator_impl_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_test.cpp index 6b10ba21154..de8a175d01c 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_test.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_test.cpp @@ -2442,8 +2442,7 @@ TEST_F(ReplCoordTest, LogAMessageWhenShutDownBeforeReplicationStartUpFinished) { startCapturingLogMessages(); getReplCoord()->shutdown(); stopCapturingLogMessages(); - ASSERT_EQUALS(1, - countLogLinesContaining("shutdown() called before startReplication() finished")); + ASSERT_EQUALS(1, countLogLinesContaining("shutdown() called before startup() finished")); } TEST_F(ReplCoordTest, DoNotProcessSelfWhenUpdatePositionContainsInfoAboutSelf) { diff --git a/src/mongo/db/repl/rs_initialsync.cpp b/src/mongo/db/repl/rs_initialsync.cpp index 3d62914b081..eb75c4390c8 100644 --- a/src/mongo/db/repl/rs_initialsync.cpp +++ b/src/mongo/db/repl/rs_initialsync.cpp @@ -277,6 +277,10 @@ bool _initialSyncApplyOplog(OperationContext* ctx, repl::InitialSync* syncer, Op return true; } + +// Number of connection retries allowed during initial sync. +const auto kConnectRetryLimit = 10; + /** * Do the initial sync for this member. There are several steps to this process: * @@ -316,15 +320,20 @@ Status _initialSync() { OplogReader r; + auto currentRetry = 0; while (r.getHost().empty()) { // We must prime the sync source selector so that it considers all candidates regardless // of oplog position, by passing in null OpTime as the last op fetched time. r.connectToSyncSource(&txn, OpTime(), replCoord); + if (r.getHost().empty()) { std::string msg = - "no valid sync sources found in current replset to do an initial sync"; - log() << msg; - return Status(ErrorCodes::InitialSyncOplogSourceMissing, msg); + "No valid sync source found in current replica set to do an initial sync."; + if (++currentRetry >= kConnectRetryLimit) { + return Status(ErrorCodes::InitialSyncOplogSourceMissing, msg); + } + LOG(1) << msg << ", retry " << currentRetry << " of " << kConnectRetryLimit; + sleepsecs(1); } if (inShutdown()) { @@ -338,7 +347,6 @@ Status _initialSync() { BSONObj lastOp = r.getLastOp(rsOplogName); if (lastOp.isEmpty()) { std::string msg = "initial sync couldn't read remote oplog"; - log() << msg; sleepsecs(15); return Status(ErrorCodes::InitialSyncFailure, msg); } @@ -445,10 +453,17 @@ Status _initialSync() { log() << "initial sync done"; return Status::OK(); } + +stdx::mutex _initialSyncMutex; +const auto kMaxFailedAttempts = 10; +const auto kInitialSyncRetrySleepDuration = Seconds{5}; } // namespace void syncDoInitialSync() { - static const int maxFailedAttempts = 10; + stdx::unique_lock<stdx::mutex> lk(_initialSyncMutex, stdx::defer_lock); + if (!lk.try_lock()) { + uasserted(34474, "Initial Sync Already Active."); + } { OperationContextImpl txn; @@ -456,16 +471,14 @@ void syncDoInitialSync() { } int failedAttempts = 0; - while (failedAttempts < maxFailedAttempts) { + while (failedAttempts < kMaxFailedAttempts) { try { // leave loop when successful Status status = _initialSync(); if (status.isOK()) { break; - } - if (status == ErrorCodes::InitialSyncOplogSourceMissing) { - sleepsecs(1); - return; + } else { + error() << status; } } catch (const DBException& e) { error() << e; @@ -479,13 +492,13 @@ void syncDoInitialSync() { return; } - error() << "initial sync attempt failed, " << (maxFailedAttempts - ++failedAttempts) + error() << "initial sync attempt failed, " << (kMaxFailedAttempts - ++failedAttempts) << " attempts remaining"; - sleepsecs(5); + sleepmillis(durationCount<Milliseconds>(kInitialSyncRetrySleepDuration)); } // No need to print a stack - if (failedAttempts >= maxFailedAttempts) { + if (failedAttempts >= kMaxFailedAttempts) { severe() << "The maximum number of retries have been exhausted for initial sync."; fassertFailedNoTrace(16233); } diff --git a/src/mongo/db/repl/rs_sync.cpp b/src/mongo/db/repl/rs_sync.cpp index 9e5c0e7e344..a8de0eadc56 100644 --- a/src/mongo/db/repl/rs_sync.cpp +++ b/src/mongo/db/repl/rs_sync.cpp @@ -102,16 +102,6 @@ void runSyncThread() { continue; } - bool initialSyncRequested = BackgroundSync::get()->getInitialSyncRequestedFlag(); - // Check criteria for doing an initial sync: - // 1. If the oplog is empty, do an initial sync - // 2. If minValid has _initialSyncFlag set, do an initial sync - // 3. If initialSyncRequested is true - if (getGlobalReplicationCoordinator()->getMyLastAppliedOpTime().isNull() || - getInitialSyncFlag() || initialSyncRequested) { - syncDoInitialSync(); - continue; // start from top again in case sync failed. - } if (!replCoord->setFollowerMode(MemberState::RS_RECOVERING)) { continue; } |