diff options
author | Siyuan Zhou <siyuan.zhou@mongodb.com> | 2016-05-09 17:29:44 -0400 |
---|---|---|
committer | Siyuan Zhou <siyuan.zhou@mongodb.com> | 2016-08-04 18:10:46 -0400 |
commit | 953a241f6dd1541905a1b6e259140635b80238de (patch) | |
tree | c04703faefd3cfea5e81293fc2952d7e69991a29 /src/mongo/db/repl/bgsync.cpp | |
parent | 6e3021388b44e1c9fc3f6fd02b554fe0cc5c5c3e (diff) | |
download | mongo-953a241f6dd1541905a1b6e259140635b80238de.tar.gz |
SERVER-23663 New primary syncs from chosen node to catch up with timeout
SERVER-23662 Implement "catch-up timeout" configuration variable
Diffstat (limited to 'src/mongo/db/repl/bgsync.cpp')
-rw-r--r-- | src/mongo/db/repl/bgsync.cpp | 46 |
1 files changed, 36 insertions, 10 deletions
diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp index 19249c7f864..8bbda77b638 100644 --- a/src/mongo/db/repl/bgsync.cpp +++ b/src/mongo/db/repl/bgsync.cpp @@ -108,7 +108,7 @@ size_t getSize(const BSONObj& o) { } } // namespace -MONGO_FP_DECLARE(rsBgSyncProduce); +MONGO_FP_DECLARE(pauseRsBgSyncProducer); // The number and time spent reading batches off the network static TimerStats getmoreReplStats; @@ -220,7 +220,11 @@ void BackgroundSync::_signalNoNewDataForApplier(OperationContext* txn) { void BackgroundSync::_runProducer() { const MemberState state = _replCoord->getMemberState(); // Stop when the state changes to primary. - if (_replCoord->isWaitingForApplierToDrain() || state.primary()) { + // + // TODO(siyuan) Drain mode should imply we're the primary. Fix this condition and the one below + // after fixing step-down during drain mode. + if (!_replCoord->isCatchingUp() && + (_replCoord->isWaitingForApplierToDrain() || state.primary())) { if (!isStopped()) { stop(); } @@ -255,6 +259,11 @@ void BackgroundSync::_runProducer() { } void BackgroundSync::_produce(OperationContext* txn) { + + while (MONGO_FAIL_POINT(pauseRsBgSyncProducer)) { + sleepmillis(0); + } + // this oplog reader does not do a handshake because we don't want the server it's syncing // from to track how far it has synced { @@ -267,17 +276,16 @@ void BackgroundSync::_produce(OperationContext* txn) { return; } - if (_replCoord->isWaitingForApplierToDrain() || _replCoord->getMemberState().primary() || - _inShutdown_inlock()) { + if (!_replCoord->isCatchingUp() && + (_replCoord->isWaitingForApplierToDrain() || _replCoord->getMemberState().primary())) { return; } - } - while (MONGO_FAIL_POINT(rsBgSyncProduce)) { - sleepmillis(0); + if (_inShutdown_inlock()) { + return; + } } - // find a target to sync from the last optime fetched OpTime lastOpTimeFetched; HostAndPort source; @@ -286,15 +294,25 @@ void BackgroundSync::_produce(OperationContext* txn) { lastOpTimeFetched = _lastOpTimeFetched; _syncSourceHost = HostAndPort(); } + SyncSourceResolverResponse syncSourceResp = _syncSourceResolver.findSyncSource(txn, lastOpTimeFetched); if (syncSourceResp.syncSourceStatus == ErrorCodes::OplogStartMissing) { // All (accessible) sync sources were too stale. + if (_replCoord->isCatchingUp()) { + warning() << "Too stale to catch up."; + log() << "Our newest OpTime : " << lastOpTimeFetched; + log() << "Earliest OpTime available is " << syncSourceResp.earliestOpTimeSeen; + sleepsecs(1); + return; + } + error() << "too stale to catch up -- entering maintenance mode"; log() << "Our newest OpTime : " << lastOpTimeFetched; log() << "Earliest OpTime available is " << syncSourceResp.earliestOpTimeSeen; log() << "See http://dochub.mongodb.org/core/resyncingaverystalereplicasetmember"; + StorageInterface::get(txn)->setMinValid( txn, {lastOpTimeFetched, syncSourceResp.earliestOpTimeSeen}); auto status = _replCoord->setMaintenanceMode(true); @@ -329,7 +347,9 @@ void BackgroundSync::_produce(OperationContext* txn) { } lastOpTimeFetched = _lastOpTimeFetched; lastHashFetched = _lastFetchedHash; - _replCoord->signalUpstreamUpdater(); + if (!_replCoord->isCatchingUp()) { + _replCoord->signalUpstreamUpdater(); + } } // "lastFetched" not used. Already set in _enqueueDocuments. @@ -393,6 +413,12 @@ void BackgroundSync::_produce(OperationContext* txn) { return; } else if (fetcherReturnStatus.code() == ErrorCodes::OplogStartMissing || fetcherReturnStatus.code() == ErrorCodes::RemoteOplogStale) { + if (_replCoord->isCatchingUp()) { + warning() << "Rollback situation detected in catch-up mode; catch-up mode will end."; + sleepsecs(1); + return; + } + // Rollback is a synchronous operation that uses the task executor and may not be // executed inside the fetcher callback. const int messagingPortTags = 0; @@ -668,7 +694,7 @@ bool BackgroundSync::shouldStopFetching() const { return true; } - if (_replCoord->getMemberState().primary()) { + if (_replCoord->getMemberState().primary() && !_replCoord->isCatchingUp()) { LOG(2) << "Interrupted by becoming primary while checking sync source."; return true; } |