summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/bgsync.cpp
diff options
context:
space:
mode:
authorSiyuan Zhou <siyuan.zhou@mongodb.com>2016-05-09 17:29:44 -0400
committerSiyuan Zhou <siyuan.zhou@mongodb.com>2016-08-04 18:10:46 -0400
commit953a241f6dd1541905a1b6e259140635b80238de (patch)
treec04703faefd3cfea5e81293fc2952d7e69991a29 /src/mongo/db/repl/bgsync.cpp
parent6e3021388b44e1c9fc3f6fd02b554fe0cc5c5c3e (diff)
downloadmongo-953a241f6dd1541905a1b6e259140635b80238de.tar.gz
SERVER-23663 New primary syncs from chosen node to catch up with timeout
SERVER-23662 Implement "catch-up timeout" configuration variable
Diffstat (limited to 'src/mongo/db/repl/bgsync.cpp')
-rw-r--r--src/mongo/db/repl/bgsync.cpp46
1 files changed, 36 insertions, 10 deletions
diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp
index 19249c7f864..8bbda77b638 100644
--- a/src/mongo/db/repl/bgsync.cpp
+++ b/src/mongo/db/repl/bgsync.cpp
@@ -108,7 +108,7 @@ size_t getSize(const BSONObj& o) {
}
} // namespace
-MONGO_FP_DECLARE(rsBgSyncProduce);
+MONGO_FP_DECLARE(pauseRsBgSyncProducer);
// The number and time spent reading batches off the network
static TimerStats getmoreReplStats;
@@ -220,7 +220,11 @@ void BackgroundSync::_signalNoNewDataForApplier(OperationContext* txn) {
void BackgroundSync::_runProducer() {
const MemberState state = _replCoord->getMemberState();
// Stop when the state changes to primary.
- if (_replCoord->isWaitingForApplierToDrain() || state.primary()) {
+ //
+ // TODO(siyuan) Drain mode should imply we're the primary. Fix this condition and the one below
+ // after fixing step-down during drain mode.
+ if (!_replCoord->isCatchingUp() &&
+ (_replCoord->isWaitingForApplierToDrain() || state.primary())) {
if (!isStopped()) {
stop();
}
@@ -255,6 +259,11 @@ void BackgroundSync::_runProducer() {
}
void BackgroundSync::_produce(OperationContext* txn) {
+
+ while (MONGO_FAIL_POINT(pauseRsBgSyncProducer)) {
+ sleepmillis(0);
+ }
+
// this oplog reader does not do a handshake because we don't want the server it's syncing
// from to track how far it has synced
{
@@ -267,17 +276,16 @@ void BackgroundSync::_produce(OperationContext* txn) {
return;
}
- if (_replCoord->isWaitingForApplierToDrain() || _replCoord->getMemberState().primary() ||
- _inShutdown_inlock()) {
+ if (!_replCoord->isCatchingUp() &&
+ (_replCoord->isWaitingForApplierToDrain() || _replCoord->getMemberState().primary())) {
return;
}
- }
- while (MONGO_FAIL_POINT(rsBgSyncProduce)) {
- sleepmillis(0);
+ if (_inShutdown_inlock()) {
+ return;
+ }
}
-
// find a target to sync from the last optime fetched
OpTime lastOpTimeFetched;
HostAndPort source;
@@ -286,15 +294,25 @@ void BackgroundSync::_produce(OperationContext* txn) {
lastOpTimeFetched = _lastOpTimeFetched;
_syncSourceHost = HostAndPort();
}
+
SyncSourceResolverResponse syncSourceResp =
_syncSourceResolver.findSyncSource(txn, lastOpTimeFetched);
if (syncSourceResp.syncSourceStatus == ErrorCodes::OplogStartMissing) {
// All (accessible) sync sources were too stale.
+ if (_replCoord->isCatchingUp()) {
+ warning() << "Too stale to catch up.";
+ log() << "Our newest OpTime : " << lastOpTimeFetched;
+ log() << "Earliest OpTime available is " << syncSourceResp.earliestOpTimeSeen;
+ sleepsecs(1);
+ return;
+ }
+
error() << "too stale to catch up -- entering maintenance mode";
log() << "Our newest OpTime : " << lastOpTimeFetched;
log() << "Earliest OpTime available is " << syncSourceResp.earliestOpTimeSeen;
log() << "See http://dochub.mongodb.org/core/resyncingaverystalereplicasetmember";
+
StorageInterface::get(txn)->setMinValid(
txn, {lastOpTimeFetched, syncSourceResp.earliestOpTimeSeen});
auto status = _replCoord->setMaintenanceMode(true);
@@ -329,7 +347,9 @@ void BackgroundSync::_produce(OperationContext* txn) {
}
lastOpTimeFetched = _lastOpTimeFetched;
lastHashFetched = _lastFetchedHash;
- _replCoord->signalUpstreamUpdater();
+ if (!_replCoord->isCatchingUp()) {
+ _replCoord->signalUpstreamUpdater();
+ }
}
// "lastFetched" not used. Already set in _enqueueDocuments.
@@ -393,6 +413,12 @@ void BackgroundSync::_produce(OperationContext* txn) {
return;
} else if (fetcherReturnStatus.code() == ErrorCodes::OplogStartMissing ||
fetcherReturnStatus.code() == ErrorCodes::RemoteOplogStale) {
+ if (_replCoord->isCatchingUp()) {
+ warning() << "Rollback situation detected in catch-up mode; catch-up mode will end.";
+ sleepsecs(1);
+ return;
+ }
+
// Rollback is a synchronous operation that uses the task executor and may not be
// executed inside the fetcher callback.
const int messagingPortTags = 0;
@@ -668,7 +694,7 @@ bool BackgroundSync::shouldStopFetching() const {
return true;
}
- if (_replCoord->getMemberState().primary()) {
+ if (_replCoord->getMemberState().primary() && !_replCoord->isCatchingUp()) {
LOG(2) << "Interrupted by becoming primary while checking sync source.";
return true;
}