summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorSiyuan Zhou <siyuan.zhou@mongodb.com>2016-11-30 00:14:28 -0500
committerSiyuan Zhou <siyuan.zhou@mongodb.com>2017-02-27 19:28:13 -0500
commit1da3111dc238698e4e70672b7ba260a368121e50 (patch)
tree40907eb4d351ac451b03fe00bdba72ece5fe6f17 /src/mongo
parentd124f6c14f51440fff9734578166a6a49e3b463b (diff)
downloadmongo-1da3111dc238698e4e70672b7ba260a368121e50.tar.gz
SERVER-27120 Increase synchronization between producer/applier threads and stepdown/stepup
SERVER-27913 Make sure the last applied hash is corresponding to the last applied optime in bgsync start()
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/db/repl/bgsync.cpp177
-rw-r--r--src/mongo/db/repl/bgsync.h47
-rw-r--r--src/mongo/db/repl/replication_coordinator.h68
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state.h9
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.cpp14
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.h3
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_mock.cpp11
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_mock.h9
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp91
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.h17
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl_elect_test.cpp8
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp172
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl_test.cpp10
-rw-r--r--src/mongo/db/repl/replication_coordinator_mock.cpp10
-rw-r--r--src/mongo/db/repl/replication_coordinator_mock.h6
-rw-r--r--src/mongo/db/repl/replication_coordinator_test_fixture.cpp9
-rw-r--r--src/mongo/db/repl/rs_initialsync.cpp3
-rw-r--r--src/mongo/db/repl/rs_sync.cpp4
-rw-r--r--src/mongo/db/repl/sync_tail.cpp47
19 files changed, 400 insertions, 315 deletions
diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp
index 1874e73474b..a0f646f6078 100644
--- a/src/mongo/db/repl/bgsync.cpp
+++ b/src/mongo/db/repl/bgsync.cpp
@@ -134,9 +134,7 @@ BackgroundSync::BackgroundSync(
std::unique_ptr<OplogBuffer> oplogBuffer)
: _oplogBuffer(std::move(oplogBuffer)),
_replCoord(getGlobalReplicationCoordinator()),
- _replicationCoordinatorExternalState(replicationCoordinatorExternalState),
- _lastOpTimeFetched(Timestamp(std::numeric_limits<int>::max(), 0),
- std::numeric_limits<long long>::max()) {
+ _replicationCoordinatorExternalState(replicationCoordinatorExternalState) {
// Update "repl.buffer.maxSizeBytes" server status metric to reflect the current oplog buffer's
// max size.
bufferMaxSizeGauge.increment(_oplogBuffer->getMaxSize() - bufferMaxSizeGauge.get());
@@ -156,7 +154,7 @@ void BackgroundSync::shutdown(OperationContext* txn) {
// ensures that it won't add anything. It will also unblock the OpApplier pipeline if it is
// waiting for an operation to be past the slaveDelay point.
clearBuffer(txn);
- _stopped = true;
+ _state = ProducerState::Stopped;
if (_syncSourceResolver) {
_syncSourceResolver->shutdown();
@@ -201,48 +199,24 @@ void BackgroundSync::_run() {
fassertFailed(28546);
}
}
- stop();
-}
-
-void BackgroundSync::_signalNoNewDataForApplier(OperationContext* txn) {
- // Signal to consumers that we have entered the stopped state
- // if the signal isn't already in the queue.
- const boost::optional<BSONObj> lastObjectPushed = _oplogBuffer->lastObjectPushed(txn);
- if (!lastObjectPushed || !lastObjectPushed->isEmpty()) {
- const BSONObj sentinelDoc;
- _oplogBuffer->pushEvenIfFull(txn, sentinelDoc);
- bufferCountGauge.increment();
- bufferSizeGauge.increment(sentinelDoc.objsize());
- }
+ stop(true);
}
void BackgroundSync::_runProducer() {
- const MemberState state = _replCoord->getMemberState();
- // Stop when the state changes to primary.
- //
- // TODO(siyuan) Drain mode should imply we're the primary. Fix this condition and the one below
- // after fixing step-down during drain mode.
- if (!_replCoord->isCatchingUp() &&
- (_replCoord->isWaitingForApplierToDrain() || state.primary())) {
- if (!isStopped()) {
- stop();
- }
- if (_replCoord->isWaitingForApplierToDrain()) {
- auto txn = cc().makeOperationContext();
- _signalNoNewDataForApplier(txn.get());
- }
+ if (getState() == ProducerState::Stopped) {
sleepsecs(1);
return;
}
// TODO(spencer): Use a condition variable to await loading a config.
- if (state.startup()) {
+ // TODO(siyuan): Control bgsync with producer state.
+ if (_replCoord->getMemberState().startup()) {
// Wait for a config to be loaded
sleepsecs(1);
return;
}
- invariant(!state.rollback());
+ invariant(!_replCoord->getMemberState().rollback());
// We need to wait until initial sync has started.
if (_replCoord->getMyLastAppliedOpTime().isNull()) {
@@ -252,7 +226,7 @@ void BackgroundSync::_runProducer() {
// we want to start when we're no longer primary
// start() also loads _lastOpTimeFetched, which we know is set from the "if"
auto txn = cc().makeOperationContext();
- if (isStopped()) {
+ if (getState() == ProducerState::Starting) {
start(txn.get());
}
@@ -287,12 +261,7 @@ void BackgroundSync::_produce(OperationContext* txn) {
return;
}
- if (!_replCoord->isCatchingUp() &&
- (_replCoord->isWaitingForApplierToDrain() || _replCoord->getMemberState().primary())) {
- return;
- }
-
- if (_inShutdown_inlock()) {
+ if (_state != ProducerState::Running) {
return;
}
}
@@ -334,7 +303,8 @@ void BackgroundSync::_produce(OperationContext* txn) {
if (syncSourceResp.syncSourceStatus == ErrorCodes::OplogStartMissing) {
// All (accessible) sync sources were too stale.
- if (_replCoord->isCatchingUp()) {
+ // TODO: End catchup mode early if we are too stale.
+ if (_replCoord->getMemberState().primary()) {
warning() << "Too stale to catch up.";
log() << "Our newest OpTime : " << lastOpTimeFetched;
log() << "Earliest OpTime available is " << syncSourceResp.earliestOpTimeSeen
@@ -374,14 +344,15 @@ void BackgroundSync::_produce(OperationContext* txn) {
long long lastHashFetched;
{
stdx::lock_guard<stdx::mutex> lock(_mutex);
- if (_stopped) {
+ if (_state != ProducerState::Running) {
return;
}
lastOpTimeFetched = _lastOpTimeFetched;
lastHashFetched = _lastFetchedHash;
- if (!_replCoord->isCatchingUp()) {
- _replCoord->signalUpstreamUpdater();
- }
+ }
+
+ if (!_replCoord->getMemberState().primary()) {
+ _replCoord->signalUpstreamUpdater();
}
// Set the applied point if unset. This is most likely the first time we've established a sync
@@ -441,7 +412,10 @@ void BackgroundSync::_produce(OperationContext* txn) {
// If the background sync is stopped after the fetcher is started, we need to
// re-evaluate our sync source and oplog common point.
- if (isStopped()) {
+ if (getState() != ProducerState::Running) {
+ log() << "Replication producer stopped after oplog fetcher finished returning a batch from "
+ "our sync source. Abandoning this batch of oplog entries and re-evaluating our "
+ "sync source.";
return;
}
@@ -455,7 +429,8 @@ void BackgroundSync::_produce(OperationContext* txn) {
return;
} else if (fetcherReturnStatus.code() == ErrorCodes::OplogStartMissing ||
fetcherReturnStatus.code() == ErrorCodes::RemoteOplogStale) {
- if (_replCoord->isCatchingUp()) {
+ if (_replCoord->getMemberState().primary()) {
+ // TODO: Abort catchup mode early if rollback detected.
warning() << "Rollback situation detected in catch-up mode; catch-up mode will end.";
sleepsecs(1);
return;
@@ -481,6 +456,9 @@ void BackgroundSync::_produce(OperationContext* txn) {
log() << "Starting rollback due to " << redact(fetcherReturnStatus);
+ // TODO: change this to call into the Applier directly to block until the applier is
+ // drained.
+ //
// Wait till all buffered oplog entries have drained and been applied.
auto lastApplied = _replCoord->getMyLastAppliedOpTime();
if (lastApplied != lastOpTimeFetched) {
@@ -488,14 +466,16 @@ void BackgroundSync::_produce(OperationContext* txn) {
<< lastOpTimeFetched << " to be applied before starting rollback.";
while (lastOpTimeFetched > (lastApplied = _replCoord->getMyLastAppliedOpTime())) {
sleepmillis(10);
- if (isStopped() || inShutdown()) {
+ if (getState() != ProducerState::Running) {
return;
}
}
}
_rollback(txn, source, syncSourceResp.rbid, getConnection);
- stop();
+ // Reset the producer to clear the sync source and the last optime fetched.
+ stop(true);
+ startProducerIfStopped();
} else if (fetcherReturnStatus == ErrorCodes::InvalidBSON) {
Seconds blacklistDuration(60);
warning() << "Fetcher got invalid BSON while querying oplog. Blacklisting sync source "
@@ -571,7 +551,7 @@ Status BackgroundSync::_enqueueDocuments(Fetcher::Documents::const_iterator begi
// buffer between the time we check _inShutdown and the point where we finish writing to the
// buffer.
stdx::unique_lock<stdx::mutex> lock(_mutex);
- if (_inShutdown) {
+ if (_state != ProducerState::Running) {
return Status::OK();
}
@@ -734,20 +714,20 @@ void BackgroundSync::clearSyncTarget() {
_syncSourceHost = HostAndPort();
}
-void BackgroundSync::cancelFetcher() {
- stdx::lock_guard<stdx::mutex> lock(_mutex);
- if (_oplogFetcher) {
- _oplogFetcher->shutdown();
- }
-}
-
-void BackgroundSync::stop() {
+void BackgroundSync::stop(bool resetLastFetchedOptime) {
stdx::lock_guard<stdx::mutex> lock(_mutex);
- _stopped = true;
+ _state = ProducerState::Stopped;
_syncSourceHost = HostAndPort();
- _lastOpTimeFetched = OpTime();
- _lastFetchedHash = 0;
+ if (resetLastFetchedOptime) {
+ invariant(_oplogBuffer->isEmpty());
+ _lastOpTimeFetched = OpTime();
+ _lastFetchedHash = 0;
+ }
+
+ if (_syncSourceResolver) {
+ _syncSourceResolver->shutdown();
+ }
if (_oplogFetcher) {
_oplogFetcher->shutdown();
@@ -755,24 +735,33 @@ void BackgroundSync::stop() {
}
void BackgroundSync::start(OperationContext* txn) {
- massert(16235, "going to start syncing, but buffer is not empty", _oplogBuffer->isEmpty());
-
- long long lastFetchedHash = _readLastAppliedHash(txn);
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- _stopped = false;
+ OpTimeWithHash lastAppliedOpTimeWithHash;
+ do {
+ lastAppliedOpTimeWithHash = _readLastAppliedOpTimeWithHash(txn);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ // Double check the state after acquiring the mutex.
+ if (_state != ProducerState::Starting) {
+ return;
+ }
+ // If a node steps down during drain mode, then the buffer may not be empty at the beginning
+ // of secondary state.
+ if (!_oplogBuffer->isEmpty()) {
+ log() << "going to start syncing, but buffer is not empty";
+ }
+ _state = ProducerState::Running;
- // reset _last fields with current oplog data
- _lastOpTimeFetched = _replCoord->getMyLastAppliedOpTime();
- _lastFetchedHash = lastFetchedHash;
+ // When a node steps down during drain mode, the last fetched optime would be newer than
+ // the last applied.
+ if (_lastOpTimeFetched <= lastAppliedOpTimeWithHash.opTime) {
+ _lastOpTimeFetched = lastAppliedOpTimeWithHash.opTime;
+ _lastFetchedHash = lastAppliedOpTimeWithHash.value;
+ }
+ // Reload the last applied optime from disk if it has been changed.
+ } while (lastAppliedOpTimeWithHash.opTime != _replCoord->getMyLastAppliedOpTime());
LOG(1) << "bgsync fetch queue set to: " << _lastOpTimeFetched << " " << _lastFetchedHash;
}
-bool BackgroundSync::isStopped() const {
- stdx::lock_guard<stdx::mutex> lock(_mutex);
- return _stopped;
-}
-
void BackgroundSync::clearBuffer(OperationContext* txn) {
_oplogBuffer->clear(txn);
const auto count = bufferCountGauge.get();
@@ -781,7 +770,7 @@ void BackgroundSync::clearBuffer(OperationContext* txn) {
bufferSizeGauge.decrement(size);
}
-long long BackgroundSync::_readLastAppliedHash(OperationContext* txn) {
+OpTimeWithHash BackgroundSync::_readLastAppliedOpTimeWithHash(OperationContext* txn) {
BSONObj oplogEntry;
try {
MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
@@ -791,7 +780,7 @@ long long BackgroundSync::_readLastAppliedHash(OperationContext* txn) {
if (!success) {
// This can happen when we are to do an initial sync. lastHash will be set
// after the initial sync is complete.
- return 0;
+ return OpTimeWithHash(0);
}
}
MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "readLastAppliedHash", rsOplogName);
@@ -807,29 +796,13 @@ long long BackgroundSync::_readLastAppliedHash(OperationContext* txn) {
<< redact(status);
fassertFailed(18902);
}
- return hash;
+ OplogEntry parsedEntry(oplogEntry);
+ return OpTimeWithHash(hash, parsedEntry.getOpTime());
}
bool BackgroundSync::shouldStopFetching() const {
- if (inShutdown()) {
- LOG(2) << "Stopping oplog fetcher due to shutdown.";
- return true;
- }
-
- // If we are transitioning to primary state, we need to stop fetching in order to go into
- // bgsync-stop mode.
- if (_replCoord->isWaitingForApplierToDrain()) {
- LOG(2) << "Stopping oplog fetcher because we are waiting for the applier to drain.";
- return true;
- }
-
- if (_replCoord->getMemberState().primary() && !_replCoord->isCatchingUp()) {
- LOG(2) << "Stopping oplog fetcher because we are primary.";
- return true;
- }
-
// Check if we have been stopped.
- if (isStopped()) {
+ if (getState() != ProducerState::Running) {
LOG(2) << "Stopping oplog fetcher due to stop request.";
return true;
}
@@ -850,5 +823,19 @@ void BackgroundSync::pushTestOpToBuffer(OperationContext* txn, const BSONObj& op
bufferSizeGauge.increment(op.objsize());
}
+BackgroundSync::ProducerState BackgroundSync::getState() const {
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+ return _state;
+}
+
+void BackgroundSync::startProducerIfStopped() {
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+ // Let producer run if it's already running.
+ if (_state == ProducerState::Stopped) {
+ _state = ProducerState::Starting;
+ }
+}
+
+
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/bgsync.h b/src/mongo/db/repl/bgsync.h
index 8177509e049..479caabaee8 100644
--- a/src/mongo/db/repl/bgsync.h
+++ b/src/mongo/db/repl/bgsync.h
@@ -58,11 +58,28 @@ class BackgroundSync {
MONGO_DISALLOW_COPYING(BackgroundSync);
public:
+ /**
+ * Stopped -> Starting -> Running
+ * ^ | |
+ * |__________|____________|
+ *
+ * In normal cases: Stopped -> Starting -> Running -> Stopped.
+ * It is also possible to transition directly from Starting to Stopped.
+ *
+ * We need a separate Starting state since part of the startup process involves reading from
+ * disk and we want to do that disk I/O in the bgsync thread, rather than whatever thread calls
+ * start().
+ */
+ enum class ProducerState { Starting, Running, Stopped };
+
BackgroundSync(ReplicationCoordinatorExternalState* replicationCoordinatorExternalState,
std::unique_ptr<OplogBuffer> oplogBuffer);
// stop syncing (when this node becomes a primary, e.g.)
- void stop();
+ // During stepdown, the last fetched optime is not reset in order to keep track of the lastest
+ // optime in the buffer. However, the last fetched optime has to be reset after initial sync or
+ // rollback.
+ void stop(bool resetLastFetchedOptime);
/**
* Starts oplog buffer, task executor and producer thread, in that order.
@@ -85,8 +102,6 @@ public:
*/
bool inShutdown() const;
- bool isStopped() const;
-
// starts the sync target notifying thread
void notifierThread();
@@ -106,11 +121,6 @@ public:
void clearBuffer(OperationContext* txn);
/**
- * Cancel existing find/getMore commands on the sync source's oplog collection.
- */
- void cancelFetcher();
-
- /**
* Returns true if any of the following is true:
* 1) We are shutting down;
* 2) We are primary;
@@ -119,7 +129,11 @@ public:
*/
bool shouldStopFetching() const;
- // Testing related stuff
+ ProducerState getState() const;
+ // Starts the producer if it's stopped. Otherwise, let it keep running.
+ void startProducerIfStopped();
+
+ // Adds a fake oplog entry to buffer. Used for testing only.
void pushTestOpToBuffer(OperationContext* txn, const BSONObj& op);
private:
@@ -137,14 +151,6 @@ private:
void _produce(OperationContext* txn);
/**
- * Signals to the applier that we have no new data,
- * and are in sync with the applier at this point.
- *
- * NOTE: Used after rollback and during draining to transition to Primary role;
- */
- void _signalNoNewDataForApplier(OperationContext* txn);
-
- /**
* Checks current background sync state before pushing operations into blocking queue and
* updating metrics. If the queue is full, might block.
*
@@ -167,7 +173,7 @@ private:
// restart syncing
void start(OperationContext* txn);
- long long _readLastAppliedHash(OperationContext* txn);
+ OpTimeWithHash _readLastAppliedOpTimeWithHash(OperationContext* txn);
// Production thread
std::unique_ptr<OplogBuffer> _oplogBuffer;
@@ -179,6 +185,8 @@ private:
ReplicationCoordinatorExternalState* _replicationCoordinatorExternalState;
// _mutex protects all of the class variables declared below.
+ //
+ // Never hold bgsync mutex when trying to acquire the ReplicationCoordinator mutex.
mutable stdx::mutex _mutex;
OpTime _lastOpTimeFetched;
@@ -193,8 +201,7 @@ private:
// Set to true if shutdown() has been called.
bool _inShutdown = false;
- // if producer thread should not be running
- bool _stopped = true;
+ ProducerState _state = ProducerState::Starting;
HostAndPort _syncSourceHost;
diff --git a/src/mongo/db/repl/replication_coordinator.h b/src/mongo/db/repl/replication_coordinator.h
index a840b8e99ed..b70ac1906c1 100644
--- a/src/mongo/db/repl/replication_coordinator.h
+++ b/src/mongo/db/repl/replication_coordinator.h
@@ -393,18 +393,62 @@ public:
virtual bool setFollowerMode(const MemberState& newState) = 0;
/**
- * Returns true if the coordinator wants the applier to pause application.
+ * Step-up
+ * =======
+ * On stepup, repl coord enters catch-up mode. It's the same as the secondary mode from
+ * the perspective of producer and applier, so there's nothing to do with them.
+ * When a node enters drain mode, producer state = Stopped, applier state = Draining.
*
- * If this returns true, the applier should call signalDrainComplete() when it has
- * completed draining its operation buffer and no further ops are being applied.
+ * If the applier state is Draining, it will signal repl coord when there's nothing to apply.
+ * The applier goes into Stopped state at the same time.
+ *
+ * The states go like the following:
+ * - secondary and during catchup mode
+ * (producer: Running, applier: Running)
+ * |
+ * | finish catch-up, enter drain mode
+ * V
+ * - drain mode
+ * (producer: Stopped, applier: Draining)
+ * |
+ * | applier signals drain is complete
+ * V
+ * - primary is in master mode
+ * (producer: Stopped, applier: Stopped)
+ *
+ *
+ * Step-down
+ * =========
+ * The state transitions become:
+ * - primary is in master mode
+ * (producer: Stopped, applier: Stopped)
+ * |
+ * | step down
+ * V
+ * - secondary mode, starting bgsync
+ * (producer: Starting, applier: Running)
+ * |
+ * | bgsync runs start()
+ * V
+ * - secondary mode, normal
+ * (producer: Running, applier: Running)
+ *
+ * When a node steps down during draining mode, it's OK to change from (producer: Stopped,
+ * applier: Draining) to (producer: Starting, applier: Running).
+ *
+ * When a node steps down during catchup mode, the states remain the same (producer: Running,
+ * applier: Running).
*/
- virtual bool isWaitingForApplierToDrain() = 0;
+ enum class ApplierState { Running, Draining, Stopped };
/**
- * A new primary tries to have its oplog catch up after winning an election.
- * Return true if the coordinator is waiting for catch-up to finish.
+ * In normal cases: Running -> Draining -> Stopped -> Running.
+ * Draining -> Running is also possible if a node steps down during drain mode.
+ *
+ * Only the applier can make the transition from Draining to Stopped by calling
+ * signalDrainComplete().
*/
- virtual bool isCatchingUp() = 0;
+ virtual ApplierState getApplierState() = 0;
/**
* Signals that a previously requested pause and drain of the applier buffer
@@ -412,12 +456,18 @@ public:
*
* This is an interface that allows the applier to reenable writes after
* a successful election triggers the draining of the applier buffer.
+ *
+ * The applier signals drain complete when the buffer is empty and it's in Draining
+ * state. We need to make sure the applier checks both conditions in the same term.
+ * Otherwise, it's possible that the applier confirms the empty buffer, but the node
+ * steps down and steps up so quickly that the applier signals drain complete in the wrong
+ * term.
*/
- virtual void signalDrainComplete(OperationContext* txn) = 0;
+ virtual void signalDrainComplete(OperationContext* txn, long long termWhenBufferIsEmpty) = 0;
/**
* Waits duration of 'timeout' for applier to finish draining its buffer of operations.
- * Returns OK if isWaitingForApplierToDrain() returns false.
+ * Returns OK if we are not in drain mode.
* Returns ErrorCodes::ExceededTimeLimit if we timed out waiting for the applier to drain its
* buffer.
* Returns ErrorCodes::BadValue if timeout is negative.
diff --git a/src/mongo/db/repl/replication_coordinator_external_state.h b/src/mongo/db/repl/replication_coordinator_external_state.h
index c995f30a468..66283518bfe 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state.h
+++ b/src/mongo/db/repl/replication_coordinator_external_state.h
@@ -255,9 +255,14 @@ public:
virtual void signalApplierToChooseNewSyncSource() = 0;
/**
- * Notifies the bgsync to cancel the current oplog fetcher.
+ * Notifies the bgsync to stop fetching data.
*/
- virtual void signalApplierToCancelFetcher() = 0;
+ virtual void stopProducer() = 0;
+
+ /**
+ * Start bgsync's producer if it's stopped.
+ */
+ virtual void startProducerIfStopped() = 0;
/**
* Drops all snapshots and clears the "committed" snapshot.
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
index d7f08443d45..f5a96ed3123 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
@@ -808,12 +808,18 @@ void ReplicationCoordinatorExternalStateImpl::signalApplierToChooseNewSyncSource
}
}
-void ReplicationCoordinatorExternalStateImpl::signalApplierToCancelFetcher() {
+void ReplicationCoordinatorExternalStateImpl::stopProducer() {
LockGuard lk(_threadMutex);
- if (!_bgSync) {
- return;
+ if (_bgSync) {
+ _bgSync->stop(false);
+ }
+}
+
+void ReplicationCoordinatorExternalStateImpl::startProducerIfStopped() {
+ LockGuard lk(_threadMutex);
+ if (_bgSync) {
+ _bgSync->startProducerIfStopped();
}
- _bgSync->cancelFetcher();
}
void ReplicationCoordinatorExternalStateImpl::_dropAllTempCollections(OperationContext* txn) {
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.h b/src/mongo/db/repl/replication_coordinator_external_state_impl.h
index 2bee57d3419..b0d9487eaaf 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.h
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.h
@@ -94,7 +94,8 @@ public:
virtual void killAllUserOperations(OperationContext* txn);
virtual void shardingOnStepDownHook();
virtual void signalApplierToChooseNewSyncSource();
- virtual void signalApplierToCancelFetcher();
+ virtual void stopProducer();
+ virtual void startProducerIfStopped();
void dropAllSnapshots() final;
void updateCommittedSnapshot(SnapshotName newCommitPoint) final;
void createSnapshot(OperationContext* txn, SnapshotName name) final;
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp b/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp
index 5f305c067d6..274806326a6 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp
+++ b/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp
@@ -55,7 +55,6 @@ ReplicationCoordinatorExternalStateMock::ReplicationCoordinatorExternalStateMock
_storeLocalLastVoteDocumentStatus(Status::OK()),
_storeLocalConfigDocumentShouldHang(false),
_storeLocalLastVoteDocumentShouldHang(false),
- _isApplierSignaledToCancelFetcher(false),
_connectionsClosed(false),
_threadsStarted(false) {}
@@ -203,10 +202,6 @@ void ReplicationCoordinatorExternalStateMock::setStoreLocalConfigDocumentToHang(
}
}
-bool ReplicationCoordinatorExternalStateMock::isApplierSignaledToCancelFetcher() const {
- return _isApplierSignaledToCancelFetcher;
-}
-
bool ReplicationCoordinatorExternalStateMock::threadsStarted() const {
return _threadsStarted;
}
@@ -233,9 +228,9 @@ void ReplicationCoordinatorExternalStateMock::shardingOnStepDownHook() {}
void ReplicationCoordinatorExternalStateMock::signalApplierToChooseNewSyncSource() {}
-void ReplicationCoordinatorExternalStateMock::signalApplierToCancelFetcher() {
- _isApplierSignaledToCancelFetcher = true;
-}
+void ReplicationCoordinatorExternalStateMock::stopProducer() {}
+
+void ReplicationCoordinatorExternalStateMock::startProducerIfStopped() {}
void ReplicationCoordinatorExternalStateMock::dropAllSnapshots() {}
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_mock.h b/src/mongo/db/repl/replication_coordinator_external_state_mock.h
index d35c269a332..1b575ede697 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_mock.h
+++ b/src/mongo/db/repl/replication_coordinator_external_state_mock.h
@@ -87,7 +87,8 @@ public:
virtual void killAllUserOperations(OperationContext* txn);
virtual void shardingOnStepDownHook();
virtual void signalApplierToChooseNewSyncSource();
- virtual void signalApplierToCancelFetcher();
+ virtual void stopProducer();
+ virtual void startProducerIfStopped();
virtual void dropAllSnapshots();
virtual void updateCommittedSnapshot(SnapshotName newCommitPoint);
virtual void createSnapshot(OperationContext* txn, SnapshotName name);
@@ -161,11 +162,6 @@ public:
void setStoreLocalLastVoteDocumentToHang(bool hang);
/**
- * Returns true if applier was signaled to cancel fetcher.
- */
- bool isApplierSignaledToCancelFetcher() const;
-
- /**
* Returns true if startThreads() has been called.
*/
bool threadsStarted() const;
@@ -211,7 +207,6 @@ private:
stdx::condition_variable _shouldHangLastVoteCondVar;
bool _storeLocalConfigDocumentShouldHang;
bool _storeLocalLastVoteDocumentShouldHang;
- bool _isApplierSignaledToCancelFetcher;
bool _connectionsClosed;
HostAndPort _clientHostAndPort;
bool _threadsStarted;
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index 099f9462504..a6d936efc0c 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -322,7 +322,6 @@ ReplicationCoordinatorImpl::ReplicationCoordinatorImpl(
_externalState(std::move(externalState)),
_inShutdown(false),
_memberState(MemberState::RS_STARTUP),
- _isWaitingForDrainToComplete(false),
_rsConfigState(kConfigPreStart),
_selfIndex(-1),
_sleptLastElection(false),
@@ -884,18 +883,13 @@ void ReplicationCoordinatorImpl::_setFollowerModeFinish(
_replExecutor.signalEvent(finishedSettingFollowerMode);
}
-bool ReplicationCoordinatorImpl::isWaitingForApplierToDrain() {
+ReplicationCoordinator::ApplierState ReplicationCoordinatorImpl::getApplierState() {
stdx::lock_guard<stdx::mutex> lk(_mutex);
- return _isWaitingForDrainToComplete;
+ return _applierState;
}
-bool ReplicationCoordinatorImpl::isCatchingUp() {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- return _isCatchingUp;
-}
-
-
-void ReplicationCoordinatorImpl::signalDrainComplete(OperationContext* txn) {
+void ReplicationCoordinatorImpl::signalDrainComplete(OperationContext* txn,
+ long long termWhenBufferIsEmpty) {
// This logic is a little complicated in order to avoid acquiring the global exclusive lock
// unnecessarily. This is important because the applier may call signalDrainComplete()
// whenever it wants, not only when the ReplicationCoordinator is expecting it.
@@ -905,7 +899,7 @@ void ReplicationCoordinatorImpl::signalDrainComplete(OperationContext* txn) {
// 2.) Otherwise, release the mutex while acquiring the global exclusive lock,
// since that might take a while (NB there's a deadlock cycle otherwise, too).
// 3.) Re-check to see if we've somehow left drain mode. If we have not, clear
- // _isWaitingForDrainToComplete, set the flag allowing non-local database writes and
+ // producer and applier's states, set the flag allowing non-local database writes and
// drop the mutex. At this point, no writes can occur from other threads, due to the
// global exclusive lock.
// 4.) Drop all temp collections, and log the drops to the oplog.
@@ -915,16 +909,15 @@ void ReplicationCoordinatorImpl::signalDrainComplete(OperationContext* txn) {
//
// Because replicatable writes are forbidden while in drain mode, and we don't exit drain
// mode until we have the global exclusive lock, which forbids all other threads from making
- // writes, we know that from the time that _isWaitingForDrainToComplete is set in
- // _performPostMemberStateUpdateAction(kActionWinElection) until this method returns, no
- // external writes will be processed. This is important so that a new temp collection isn't
- // introduced on the new primary before we drop all the temp collections.
+ // writes, we know that from the time that _canAcceptNonLocalWrites is set until
+ // this method returns, no external writes will be processed. This is important so that a new
+ // temp collection isn't introduced on the new primary before we drop all the temp collections.
// When we go to drop all temp collections, we must replicate the drops.
invariant(txn->writesAreReplicated());
stdx::unique_lock<stdx::mutex> lk(_mutex);
- if (!_isWaitingForDrainToComplete) {
+ if (_applierState != ApplierState::Draining) {
return;
}
lk.unlock();
@@ -950,19 +943,14 @@ void ReplicationCoordinatorImpl::signalDrainComplete(OperationContext* txn) {
Lock::GlobalWrite globalWriteLock(txn->lockState());
lk.lock();
- if (!_isWaitingForDrainToComplete) {
+ // Exit drain mode when the buffer is empty in the current term and we're in Draining mode.
+ if (_applierState != ApplierState::Draining || termWhenBufferIsEmpty != _cachedTerm) {
return;
}
- invariant(!_isCatchingUp);
- _isWaitingForDrainToComplete = false;
+ _applierState = ApplierState::Stopped;
_drainFinishedCond.notify_all();
- if (!_getMemberState_inlock().primary()) {
- // We must have decided not to transition to primary while waiting for the applier to drain.
- // Skip the rest of this function since it should only be done when really transitioning.
- return;
- }
-
+ invariant(_getMemberState_inlock().primary());
invariant(!_canAcceptNonLocalWrites);
_canAcceptNonLocalWrites = true;
@@ -984,7 +972,7 @@ Status ReplicationCoordinatorImpl::waitForDrainFinish(Milliseconds timeout) {
}
stdx::unique_lock<stdx::mutex> lk(_mutex);
- auto pred = [this]() { return !_isCatchingUp && !_isWaitingForDrainToComplete; };
+ auto pred = [this]() { return _applierState != ApplierState::Draining; };
if (!_drainFinishedCond.wait_for(lk, timeout.toSystemDuration(), pred)) {
return Status(ErrorCodes::ExceededTimeLimit,
"Timed out waiting to finish draining applier buffer");
@@ -2071,15 +2059,15 @@ void ReplicationCoordinatorImpl::fillIsMasterForReplSet(IsMasterResponse* respon
_topCoord->fillIsMasterForReplSet(response);
}
- OpTime lastOpTime = getMyLastAppliedOpTime();
+ LockGuard lock(_mutex);
+ OpTime lastOpTime = _getMyLastAppliedOpTime_inlock();
response->setLastWrite(lastOpTime, lastOpTime.getTimestamp().getSecs());
if (_currentCommittedSnapshot) {
OpTime majorityOpTime = _currentCommittedSnapshot->opTime;
response->setLastMajorityWrite(majorityOpTime, majorityOpTime.getTimestamp().getSecs());
}
-
- if (isWaitingForApplierToDrain() || isCatchingUp()) {
- // Report that we are secondary to ismaster callers until drain completes.
+ // Report that we are secondary to ismaster callers until drain completes.
+ if (response->isMaster() && !_canAcceptNonLocalWrites) {
response->setIsMaster(false);
response->setIsSecondary(true);
}
@@ -2577,8 +2565,6 @@ ReplicationCoordinatorImpl::_updateMemberStateFromTopologyCoordinator_inlock() {
_replicationWaiterList.signalAndRemoveAll_inlock();
// Wake up the optime waiter that is waiting for primary catch-up to finish.
_opTimeWaiterList.signalAndRemoveAll_inlock();
- // _isCatchingUp and _isWaitingForDrainToComplete could be cleaned up asynchronously
- // by freshness scan.
_canAcceptNonLocalWrites = false;
_stepDownPending = false;
serverGlobalParams.featureCompatibility.validateFeaturesAsMaster.store(false);
@@ -2587,6 +2573,12 @@ ReplicationCoordinatorImpl::_updateMemberStateFromTopologyCoordinator_inlock() {
result = kActionFollowerModeStateChange;
}
+ // Enable replication producer and applier on stepdown.
+ if (_memberState.primary()) {
+ _applierState = ApplierState::Running;
+ _externalState->startProducerIfStopped();
+ }
+
if (_memberState.secondary() && !newState.primary()) {
// Switching out of SECONDARY, but not to PRIMARY.
_canServeNonLocalReads.store(0U);
@@ -2681,9 +2673,6 @@ void ReplicationCoordinatorImpl::_performPostMemberStateUpdateAction(
auto ts = LogicalClock::get(getServiceContext())->reserveTicks(1).asTimestamp();
_topCoord->processWinElection(_electionId, ts);
- invariant(!_isCatchingUp);
- invariant(!_isWaitingForDrainToComplete);
- _isCatchingUp = true;
const PostMemberStateUpdateAction nextAction =
_updateMemberStateFromTopologyCoordinator_inlock();
invariant(nextAction != kActionWinElection);
@@ -2695,7 +2684,7 @@ void ReplicationCoordinatorImpl::_performPostMemberStateUpdateAction(
if (isV1ElectionProtocol()) {
_scanOpTimeForCatchUp_inlock();
} else {
- _finishCatchUpOplog_inlock(true);
+ _finishCatchingUpOplog_inlock();
}
break;
}
@@ -2716,7 +2705,7 @@ void ReplicationCoordinatorImpl::_scanOpTimeForCatchUp_inlock() {
auto evhStatus =
scanner->start(&_replExecutor, _rsConfig, _selfIndex, _rsConfig.getCatchUpTimeoutPeriod());
if (evhStatus == ErrorCodes::ShutdownInProgress) {
- _finishCatchUpOplog_inlock(true);
+ _finishCatchingUpOplog_inlock();
return;
}
fassertStatusOK(40254, evhStatus.getStatus());
@@ -2725,7 +2714,7 @@ void ReplicationCoordinatorImpl::_scanOpTimeForCatchUp_inlock() {
evhStatus.getValue(), [this, scanner, scanStartTime, term](const CallbackArgs& cbData) {
LockGuard lk(_mutex);
if (cbData.status == ErrorCodes::CallbackCanceled) {
- _finishCatchUpOplog_inlock(true);
+ _finishCatchingUpOplog_inlock();
return;
}
auto totalTimeout = _rsConfig.getCatchUpTimeoutPeriod();
@@ -2741,9 +2730,9 @@ void ReplicationCoordinatorImpl::_catchUpOplogToLatest_inlock(const FreshnessSca
// Term is also checked in case the catchup timeout is so long that the node becomes primary
// again.
if (!_memberState.primary() || originalTerm != _cachedTerm) {
+ // If the node steps down during the catch-up, we don't go into drain mode.
log() << "Stopped transition to primary of term " << originalTerm
<< " because I've already stepped down.";
- _finishCatchUpOplog_inlock(false);
return;
}
@@ -2754,7 +2743,7 @@ void ReplicationCoordinatorImpl::_catchUpOplogToLatest_inlock(const FreshnessSca
log() << "Could not access any nodes within timeout when checking for "
<< "additional ops to apply before finishing transition to primary. "
<< "Will move forward with becoming primary anyway.";
- _finishCatchUpOplog_inlock(true);
+ _finishCatchingUpOplog_inlock();
return;
}
@@ -2763,7 +2752,7 @@ void ReplicationCoordinatorImpl::_catchUpOplogToLatest_inlock(const FreshnessSca
if (freshnessInfo.opTime <= _getMyLastAppliedOpTime_inlock()) {
log() << "My optime is most up-to-date, skipping catch-up "
<< "and completing transition to primary.";
- _finishCatchUpOplog_inlock(true);
+ _finishCatchingUpOplog_inlock();
return;
}
@@ -2776,7 +2765,7 @@ void ReplicationCoordinatorImpl::_catchUpOplogToLatest_inlock(const FreshnessSca
log() << "Finished catch-up oplog after becoming primary.";
}
- _finishCatchUpOplog_inlock(true);
+ _finishCatchingUpOplog_inlock();
};
auto waiterInfo = std::make_shared<WaiterInfo>(freshnessInfo.opTime, finishCB);
@@ -2791,18 +2780,9 @@ void ReplicationCoordinatorImpl::_catchUpOplogToLatest_inlock(const FreshnessSca
_replExecutor.scheduleWorkAt(_replExecutor.now() + timeout, timeoutCB);
}
-void ReplicationCoordinatorImpl::_finishCatchUpOplog_inlock(bool startToDrain) {
- invariant(_isCatchingUp);
- _isCatchingUp = false;
- // If the node steps down during the catch-up, we don't go into drain mode.
- if (startToDrain) {
- invariant(!_isWaitingForDrainToComplete);
- _isWaitingForDrainToComplete = true;
- // Signal applier in executor to avoid the deadlock with bgsync's mutex that is required to
- // cancel fetcher.
- _replExecutor.scheduleWork(
- _wrapAsCallbackFn([this]() { _externalState->signalApplierToCancelFetcher(); }));
- }
+void ReplicationCoordinatorImpl::_finishCatchingUpOplog_inlock() {
+ _applierState = ApplierState::Draining;
+ _externalState->stopProducer();
}
Status ReplicationCoordinatorImpl::processReplSetGetRBID(BSONObjBuilder* resultObj) {
@@ -3079,7 +3059,8 @@ HostAndPort ReplicationCoordinatorImpl::chooseNewSyncSource(const OpTime& lastOp
LockGuard topoLock(_topoMutex);
HostAndPort oldSyncSource = _topCoord->getSyncSourceAddress();
- auto chainingPreference = isCatchingUp()
+ // Always allow chaining while in catchup and drain mode.
+ auto chainingPreference = getMemberState().primary()
? TopologyCoordinator::ChainingPreference::kAllowChaining
: TopologyCoordinator::ChainingPreference::kUseConfiguration;
HostAndPort newSyncSource =
diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h
index 9144e88b6b8..1e2543367a9 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.h
+++ b/src/mongo/db/repl/replication_coordinator_impl.h
@@ -174,11 +174,10 @@ public:
virtual bool setFollowerMode(const MemberState& newState) override;
- virtual bool isWaitingForApplierToDrain() override;
+ virtual ApplierState getApplierState() override;
- virtual bool isCatchingUp() override;
-
- virtual void signalDrainComplete(OperationContext* txn) override;
+ virtual void signalDrainComplete(OperationContext* txn,
+ long long termWhenBufferIsEmpty) override;
virtual Status waitForDrainFinish(Milliseconds timeout) override;
@@ -1134,10 +1133,8 @@ private:
long long originalTerm);
/**
* Finish catch-up mode and start drain mode.
- * If "startToDrain" is true, the node enters drain mode. Otherwise, it goes back to secondary
- * mode.
*/
- void _finishCatchUpOplog_inlock(bool startToDrain);
+ void _finishCatchingUpOplog_inlock();
/**
* Waits for the config state to leave kConfigStartingUp, which indicates that start() has
@@ -1247,11 +1244,7 @@ private:
// Used to signal threads waiting for changes to _memberState.
stdx::condition_variable _drainFinishedCond; // (M)
- // True if we are waiting for the applier to finish draining.
- bool _isWaitingForDrainToComplete; // (M)
-
- // True if we are waiting for oplog catch-up to finish.
- bool _isCatchingUp = false; // (M)
+ ReplicationCoordinator::ApplierState _applierState = ApplierState::Running; // (M)
// Used to signal threads waiting for changes to _rsConfigState.
stdx::condition_variable _rsConfigStateChange; // (M)
diff --git a/src/mongo/db/repl/replication_coordinator_impl_elect_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_elect_test.cpp
index 0520bbad0f2..51817d9e0c2 100644
--- a/src/mongo/db/repl/replication_coordinator_impl_elect_test.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl_elect_test.cpp
@@ -155,7 +155,7 @@ TEST_F(ReplCoordElectTest, ElectionSucceedsWhenNodeIsTheOnlyElectableNode) {
ASSERT(getReplCoord()->getMemberState().primary())
<< getReplCoord()->getMemberState().toString();
- ASSERT(getReplCoord()->isWaitingForApplierToDrain());
+ ASSERT(getReplCoord()->getApplierState() == ReplicationCoordinator::ApplierState::Draining);
const auto txnPtr = makeOperationContext();
auto& txn = *txnPtr;
@@ -165,7 +165,7 @@ TEST_F(ReplCoordElectTest, ElectionSucceedsWhenNodeIsTheOnlyElectableNode) {
getReplCoord()->fillIsMasterForReplSet(&imResponse);
ASSERT_FALSE(imResponse.isMaster()) << imResponse.toBSON().toString();
ASSERT_TRUE(imResponse.isSecondary()) << imResponse.toBSON().toString();
- getReplCoord()->signalDrainComplete(&txn);
+ getReplCoord()->signalDrainComplete(&txn, getReplCoord()->getTerm());
getReplCoord()->fillIsMasterForReplSet(&imResponse);
ASSERT_TRUE(imResponse.isMaster()) << imResponse.toBSON().toString();
ASSERT_FALSE(imResponse.isSecondary()) << imResponse.toBSON().toString();
@@ -189,7 +189,7 @@ TEST_F(ReplCoordElectTest, ElectionSucceedsWhenNodeIsTheOnlyNode) {
ASSERT(getReplCoord()->getMemberState().primary())
<< getReplCoord()->getMemberState().toString();
- ASSERT(getReplCoord()->isWaitingForApplierToDrain());
+ ASSERT(getReplCoord()->getApplierState() == ReplicationCoordinator::ApplierState::Draining);
const auto txnPtr = makeOperationContext();
auto& txn = *txnPtr;
@@ -199,7 +199,7 @@ TEST_F(ReplCoordElectTest, ElectionSucceedsWhenNodeIsTheOnlyNode) {
getReplCoord()->fillIsMasterForReplSet(&imResponse);
ASSERT_FALSE(imResponse.isMaster()) << imResponse.toBSON().toString();
ASSERT_TRUE(imResponse.isSecondary()) << imResponse.toBSON().toString();
- getReplCoord()->signalDrainComplete(&txn);
+ getReplCoord()->signalDrainComplete(&txn, getReplCoord()->getTerm());
getReplCoord()->fillIsMasterForReplSet(&imResponse);
ASSERT_TRUE(imResponse.isMaster()) << imResponse.toBSON().toString();
ASSERT_FALSE(imResponse.isSecondary()) << imResponse.toBSON().toString();
diff --git a/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp
index 7bbe9000bc7..137c6483a6e 100644
--- a/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp
@@ -51,6 +51,7 @@ namespace {
using executor::NetworkInterfaceMock;
using executor::RemoteCommandRequest;
using executor::RemoteCommandResponse;
+using ApplierState = ReplicationCoordinator::ApplierState;
TEST_F(ReplCoordTest, ElectionSucceedsWhenNodeIsTheOnlyElectableNode) {
assertStartSuccess(BSON("_id"
@@ -106,7 +107,7 @@ TEST_F(ReplCoordTest, ElectionSucceedsWhenNodeIsTheOnlyElectableNode) {
ASSERT(getReplCoord()->getMemberState().primary())
<< getReplCoord()->getMemberState().toString();
simulateCatchUpTimeout();
- ASSERT(getReplCoord()->isWaitingForApplierToDrain());
+ ASSERT(getReplCoord()->getApplierState() == ApplierState::Draining);
const auto txnPtr = makeOperationContext();
auto& txn = *txnPtr;
@@ -116,7 +117,7 @@ TEST_F(ReplCoordTest, ElectionSucceedsWhenNodeIsTheOnlyElectableNode) {
getReplCoord()->fillIsMasterForReplSet(&imResponse);
ASSERT_FALSE(imResponse.isMaster()) << imResponse.toBSON().toString();
ASSERT_TRUE(imResponse.isSecondary()) << imResponse.toBSON().toString();
- getReplCoord()->signalDrainComplete(&txn);
+ getReplCoord()->signalDrainComplete(&txn, getReplCoord()->getTerm());
getReplCoord()->fillIsMasterForReplSet(&imResponse);
ASSERT_TRUE(imResponse.isMaster()) << imResponse.toBSON().toString();
ASSERT_FALSE(imResponse.isSecondary()) << imResponse.toBSON().toString();
@@ -170,7 +171,7 @@ TEST_F(ReplCoordTest, ElectionSucceedsWhenNodeIsTheOnlyNode) {
<< getReplCoord()->getMemberState().toString();
// Wait for catchup check to finish.
simulateCatchUpTimeout();
- ASSERT(getReplCoord()->isWaitingForApplierToDrain());
+ ASSERT(getReplCoord()->getApplierState() == ApplierState::Draining);
const auto txnPtr = makeOperationContext();
auto& txn = *txnPtr;
@@ -180,7 +181,7 @@ TEST_F(ReplCoordTest, ElectionSucceedsWhenNodeIsTheOnlyNode) {
getReplCoord()->fillIsMasterForReplSet(&imResponse);
ASSERT_FALSE(imResponse.isMaster()) << imResponse.toBSON().toString();
ASSERT_TRUE(imResponse.isSecondary()) << imResponse.toBSON().toString();
- getReplCoord()->signalDrainComplete(&txn);
+ getReplCoord()->signalDrainComplete(&txn, getReplCoord()->getTerm());
getReplCoord()->fillIsMasterForReplSet(&imResponse);
ASSERT_TRUE(imResponse.isMaster()) << imResponse.toBSON().toString();
ASSERT_FALSE(imResponse.isSecondary()) << imResponse.toBSON().toString();
@@ -1120,6 +1121,15 @@ protected:
using NetworkOpIter = NetworkInterfaceMock::NetworkOperationIterator;
using FreshnessScanFn = stdx::function<void(const NetworkOpIter)>;
+ void replyToHeartbeatRequestAsSecondaries(const NetworkOpIter noi) {
+ ReplicaSetConfig rsConfig = getReplCoord()->getReplicaSetConfig_forTest();
+ ReplSetHeartbeatResponse hbResp;
+ hbResp.setSetName(rsConfig.getReplSetName());
+ hbResp.setState(MemberState::RS_SECONDARY);
+ hbResp.setConfigVersion(rsConfig.getConfigVersion());
+ getNet()->scheduleResponse(noi, getNet()->now(), makeResponseStatus(hbResp.toBSON(true)));
+ }
+
void simulateSuccessfulV1Voting() {
ReplicationCoordinatorImpl* replCoord = getReplCoord();
NetworkInterfaceMock* net = getNet();
@@ -1128,32 +1138,25 @@ protected:
ASSERT_NOT_EQUALS(Date_t(), electionTimeoutWhen);
log() << "Election timeout scheduled at " << electionTimeoutWhen << " (simulator time)";
- ReplicaSetConfig rsConfig = replCoord->getReplicaSetConfig_forTest();
ASSERT(replCoord->getMemberState().secondary()) << replCoord->getMemberState().toString();
bool hasReadyRequests = true;
// Process requests until we're primary and consume the heartbeats for the notification
- // of election win. Exit immediately on catch up.
- while (!replCoord->isCatchingUp() &&
- (!replCoord->getMemberState().primary() || hasReadyRequests)) {
+ // of election win. Exit immediately on unexpected requests.
+ while (!replCoord->getMemberState().primary() || hasReadyRequests) {
log() << "Waiting on network in state " << replCoord->getMemberState();
- getNet()->enterNetwork();
+ net->enterNetwork();
if (net->now() < electionTimeoutWhen) {
net->runUntil(electionTimeoutWhen);
}
- const NetworkInterfaceMock::NetworkOperationIterator noi = net->getNextReadyRequest();
+ // Peek the next request, don't consume it yet.
+ const NetworkOpIter noi = net->getFrontOfUnscheduledQueue();
const RemoteCommandRequest& request = noi->getRequest();
log() << request.target.toString() << " processing " << request.cmdObj;
- ReplSetHeartbeatArgsV1 hbArgs;
- Status status = hbArgs.initialize(request.cmdObj);
- if (hbArgs.initialize(request.cmdObj).isOK()) {
- ReplSetHeartbeatResponse hbResp;
- hbResp.setSetName(rsConfig.getReplSetName());
- hbResp.setState(MemberState::RS_SECONDARY);
- hbResp.setConfigVersion(rsConfig.getConfigVersion());
- net->scheduleResponse(noi, net->now(), makeResponseStatus(hbResp.toBSON(true)));
+ if (ReplSetHeartbeatArgsV1().initialize(request.cmdObj).isOK()) {
+ replyToHeartbeatRequestAsSecondaries(net->getNextReadyRequest());
} else if (request.cmdObj.firstElement().fieldNameStringData() ==
"replSetRequestVotes") {
- net->scheduleResponse(noi,
+ net->scheduleResponse(net->getNextReadyRequest(),
net->now(),
makeResponseStatus(BSON("ok" << 1 << "reason"
<< ""
@@ -1162,9 +1165,9 @@ protected:
<< "voteGranted"
<< true)));
} else {
- error() << "Black holing unexpected request to " << request.target << ": "
- << request.cmdObj;
- net->blackHole(noi);
+ // Stop the loop and let the caller handle unexpected requests.
+ net->exitNetwork();
+ break;
}
net->runReadyNetworkOperations();
// Successful elections need to write the last vote to disk, which is done by DB worker.
@@ -1173,7 +1176,7 @@ protected:
getReplExec()->waitForDBWork_forTest();
net->runReadyNetworkOperations();
hasReadyRequests = net->hasReadyRequests();
- getNet()->exitNetwork();
+ net->exitNetwork();
}
}
@@ -1220,9 +1223,11 @@ protected:
while (net->hasReadyRequests()) {
const NetworkInterfaceMock::NetworkOperationIterator noi = net->getNextReadyRequest();
const RemoteCommandRequest& request = noi->getRequest();
+ log() << request.target.toString() << " processing " << request.cmdObj;
if (request.cmdObj.firstElement().fieldNameStringData() == "replSetGetStatus") {
- log() << request.target.toString() << " processing " << request.cmdObj;
onFreshnessScanRequest(noi);
+ } else if (ReplSetHeartbeatArgsV1().initialize(request.cmdObj).isOK()) {
+ replyToHeartbeatRequestAsSecondaries(noi);
} else {
log() << "Black holing unexpected request to " << request.target << ": "
<< request.cmdObj;
@@ -1232,6 +1237,28 @@ protected:
}
net->exitNetwork();
}
+
+ void replyHeartbeatsAndRunUntil(Date_t until) {
+ auto net = getNet();
+ net->enterNetwork();
+ while (net->now() < until) {
+ while (net->hasReadyRequests()) {
+ // Peek the next request
+ auto noi = net->getFrontOfUnscheduledQueue();
+ auto& request = noi->getRequest();
+ if (ReplSetHeartbeatArgsV1().initialize(request.cmdObj).isOK()) {
+ // Consume the next request
+ replyToHeartbeatRequestAsSecondaries(net->getNextReadyRequest());
+ } else {
+ // Cannot consume other requests than heartbeats.
+ net->exitNetwork();
+ return;
+ }
+ }
+ net->runUntil(until);
+ }
+ net->exitNetwork();
+ }
};
TEST_F(PrimaryCatchUpTest, PrimaryDoNotNeedToCatchUp) {
@@ -1242,11 +1269,11 @@ TEST_F(PrimaryCatchUpTest, PrimaryDoNotNeedToCatchUp) {
processFreshnessScanRequests([this](const NetworkOpIter noi) {
getNet()->scheduleResponse(noi, getNet()->now(), makeFreshnessScanResponse(OpTime()));
});
- ASSERT(getReplCoord()->isWaitingForApplierToDrain());
+ ASSERT(getReplCoord()->getApplierState() == ApplierState::Draining);
stopCapturingLogMessages();
ASSERT_EQUALS(1, countLogLinesContaining("My optime is most up-to-date, skipping catch-up"));
auto txn = makeOperationContext();
- getReplCoord()->signalDrainComplete(txn.get());
+ getReplCoord()->signalDrainComplete(txn.get(), getReplCoord()->getTerm());
ASSERT_TRUE(getReplCoord()->canAcceptWritesForDatabase("test"));
}
@@ -1263,14 +1290,13 @@ TEST_F(PrimaryCatchUpTest, PrimaryFreshnessScanTimeout) {
});
auto net = getNet();
- net->enterNetwork();
- net->runUntil(net->now() + config.getCatchUpTimeoutPeriod());
- net->exitNetwork();
- ASSERT(getReplCoord()->isWaitingForApplierToDrain());
+ replyHeartbeatsAndRunUntil(net->now() + config.getCatchUpTimeoutPeriod());
+ ASSERT_EQ((int)getReplCoord()->getApplierState(), (int)ApplierState::Draining);
+ ASSERT(getReplCoord()->getApplierState() == ApplierState::Draining);
stopCapturingLogMessages();
ASSERT_EQUALS(1, countLogLinesContaining("Could not access any nodes within timeout"));
auto txn = makeOperationContext();
- getReplCoord()->signalDrainComplete(txn.get());
+ getReplCoord()->signalDrainComplete(txn.get(), getReplCoord()->getTerm());
ASSERT_TRUE(getReplCoord()->canAcceptWritesForDatabase("test"));
}
@@ -1288,18 +1314,18 @@ TEST_F(PrimaryCatchUpTest, PrimaryCatchUpSucceeds) {
});
NetworkInterfaceMock* net = getNet();
- ASSERT(getReplCoord()->isCatchingUp());
+ ASSERT(getReplCoord()->getApplierState() == ApplierState::Running);
// Simulate the work done by bgsync and applier threads.
// setMyLastAppliedOpTime() will signal the optime waiter.
getReplCoord()->setMyLastAppliedOpTime(time2);
net->enterNetwork();
net->runReadyNetworkOperations();
net->exitNetwork();
- ASSERT(getReplCoord()->isWaitingForApplierToDrain());
+ ASSERT(getReplCoord()->getApplierState() == ApplierState::Draining);
stopCapturingLogMessages();
ASSERT_EQUALS(1, countLogLinesContaining("Finished catch-up oplog after becoming primary."));
auto txn = makeOperationContext();
- getReplCoord()->signalDrainComplete(txn.get());
+ getReplCoord()->signalDrainComplete(txn.get(), getReplCoord()->getTerm());
ASSERT_TRUE(getReplCoord()->canAcceptWritesForDatabase("test"));
}
@@ -1316,16 +1342,13 @@ TEST_F(PrimaryCatchUpTest, PrimaryCatchUpTimeout) {
net->scheduleResponse(noi, net->now(), makeFreshnessScanResponse(time2));
});
- NetworkInterfaceMock* net = getNet();
- ASSERT(getReplCoord()->isCatchingUp());
- net->enterNetwork();
- net->runUntil(net->now() + config.getCatchUpTimeoutPeriod());
- net->exitNetwork();
- ASSERT(getReplCoord()->isWaitingForApplierToDrain());
+ ASSERT(getReplCoord()->getApplierState() == ApplierState::Running);
+ replyHeartbeatsAndRunUntil(getNet()->now() + config.getCatchUpTimeoutPeriod());
+ ASSERT(getReplCoord()->getApplierState() == ApplierState::Draining);
stopCapturingLogMessages();
ASSERT_EQUALS(1, countLogLinesContaining("Cannot catch up oplog after becoming primary"));
auto txn = makeOperationContext();
- getReplCoord()->signalDrainComplete(txn.get());
+ getReplCoord()->signalDrainComplete(txn.get(), getReplCoord()->getTerm());
ASSERT_TRUE(getReplCoord()->canAcceptWritesForDatabase("test"));
}
@@ -1341,18 +1364,15 @@ TEST_F(PrimaryCatchUpTest, PrimaryStepsDownDuringFreshnessScan) {
log() << "Black holing request to " << request.target << ": " << request.cmdObj;
getNet()->blackHole(noi);
});
- ASSERT(getReplCoord()->isCatchingUp());
+ ASSERT(getReplCoord()->getApplierState() == ApplierState::Running);
TopologyCoordinator::UpdateTermResult updateTermResult;
auto evh = getReplCoord()->updateTerm_forTest(2, &updateTermResult);
ASSERT_TRUE(evh.isValid());
getReplExec()->waitForEvent(evh);
ASSERT_TRUE(getReplCoord()->getMemberState().secondary());
- auto net = getNet();
- net->enterNetwork();
- net->runUntil(net->now() + config.getCatchUpTimeoutPeriod());
- net->exitNetwork();
- ASSERT_FALSE(getReplCoord()->isWaitingForApplierToDrain());
+ replyHeartbeatsAndRunUntil(getNet()->now() + config.getCatchUpTimeoutPeriod());
+ ASSERT(getReplCoord()->getApplierState() == ApplierState::Running);
stopCapturingLogMessages();
ASSERT_EQUALS(1, countLogLinesContaining("Stopped transition to primary"));
ASSERT_FALSE(getReplCoord()->canAcceptWritesForDatabase("test"));
@@ -1370,7 +1390,7 @@ TEST_F(PrimaryCatchUpTest, PrimaryStepsDownDuringCatchUp) {
// The old primary accepted one more op and all nodes caught up after voting for me.
net->scheduleResponse(noi, net->now(), makeFreshnessScanResponse(time2));
});
- ASSERT(getReplCoord()->isCatchingUp());
+ ASSERT(getReplCoord()->getApplierState() == ApplierState::Running);
TopologyCoordinator::UpdateTermResult updateTermResult;
auto evh = getReplCoord()->updateTerm_forTest(2, &updateTermResult);
@@ -1382,15 +1402,65 @@ TEST_F(PrimaryCatchUpTest, PrimaryStepsDownDuringCatchUp) {
net->runReadyNetworkOperations();
net->exitNetwork();
auto txn = makeOperationContext();
- // Simulate bgsync signaling replCoord to exit drain mode.
+ // Simulate the applier signaling replCoord to exit drain mode.
// At this point, we see the stepdown and reset the states.
- getReplCoord()->signalDrainComplete(txn.get());
- ASSERT_FALSE(getReplCoord()->isWaitingForApplierToDrain());
+ getReplCoord()->signalDrainComplete(txn.get(), getReplCoord()->getTerm());
+ ASSERT(getReplCoord()->getApplierState() == ApplierState::Running);
stopCapturingLogMessages();
ASSERT_EQUALS(1, countLogLinesContaining("Cannot catch up oplog after becoming primary"));
ASSERT_FALSE(getReplCoord()->canAcceptWritesForDatabase("test"));
}
+TEST_F(PrimaryCatchUpTest, PrimaryStepsDownDuringDrainMode) {
+ startCapturingLogMessages();
+
+ OpTime time1(Timestamp(100, 1), 0);
+ OpTime time2(Timestamp(100, 2), 0);
+ ReplicaSetConfig config = setUp3NodeReplSetAndRunForElection(time1);
+
+ processFreshnessScanRequests([this, time2](const NetworkOpIter noi) {
+ auto net = getNet();
+ // The old primary accepted one more op and all nodes caught up after voting for me.
+ net->scheduleResponse(noi, net->now(), makeFreshnessScanResponse(time2));
+ });
+
+ NetworkInterfaceMock* net = getNet();
+ ReplicationCoordinatorImpl* replCoord = getReplCoord();
+ ASSERT(getReplCoord()->getApplierState() == ApplierState::Running);
+ // Simulate the work done by bgsync and applier threads.
+ // setMyLastAppliedOpTime() will signal the optime waiter.
+ replCoord->setMyLastAppliedOpTime(time2);
+ net->enterNetwork();
+ net->runReadyNetworkOperations();
+ net->exitNetwork();
+ ASSERT(replCoord->getApplierState() == ApplierState::Draining);
+ stopCapturingLogMessages();
+ ASSERT_EQUALS(1, countLogLinesContaining("Finished catch-up oplog after becoming primary."));
+
+ // Step down during drain mode.
+ TopologyCoordinator::UpdateTermResult updateTermResult;
+ auto evh = replCoord->updateTerm_forTest(2, &updateTermResult);
+ ASSERT_TRUE(evh.isValid());
+ getReplExec()->waitForEvent(evh);
+ ASSERT_TRUE(replCoord->getMemberState().secondary());
+
+ // Step up again
+ ASSERT(replCoord->getApplierState() == ApplierState::Running);
+ simulateSuccessfulV1Voting();
+ ASSERT_TRUE(replCoord->getMemberState().primary());
+
+ // No need to catch-up, so we enter drain mode.
+ processFreshnessScanRequests([this](const NetworkOpIter noi) {
+ getNet()->scheduleResponse(noi, getNet()->now(), makeFreshnessScanResponse(OpTime()));
+ });
+ ASSERT(replCoord->getApplierState() == ApplierState::Draining);
+ ASSERT_FALSE(replCoord->canAcceptWritesForDatabase("test"));
+ auto txn = makeOperationContext();
+ replCoord->signalDrainComplete(txn.get(), replCoord->getTerm());
+ ASSERT(replCoord->getApplierState() == ApplierState::Stopped);
+ ASSERT_TRUE(replCoord->canAcceptWritesForDatabase("test"));
+}
+
} // namespace
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/replication_coordinator_impl_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_test.cpp
index 9e8a92a531f..3fd13b767a4 100644
--- a/src/mongo/db/repl/replication_coordinator_impl_test.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl_test.cpp
@@ -113,10 +113,10 @@ void runSingleNodeElection(ServiceContext::UniqueOperationContext txn,
net->exitNetwork();
- ASSERT(replCoord->isWaitingForApplierToDrain());
+ ASSERT(replCoord->getApplierState() == ReplicationCoordinator::ApplierState::Draining);
ASSERT(replCoord->getMemberState().primary()) << replCoord->getMemberState().toString();
- replCoord->signalDrainComplete(txn.get());
+ replCoord->signalDrainComplete(txn.get(), replCoord->getTerm());
}
/**
@@ -709,8 +709,6 @@ TEST_F(ReplCoordTest, NodeReturnsOkWhenRunningAwaitReplicationAgainstPrimaryWith
ReplicationCoordinator::StatusAndDuration statusAndDur =
getReplCoord()->awaitReplication(txn.get(), time, writeConcern);
ASSERT_OK(statusAndDur.status);
-
- ASSERT_TRUE(getExternalState()->isApplierSignaledToCancelFetcher());
}
TEST_F(ReplCoordTest,
@@ -5067,13 +5065,13 @@ TEST_F(ReplCoordTest, WaitForDrainFinish) {
auto timeout = Milliseconds(1);
ASSERT_OK(replCoord->waitForMemberState(MemberState::RS_PRIMARY, timeout));
- ASSERT_TRUE(replCoord->isWaitingForApplierToDrain());
+ ASSERT(replCoord->getApplierState() == ReplicationCoordinator::ApplierState::Draining);
ASSERT_EQUALS(ErrorCodes::ExceededTimeLimit, replCoord->waitForDrainFinish(timeout));
ASSERT_EQUALS(ErrorCodes::BadValue, replCoord->waitForDrainFinish(Milliseconds(-1)));
const auto txn = makeOperationContext();
- replCoord->signalDrainComplete(txn.get());
+ replCoord->signalDrainComplete(txn.get(), replCoord->getTerm());
ASSERT_OK(replCoord->waitForDrainFinish(timeout));
// Zero timeout is fine.
diff --git a/src/mongo/db/repl/replication_coordinator_mock.cpp b/src/mongo/db/repl/replication_coordinator_mock.cpp
index fd41930ce28..38c4408eebc 100644
--- a/src/mongo/db/repl/replication_coordinator_mock.cpp
+++ b/src/mongo/db/repl/replication_coordinator_mock.cpp
@@ -210,15 +210,11 @@ bool ReplicationCoordinatorMock::setFollowerMode(const MemberState& newState) {
return true;
}
-bool ReplicationCoordinatorMock::isWaitingForApplierToDrain() {
- return false;
-}
-
-bool ReplicationCoordinatorMock::isCatchingUp() {
- return false;
+ReplicationCoordinator::ApplierState ReplicationCoordinatorMock::getApplierState() {
+ return ApplierState::Running;
}
-void ReplicationCoordinatorMock::signalDrainComplete(OperationContext*) {}
+void ReplicationCoordinatorMock::signalDrainComplete(OperationContext*, long long) {}
Status ReplicationCoordinatorMock::waitForDrainFinish(Milliseconds timeout) {
invariant(false);
diff --git a/src/mongo/db/repl/replication_coordinator_mock.h b/src/mongo/db/repl/replication_coordinator_mock.h
index b211007a8dd..5fed6539829 100644
--- a/src/mongo/db/repl/replication_coordinator_mock.h
+++ b/src/mongo/db/repl/replication_coordinator_mock.h
@@ -128,11 +128,9 @@ public:
virtual bool setFollowerMode(const MemberState& newState);
- virtual bool isWaitingForApplierToDrain();
+ virtual ApplierState getApplierState();
- virtual bool isCatchingUp();
-
- virtual void signalDrainComplete(OperationContext*);
+ virtual void signalDrainComplete(OperationContext*, long long);
virtual Status waitForDrainFinish(Milliseconds timeout) override;
diff --git a/src/mongo/db/repl/replication_coordinator_test_fixture.cpp b/src/mongo/db/repl/replication_coordinator_test_fixture.cpp
index a8f74185d0d..24ecd3af4d0 100644
--- a/src/mongo/db/repl/replication_coordinator_test_fixture.cpp
+++ b/src/mongo/db/repl/replication_coordinator_test_fixture.cpp
@@ -352,7 +352,7 @@ void ReplCoordTest::simulateSuccessfulV1ElectionAt(Date_t electionTime) {
hasReadyRequests = net->hasReadyRequests();
getNet()->exitNetwork();
}
- ASSERT(replCoord->isWaitingForApplierToDrain());
+ ASSERT(replCoord->getApplierState() == ReplicationCoordinator::ApplierState::Draining);
ASSERT(replCoord->getMemberState().primary()) << replCoord->getMemberState().toString();
IsMasterResponse imResponse;
@@ -361,8 +361,9 @@ void ReplCoordTest::simulateSuccessfulV1ElectionAt(Date_t electionTime) {
ASSERT_TRUE(imResponse.isSecondary()) << imResponse.toBSON().toString();
{
auto txn = makeOperationContext();
- replCoord->signalDrainComplete(txn.get());
+ replCoord->signalDrainComplete(txn.get(), replCoord->getTerm());
}
+ ASSERT(replCoord->getApplierState() == ReplicationCoordinator::ApplierState::Stopped);
replCoord->fillIsMasterForReplSet(&imResponse);
ASSERT_TRUE(imResponse.isMaster()) << imResponse.toBSON().toString();
ASSERT_FALSE(imResponse.isSecondary()) << imResponse.toBSON().toString();
@@ -414,7 +415,7 @@ void ReplCoordTest::simulateSuccessfulElection() {
hasReadyRequests = net->hasReadyRequests();
getNet()->exitNetwork();
}
- ASSERT(replCoord->isWaitingForApplierToDrain());
+ ASSERT(replCoord->getApplierState() == ReplicationCoordinator::ApplierState::Draining);
ASSERT(replCoord->getMemberState().primary()) << replCoord->getMemberState().toString();
IsMasterResponse imResponse;
@@ -423,7 +424,7 @@ void ReplCoordTest::simulateSuccessfulElection() {
ASSERT_TRUE(imResponse.isSecondary()) << imResponse.toBSON().toString();
{
auto txn = makeOperationContext();
- replCoord->signalDrainComplete(txn.get());
+ replCoord->signalDrainComplete(txn.get(), replCoord->getTerm());
}
replCoord->fillIsMasterForReplSet(&imResponse);
ASSERT_TRUE(imResponse.isMaster()) << imResponse.toBSON().toString();
diff --git a/src/mongo/db/repl/rs_initialsync.cpp b/src/mongo/db/repl/rs_initialsync.cpp
index 5c3ec06be96..073afcc73ec 100644
--- a/src/mongo/db/repl/rs_initialsync.cpp
+++ b/src/mongo/db/repl/rs_initialsync.cpp
@@ -102,7 +102,8 @@ void truncateAndResetOplog(OperationContext* txn,
// We must clear the sync source blacklist after calling stop()
// because the bgsync thread, while running, may update the blacklist.
replCoord->resetMyLastOpTimes();
- bgsync->stop();
+ bgsync->stop(true);
+ bgsync->startProducerIfStopped();
bgsync->clearBuffer(txn);
replCoord->clearSyncSourceBlacklist();
diff --git a/src/mongo/db/repl/rs_sync.cpp b/src/mongo/db/repl/rs_sync.cpp
index 5a1c98574d2..b04972c29e4 100644
--- a/src/mongo/db/repl/rs_sync.cpp
+++ b/src/mongo/db/repl/rs_sync.cpp
@@ -85,6 +85,7 @@ void RSDataSync::_run() {
const MemberState memberState = _replCoord->getMemberState();
+ // TODO(siyuan) Control the behavior using applier state.
// An arbiter can never transition to any other state, and doesn't replicate, ever
if (memberState.arbiter()) {
break;
@@ -97,8 +98,7 @@ void RSDataSync::_run() {
}
try {
- if (memberState.primary() && !_replCoord->isWaitingForApplierToDrain() &&
- !_replCoord->isCatchingUp()) {
+ if (_replCoord->getApplierState() == ReplicationCoordinator::ApplierState::Stopped) {
sleepsecs(1);
continue;
}
diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp
index 8f752b49e05..50fbfe820b3 100644
--- a/src/mongo/db/repl/sync_tail.cpp
+++ b/src/mongo/db/repl/sync_tail.cpp
@@ -723,17 +723,6 @@ private:
while (!_syncTail->tryPopAndWaitForMore(&txn, &ops, batchLimits)) {
}
- // For pausing replication in tests.
- while (MONGO_FAIL_POINT(rsSyncApplyStop)) {
- // Tests should not trigger clean shutdown while that failpoint is active. If we
- // think we need this, we need to think hard about what the behavior should be.
- if (_syncTail->_networkQueue->inShutdown()) {
- severe() << "Turn off rsSyncApplyStop before attempting clean shutdown";
- fassertFailedNoTrace(40304);
- }
- sleepmillis(10);
- }
-
if (ops.empty() && !ops.mustShutdown()) {
continue; // Don't emit empty batches.
}
@@ -774,8 +763,20 @@ void SyncTail::oplogApplication(ReplicationCoordinator* replCoord) {
: new ApplyBatchFinalizer(replCoord)};
while (true) { // Exits on message from OpQueueBatcher.
+ // For pausing replication in tests.
+ while (MONGO_FAIL_POINT(rsSyncApplyStop)) {
+ // Tests should not trigger clean shutdown while that failpoint is active. If we
+ // think we need this, we need to think hard about what the behavior should be.
+ if (_networkQueue->inShutdown()) {
+ severe() << "Turn off rsSyncApplyStop before attempting clean shutdown";
+ fassertFailedNoTrace(40304);
+ }
+ sleepmillis(10);
+ }
+
tryToGoLiveAsASecondary(&txn, replCoord);
+ long long termWhenBufferIsEmpty = replCoord->getTerm();
// Blocks up to a second waiting for a batch to be ready to apply. If one doesn't become
// ready in time, we'll loop again so we can do the above checks periodically.
OpQueue ops = batcher.getNextBatch(Seconds(1));
@@ -783,17 +784,12 @@ void SyncTail::oplogApplication(ReplicationCoordinator* replCoord) {
if (ops.mustShutdown()) {
return;
}
- continue; // Try again.
- }
-
- if (ops.front().raw.isEmpty()) {
- // This means that the network thread has coalesced and we have processed all of its
- // data.
- invariant(ops.getCount() == 1);
- if (replCoord->isWaitingForApplierToDrain()) {
- replCoord->signalDrainComplete(&txn);
+ if (MONGO_FAIL_POINT(rsSyncApplyStop)) {
+ continue;
}
- continue; // This wasn't a real op. Don't try to apply it.
+ // Signal drain complete if we're in Draining state and the buffer is empty.
+ replCoord->signalDrainComplete(&txn, termWhenBufferIsEmpty);
+ continue; // Try again.
}
// Extract some info from ops that we'll need after releasing the batch below.
@@ -863,6 +859,12 @@ bool SyncTail::tryPopAndWaitForMore(OperationContext* txn,
return true; // Return before wasting time parsing the op.
}
+ // Don't consume the op if we are told to stop.
+ if (MONGO_FAIL_POINT(rsSyncApplyStop)) {
+ sleepmillis(10);
+ return true;
+ }
+
ops->emplace_back(std::move(op)); // Parses the op in-place.
}
@@ -1265,8 +1267,7 @@ StatusWith<OpTime> multiApply(OperationContext* txn,
Lock::ParallelBatchWriterMode pbwm(txn->lockState());
auto replCoord = ReplicationCoordinator::get(txn);
- if (replCoord->getMemberState().primary() && !replCoord->isWaitingForApplierToDrain() &&
- !replCoord->isCatchingUp()) {
+ if (replCoord->getApplierState() == ReplicationCoordinator::ApplierState::Stopped) {
severe() << "attempting to replicate ops while primary";
return {ErrorCodes::CannotApplyOplogWhilePrimary,
"attempting to replicate ops while primary"};