summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorSiyuan Zhou <siyuan.zhou@mongodb.com>2016-05-09 17:29:44 -0400
committerSiyuan Zhou <siyuan.zhou@mongodb.com>2016-08-04 18:10:46 -0400
commit953a241f6dd1541905a1b6e259140635b80238de (patch)
treec04703faefd3cfea5e81293fc2952d7e69991a29 /src/mongo
parent6e3021388b44e1c9fc3f6fd02b554fe0cc5c5c3e (diff)
downloadmongo-953a241f6dd1541905a1b6e259140635b80238de.tar.gz
SERVER-23663 New primary syncs from chosen node to catch up with timeout
SERVER-23662 Implement "catch-up timeout" configuration variable
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/db/repl/bgsync.cpp46
-rw-r--r--src/mongo/db/repl/freshness_scanner.cpp4
-rw-r--r--src/mongo/db/repl/oplog_fetcher.cpp8
-rw-r--r--src/mongo/db/repl/replica_set_config.cpp22
-rw-r--r--src/mongo/db/repl/replica_set_config.h9
-rw-r--r--src/mongo/db/repl/replication_coordinator.h6
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp314
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.h50
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp241
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl_test.cpp35
-rw-r--r--src/mongo/db/repl/replication_coordinator_mock.cpp4
-rw-r--r--src/mongo/db/repl/replication_coordinator_mock.h2
-rw-r--r--src/mongo/db/repl/replication_coordinator_test_fixture.cpp32
-rw-r--r--src/mongo/db/repl/replication_coordinator_test_fixture.h5
-rw-r--r--src/mongo/db/repl/rs_sync.cpp3
-rw-r--r--src/mongo/db/repl/sync_tail.cpp3
-rw-r--r--src/mongo/db/repl/topology_coordinator_impl.cpp5
17 files changed, 666 insertions, 123 deletions
diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp
index 19249c7f864..8bbda77b638 100644
--- a/src/mongo/db/repl/bgsync.cpp
+++ b/src/mongo/db/repl/bgsync.cpp
@@ -108,7 +108,7 @@ size_t getSize(const BSONObj& o) {
}
} // namespace
-MONGO_FP_DECLARE(rsBgSyncProduce);
+MONGO_FP_DECLARE(pauseRsBgSyncProducer);
// The number and time spent reading batches off the network
static TimerStats getmoreReplStats;
@@ -220,7 +220,11 @@ void BackgroundSync::_signalNoNewDataForApplier(OperationContext* txn) {
void BackgroundSync::_runProducer() {
const MemberState state = _replCoord->getMemberState();
// Stop when the state changes to primary.
- if (_replCoord->isWaitingForApplierToDrain() || state.primary()) {
+ //
+ // TODO(siyuan) Drain mode should imply we're the primary. Fix this condition and the one below
+ // after fixing step-down during drain mode.
+ if (!_replCoord->isCatchingUp() &&
+ (_replCoord->isWaitingForApplierToDrain() || state.primary())) {
if (!isStopped()) {
stop();
}
@@ -255,6 +259,11 @@ void BackgroundSync::_runProducer() {
}
void BackgroundSync::_produce(OperationContext* txn) {
+
+ while (MONGO_FAIL_POINT(pauseRsBgSyncProducer)) {
+ sleepmillis(0);
+ }
+
// this oplog reader does not do a handshake because we don't want the server it's syncing
// from to track how far it has synced
{
@@ -267,17 +276,16 @@ void BackgroundSync::_produce(OperationContext* txn) {
return;
}
- if (_replCoord->isWaitingForApplierToDrain() || _replCoord->getMemberState().primary() ||
- _inShutdown_inlock()) {
+ if (!_replCoord->isCatchingUp() &&
+ (_replCoord->isWaitingForApplierToDrain() || _replCoord->getMemberState().primary())) {
return;
}
- }
- while (MONGO_FAIL_POINT(rsBgSyncProduce)) {
- sleepmillis(0);
+ if (_inShutdown_inlock()) {
+ return;
+ }
}
-
// find a target to sync from the last optime fetched
OpTime lastOpTimeFetched;
HostAndPort source;
@@ -286,15 +294,25 @@ void BackgroundSync::_produce(OperationContext* txn) {
lastOpTimeFetched = _lastOpTimeFetched;
_syncSourceHost = HostAndPort();
}
+
SyncSourceResolverResponse syncSourceResp =
_syncSourceResolver.findSyncSource(txn, lastOpTimeFetched);
if (syncSourceResp.syncSourceStatus == ErrorCodes::OplogStartMissing) {
// All (accessible) sync sources were too stale.
+ if (_replCoord->isCatchingUp()) {
+ warning() << "Too stale to catch up.";
+ log() << "Our newest OpTime : " << lastOpTimeFetched;
+ log() << "Earliest OpTime available is " << syncSourceResp.earliestOpTimeSeen;
+ sleepsecs(1);
+ return;
+ }
+
error() << "too stale to catch up -- entering maintenance mode";
log() << "Our newest OpTime : " << lastOpTimeFetched;
log() << "Earliest OpTime available is " << syncSourceResp.earliestOpTimeSeen;
log() << "See http://dochub.mongodb.org/core/resyncingaverystalereplicasetmember";
+
StorageInterface::get(txn)->setMinValid(
txn, {lastOpTimeFetched, syncSourceResp.earliestOpTimeSeen});
auto status = _replCoord->setMaintenanceMode(true);
@@ -329,7 +347,9 @@ void BackgroundSync::_produce(OperationContext* txn) {
}
lastOpTimeFetched = _lastOpTimeFetched;
lastHashFetched = _lastFetchedHash;
- _replCoord->signalUpstreamUpdater();
+ if (!_replCoord->isCatchingUp()) {
+ _replCoord->signalUpstreamUpdater();
+ }
}
// "lastFetched" not used. Already set in _enqueueDocuments.
@@ -393,6 +413,12 @@ void BackgroundSync::_produce(OperationContext* txn) {
return;
} else if (fetcherReturnStatus.code() == ErrorCodes::OplogStartMissing ||
fetcherReturnStatus.code() == ErrorCodes::RemoteOplogStale) {
+ if (_replCoord->isCatchingUp()) {
+ warning() << "Rollback situation detected in catch-up mode; catch-up mode will end.";
+ sleepsecs(1);
+ return;
+ }
+
// Rollback is a synchronous operation that uses the task executor and may not be
// executed inside the fetcher callback.
const int messagingPortTags = 0;
@@ -668,7 +694,7 @@ bool BackgroundSync::shouldStopFetching() const {
return true;
}
- if (_replCoord->getMemberState().primary()) {
+ if (_replCoord->getMemberState().primary() && !_replCoord->isCatchingUp()) {
LOG(2) << "Interrupted by becoming primary while checking sync source.";
return true;
}
diff --git a/src/mongo/db/repl/freshness_scanner.cpp b/src/mongo/db/repl/freshness_scanner.cpp
index 2623fb32b88..e3880d6cc76 100644
--- a/src/mongo/db/repl/freshness_scanner.cpp
+++ b/src/mongo/db/repl/freshness_scanner.cpp
@@ -80,6 +80,8 @@ void FreshnessScanner::Algorithm::processResponse(const RemoteCommandRequest& re
OpTime lastOpTime;
Status status = bsonExtractOpTimeField(opTimesObj, "appliedOpTime", &lastOpTime);
if (!status.isOK()) {
+ LOG(2) << "FreshnessScanner: failed to parse opTime in " << opTimesObj << " from "
+ << request.target << causedBy(status);
return;
}
@@ -92,6 +94,8 @@ void FreshnessScanner::Algorithm::processResponse(const RemoteCommandRequest& re
auto iter =
std::upper_bound(_freshnessInfos.begin(), _freshnessInfos.end(), freshnessInfo, cmp);
_freshnessInfos.insert(iter, freshnessInfo);
+ LOG(2) << "FreshnessScanner: processed response " << opTimesObj << " from "
+ << request.target;
}
}
diff --git a/src/mongo/db/repl/oplog_fetcher.cpp b/src/mongo/db/repl/oplog_fetcher.cpp
index d010fdcc585..d41c905ab9a 100644
--- a/src/mongo/db/repl/oplog_fetcher.cpp
+++ b/src/mongo/db/repl/oplog_fetcher.cpp
@@ -37,6 +37,7 @@
#include "mongo/rpc/metadata/server_selection_metadata.h"
#include "mongo/stdx/mutex.h"
#include "mongo/util/assert_util.h"
+#include "mongo/util/fail_point_service.h"
#include "mongo/util/log.h"
#include "mongo/util/time_support.h"
@@ -47,6 +48,8 @@ Seconds OplogFetcher::kDefaultProtocolZeroAwaitDataTimeout(2);
namespace {
+MONGO_FP_DECLARE(stopOplogFetcher);
+
/**
* Calculates await data timeout based on the current replica set configuration.
*/
@@ -302,6 +305,11 @@ void OplogFetcher::_callback(const Fetcher::QueryResponseStatus& result,
return;
}
+ // Stop fetching and return immediately on fail point.
+ if (MONGO_FAIL_POINT(stopOplogFetcher)) {
+ return;
+ }
+
const auto& queryResponse = result.getValue();
rpc::ReplSetMetadata metadata;
diff --git a/src/mongo/db/repl/replica_set_config.cpp b/src/mongo/db/repl/replica_set_config.cpp
index b3d0d8f4fc8..4f160d9488c 100644
--- a/src/mongo/db/repl/replica_set_config.cpp
+++ b/src/mongo/db/repl/replica_set_config.cpp
@@ -50,6 +50,8 @@ const std::string ReplicaSetConfig::kMajorityWriteConcernModeName = "$majority";
const Milliseconds ReplicaSetConfig::kDefaultHeartbeatInterval(2000);
const Seconds ReplicaSetConfig::kDefaultHeartbeatTimeoutPeriod(10);
const Milliseconds ReplicaSetConfig::kDefaultElectionTimeoutPeriod(10000);
+// TODO(siyuan): Change the default catch up timeout to 5000 milliseconds;
+const Milliseconds ReplicaSetConfig::kDefaultCatchUpTimeoutPeriod(0);
const bool ReplicaSetConfig::kDefaultChainingAllowed(true);
namespace {
@@ -76,6 +78,7 @@ const std::string kGetLastErrorDefaultsFieldName = "getLastErrorDefaults";
const std::string kGetLastErrorModesFieldName = "getLastErrorModes";
const std::string kHeartbeatIntervalFieldName = "heartbeatIntervalMillis";
const std::string kHeartbeatTimeoutFieldName = "heartbeatTimeoutSecs";
+const std::string kCatchUpTimeoutFieldName = "catchUpTimeoutMillis";
const std::string kReplicaSetIdFieldName = "replicaSetId";
} // namespace
@@ -265,6 +268,23 @@ Status ReplicaSetConfig::_parseSettingsSubdocument(const BSONObj& settings) {
_heartbeatTimeoutPeriod = Seconds(heartbeatTimeoutSecs);
//
+ // Parse catchUpTimeoutMillis
+ //
+ auto notLessThanZero = stdx::bind(std::greater_equal<long long>(), stdx::placeholders::_1, 0);
+ long long catchUpTimeoutMillis;
+ Status catchUpTimeoutStatus = bsonExtractIntegerFieldWithDefaultIf(
+ settings,
+ kCatchUpTimeoutFieldName,
+ durationCount<Milliseconds>(kDefaultCatchUpTimeoutPeriod),
+ notLessThanZero,
+ "catch-up timeout must be greater than or equal to 0",
+ &catchUpTimeoutMillis);
+ if (!catchUpTimeoutStatus.isOK()) {
+ return catchUpTimeoutStatus;
+ }
+ _catchUpTimeoutPeriod = Milliseconds(catchUpTimeoutMillis);
+
+ //
// Parse chainingAllowed
//
Status status = bsonExtractBooleanFieldWithDefault(
@@ -763,6 +783,8 @@ BSONObj ReplicaSetConfig::toBSON() const {
durationCount<Seconds>(_heartbeatTimeoutPeriod));
settingsBuilder.appendIntOrLL(kElectionTimeoutFieldName,
durationCount<Milliseconds>(_electionTimeoutPeriod));
+ settingsBuilder.appendIntOrLL(kCatchUpTimeoutFieldName,
+ durationCount<Milliseconds>(_catchUpTimeoutPeriod));
BSONObjBuilder gleModes(settingsBuilder.subobjStart(kGetLastErrorModesFieldName));
diff --git a/src/mongo/db/repl/replica_set_config.h b/src/mongo/db/repl/replica_set_config.h
index 7bbedf85f89..0586342df92 100644
--- a/src/mongo/db/repl/replica_set_config.h
+++ b/src/mongo/db/repl/replica_set_config.h
@@ -63,6 +63,7 @@ public:
static const Milliseconds kDefaultElectionTimeoutPeriod;
static const Milliseconds kDefaultHeartbeatInterval;
static const Seconds kDefaultHeartbeatTimeoutPeriod;
+ static const Milliseconds kDefaultCatchUpTimeoutPeriod;
static const bool kDefaultChainingAllowed;
/**
@@ -218,6 +219,13 @@ public:
}
/**
+ * Gets the timeout to wait for a primary to catch up its oplog.
+ */
+ Milliseconds getCatchUpTimeoutPeriod() const {
+ return _catchUpTimeoutPeriod;
+ }
+
+ /**
* Gets the number of votes required to win an election.
*/
int getMajorityVoteCount() const {
@@ -374,6 +382,7 @@ private:
Milliseconds _electionTimeoutPeriod = kDefaultElectionTimeoutPeriod;
Milliseconds _heartbeatInterval = kDefaultHeartbeatInterval;
Seconds _heartbeatTimeoutPeriod = kDefaultHeartbeatTimeoutPeriod;
+ Milliseconds _catchUpTimeoutPeriod = kDefaultCatchUpTimeoutPeriod;
bool _chainingAllowed = kDefaultChainingAllowed;
bool _writeConcernMajorityJournalDefault = false;
int _majorityVoteCount = 0;
diff --git a/src/mongo/db/repl/replication_coordinator.h b/src/mongo/db/repl/replication_coordinator.h
index afede4ea517..62ca5fe94ec 100644
--- a/src/mongo/db/repl/replication_coordinator.h
+++ b/src/mongo/db/repl/replication_coordinator.h
@@ -399,6 +399,12 @@ public:
virtual bool isWaitingForApplierToDrain() = 0;
/**
+ * A new primary tries to have its oplog catch up after winning an election.
+ * Return true if the coordinator is waiting for catch-up to finish.
+ */
+ virtual bool isCatchingUp() = 0;
+
+ /**
* Signals that a previously requested pause and drain of the applier buffer
* has completed.
*
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index 4c4aaf14e32..51db27b9dd8 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -45,6 +45,7 @@
#include "mongo/db/repl/data_replicator_external_state_impl.h"
#include "mongo/db/repl/elect_cmd_runner.h"
#include "mongo/db/repl/freshness_checker.h"
+#include "mongo/db/repl/freshness_scanner.h"
#include "mongo/db/repl/handshake_args.h"
#include "mongo/db/repl/is_master_response.h"
#include "mongo/db/repl/last_vote.h"
@@ -146,37 +147,25 @@ std::string ReplicationCoordinatorImpl::SlaveInfo::toString() const {
return toBSON().toString();
}
-
struct ReplicationCoordinatorImpl::WaiterInfo {
- /**
- * Constructor takes the list of waiters and enqueues itself on the list, removing itself
- * in the destructor.
- */
- WaiterInfo(std::vector<WaiterInfo*>* _list,
- unsigned int _opID,
- const OpTime* _opTime,
+
+ using FinishFunc = stdx::function<void()>;
+
+ WaiterInfo(unsigned int _opID,
+ const OpTime _opTime,
const WriteConcernOptions* _writeConcern,
stdx::condition_variable* _condVar)
- : list(_list),
- master(true),
- opID(_opID),
- opTime(_opTime),
- writeConcern(_writeConcern),
- condVar(_condVar) {
- list->push_back(this);
- }
+ : opID(_opID), opTime(_opTime), writeConcern(_writeConcern), condVar(_condVar) {}
- ~WaiterInfo() {
- list->erase(std::remove(list->begin(), list->end(), this), list->end());
- }
+ // When waiter is signaled, finishCallback will be called while holding replCoord _mutex
+ // since WaiterLists are protected by _mutex.
+ WaiterInfo(const OpTime _opTime, FinishFunc _finishCallback)
+ : opTime(_opTime), finishCallback(_finishCallback) {}
BSONObj toBSON() const {
BSONObjBuilder bob;
bob.append("opId", opID);
- if (opTime) {
- bob.append("opTime", opTime->toBSON());
- }
- bob.append("master", master);
+ bob.append("opTime", opTime.toBSON());
if (writeConcern) {
bob.append("writeConcern", writeConcern->toBSON());
}
@@ -187,14 +176,90 @@ struct ReplicationCoordinatorImpl::WaiterInfo {
return toBSON().toString();
};
- std::vector<WaiterInfo*>* list;
- bool master; // Set to false to indicate that stepDown was called while waiting
- const unsigned int opID;
- const OpTime* opTime;
- const WriteConcernOptions* writeConcern;
- stdx::condition_variable* condVar;
+ // It is invalid to call notify() unless holding ReplicationCoordinatorImpl::_mutex.
+ void notify() {
+ if (condVar) {
+ condVar->notify_all();
+ }
+ if (finishCallback) {
+ finishCallback();
+ }
+ }
+
+ const unsigned int opID = 0;
+ const OpTime opTime;
+ const WriteConcernOptions* writeConcern = nullptr;
+ stdx::condition_variable* condVar = nullptr;
+ // The callback that will be called when this waiter is notified.
+ FinishFunc finishCallback = nullptr;
+};
+
+struct ReplicationCoordinatorImpl::WaiterInfoGuard {
+ /**
+ * Constructor takes the list of waiters and enqueues itself on the list, removing itself
+ * in the destructor.
+ *
+ * Usually waiters will be signaled and removed when their criteria are satisfied, but
+ * wait_until() with timeout may signal waiters earlier and this guard will remove the waiter
+ * properly.
+ *
+ * _list is guarded by ReplicationCoordinatorImpl::_mutex, thus it is illegal to construct one
+ * of these without holding _mutex
+ */
+ WaiterInfoGuard(WaiterList* list,
+ unsigned int opID,
+ const OpTime opTime,
+ const WriteConcernOptions* writeConcern,
+ stdx::condition_variable* condVar)
+ : waiter(opID, opTime, writeConcern, condVar), _list(list) {
+ list->add_inlock(&waiter);
+ }
+
+ ~WaiterInfoGuard() {
+ _list->remove_inlock(&waiter);
+ }
+
+ WaiterInfo waiter;
+
+private:
+ WaiterList* _list;
};
+void ReplicationCoordinatorImpl::WaiterList::add_inlock(WaiterType waiter) {
+ _list.push_back(waiter);
+}
+
+void ReplicationCoordinatorImpl::WaiterList::signalAndRemoveIf_inlock(
+ stdx::function<bool(WaiterType)> func) {
+ std::vector<WaiterType>::iterator it = _list.end();
+ while (true) {
+ it = std::find_if(_list.begin(), _list.end(), func);
+ if (it == _list.end()) {
+ break;
+ }
+ (*it)->notify();
+ std::swap(*it, _list.back());
+ _list.pop_back();
+ }
+}
+
+void ReplicationCoordinatorImpl::WaiterList::signalAndRemoveAll_inlock() {
+ for (auto& waiter : _list) {
+ waiter->notify();
+ }
+ _list.clear();
+}
+
+bool ReplicationCoordinatorImpl::WaiterList::remove_inlock(WaiterType waiter) {
+ auto it = std::find(_list.begin(), _list.end(), waiter);
+ if (it != _list.end()) {
+ std::swap(*it, _list.back());
+ _list.pop_back();
+ return true;
+ }
+ return false;
+}
+
namespace {
ReplicationCoordinator::Mode getReplicationModeFromSettings(const ReplSettings& settings) {
if (settings.usingReplSets()) {
@@ -594,12 +659,8 @@ void ReplicationCoordinatorImpl::shutdown(OperationContext* txn) {
return;
}
fassert(18823, _rsConfigState != kConfigStartingUp);
- for (std::vector<WaiterInfo*>::iterator it = _replicationWaiterList.begin();
- it != _replicationWaiterList.end();
- ++it) {
- WaiterInfo* waiter = *it;
- waiter->condVar->notify_all();
- }
+ _replicationWaiterList.signalAndRemoveAll_inlock();
+ _opTimeWaiterList.signalAndRemoveAll_inlock();
}
// joining the replication executor is blocking so it must be run outside of the mutex
@@ -736,6 +797,12 @@ bool ReplicationCoordinatorImpl::isWaitingForApplierToDrain() {
return _isWaitingForDrainToComplete;
}
+bool ReplicationCoordinatorImpl::isCatchingUp() {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ return _isCatchingUp;
+}
+
+
void ReplicationCoordinatorImpl::signalDrainComplete(OperationContext* txn) {
// This logic is a little complicated in order to avoid acquiring the global exclusive lock
// unnecessarily. This is important because the applier may call signalDrainComplete()
@@ -806,7 +873,7 @@ Status ReplicationCoordinatorImpl::waitForDrainFinish(Milliseconds timeout) {
}
stdx::unique_lock<stdx::mutex> lk(_mutex);
- auto pred = [this]() { return !_isWaitingForDrainToComplete; };
+ auto pred = [this]() { return !_isCatchingUp && !_isWaitingForDrainToComplete; };
if (!_drainFinishedCond.wait_for(lk, timeout.toSystemDuration(), pred)) {
return Status(ErrorCodes::ExceededTimeLimit,
"Timed out waiting to finish draining applier buffer");
@@ -1028,11 +1095,8 @@ void ReplicationCoordinatorImpl::_setMyLastAppliedOpTime_inlock(const OpTime& op
invariant(isRollbackAllowed || mySlaveInfo->lastAppliedOpTime <= opTime);
_updateSlaveInfoAppliedOpTime_inlock(mySlaveInfo, opTime);
- for (auto& opTimeWaiter : _opTimeWaiterList) {
- if (*(opTimeWaiter->opTime) <= opTime) {
- opTimeWaiter->condVar->notify_all();
- }
- }
+ _opTimeWaiterList.signalAndRemoveIf_inlock(
+ [opTime](WaiterInfo* waiter) { return waiter->opTime <= opTime; });
}
void ReplicationCoordinatorImpl::_setMyLastDurableOpTime_inlock(const OpTime& opTime,
@@ -1138,9 +1202,10 @@ Status ReplicationCoordinatorImpl::waitUntilOpTimeForRead(OperationContext* txn,
// We just need to wait for the opTime to catch up to what we need (not majority RC).
stdx::condition_variable condVar;
- WaiterInfo waitInfo(&_opTimeWaiterList, txn->getOpID(), &targetOpTime, nullptr, &condVar);
+ WaiterInfoGuard waitInfo(
+ &_opTimeWaiterList, txn->getOpID(), targetOpTime, nullptr, &condVar);
- LOG(3) << "Waiting for OpTime: " << waitInfo;
+ LOG(3) << "Waiting for OpTime: " << waitInfo.waiter;
if (txn->hasDeadline()) {
condVar.wait_until(lock, txn->getDeadline().toSystemTimePoint());
} else {
@@ -1334,22 +1399,9 @@ void ReplicationCoordinatorImpl::interrupt(unsigned opId) {
// Wake ops waiting for a new committed snapshot.
_currentCommittedSnapshotCond.notify_all();
- for (std::vector<WaiterInfo*>::iterator it = _replicationWaiterList.begin();
- it != _replicationWaiterList.end();
- ++it) {
- WaiterInfo* info = *it;
- if (info->opID == opId) {
- info->condVar->notify_all();
- return;
- }
- }
-
- for (auto& opTimeWaiter : _opTimeWaiterList) {
- if (opTimeWaiter->opID == opId) {
- opTimeWaiter->condVar->notify_all();
- return;
- }
- }
+ auto hasSameOpID = [opId](WaiterInfo* waiter) { return waiter->opID == opId; };
+ _replicationWaiterList.signalAndRemoveIf_inlock(hasSameOpID);
+ _opTimeWaiterList.signalAndRemoveIf_inlock(hasSameOpID);
}
{
@@ -1364,16 +1416,8 @@ void ReplicationCoordinatorImpl::interruptAll() {
// Wake ops waiting for a new committed snapshot.
_currentCommittedSnapshotCond.notify_all();
- for (std::vector<WaiterInfo*>::iterator it = _replicationWaiterList.begin();
- it != _replicationWaiterList.end();
- ++it) {
- WaiterInfo* info = *it;
- info->condVar->notify_all();
- }
-
- for (auto& opTimeWaiter : _opTimeWaiterList) {
- opTimeWaiter->condVar->notify_all();
- }
+ _replicationWaiterList.signalAndRemoveAll_inlock();
+ _opTimeWaiterList.signalAndRemoveAll_inlock();
}
{
@@ -1543,7 +1587,8 @@ ReplicationCoordinator::StatusAndDuration ReplicationCoordinatorImpl::_awaitRepl
// Must hold _mutex before constructing waitInfo as it will modify _replicationWaiterList
stdx::condition_variable condVar;
- WaiterInfo waitInfo(&_replicationWaiterList, txn->getOpID(), &opTime, &writeConcern, &condVar);
+ WaiterInfoGuard waitInfo(
+ &_replicationWaiterList, txn->getOpID(), opTime, &writeConcern, &condVar);
while (!_doneWaitingForReplication_inlock(opTime, minSnapshot, writeConcern)) {
const Milliseconds elapsed{timer->millis()};
@@ -1552,7 +1597,7 @@ ReplicationCoordinator::StatusAndDuration ReplicationCoordinatorImpl::_awaitRepl
return StatusAndDuration(interruptedStatus, elapsed);
}
- if (!waitInfo.master) {
+ if (replMode == modeReplSet && !_getMemberState_inlock().primary()) {
return StatusAndDuration(Status(ErrorCodes::NotMaster,
"Not master anymore while waiting for replication"
" - this most likely means that a step down"
@@ -2001,7 +2046,7 @@ void ReplicationCoordinatorImpl::fillIsMasterForReplSet(IsMasterResponse* respon
response->setLastMajorityWrite(majorityOpTime, majorityOpTime.getTimestamp().getSecs());
}
- if (isWaitingForApplierToDrain()) {
+ if (isWaitingForApplierToDrain() || isCatchingUp()) {
// Report that we are secondary to ismaster callers until drain completes.
response->setIsMaster(false);
response->setIsSecondary(true);
@@ -2475,13 +2520,9 @@ ReplicationCoordinatorImpl::_updateMemberStateFromTopologyCoordinator_inlock() {
PostMemberStateUpdateAction result;
if (_memberState.primary() || newState.removed() || newState.rollback()) {
// Wake up any threads blocked in awaitReplication, close connections, etc.
- for (std::vector<WaiterInfo*>::iterator it = _replicationWaiterList.begin();
- it != _replicationWaiterList.end();
- ++it) {
- WaiterInfo* info = *it;
- info->master = false;
- info->condVar->notify_all();
- }
+ _replicationWaiterList.signalAndRemoveAll_inlock();
+ // Wake up the optime waiter that is waiting for primary catch-up to finish.
+ _opTimeWaiterList.signalAndRemoveAll_inlock();
_canAcceptNonLocalWrites = false;
result = kActionCloseAllConnections;
} else {
@@ -2580,15 +2621,20 @@ void ReplicationCoordinatorImpl::_performPostMemberStateUpdateAction(
_electionId = OID::gen();
}
_topCoord->processWinElection(_electionId, getNextGlobalTimestamp());
- _isWaitingForDrainToComplete = true;
+ _isCatchingUp = true;
const PostMemberStateUpdateAction nextAction =
_updateMemberStateFromTopologyCoordinator_inlock();
invariant(nextAction != kActionWinElection);
lk.unlock();
- _externalState->signalApplierToCancelFetcher();
_performPostMemberStateUpdateAction(nextAction);
// Notify all secondaries of the election win.
_scheduleElectionWinNotification();
+ lk.lock();
+ if (isV1ElectionProtocol()) {
+ _scanOpTimeForCatchUp_inlock();
+ } else {
+ _finishCatchUpOplog_inlock(true);
+ }
break;
}
case kActionStartSingleNodeElection:
@@ -2602,6 +2648,99 @@ void ReplicationCoordinatorImpl::_performPostMemberStateUpdateAction(
}
}
+void ReplicationCoordinatorImpl::_scanOpTimeForCatchUp_inlock() {
+ auto scanner = std::make_shared<FreshnessScanner>();
+ auto scanStartTime = _replExecutor.now();
+ auto evhStatus =
+ scanner->start(&_replExecutor, _rsConfig, _selfIndex, _rsConfig.getCatchUpTimeoutPeriod());
+ if (evhStatus == ErrorCodes::ShutdownInProgress) {
+ _finishCatchUpOplog_inlock(true);
+ return;
+ }
+ fassertStatusOK(40254, evhStatus.getStatus());
+ long long term = _cachedTerm;
+ _replExecutor.onEvent(
+ evhStatus.getValue(), [this, scanner, scanStartTime, term](const CallbackArgs& cbData) {
+ LockGuard lk(_mutex);
+ if (cbData.status == ErrorCodes::CallbackCanceled) {
+ _finishCatchUpOplog_inlock(true);
+ return;
+ }
+ auto totalTimeout = _rsConfig.getCatchUpTimeoutPeriod();
+ auto catchUpTimeout = totalTimeout - (_replExecutor.now() - scanStartTime);
+ _catchUpOplogToLatest_inlock(*scanner, catchUpTimeout, term);
+ });
+}
+
+void ReplicationCoordinatorImpl::_catchUpOplogToLatest_inlock(const FreshnessScanner& scanner,
+ Milliseconds timeout,
+ long long originalTerm) {
+ // On stepping down, the node doesn't update its term immediately due to SERVER-21425.
+ // Term is also checked in case the catchup timeout is so long that the node becomes primary
+ // again.
+ if (!_memberState.primary() || originalTerm != _cachedTerm) {
+ log() << "Stopped transition to primary of term " << originalTerm
+ << " because I've already stepped down.";
+ _finishCatchUpOplog_inlock(false);
+ return;
+ }
+
+ auto result = scanner.getResult();
+
+ // Cannot access any nodes within timeout.
+ if (result.size() == 0) {
+ log() << "Could not access any nodes within timeout when checking for "
+ << "additional ops to apply before finishing transition to primary. "
+ << "Will move forward with becoming primary anyway.";
+ _finishCatchUpOplog_inlock(true);
+ return;
+ }
+
+ // I'm most up-to-date as far as I know.
+ auto freshnessInfo = result.front();
+ if (freshnessInfo.opTime <= _getMyLastAppliedOpTime_inlock()) {
+ log() << "My optime is most up-to-date, skipping catch-up "
+ << "and completing transition to primary.";
+ _finishCatchUpOplog_inlock(true);
+ return;
+ }
+
+ // Wait for the replication level to reach the latest opTime within timeout.
+ auto latestOpTime = freshnessInfo.opTime;
+ auto finishCB = [this, latestOpTime]() {
+ if (latestOpTime > _getMyLastAppliedOpTime_inlock()) {
+ log() << "Cannot catch up oplog after becoming primary.";
+ } else {
+ log() << "Finished catch-up oplog after becoming primary.";
+ }
+
+ _finishCatchUpOplog_inlock(true);
+ };
+ auto waiterInfo = std::make_shared<WaiterInfo>(freshnessInfo.opTime, finishCB);
+
+ _opTimeWaiterList.add_inlock(waiterInfo.get());
+ auto timeoutCB = [this, waiterInfo, finishCB](const CallbackArgs& cbData) {
+ LockGuard lk(_mutex);
+ if (_opTimeWaiterList.remove_inlock(waiterInfo.get())) {
+ finishCB();
+ }
+ };
+ // Schedule the timeout callback. It may signal after we have already caught up.
+ _replExecutor.scheduleWorkAt(_replExecutor.now() + timeout, timeoutCB);
+}
+
+void ReplicationCoordinatorImpl::_finishCatchUpOplog_inlock(bool startToDrain) {
+ _isCatchingUp = false;
+ // If the node steps down during the catch-up, we don't go into drain mode.
+ if (startToDrain) {
+ _isWaitingForDrainToComplete = true;
+ // Signal applier in executor to avoid the deadlock with bgsync's mutex that is required to
+ // cancel fetcher.
+ _replExecutor.scheduleWork(
+ _wrapAsCallbackFn([this]() { _externalState->signalApplierToCancelFetcher(); }));
+ }
+}
+
Status ReplicationCoordinatorImpl::processReplSetGetRBID(BSONObjBuilder* resultObj) {
stdx::lock_guard<stdx::mutex> lk(_mutex);
resultObj->append("rbid", _rbid);
@@ -2687,15 +2826,10 @@ ReplicationCoordinatorImpl::_setCurrentRSConfig_inlock(const ReplicaSetConfig& n
}
void ReplicationCoordinatorImpl::_wakeReadyWaiters_inlock() {
- for (std::vector<WaiterInfo*>::iterator it = _replicationWaiterList.begin();
- it != _replicationWaiterList.end();
- ++it) {
- WaiterInfo* info = *it;
- if (_doneWaitingForReplication_inlock(
- *info->opTime, SnapshotName::min(), *info->writeConcern)) {
- info->condVar->notify_all();
- }
- }
+ _replicationWaiterList.signalAndRemoveIf_inlock([this](WaiterInfo* waiter) {
+ return _doneWaitingForReplication_inlock(
+ waiter->opTime, SnapshotName::min(), *waiter->writeConcern);
+ });
}
Status ReplicationCoordinatorImpl::processReplSetUpdatePosition(
diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h
index 480a5287dc1..798762a1494 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.h
+++ b/src/mongo/db/repl/replication_coordinator_impl.h
@@ -73,6 +73,7 @@ namespace repl {
class ElectCmdRunner;
class FreshnessChecker;
+class FreshnessScanner;
class HandshakeArgs;
class HeartbeatResponseAction;
class LastVote;
@@ -198,6 +199,8 @@ public:
virtual bool isWaitingForApplierToDrain() override;
+ virtual bool isCatchingUp() override;
+
virtual void signalDrainComplete(OperationContext* txn) override;
virtual Status waitForDrainFinish(Milliseconds timeout) override;
@@ -505,6 +508,25 @@ private:
// Struct that holds information about clients waiting for replication.
struct WaiterInfo;
+ struct WaiterInfoGuard;
+
+ class WaiterList {
+ public:
+ using WaiterType = WaiterInfo*;
+
+ // Adds waiter into the list. Usually, the waiter will be signaled only once and then
+ // removed.
+ void add_inlock(WaiterType waiter);
+ // Returns whether waiter is found and removed.
+ bool remove_inlock(WaiterType waiter);
+ // Signals and removes all waiters that satisfy the condition.
+ void signalAndRemoveIf_inlock(stdx::function<bool(WaiterType)> fun);
+ // Signals and removes all waiters from the list.
+ void signalAndRemoveAll_inlock();
+
+ private:
+ std::vector<WaiterType> _list;
+ };
// Struct that holds information about nodes in this replication group, mainly used for
// tracking replication progress for write concern satisfaction.
@@ -1113,6 +1135,27 @@ private:
*/
executor::TaskExecutor::CallbackFn _wrapAsCallbackFn(const stdx::function<void()>& work);
+ /**
+ * Scan all nodes to find out the the latest optime in the replset, thus we know when there's no
+ * more to catch up before the timeout. It also schedules the actual catch-up once we get the
+ * response from the freshness scan.
+ */
+ void _scanOpTimeForCatchUp_inlock();
+ /**
+ * Wait for data replication until we reach the latest optime, or the timeout expires.
+ * "originalTerm" is the term when catch-up work is scheduled and used to detect
+ * the step-down (and potential following step-up) after catch-up gets scheduled.
+ */
+ void _catchUpOplogToLatest_inlock(const FreshnessScanner& scanner,
+ Milliseconds timeout,
+ long long originalTerm);
+ /**
+ * Finish catch-up mode and start drain mode.
+ * If "startToDrain" is true, the node enters drain mode. Otherwise, it goes back to secondary
+ * mode.
+ */
+ void _finishCatchUpOplog_inlock(bool startToDrain);
+
//
// All member variables are labeled with one of the following codes indicating the
// synchronization rules for accessing them.
@@ -1176,11 +1219,11 @@ private:
int _rbid; // (M)
// list of information about clients waiting on replication. Does *not* own the WaiterInfos.
- std::vector<WaiterInfo*> _replicationWaiterList; // (M)
+ WaiterList _replicationWaiterList; // (M)
// list of information about clients waiting for a particular opTime.
// Does *not* own the WaiterInfos.
- std::vector<WaiterInfo*> _opTimeWaiterList; // (M)
+ WaiterList _opTimeWaiterList; // (M)
// Set to true when we are in the process of shutting down replication.
bool _inShutdown; // (M)
@@ -1210,6 +1253,9 @@ private:
// True if we are waiting for the applier to finish draining.
bool _isWaitingForDrainToComplete; // (M)
+ // True if we are waiting for oplog catch-up to finish.
+ bool _isCatchingUp = false; // (M)
+
// Used to signal threads waiting for changes to _rsConfigState.
stdx::condition_variable _rsConfigStateChange; // (M)
diff --git a/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp
index b2686b0cc47..8198dc43211 100644
--- a/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp
@@ -105,6 +105,7 @@ TEST_F(ReplCoordTest, ElectionSucceedsWhenNodeIsTheOnlyElectableNode) {
ASSERT(getReplCoord()->getMemberState().primary())
<< getReplCoord()->getMemberState().toString();
+ simulateCatchUpTimeout();
ASSERT(getReplCoord()->isWaitingForApplierToDrain());
const auto txnPtr = makeOperationContext();
@@ -167,6 +168,8 @@ TEST_F(ReplCoordTest, ElectionSucceedsWhenNodeIsTheOnlyNode) {
getReplCoord()->waitForElectionFinish_forTest();
ASSERT(getReplCoord()->getMemberState().primary())
<< getReplCoord()->getMemberState().toString();
+ // Wait for catchup check to finish.
+ simulateCatchUpTimeout();
ASSERT(getReplCoord()->isWaitingForApplierToDrain());
const auto txnPtr = makeOperationContext();
@@ -904,6 +907,244 @@ TEST_F(ReplCoordTest, NodeCancelsElectionUponReceivingANewConfigDuringVotePhase)
ASSERT(TopologyCoordinator::Role::follower == getTopoCoord().getRole());
}
+class PrimaryCatchUpTest : public ReplCoordTest {
+protected:
+ using NetworkOpIter = NetworkInterfaceMock::NetworkOperationIterator;
+ using FreshnessScanFn = stdx::function<void(const NetworkOpIter)>;
+
+ void simulateSuccessfulV1Voting() {
+ ReplicationCoordinatorImpl* replCoord = getReplCoord();
+ NetworkInterfaceMock* net = getNet();
+
+ auto electionTimeoutWhen = replCoord->getElectionTimeout_forTest();
+ ASSERT_NOT_EQUALS(Date_t(), electionTimeoutWhen);
+ log() << "Election timeout scheduled at " << electionTimeoutWhen << " (simulator time)";
+
+ ReplicaSetConfig rsConfig = replCoord->getReplicaSetConfig_forTest();
+ ASSERT(replCoord->getMemberState().secondary()) << replCoord->getMemberState().toString();
+ bool hasReadyRequests = true;
+ // Process requests until we're primary and consume the heartbeats for the notification
+ // of election win. Exit immediately on catch up.
+ while (!replCoord->isCatchingUp() &&
+ (!replCoord->getMemberState().primary() || hasReadyRequests)) {
+ log() << "Waiting on network in state " << replCoord->getMemberState();
+ getNet()->enterNetwork();
+ if (net->now() < electionTimeoutWhen) {
+ net->runUntil(electionTimeoutWhen);
+ }
+ const NetworkInterfaceMock::NetworkOperationIterator noi = net->getNextReadyRequest();
+ const RemoteCommandRequest& request = noi->getRequest();
+ log() << request.target.toString() << " processing " << request.cmdObj;
+ ReplSetHeartbeatArgsV1 hbArgs;
+ Status status = hbArgs.initialize(request.cmdObj);
+ if (hbArgs.initialize(request.cmdObj).isOK()) {
+ ReplSetHeartbeatResponse hbResp;
+ hbResp.setSetName(rsConfig.getReplSetName());
+ hbResp.setState(MemberState::RS_SECONDARY);
+ hbResp.setConfigVersion(rsConfig.getConfigVersion());
+ net->scheduleResponse(noi, net->now(), makeResponseStatus(hbResp.toBSON(true)));
+ } else if (request.cmdObj.firstElement().fieldNameStringData() ==
+ "replSetRequestVotes") {
+ net->scheduleResponse(noi,
+ net->now(),
+ makeResponseStatus(BSON("ok" << 1 << "reason"
+ << ""
+ << "term"
+ << request.cmdObj["term"].Long()
+ << "voteGranted"
+ << true)));
+ } else {
+ error() << "Black holing unexpected request to " << request.target << ": "
+ << request.cmdObj;
+ net->blackHole(noi);
+ }
+ net->runReadyNetworkOperations();
+ hasReadyRequests = net->hasReadyRequests();
+ getNet()->exitNetwork();
+ }
+ }
+
+ ReplicaSetConfig setUp3NodeReplSetAndRunForElection(OpTime opTime) {
+ BSONObj configObj = BSON("_id"
+ << "mySet"
+ << "version"
+ << 1
+ << "members"
+ << BSON_ARRAY(BSON("_id" << 1 << "host"
+ << "node1:12345")
+ << BSON("_id" << 2 << "host"
+ << "node2:12345")
+ << BSON("_id" << 3 << "host"
+ << "node3:12345"))
+ << "protocolVersion"
+ << 1
+ << "settings"
+ << BSON("catchUpTimeoutMillis" << 5000));
+ assertStartSuccess(configObj, HostAndPort("node1", 12345));
+ ReplicaSetConfig config = assertMakeRSConfig(configObj);
+
+ getReplCoord()->setMyLastAppliedOpTime(opTime);
+ getReplCoord()->setMyLastDurableOpTime(opTime);
+ ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
+
+ simulateSuccessfulV1Voting();
+ IsMasterResponse imResponse;
+ getReplCoord()->fillIsMasterForReplSet(&imResponse);
+ ASSERT_FALSE(imResponse.isMaster()) << imResponse.toBSON().toString();
+ ASSERT_TRUE(imResponse.isSecondary()) << imResponse.toBSON().toString();
+
+ return config;
+ }
+
+ ResponseStatus makeFreshnessScanResponse(OpTime opTime) {
+ // OpTime part of replSetGetStatus.
+ return makeResponseStatus(BSON("optimes" << BSON("appliedOpTime" << opTime.toBSON())));
+ }
+
+ void processFreshnessScanRequests(FreshnessScanFn onFreshnessScanRequest) {
+ NetworkInterfaceMock* net = getNet();
+ net->enterNetwork();
+ while (net->hasReadyRequests()) {
+ const NetworkInterfaceMock::NetworkOperationIterator noi = net->getNextReadyRequest();
+ const RemoteCommandRequest& request = noi->getRequest();
+ if (request.cmdObj.firstElement().fieldNameStringData() == "replSetGetStatus") {
+ log() << request.target.toString() << " processing " << request.cmdObj;
+ onFreshnessScanRequest(noi);
+ } else {
+ log() << "Black holing unexpected request to " << request.target << ": "
+ << request.cmdObj;
+ net->blackHole(noi);
+ }
+ net->runReadyNetworkOperations();
+ }
+ net->exitNetwork();
+ }
+};
+
+TEST_F(PrimaryCatchUpTest, PrimaryDoNotNeedToCatchUp) {
+ startCapturingLogMessages();
+ OpTime time1(Timestamp(100, 1), 0);
+ ReplicaSetConfig config = setUp3NodeReplSetAndRunForElection(time1);
+
+ processFreshnessScanRequests([this](const NetworkOpIter noi) {
+ getNet()->scheduleResponse(noi, getNet()->now(), makeFreshnessScanResponse(OpTime()));
+ });
+ ASSERT(getReplCoord()->isWaitingForApplierToDrain());
+ stopCapturingLogMessages();
+ ASSERT_EQUALS(1, countLogLinesContaining("My optime is most up-to-date, skipping catch-up"));
+ auto txn = makeOperationContext();
+ getReplCoord()->signalDrainComplete(txn.get());
+ ASSERT_TRUE(getReplCoord()->canAcceptWritesForDatabase("test"));
+}
+
+TEST_F(PrimaryCatchUpTest, PrimaryFreshnessScanTimeout) {
+ startCapturingLogMessages();
+
+ OpTime time1(Timestamp(100, 1), 0);
+ ReplicaSetConfig config = setUp3NodeReplSetAndRunForElection(time1);
+
+ processFreshnessScanRequests([this](const NetworkOpIter noi) {
+ auto request = noi->getRequest();
+ log() << "Black holing request to " << request.target << ": " << request.cmdObj;
+ getNet()->blackHole(noi);
+ });
+
+ auto net = getNet();
+ net->enterNetwork();
+ net->runUntil(net->now() + config.getCatchUpTimeoutPeriod());
+ net->exitNetwork();
+ ASSERT(getReplCoord()->isWaitingForApplierToDrain());
+ stopCapturingLogMessages();
+ ASSERT_EQUALS(1, countLogLinesContaining("Could not access any nodes within timeout"));
+ auto txn = makeOperationContext();
+ getReplCoord()->signalDrainComplete(txn.get());
+ ASSERT_TRUE(getReplCoord()->canAcceptWritesForDatabase("test"));
+}
+
+TEST_F(PrimaryCatchUpTest, PrimaryCatchUpSucceeds) {
+ startCapturingLogMessages();
+
+ OpTime time1(Timestamp(100, 1), 0);
+ OpTime time2(Timestamp(100, 2), 0);
+ ReplicaSetConfig config = setUp3NodeReplSetAndRunForElection(time1);
+
+ processFreshnessScanRequests([this, time2](const NetworkOpIter noi) {
+ auto net = getNet();
+ // The old primary accepted one more op and all nodes caught up after voting for me.
+ net->scheduleResponse(noi, net->now(), makeFreshnessScanResponse(time2));
+ });
+
+ NetworkInterfaceMock* net = getNet();
+ ASSERT(getReplCoord()->isCatchingUp());
+ // Simulate the work done by bgsync and applier threads.
+ // setMyLastAppliedOpTime() will signal the optime waiter.
+ getReplCoord()->setMyLastAppliedOpTime(time2);
+ net->enterNetwork();
+ net->runReadyNetworkOperations();
+ net->exitNetwork();
+ ASSERT(getReplCoord()->isWaitingForApplierToDrain());
+ stopCapturingLogMessages();
+ ASSERT_EQUALS(1, countLogLinesContaining("Finished catch-up oplog after becoming primary."));
+ auto txn = makeOperationContext();
+ getReplCoord()->signalDrainComplete(txn.get());
+ ASSERT_TRUE(getReplCoord()->canAcceptWritesForDatabase("test"));
+}
+
+TEST_F(PrimaryCatchUpTest, PrimaryCatchUpTimeout) {
+ startCapturingLogMessages();
+
+ OpTime time1(Timestamp(100, 1), 0);
+ OpTime time2(Timestamp(100, 2), 0);
+ ReplicaSetConfig config = setUp3NodeReplSetAndRunForElection(time1);
+
+ // The new primary learns of the latest OpTime.
+ processFreshnessScanRequests([this, time2](const NetworkOpIter noi) {
+ auto net = getNet();
+ net->scheduleResponse(noi, net->now(), makeFreshnessScanResponse(time2));
+ });
+
+ NetworkInterfaceMock* net = getNet();
+ ASSERT(getReplCoord()->isCatchingUp());
+ net->enterNetwork();
+ net->runUntil(net->now() + config.getCatchUpTimeoutPeriod());
+ net->exitNetwork();
+ ASSERT(getReplCoord()->isWaitingForApplierToDrain());
+ stopCapturingLogMessages();
+ ASSERT_EQUALS(1, countLogLinesContaining("Cannot catch up oplog after becoming primary"));
+ auto txn = makeOperationContext();
+ getReplCoord()->signalDrainComplete(txn.get());
+ ASSERT_TRUE(getReplCoord()->canAcceptWritesForDatabase("test"));
+}
+
+TEST_F(PrimaryCatchUpTest, PrimaryStepsDownDuringFreshnessScan) {
+ startCapturingLogMessages();
+
+ OpTime time1(Timestamp(100, 1), 0);
+ OpTime time2(Timestamp(100, 2), 0);
+ ReplicaSetConfig config = setUp3NodeReplSetAndRunForElection(time1);
+
+ processFreshnessScanRequests([this, time2](const NetworkOpIter noi) {
+ auto request = noi->getRequest();
+ log() << "Black holing request to " << request.target << ": " << request.cmdObj;
+ getNet()->blackHole(noi);
+ });
+ ASSERT(getReplCoord()->isCatchingUp());
+
+ TopologyCoordinator::UpdateTermResult updateTermResult;
+ auto evh = getReplCoord()->updateTerm_forTest(2, &updateTermResult);
+ ASSERT_TRUE(evh.isValid());
+ getReplExec()->waitForEvent(evh);
+ ASSERT_TRUE(getReplCoord()->getMemberState().secondary());
+ auto net = getNet();
+ net->enterNetwork();
+ net->runUntil(net->now() + config.getCatchUpTimeoutPeriod());
+ net->exitNetwork();
+ ASSERT_FALSE(getReplCoord()->isWaitingForApplierToDrain());
+ stopCapturingLogMessages();
+ ASSERT_EQUALS(1, countLogLinesContaining("Stopped transition to primary"));
+ ASSERT_FALSE(getReplCoord()->canAcceptWritesForDatabase("test"));
+}
+
} // namespace
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/replication_coordinator_impl_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_test.cpp
index b7e78539c3a..aa61d91d10a 100644
--- a/src/mongo/db/repl/replication_coordinator_impl_test.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl_test.cpp
@@ -97,11 +97,17 @@ struct OpTimeWithTermZero {
};
void runSingleNodeElection(ServiceContext::UniqueOperationContext txn,
- ReplicationCoordinatorImpl* replCoord) {
+ ReplicationCoordinatorImpl* replCoord,
+ executor::NetworkInterfaceMock* net) {
replCoord->setMyLastAppliedOpTime(OpTime(Timestamp(1, 0), 0));
replCoord->setMyLastDurableOpTime(OpTime(Timestamp(1, 0), 0));
ASSERT(replCoord->setFollowerMode(MemberState::RS_SECONDARY));
replCoord->waitForElectionFinish_forTest();
+ // Wait for primary catch-up
+ net->enterNetwork();
+ net->runReadyNetworkOperations();
+ net->exitNetwork();
+
ASSERT(replCoord->isWaitingForApplierToDrain());
ASSERT(replCoord->getMemberState().primary()) << replCoord->getMemberState().toString();
@@ -1626,7 +1632,7 @@ TEST_F(ReplCoordTest, NodeBecomesPrimaryAgainWhenStepDownTimeoutExpiresInASingle
<< BSON_ARRAY(BSON("_id" << 0 << "host"
<< "test1:1234"))),
HostAndPort("test1", 1234));
- runSingleNodeElection(makeOperationContext(), getReplCoord());
+ runSingleNodeElection(makeOperationContext(), getReplCoord(), getNet());
const auto txn = makeOperationContext();
ASSERT_OK(getReplCoord()->stepDown(txn.get(), true, Milliseconds(0), Milliseconds(1000)));
@@ -2525,7 +2531,7 @@ TEST_F(ReplCoordTest, IsMasterWithCommittedSnapshot) {
<< BSON_ARRAY(BSON("_id" << 0 << "host"
<< "test1:1234"))),
HostAndPort("test1", 1234));
- runSingleNodeElection(makeOperationContext(), getReplCoord());
+ runSingleNodeElection(makeOperationContext(), getReplCoord(), getNet());
time_t lastWriteDate = 101;
OpTime opTime = OpTime(Timestamp(lastWriteDate, 2), 1);
@@ -3521,7 +3527,7 @@ TEST_F(ReplCoordTest, ReadAfterCommittedWhileShutdown) {
<< 0))),
HostAndPort("node1", 12345));
- runSingleNodeElection(makeOperationContext(), getReplCoord());
+ runSingleNodeElection(makeOperationContext(), getReplCoord(), getNet());
getReplCoord()->setMyLastAppliedOpTime(OpTime(Timestamp(10, 0), 0));
getReplCoord()->setMyLastDurableOpTime(OpTime(Timestamp(10, 0), 0));
@@ -3546,7 +3552,7 @@ TEST_F(ReplCoordTest, ReadAfterCommittedInterrupted) {
<< "_id"
<< 0))),
HostAndPort("node1", 12345));
- runSingleNodeElection(makeOperationContext(), getReplCoord());
+ runSingleNodeElection(makeOperationContext(), getReplCoord(), getNet());
const auto txn = makeOperationContext();
getReplCoord()->setMyLastAppliedOpTime(OpTime(Timestamp(10, 0), 0));
@@ -3574,7 +3580,7 @@ TEST_F(ReplCoordTest, ReadAfterCommittedGreaterOpTime) {
<< "_id"
<< 0))),
HostAndPort("node1", 12345));
- runSingleNodeElection(makeOperationContext(), getReplCoord());
+ runSingleNodeElection(makeOperationContext(), getReplCoord(), getNet());
getReplCoord()->setMyLastAppliedOpTime(OpTime(Timestamp(100, 0), 1));
getReplCoord()->setMyLastDurableOpTime(OpTime(Timestamp(100, 0), 1));
@@ -3598,7 +3604,7 @@ TEST_F(ReplCoordTest, ReadAfterCommittedEqualOpTime) {
<< "_id"
<< 0))),
HostAndPort("node1", 12345));
- runSingleNodeElection(makeOperationContext(), getReplCoord());
+ runSingleNodeElection(makeOperationContext(), getReplCoord(), getNet());
OpTime time(Timestamp(100, 0), 1);
getReplCoord()->setMyLastAppliedOpTime(time);
getReplCoord()->setMyLastDurableOpTime(time);
@@ -3621,7 +3627,7 @@ TEST_F(ReplCoordTest, ReadAfterCommittedDeferredGreaterOpTime) {
<< "_id"
<< 0))),
HostAndPort("node1", 12345));
- runSingleNodeElection(makeOperationContext(), getReplCoord());
+ runSingleNodeElection(makeOperationContext(), getReplCoord(), getNet());
getReplCoord()->setMyLastAppliedOpTime(OpTime(Timestamp(0, 0), 1));
getReplCoord()->setMyLastDurableOpTime(OpTime(Timestamp(0, 0), 1));
OpTime committedOpTime(Timestamp(200, 0), 1);
@@ -3650,7 +3656,7 @@ TEST_F(ReplCoordTest, ReadAfterCommittedDeferredEqualOpTime) {
<< "_id"
<< 0))),
HostAndPort("node1", 12345));
- runSingleNodeElection(makeOperationContext(), getReplCoord());
+ runSingleNodeElection(makeOperationContext(), getReplCoord(), getNet());
getReplCoord()->setMyLastAppliedOpTime(OpTime(Timestamp(0, 0), 1));
getReplCoord()->setMyLastDurableOpTime(OpTime(Timestamp(0, 0), 1));
@@ -4255,7 +4261,7 @@ TEST_F(ReplCoordTest, AdvanceCommittedSnapshotToMostRecentSnapshotPriorToOpTimeW
<< BSON_ARRAY(BSON("_id" << 0 << "host"
<< "test1:1234"))),
HostAndPort("test1", 1234));
- runSingleNodeElection(makeOperationContext(), getReplCoord());
+ runSingleNodeElection(makeOperationContext(), getReplCoord(), getNet());
OpTime time1(Timestamp(100, 1), 1);
OpTime time2(Timestamp(100, 2), 1);
@@ -4288,7 +4294,7 @@ TEST_F(ReplCoordTest, DoNotAdvanceCommittedSnapshotWhenAnOpTimeIsNewerThanOurLat
<< BSON_ARRAY(BSON("_id" << 0 << "host"
<< "test1:1234"))),
HostAndPort("test1", 1234));
- runSingleNodeElection(makeOperationContext(), getReplCoord());
+ runSingleNodeElection(makeOperationContext(), getReplCoord(), getNet());
OpTime time1(Timestamp(100, 1), 1);
OpTime time2(Timestamp(100, 2), 1);
@@ -4319,7 +4325,7 @@ TEST_F(ReplCoordTest,
<< BSON_ARRAY(BSON("_id" << 0 << "host"
<< "test1:1234"))),
HostAndPort("test1", 1234));
- runSingleNodeElection(makeOperationContext(), getReplCoord());
+ runSingleNodeElection(makeOperationContext(), getReplCoord(), getNet());
OpTime time1(Timestamp(100, 1), 1);
OpTime time2(Timestamp(100, 2), 1);
@@ -4352,7 +4358,7 @@ TEST_F(ReplCoordTest, ZeroCommittedSnapshotWhenAllSnapshotsAreDropped) {
<< BSON_ARRAY(BSON("_id" << 0 << "host"
<< "test1:1234"))),
HostAndPort("test1", 1234));
- runSingleNodeElection(makeOperationContext(), getReplCoord());
+ runSingleNodeElection(makeOperationContext(), getReplCoord(), getNet());
OpTime time1(Timestamp(100, 1), 1);
OpTime time2(Timestamp(100, 2), 1);
@@ -4381,7 +4387,7 @@ TEST_F(ReplCoordTest, DoNotAdvanceCommittedSnapshotWhenAppliedOpTimeChanges) {
<< BSON_ARRAY(BSON("_id" << 0 << "host"
<< "test1:1234"))),
HostAndPort("test1", 1234));
- runSingleNodeElection(makeOperationContext(), getReplCoord());
+ runSingleNodeElection(makeOperationContext(), getReplCoord(), getNet());
OpTime time1(Timestamp(100, 1), 1);
OpTime time2(Timestamp(100, 2), 1);
@@ -4755,6 +4761,7 @@ TEST_F(ReplCoordTest, WaitForDrainFinish) {
// Single node cluster - this node should start election on setFollowerMode() completion.
replCoord->waitForElectionFinish_forTest();
+ simulateCatchUpTimeout();
// Successful dry run election increases term.
ASSERT_EQUALS(initialTerm + 1, replCoord->getTerm());
diff --git a/src/mongo/db/repl/replication_coordinator_mock.cpp b/src/mongo/db/repl/replication_coordinator_mock.cpp
index b962e9b11bc..19640ba1c55 100644
--- a/src/mongo/db/repl/replication_coordinator_mock.cpp
+++ b/src/mongo/db/repl/replication_coordinator_mock.cpp
@@ -209,6 +209,10 @@ bool ReplicationCoordinatorMock::isWaitingForApplierToDrain() {
return false;
}
+bool ReplicationCoordinatorMock::isCatchingUp() {
+ return false;
+}
+
void ReplicationCoordinatorMock::signalDrainComplete(OperationContext*) {}
Status ReplicationCoordinatorMock::waitForDrainFinish(Milliseconds timeout) {
diff --git a/src/mongo/db/repl/replication_coordinator_mock.h b/src/mongo/db/repl/replication_coordinator_mock.h
index 4dd04f8fb3f..706d4832d38 100644
--- a/src/mongo/db/repl/replication_coordinator_mock.h
+++ b/src/mongo/db/repl/replication_coordinator_mock.h
@@ -130,6 +130,8 @@ public:
virtual bool isWaitingForApplierToDrain();
+ virtual bool isCatchingUp();
+
virtual void signalDrainComplete(OperationContext*);
virtual Status waitForDrainFinish(Milliseconds timeout) override;
diff --git a/src/mongo/db/repl/replication_coordinator_test_fixture.cpp b/src/mongo/db/repl/replication_coordinator_test_fixture.cpp
index ec0497903a2..c630ddca26d 100644
--- a/src/mongo/db/repl/replication_coordinator_test_fixture.cpp
+++ b/src/mongo/db/repl/replication_coordinator_test_fixture.cpp
@@ -315,6 +315,10 @@ void ReplCoordTest::simulateSuccessfulV1Election() {
<< request.cmdObj["term"].Long()
<< "voteGranted"
<< true)));
+ } else if (request.cmdObj.firstElement().fieldNameStringData() == "replSetGetStatus") {
+ // OpTime part of replSetGetStatus for use by FreshnessScanner during catch-up period.
+ BSONObj response = BSON("optimes" << BSON("appliedOpTime" << OpTime().toBSON()));
+ net->scheduleResponse(noi, net->now(), makeResponseStatus(response));
} else {
error() << "Black holing unexpected request to " << request.target << ": "
<< request.cmdObj;
@@ -459,5 +463,33 @@ void ReplCoordTest::disableSnapshots() {
_externalState->setAreSnapshotsEnabled(false);
}
+void ReplCoordTest::simulateCatchUpTimeout() {
+ NetworkInterfaceMock* net = getNet();
+ auto catchUpTimeoutWhen = net->now() + getReplCoord()->getConfig().getCatchUpTimeoutPeriod();
+ bool hasRequest = false;
+ net->enterNetwork();
+ if (net->now() < catchUpTimeoutWhen) {
+ net->runUntil(catchUpTimeoutWhen);
+ }
+ hasRequest = net->hasReadyRequests();
+ net->exitNetwork();
+
+ while (hasRequest) {
+ net->enterNetwork();
+ auto noi = net->getNextReadyRequest();
+ auto request = noi->getRequest();
+ // Black hole heartbeat requests caused by time advance.
+ log() << "Black holing request to " << request.target.toString() << " : " << request.cmdObj;
+ net->blackHole(noi);
+ if (net->now() < catchUpTimeoutWhen) {
+ net->runUntil(catchUpTimeoutWhen);
+ } else {
+ net->runReadyNetworkOperations();
+ }
+ hasRequest = net->hasReadyRequests();
+ net->exitNetwork();
+ }
+}
+
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/replication_coordinator_test_fixture.h b/src/mongo/db/repl/replication_coordinator_test_fixture.h
index 6e983aa346d..5ea7a0a6953 100644
--- a/src/mongo/db/repl/replication_coordinator_test_fixture.h
+++ b/src/mongo/db/repl/replication_coordinator_test_fixture.h
@@ -258,6 +258,11 @@ protected:
*/
void disableSnapshots();
+ /**
+ * Timeout all freshness scan request for primary catch-up.
+ */
+ void simulateCatchUpTimeout();
+
private:
std::unique_ptr<ReplicationCoordinatorImpl> _repl;
// Owned by ReplicationCoordinatorImpl
diff --git a/src/mongo/db/repl/rs_sync.cpp b/src/mongo/db/repl/rs_sync.cpp
index b70fcfdbd46..46a34757ef8 100644
--- a/src/mongo/db/repl/rs_sync.cpp
+++ b/src/mongo/db/repl/rs_sync.cpp
@@ -135,7 +135,8 @@ void RSDataSync::_run() {
}
try {
- if (memberState.primary() && !_replCoord->isWaitingForApplierToDrain()) {
+ if (memberState.primary() && !_replCoord->isWaitingForApplierToDrain() &&
+ !_replCoord->isCatchingUp()) {
sleepsecs(1);
continue;
}
diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp
index e6b5e7557ee..a642140b72c 100644
--- a/src/mongo/db/repl/sync_tail.cpp
+++ b/src/mongo/db/repl/sync_tail.cpp
@@ -1290,7 +1290,8 @@ StatusWith<OpTime> multiApply(OperationContext* txn,
Lock::ParallelBatchWriterMode pbwm(txn->lockState());
auto replCoord = ReplicationCoordinator::get(txn);
- if (replCoord->getMemberState().primary() && !replCoord->isWaitingForApplierToDrain()) {
+ if (replCoord->getMemberState().primary() && !replCoord->isWaitingForApplierToDrain() &&
+ !replCoord->isCatchingUp()) {
severe() << "attempting to replicate ops while primary";
return {ErrorCodes::CannotApplyOplogWhilePrimary,
"attempting to replicate ops while primary"};
diff --git a/src/mongo/db/repl/topology_coordinator_impl.cpp b/src/mongo/db/repl/topology_coordinator_impl.cpp
index 708ef8de639..0d6dd7ee627 100644
--- a/src/mongo/db/repl/topology_coordinator_impl.cpp
+++ b/src/mongo/db/repl/topology_coordinator_impl.cpp
@@ -150,11 +150,6 @@ HostAndPort TopologyCoordinatorImpl::getSyncSourceAddress() const {
HostAndPort TopologyCoordinatorImpl::chooseNewSyncSource(Date_t now,
const Timestamp& lastTimestampApplied) {
- // If we are primary, then we aren't syncing from anyone (else).
- if (_iAmPrimary()) {
- return HostAndPort();
- }
-
// If we are not a member of the current replica set configuration, no sync source is valid.
if (_selfIndex == -1) {
LOG(2) << "Cannot sync from any members because we are not in the replica set config";