summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorScott Hernandez <scotthernandez@gmail.com>2016-03-31 09:18:20 -0400
committerScott Hernandez <scotthernandez@gmail.com>2016-04-05 13:10:19 -0400
commitbfe6a0cefe770f0c21495be5e5bdf4328fc771c4 (patch)
tree3fe010eb40083f099ccf1c534527b56b1d36c94e /src/mongo
parent45be6ce3c54aca9d1453380b74a6adc464a3c148 (diff)
downloadmongo-bfe6a0cefe770f0c21495be5e5bdf4328fc771c4.tar.gz
SERVER-19200: move initial sync behavior out of rs_sync
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/db/repl/bgsync.h1
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state.h22
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.cpp75
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.h8
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_mock.cpp3
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_mock.h2
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp33
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.h10
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp5
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl_test.cpp3
-rw-r--r--src/mongo/db/repl/rs_initialsync.cpp39
-rw-r--r--src/mongo/db/repl/rs_sync.cpp10
12 files changed, 166 insertions, 45 deletions
diff --git a/src/mongo/db/repl/bgsync.h b/src/mongo/db/repl/bgsync.h
index 5edb2079928..0f9619f7280 100644
--- a/src/mongo/db/repl/bgsync.h
+++ b/src/mongo/db/repl/bgsync.h
@@ -86,6 +86,7 @@ public:
PREFETCH_ALL = 3
};
+ // TODO: remove, once initialSyncRequestedFlag and indexPrefetchConfig go somewhere else.
static BackgroundSync* get();
// stop syncing (when this node becomes a primary, e.g.)
diff --git a/src/mongo/db/repl/replication_coordinator_external_state.h b/src/mongo/db/repl/replication_coordinator_external_state.h
index b11352312d6..5bbef20bd25 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state.h
+++ b/src/mongo/db/repl/replication_coordinator_external_state.h
@@ -32,7 +32,9 @@
#include "mongo/base/disallow_copying.h"
#include "mongo/bson/timestamp.h"
+#include "mongo/db/repl/member_state.h"
#include "mongo/db/repl/optime.h"
+#include "mongo/stdx/functional.h"
#include "mongo/util/time_support.h"
namespace mongo {
@@ -51,6 +53,9 @@ namespace repl {
class LastVote;
class ReplSettings;
+using OnInitialSyncFinishedFn = stdx::function<void()>;
+using StartInitialSyncFn = stdx::function<void(OnInitialSyncFinishedFn callback)>;
+using StartSteadyReplicationFn = stdx::function<void()>;
/**
* This class represents the interface the ReplicationCoordinator uses to interact with the
* rest of the system. All functionality of the ReplicationCoordinatorImpl that would introduce
@@ -65,13 +70,28 @@ public:
virtual ~ReplicationCoordinatorExternalState();
/**
- * Starts the background sync, producer, and sync source feedback threads
+ * Starts the journal listener, and snapshot threads
*
* NOTE: Only starts threads if they are not already started,
*/
virtual void startThreads(const ReplSettings& settings) = 0;
/**
+ * Starts an initial sync, and calls "finished" when done,
+ * for replica set member -- legacy impl not in DataReplicator.
+ *
+ * NOTE: Use either this (and below function) or the Master/Slave version, but not both.
+ */
+ virtual void startInitialSync(OnInitialSyncFinishedFn finished) = 0;
+
+ /**
+ * Starts steady state sync for replica set member -- legacy impl not in DataReplicator.
+ *
+ * NOTE: Use either this or the Master/Slave version, but not both.
+ */
+ virtual void startSteadyStateReplication() = 0;
+
+ /**
* Starts the Master/Slave threads and sets up logOp
*/
virtual void startMasterSlave(OperationContext* txn) = 0;
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
index 161bdcd2b78..8814de381f8 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
@@ -50,11 +50,13 @@
#include "mongo/db/repl/isself.h"
#include "mongo/db/repl/last_vote.h"
#include "mongo/db/repl/master_slave.h"
+#include "mongo/db/repl/member_state.h"
#include "mongo/db/repl/minvalid.h"
#include "mongo/db/repl/oplog.h"
#include "mongo/db/repl/repl_settings.h"
#include "mongo/db/repl/replication_coordinator_global.h"
#include "mongo/db/repl/rs_sync.h"
+#include "mongo/db/repl/rs_initialsync.h"
#include "mongo/db/repl/snapshot_thread.h"
#include "mongo/db/server_parameters.h"
#include "mongo/db/service_context.h"
@@ -65,6 +67,7 @@
#include "mongo/stdx/functional.h"
#include "mongo/stdx/thread.h"
#include "mongo/util/assert_util.h"
+#include "mongo/util/exit.h"
#include "mongo/util/log.h"
#include "mongo/util/mongoutils/str.h"
#include "mongo/util/net/hostandport.h"
@@ -93,22 +96,50 @@ ReplicationCoordinatorExternalStateImpl::ReplicationCoordinatorExternalStateImpl
: _startedThreads(false), _nextThreadId(0) {}
ReplicationCoordinatorExternalStateImpl::~ReplicationCoordinatorExternalStateImpl() {}
-void ReplicationCoordinatorExternalStateImpl::startThreads(const ReplSettings& settings) {
- stdx::lock_guard<stdx::mutex> lk(_threadMutex);
- if (_startedThreads) {
- return;
+void ReplicationCoordinatorExternalStateImpl::startInitialSync(OnInitialSyncFinishedFn finished) {
+ _initialSyncThread.reset(new stdx::thread{[finished, this]() {
+ Client::initThreadIfNotAlready("initial sync");
+ log() << "Starting replication fetcher thread";
+
+ // Start bgsync.
+ BackgroundSync* bgsync = BackgroundSync::get();
+ invariant(!(bgsync == nullptr && !inShutdownStrict())); // bgsync can be null @shutdown.
+ invariant(!_producerThread); // The producer thread should not be init'd before this.
+ _producerThread.reset(
+ new stdx::thread(stdx::bind(&BackgroundSync::producerThread, bgsync)));
+ // Do initial sync.
+ syncDoInitialSync();
+ finished();
+ }});
+}
+
+void ReplicationCoordinatorExternalStateImpl::startSteadyStateReplication() {
+ if (!_producerThread) {
+ log() << "Starting replication fetcher thread";
+ BackgroundSync* bgsync = BackgroundSync::get();
+ _producerThread.reset(
+ new stdx::thread(stdx::bind(&BackgroundSync::producerThread, bgsync)));
}
log() << "Starting replication applier threads";
+ invariant(!_applierThread);
_applierThread.reset(new stdx::thread(runSyncThread));
- BackgroundSync* bgsync = BackgroundSync::get();
- _producerThread.reset(new stdx::thread(stdx::bind(&BackgroundSync::producerThread, bgsync)));
+ log() << "Starting replication reporter thread";
+ invariant(!_syncSourceFeedbackThread);
_syncSourceFeedbackThread.reset(
new stdx::thread(stdx::bind(&SyncSourceFeedback::run, &_syncSourceFeedback)));
+}
+
+void ReplicationCoordinatorExternalStateImpl::startThreads(const ReplSettings& settings) {
+ stdx::lock_guard<stdx::mutex> lk(_threadMutex);
+ if (_startedThreads) {
+ return;
+ }
+ log() << "Starting replication storage threads";
if (settings.isMajorityReadConcernEnabled() || enableReplSnapshotThread) {
_snapshotThread = SnapshotThread::start(getGlobalServiceContext());
}
- _startedThreads = true;
getGlobalServiceContext()->getGlobalStorageEngine()->setJournalListener(this);
+ _startedThreads = true;
}
void ReplicationCoordinatorExternalStateImpl::startMasterSlave(OperationContext* txn) {
@@ -119,13 +150,21 @@ void ReplicationCoordinatorExternalStateImpl::shutdown() {
stdx::lock_guard<stdx::mutex> lk(_threadMutex);
if (_startedThreads) {
log() << "Stopping replication applier threads";
- _syncSourceFeedback.shutdown();
- _syncSourceFeedbackThread->join();
- _applierThread->join();
- BackgroundSync* bgsync = BackgroundSync::get();
- bgsync->shutdown();
- _producerThread->join();
+ if (_syncSourceFeedbackThread) {
+ _syncSourceFeedback.shutdown();
+ _syncSourceFeedbackThread->join();
+ }
+ if (_applierThread) {
+ _applierThread->join();
+ }
+ if (_producerThread) {
+ BackgroundSync* bgsync = BackgroundSync::get();
+ if (bgsync) {
+ bgsync->shutdown();
+ }
+ _producerThread->join();
+ }
if (_snapshotThread)
_snapshotThread->shutdown();
}
@@ -369,11 +408,17 @@ void ReplicationCoordinatorExternalStateImpl::recoverShardingState(OperationCont
}
void ReplicationCoordinatorExternalStateImpl::signalApplierToChooseNewSyncSource() {
- BackgroundSync::get()->clearSyncTarget();
+ auto bgsync = BackgroundSync::get();
+ if (bgsync) {
+ bgsync->clearSyncTarget();
+ }
}
void ReplicationCoordinatorExternalStateImpl::signalApplierToCancelFetcher() {
- BackgroundSync::get()->cancelFetcher();
+ auto bgsync = BackgroundSync::get();
+ if (bgsync) {
+ bgsync->cancelFetcher();
+ }
}
void ReplicationCoordinatorExternalStateImpl::dropAllTempCollections(OperationContext* txn) {
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.h b/src/mongo/db/repl/replication_coordinator_external_state_impl.h
index 577c0b2aa4f..78969cbb292 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.h
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.h
@@ -52,6 +52,9 @@ public:
ReplicationCoordinatorExternalStateImpl();
virtual ~ReplicationCoordinatorExternalStateImpl();
virtual void startThreads(const ReplSettings& settings) override;
+ virtual void startInitialSync(OnInitialSyncFinishedFn finished) override;
+ virtual void startSteadyStateReplication() override;
+
virtual void startMasterSlave(OperationContext* txn);
virtual void shutdown();
virtual Status initializeReplSetStorage(OperationContext* txn,
@@ -117,6 +120,11 @@ private:
long long _nextThreadId;
std::unique_ptr<SnapshotThread> _snapshotThread;
+
+ // Initial sync stuff
+ StartInitialSyncFn _startInitialSyncIfNeededFn;
+ StartSteadyReplicationFn _startSteadReplicationFn;
+ std::unique_ptr<stdx::thread> _initialSyncThread;
};
} // namespace repl
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp b/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp
index a95cb6c47c2..3ed6023fd71 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp
+++ b/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp
@@ -60,6 +60,9 @@ void ReplicationCoordinatorExternalStateMock::startThreads(const ReplSettings& s
_threadsStarted = true;
}
+void ReplicationCoordinatorExternalStateMock::startInitialSync(OnInitialSyncFinishedFn finished) {}
+void ReplicationCoordinatorExternalStateMock::startSteadyStateReplication() {}
+
void ReplicationCoordinatorExternalStateMock::startMasterSlave(OperationContext*) {}
Status ReplicationCoordinatorExternalStateMock::initializeReplSetStorage(OperationContext* txn,
const BSONObj& config,
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_mock.h b/src/mongo/db/repl/replication_coordinator_external_state_mock.h
index 89fa3399206..57970b239d5 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_mock.h
+++ b/src/mongo/db/repl/replication_coordinator_external_state_mock.h
@@ -53,6 +53,8 @@ public:
ReplicationCoordinatorExternalStateMock();
virtual ~ReplicationCoordinatorExternalStateMock();
virtual void startThreads(const ReplSettings& settings) override;
+ virtual void startInitialSync(OnInitialSyncFinishedFn finished) override;
+ virtual void startSteadyStateReplication() override;
virtual void startMasterSlave(OperationContext*);
virtual void shutdown();
virtual Status initializeReplSetStorage(OperationContext* txn,
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index 05d45fdf42a..23fd498cb52 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -468,6 +468,8 @@ void ReplicationCoordinatorImpl::_finishLoadLocalConfig(
_setMyLastAppliedOpTime_inlock(lastOpTime, false);
_setMyLastDurableOpTime_inlock(lastOpTime, false);
_reportUpstream_inlock(std::move(lock));
+ // Unlocked below.
+
_externalState->setGlobalTimestamp(lastOpTime.getTimestamp());
// Step down is impossible, so we don't need to wait for the returned event.
_updateTerm_incallback(term);
@@ -475,6 +477,30 @@ void ReplicationCoordinatorImpl::_finishLoadLocalConfig(
_performPostMemberStateUpdateAction(action);
if (!isArbiter) {
_externalState->startThreads(_settings);
+ _startDataReplication();
+ }
+}
+
+void ReplicationCoordinatorImpl::_stopDataReplication() {}
+void ReplicationCoordinatorImpl::_startDataReplication() {
+ // When initial sync is done, callback.
+ OnInitialSyncFinishedFn callback{[this]() {
+ log() << "Initial sync done, starting steady state replication.";
+ _externalState->startSteadyStateReplication();
+ }};
+
+ const auto lastApplied = getMyLastAppliedOpTime();
+ if (!lastApplied.isNull()) {
+ callback();
+ return;
+ }
+
+ // Do initial sync.
+ if (false) {
+ // TODO: make this async with callback.
+ _dr.initialSync();
+ } else {
+ _externalState->startInitialSync(callback);
}
}
@@ -534,7 +560,7 @@ void ReplicationCoordinatorImpl::shutdown() {
_inShutdown = true;
if (_rsConfigState == kConfigPreStart) {
warning() << "ReplicationCoordinatorImpl::shutdown() called before "
- "start() finished. Shutting down without cleaning up the "
+ "startup() finished. Shutting down without cleaning up the "
"replication system";
return;
}
@@ -2485,6 +2511,7 @@ Status ReplicationCoordinatorImpl::processReplSetInitiate(OperationContext* txn,
// will fail validation with a "replSet initiate got ... while validating" reason.
invariant(!newConfig.getMemberAt(myIndex.getValue()).isArbiter());
_externalState->startThreads(_settings);
+ _startDataReplication();
}
return Status::OK();
@@ -2637,11 +2664,11 @@ void ReplicationCoordinatorImpl::_performPostMemberStateUpdateAction(
}
_topCoord->processWinElection(_electionId, getNextGlobalTimestamp());
_isWaitingForDrainToComplete = true;
- _externalState->signalApplierToCancelFetcher();
const PostMemberStateUpdateAction nextAction =
_updateMemberStateFromTopologyCoordinator_inlock();
invariant(nextAction != kActionWinElection);
lk.unlock();
+ _externalState->signalApplierToCancelFetcher();
_performPostMemberStateUpdateAction(nextAction);
// Notify all secondaries of the election win.
_scheduleElectionWinNotification();
@@ -3089,6 +3116,8 @@ void ReplicationCoordinatorImpl::resetLastOpTimesFromOplog(OperationContext* txn
_setMyLastAppliedOpTime_inlock(lastOpTime, true);
_setMyLastDurableOpTime_inlock(lastOpTime, true);
_reportUpstream_inlock(std::move(lock));
+ // Unlocked below.
+
_externalState->setGlobalTimestamp(lastOpTime.getTimestamp());
}
diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h
index 1d5cd3b0737..461d3ababad 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.h
+++ b/src/mongo/db/repl/replication_coordinator_impl.h
@@ -882,6 +882,16 @@ private:
const StatusWith<LastVote>& lastVoteStatus);
/**
+ * Start replicating data, and does an initial sync if needed first.
+ */
+ void _startDataReplication();
+
+ /**
+ * Stops replicating data by stopping the applier, fetcher and such.
+ */
+ void _stopDataReplication();
+
+ /**
* Callback that finishes the work of processReplSetInitiate() inside the replication
* executor context, in the event of a successful quorum check.
*/
diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp
index 4c23bab4f5b..df03ebe37b7 100644
--- a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp
@@ -441,13 +441,14 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigStore(
}
return;
}
-
+ auto isFirstConfig = !_rsConfig.isInitialized();
lk.unlock();
bool isArbiter = myIndex.isOK() && myIndex.getValue() != -1 &&
newConfig.getMemberAt(myIndex.getValue()).isArbiter();
- if (!isArbiter) {
+ if (!isArbiter && isFirstConfig) {
_externalState->startThreads(_settings);
+ _startDataReplication();
}
}
diff --git a/src/mongo/db/repl/replication_coordinator_impl_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_test.cpp
index 6b10ba21154..de8a175d01c 100644
--- a/src/mongo/db/repl/replication_coordinator_impl_test.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl_test.cpp
@@ -2442,8 +2442,7 @@ TEST_F(ReplCoordTest, LogAMessageWhenShutDownBeforeReplicationStartUpFinished) {
startCapturingLogMessages();
getReplCoord()->shutdown();
stopCapturingLogMessages();
- ASSERT_EQUALS(1,
- countLogLinesContaining("shutdown() called before startReplication() finished"));
+ ASSERT_EQUALS(1, countLogLinesContaining("shutdown() called before startup() finished"));
}
TEST_F(ReplCoordTest, DoNotProcessSelfWhenUpdatePositionContainsInfoAboutSelf) {
diff --git a/src/mongo/db/repl/rs_initialsync.cpp b/src/mongo/db/repl/rs_initialsync.cpp
index 3d62914b081..eb75c4390c8 100644
--- a/src/mongo/db/repl/rs_initialsync.cpp
+++ b/src/mongo/db/repl/rs_initialsync.cpp
@@ -277,6 +277,10 @@ bool _initialSyncApplyOplog(OperationContext* ctx, repl::InitialSync* syncer, Op
return true;
}
+
+// Number of connection retries allowed during initial sync.
+const auto kConnectRetryLimit = 10;
+
/**
* Do the initial sync for this member. There are several steps to this process:
*
@@ -316,15 +320,20 @@ Status _initialSync() {
OplogReader r;
+ auto currentRetry = 0;
while (r.getHost().empty()) {
// We must prime the sync source selector so that it considers all candidates regardless
// of oplog position, by passing in null OpTime as the last op fetched time.
r.connectToSyncSource(&txn, OpTime(), replCoord);
+
if (r.getHost().empty()) {
std::string msg =
- "no valid sync sources found in current replset to do an initial sync";
- log() << msg;
- return Status(ErrorCodes::InitialSyncOplogSourceMissing, msg);
+ "No valid sync source found in current replica set to do an initial sync.";
+ if (++currentRetry >= kConnectRetryLimit) {
+ return Status(ErrorCodes::InitialSyncOplogSourceMissing, msg);
+ }
+ LOG(1) << msg << ", retry " << currentRetry << " of " << kConnectRetryLimit;
+ sleepsecs(1);
}
if (inShutdown()) {
@@ -338,7 +347,6 @@ Status _initialSync() {
BSONObj lastOp = r.getLastOp(rsOplogName);
if (lastOp.isEmpty()) {
std::string msg = "initial sync couldn't read remote oplog";
- log() << msg;
sleepsecs(15);
return Status(ErrorCodes::InitialSyncFailure, msg);
}
@@ -445,10 +453,17 @@ Status _initialSync() {
log() << "initial sync done";
return Status::OK();
}
+
+stdx::mutex _initialSyncMutex;
+const auto kMaxFailedAttempts = 10;
+const auto kInitialSyncRetrySleepDuration = Seconds{5};
} // namespace
void syncDoInitialSync() {
- static const int maxFailedAttempts = 10;
+ stdx::unique_lock<stdx::mutex> lk(_initialSyncMutex, stdx::defer_lock);
+ if (!lk.try_lock()) {
+ uasserted(34474, "Initial Sync Already Active.");
+ }
{
OperationContextImpl txn;
@@ -456,16 +471,14 @@ void syncDoInitialSync() {
}
int failedAttempts = 0;
- while (failedAttempts < maxFailedAttempts) {
+ while (failedAttempts < kMaxFailedAttempts) {
try {
// leave loop when successful
Status status = _initialSync();
if (status.isOK()) {
break;
- }
- if (status == ErrorCodes::InitialSyncOplogSourceMissing) {
- sleepsecs(1);
- return;
+ } else {
+ error() << status;
}
} catch (const DBException& e) {
error() << e;
@@ -479,13 +492,13 @@ void syncDoInitialSync() {
return;
}
- error() << "initial sync attempt failed, " << (maxFailedAttempts - ++failedAttempts)
+ error() << "initial sync attempt failed, " << (kMaxFailedAttempts - ++failedAttempts)
<< " attempts remaining";
- sleepsecs(5);
+ sleepmillis(durationCount<Milliseconds>(kInitialSyncRetrySleepDuration));
}
// No need to print a stack
- if (failedAttempts >= maxFailedAttempts) {
+ if (failedAttempts >= kMaxFailedAttempts) {
severe() << "The maximum number of retries have been exhausted for initial sync.";
fassertFailedNoTrace(16233);
}
diff --git a/src/mongo/db/repl/rs_sync.cpp b/src/mongo/db/repl/rs_sync.cpp
index 9e5c0e7e344..a8de0eadc56 100644
--- a/src/mongo/db/repl/rs_sync.cpp
+++ b/src/mongo/db/repl/rs_sync.cpp
@@ -102,16 +102,6 @@ void runSyncThread() {
continue;
}
- bool initialSyncRequested = BackgroundSync::get()->getInitialSyncRequestedFlag();
- // Check criteria for doing an initial sync:
- // 1. If the oplog is empty, do an initial sync
- // 2. If minValid has _initialSyncFlag set, do an initial sync
- // 3. If initialSyncRequested is true
- if (getGlobalReplicationCoordinator()->getMyLastAppliedOpTime().isNull() ||
- getInitialSyncFlag() || initialSyncRequested) {
- syncDoInitialSync();
- continue; // start from top again in case sync failed.
- }
if (!replCoord->setFollowerMode(MemberState::RS_RECOVERING)) {
continue;
}