diff options
20 files changed, 531 insertions, 315 deletions
diff --git a/jstests/replsets/step_down_during_draining.js b/jstests/replsets/step_down_during_draining.js new file mode 100644 index 00000000000..234a0343cf7 --- /dev/null +++ b/jstests/replsets/step_down_during_draining.js @@ -0,0 +1,131 @@ +// Test stepdown dring drain mode +// 1. Set up a 3-node set. Assume Node 1 is the primary at the beginning for simplicity. +// 2. Prevent applying retrieved ops on all secondaries, including Node 2. +// 3. Insert data to ensure Node 2 has ops to apply in its queue. +// 4. Step up Node 2. Now it enters drain mode, but cannot proceed. +// 5. Step up Node 1. Wait until Node 2 knows of a higher term and steps down. +// Node 2 re-enables bgsync producer while it's still in drain mode. +// 6. Step up Node 2 again. It enters drain mode again. +// 7. Enable applying ops. +// 8. Ensure the ops in queue are applied and that Node 2 begins to accept writes as usual. + +load("jstests/replsets/rslib.js"); + +(function() { + "use strict"; + var replSet = new ReplSetTest({name: 'testSet', nodes: 3}); + var nodes = replSet.nodeList(); + replSet.startSet(); + var conf = replSet.getReplSetConfig(); + conf.members[2].priority = 0; + conf.settings = conf.settings || {}; + conf.settings.chainingAllowed = false; + conf.settings.catchUpTimeoutMillis = 0; + conf.protocolVersion = 1; + replSet.initiate(conf); + + var primary = replSet.getPrimary(); + var secondary = replSet.getSecondary(); + + // Set verbosity for replication on all nodes. + var verbosity = { + "setParameter": 1, + "logComponentVerbosity": { + "replication": {"verbosity": 3}, + } + }; + replSet.nodes.forEach(function(node) { + node.adminCommand(verbosity); + }); + + function enableFailPoint(node) { + jsTest.log("enable failpoint " + node.host); + assert.commandWorked( + node.adminCommand({configureFailPoint: 'rsSyncApplyStop', mode: 'alwaysOn'})); + } + + function disableFailPoint(node) { + jsTest.log("disable failpoint " + node.host); + assert.commandWorked( + node.adminCommand({configureFailPoint: 'rsSyncApplyStop', mode: 'off'})); + } + + function stepUp(node) { + assert.commandWorked(node.adminCommand({replSetStepUp: 1})); + } + + // Do an initial insert to prevent the secondary from going into recovery + var numDocuments = 20; + var coll = primary.getDB("foo").foo; + assert.writeOK(coll.insert({x: 0}, {writeConcern: {w: 3, j: true}})); + replSet.awaitReplication(); + + // Enable fail point to stop replication. + var secondaries = replSet.getSecondaries(); + secondaries.forEach(enableFailPoint); + + var bufferCountBefore = secondary.getDB('foo').serverStatus().metrics.repl.buffer.count; + for (var i = 1; i < numDocuments; ++i) { + assert.writeOK(coll.insert({x: i})); + } + jsTestLog('Number of documents inserted into collection on primary: ' + numDocuments); + assert.eq(numDocuments, primary.getDB("foo").foo.find().itcount()); + + assert.soon( + function() { + var serverStatus = secondary.getDB('foo').serverStatus(); + var bufferCount = serverStatus.metrics.repl.buffer.count; + var bufferCountChange = bufferCount - bufferCountBefore; + jsTestLog('Number of operations buffered on secondary since stopping applier: ' + + bufferCountChange); + return bufferCountChange == numDocuments - 1; + }, + 'secondary did not buffer operations for new inserts on primary', + replSet.kDefaultTimeoutMs, + 1000); + + reconnect(secondary); + stepUp(secondary); + replSet.waitForState(secondary, ReplSetTest.State.PRIMARY, 1000); + replSet.awaitNodesAgreeOnPrimary(); + + // Secondary doesn't allow writes yet. + var res = secondary.getDB("admin").runCommand({"isMaster": 1}); + assert(!res.ismaster); + + assert.commandFailedWithCode( + secondary.adminCommand({ + replSetTest: 1, + waitForDrainFinish: 5000, + }), + ErrorCodes.ExceededTimeLimit, + 'replSetTest waitForDrainFinish should time out when draining is not allowed to complete'); + + // Original primary steps up. + reconnect(primary); + stepUp(primary); + replSet.waitForState(primary, ReplSetTest.State.PRIMARY, 1000); + replSet.awaitNodesAgreeOnPrimary(); + + reconnect(secondary); + stepUp(secondary); + replSet.waitForState(secondary, ReplSetTest.State.PRIMARY, 1000); + replSet.awaitNodesAgreeOnPrimary(); + + // Disable fail point to allow replication. + secondaries.forEach(disableFailPoint); + + assert.commandWorked( + secondary.adminCommand({ + replSetTest: 1, + waitForDrainFinish: 5000, + }), + 'replSetTest waitForDrainFinish should work when draining is allowed to complete'); + + // Ensure new primary is writable. + jsTestLog('New primary should be writable after draining is complete'); + assert.writeOK(secondary.getDB("foo").flag.insert({sentinel: 1})); + // Check that all writes reached the secondary's op queue prior to + // stepping down the original primary and got applied. + assert.eq(secondary.getDB("foo").foo.find().itcount(), numDocuments); +})(); diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp index 1874e73474b..a0f646f6078 100644 --- a/src/mongo/db/repl/bgsync.cpp +++ b/src/mongo/db/repl/bgsync.cpp @@ -134,9 +134,7 @@ BackgroundSync::BackgroundSync( std::unique_ptr<OplogBuffer> oplogBuffer) : _oplogBuffer(std::move(oplogBuffer)), _replCoord(getGlobalReplicationCoordinator()), - _replicationCoordinatorExternalState(replicationCoordinatorExternalState), - _lastOpTimeFetched(Timestamp(std::numeric_limits<int>::max(), 0), - std::numeric_limits<long long>::max()) { + _replicationCoordinatorExternalState(replicationCoordinatorExternalState) { // Update "repl.buffer.maxSizeBytes" server status metric to reflect the current oplog buffer's // max size. bufferMaxSizeGauge.increment(_oplogBuffer->getMaxSize() - bufferMaxSizeGauge.get()); @@ -156,7 +154,7 @@ void BackgroundSync::shutdown(OperationContext* txn) { // ensures that it won't add anything. It will also unblock the OpApplier pipeline if it is // waiting for an operation to be past the slaveDelay point. clearBuffer(txn); - _stopped = true; + _state = ProducerState::Stopped; if (_syncSourceResolver) { _syncSourceResolver->shutdown(); @@ -201,48 +199,24 @@ void BackgroundSync::_run() { fassertFailed(28546); } } - stop(); -} - -void BackgroundSync::_signalNoNewDataForApplier(OperationContext* txn) { - // Signal to consumers that we have entered the stopped state - // if the signal isn't already in the queue. - const boost::optional<BSONObj> lastObjectPushed = _oplogBuffer->lastObjectPushed(txn); - if (!lastObjectPushed || !lastObjectPushed->isEmpty()) { - const BSONObj sentinelDoc; - _oplogBuffer->pushEvenIfFull(txn, sentinelDoc); - bufferCountGauge.increment(); - bufferSizeGauge.increment(sentinelDoc.objsize()); - } + stop(true); } void BackgroundSync::_runProducer() { - const MemberState state = _replCoord->getMemberState(); - // Stop when the state changes to primary. - // - // TODO(siyuan) Drain mode should imply we're the primary. Fix this condition and the one below - // after fixing step-down during drain mode. - if (!_replCoord->isCatchingUp() && - (_replCoord->isWaitingForApplierToDrain() || state.primary())) { - if (!isStopped()) { - stop(); - } - if (_replCoord->isWaitingForApplierToDrain()) { - auto txn = cc().makeOperationContext(); - _signalNoNewDataForApplier(txn.get()); - } + if (getState() == ProducerState::Stopped) { sleepsecs(1); return; } // TODO(spencer): Use a condition variable to await loading a config. - if (state.startup()) { + // TODO(siyuan): Control bgsync with producer state. + if (_replCoord->getMemberState().startup()) { // Wait for a config to be loaded sleepsecs(1); return; } - invariant(!state.rollback()); + invariant(!_replCoord->getMemberState().rollback()); // We need to wait until initial sync has started. if (_replCoord->getMyLastAppliedOpTime().isNull()) { @@ -252,7 +226,7 @@ void BackgroundSync::_runProducer() { // we want to start when we're no longer primary // start() also loads _lastOpTimeFetched, which we know is set from the "if" auto txn = cc().makeOperationContext(); - if (isStopped()) { + if (getState() == ProducerState::Starting) { start(txn.get()); } @@ -287,12 +261,7 @@ void BackgroundSync::_produce(OperationContext* txn) { return; } - if (!_replCoord->isCatchingUp() && - (_replCoord->isWaitingForApplierToDrain() || _replCoord->getMemberState().primary())) { - return; - } - - if (_inShutdown_inlock()) { + if (_state != ProducerState::Running) { return; } } @@ -334,7 +303,8 @@ void BackgroundSync::_produce(OperationContext* txn) { if (syncSourceResp.syncSourceStatus == ErrorCodes::OplogStartMissing) { // All (accessible) sync sources were too stale. - if (_replCoord->isCatchingUp()) { + // TODO: End catchup mode early if we are too stale. + if (_replCoord->getMemberState().primary()) { warning() << "Too stale to catch up."; log() << "Our newest OpTime : " << lastOpTimeFetched; log() << "Earliest OpTime available is " << syncSourceResp.earliestOpTimeSeen @@ -374,14 +344,15 @@ void BackgroundSync::_produce(OperationContext* txn) { long long lastHashFetched; { stdx::lock_guard<stdx::mutex> lock(_mutex); - if (_stopped) { + if (_state != ProducerState::Running) { return; } lastOpTimeFetched = _lastOpTimeFetched; lastHashFetched = _lastFetchedHash; - if (!_replCoord->isCatchingUp()) { - _replCoord->signalUpstreamUpdater(); - } + } + + if (!_replCoord->getMemberState().primary()) { + _replCoord->signalUpstreamUpdater(); } // Set the applied point if unset. This is most likely the first time we've established a sync @@ -441,7 +412,10 @@ void BackgroundSync::_produce(OperationContext* txn) { // If the background sync is stopped after the fetcher is started, we need to // re-evaluate our sync source and oplog common point. - if (isStopped()) { + if (getState() != ProducerState::Running) { + log() << "Replication producer stopped after oplog fetcher finished returning a batch from " + "our sync source. Abandoning this batch of oplog entries and re-evaluating our " + "sync source."; return; } @@ -455,7 +429,8 @@ void BackgroundSync::_produce(OperationContext* txn) { return; } else if (fetcherReturnStatus.code() == ErrorCodes::OplogStartMissing || fetcherReturnStatus.code() == ErrorCodes::RemoteOplogStale) { - if (_replCoord->isCatchingUp()) { + if (_replCoord->getMemberState().primary()) { + // TODO: Abort catchup mode early if rollback detected. warning() << "Rollback situation detected in catch-up mode; catch-up mode will end."; sleepsecs(1); return; @@ -481,6 +456,9 @@ void BackgroundSync::_produce(OperationContext* txn) { log() << "Starting rollback due to " << redact(fetcherReturnStatus); + // TODO: change this to call into the Applier directly to block until the applier is + // drained. + // // Wait till all buffered oplog entries have drained and been applied. auto lastApplied = _replCoord->getMyLastAppliedOpTime(); if (lastApplied != lastOpTimeFetched) { @@ -488,14 +466,16 @@ void BackgroundSync::_produce(OperationContext* txn) { << lastOpTimeFetched << " to be applied before starting rollback."; while (lastOpTimeFetched > (lastApplied = _replCoord->getMyLastAppliedOpTime())) { sleepmillis(10); - if (isStopped() || inShutdown()) { + if (getState() != ProducerState::Running) { return; } } } _rollback(txn, source, syncSourceResp.rbid, getConnection); - stop(); + // Reset the producer to clear the sync source and the last optime fetched. + stop(true); + startProducerIfStopped(); } else if (fetcherReturnStatus == ErrorCodes::InvalidBSON) { Seconds blacklistDuration(60); warning() << "Fetcher got invalid BSON while querying oplog. Blacklisting sync source " @@ -571,7 +551,7 @@ Status BackgroundSync::_enqueueDocuments(Fetcher::Documents::const_iterator begi // buffer between the time we check _inShutdown and the point where we finish writing to the // buffer. stdx::unique_lock<stdx::mutex> lock(_mutex); - if (_inShutdown) { + if (_state != ProducerState::Running) { return Status::OK(); } @@ -734,20 +714,20 @@ void BackgroundSync::clearSyncTarget() { _syncSourceHost = HostAndPort(); } -void BackgroundSync::cancelFetcher() { - stdx::lock_guard<stdx::mutex> lock(_mutex); - if (_oplogFetcher) { - _oplogFetcher->shutdown(); - } -} - -void BackgroundSync::stop() { +void BackgroundSync::stop(bool resetLastFetchedOptime) { stdx::lock_guard<stdx::mutex> lock(_mutex); - _stopped = true; + _state = ProducerState::Stopped; _syncSourceHost = HostAndPort(); - _lastOpTimeFetched = OpTime(); - _lastFetchedHash = 0; + if (resetLastFetchedOptime) { + invariant(_oplogBuffer->isEmpty()); + _lastOpTimeFetched = OpTime(); + _lastFetchedHash = 0; + } + + if (_syncSourceResolver) { + _syncSourceResolver->shutdown(); + } if (_oplogFetcher) { _oplogFetcher->shutdown(); @@ -755,24 +735,33 @@ void BackgroundSync::stop() { } void BackgroundSync::start(OperationContext* txn) { - massert(16235, "going to start syncing, but buffer is not empty", _oplogBuffer->isEmpty()); - - long long lastFetchedHash = _readLastAppliedHash(txn); - stdx::lock_guard<stdx::mutex> lk(_mutex); - _stopped = false; + OpTimeWithHash lastAppliedOpTimeWithHash; + do { + lastAppliedOpTimeWithHash = _readLastAppliedOpTimeWithHash(txn); + stdx::lock_guard<stdx::mutex> lk(_mutex); + // Double check the state after acquiring the mutex. + if (_state != ProducerState::Starting) { + return; + } + // If a node steps down during drain mode, then the buffer may not be empty at the beginning + // of secondary state. + if (!_oplogBuffer->isEmpty()) { + log() << "going to start syncing, but buffer is not empty"; + } + _state = ProducerState::Running; - // reset _last fields with current oplog data - _lastOpTimeFetched = _replCoord->getMyLastAppliedOpTime(); - _lastFetchedHash = lastFetchedHash; + // When a node steps down during drain mode, the last fetched optime would be newer than + // the last applied. + if (_lastOpTimeFetched <= lastAppliedOpTimeWithHash.opTime) { + _lastOpTimeFetched = lastAppliedOpTimeWithHash.opTime; + _lastFetchedHash = lastAppliedOpTimeWithHash.value; + } + // Reload the last applied optime from disk if it has been changed. + } while (lastAppliedOpTimeWithHash.opTime != _replCoord->getMyLastAppliedOpTime()); LOG(1) << "bgsync fetch queue set to: " << _lastOpTimeFetched << " " << _lastFetchedHash; } -bool BackgroundSync::isStopped() const { - stdx::lock_guard<stdx::mutex> lock(_mutex); - return _stopped; -} - void BackgroundSync::clearBuffer(OperationContext* txn) { _oplogBuffer->clear(txn); const auto count = bufferCountGauge.get(); @@ -781,7 +770,7 @@ void BackgroundSync::clearBuffer(OperationContext* txn) { bufferSizeGauge.decrement(size); } -long long BackgroundSync::_readLastAppliedHash(OperationContext* txn) { +OpTimeWithHash BackgroundSync::_readLastAppliedOpTimeWithHash(OperationContext* txn) { BSONObj oplogEntry; try { MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { @@ -791,7 +780,7 @@ long long BackgroundSync::_readLastAppliedHash(OperationContext* txn) { if (!success) { // This can happen when we are to do an initial sync. lastHash will be set // after the initial sync is complete. - return 0; + return OpTimeWithHash(0); } } MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "readLastAppliedHash", rsOplogName); @@ -807,29 +796,13 @@ long long BackgroundSync::_readLastAppliedHash(OperationContext* txn) { << redact(status); fassertFailed(18902); } - return hash; + OplogEntry parsedEntry(oplogEntry); + return OpTimeWithHash(hash, parsedEntry.getOpTime()); } bool BackgroundSync::shouldStopFetching() const { - if (inShutdown()) { - LOG(2) << "Stopping oplog fetcher due to shutdown."; - return true; - } - - // If we are transitioning to primary state, we need to stop fetching in order to go into - // bgsync-stop mode. - if (_replCoord->isWaitingForApplierToDrain()) { - LOG(2) << "Stopping oplog fetcher because we are waiting for the applier to drain."; - return true; - } - - if (_replCoord->getMemberState().primary() && !_replCoord->isCatchingUp()) { - LOG(2) << "Stopping oplog fetcher because we are primary."; - return true; - } - // Check if we have been stopped. - if (isStopped()) { + if (getState() != ProducerState::Running) { LOG(2) << "Stopping oplog fetcher due to stop request."; return true; } @@ -850,5 +823,19 @@ void BackgroundSync::pushTestOpToBuffer(OperationContext* txn, const BSONObj& op bufferSizeGauge.increment(op.objsize()); } +BackgroundSync::ProducerState BackgroundSync::getState() const { + stdx::lock_guard<stdx::mutex> lock(_mutex); + return _state; +} + +void BackgroundSync::startProducerIfStopped() { + stdx::lock_guard<stdx::mutex> lock(_mutex); + // Let producer run if it's already running. + if (_state == ProducerState::Stopped) { + _state = ProducerState::Starting; + } +} + + } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/bgsync.h b/src/mongo/db/repl/bgsync.h index 8177509e049..479caabaee8 100644 --- a/src/mongo/db/repl/bgsync.h +++ b/src/mongo/db/repl/bgsync.h @@ -58,11 +58,28 @@ class BackgroundSync { MONGO_DISALLOW_COPYING(BackgroundSync); public: + /** + * Stopped -> Starting -> Running + * ^ | | + * |__________|____________| + * + * In normal cases: Stopped -> Starting -> Running -> Stopped. + * It is also possible to transition directly from Starting to Stopped. + * + * We need a separate Starting state since part of the startup process involves reading from + * disk and we want to do that disk I/O in the bgsync thread, rather than whatever thread calls + * start(). + */ + enum class ProducerState { Starting, Running, Stopped }; + BackgroundSync(ReplicationCoordinatorExternalState* replicationCoordinatorExternalState, std::unique_ptr<OplogBuffer> oplogBuffer); // stop syncing (when this node becomes a primary, e.g.) - void stop(); + // During stepdown, the last fetched optime is not reset in order to keep track of the lastest + // optime in the buffer. However, the last fetched optime has to be reset after initial sync or + // rollback. + void stop(bool resetLastFetchedOptime); /** * Starts oplog buffer, task executor and producer thread, in that order. @@ -85,8 +102,6 @@ public: */ bool inShutdown() const; - bool isStopped() const; - // starts the sync target notifying thread void notifierThread(); @@ -106,11 +121,6 @@ public: void clearBuffer(OperationContext* txn); /** - * Cancel existing find/getMore commands on the sync source's oplog collection. - */ - void cancelFetcher(); - - /** * Returns true if any of the following is true: * 1) We are shutting down; * 2) We are primary; @@ -119,7 +129,11 @@ public: */ bool shouldStopFetching() const; - // Testing related stuff + ProducerState getState() const; + // Starts the producer if it's stopped. Otherwise, let it keep running. + void startProducerIfStopped(); + + // Adds a fake oplog entry to buffer. Used for testing only. void pushTestOpToBuffer(OperationContext* txn, const BSONObj& op); private: @@ -137,14 +151,6 @@ private: void _produce(OperationContext* txn); /** - * Signals to the applier that we have no new data, - * and are in sync with the applier at this point. - * - * NOTE: Used after rollback and during draining to transition to Primary role; - */ - void _signalNoNewDataForApplier(OperationContext* txn); - - /** * Checks current background sync state before pushing operations into blocking queue and * updating metrics. If the queue is full, might block. * @@ -167,7 +173,7 @@ private: // restart syncing void start(OperationContext* txn); - long long _readLastAppliedHash(OperationContext* txn); + OpTimeWithHash _readLastAppliedOpTimeWithHash(OperationContext* txn); // Production thread std::unique_ptr<OplogBuffer> _oplogBuffer; @@ -179,6 +185,8 @@ private: ReplicationCoordinatorExternalState* _replicationCoordinatorExternalState; // _mutex protects all of the class variables declared below. + // + // Never hold bgsync mutex when trying to acquire the ReplicationCoordinator mutex. mutable stdx::mutex _mutex; OpTime _lastOpTimeFetched; @@ -193,8 +201,7 @@ private: // Set to true if shutdown() has been called. bool _inShutdown = false; - // if producer thread should not be running - bool _stopped = true; + ProducerState _state = ProducerState::Starting; HostAndPort _syncSourceHost; diff --git a/src/mongo/db/repl/replication_coordinator.h b/src/mongo/db/repl/replication_coordinator.h index a840b8e99ed..b70ac1906c1 100644 --- a/src/mongo/db/repl/replication_coordinator.h +++ b/src/mongo/db/repl/replication_coordinator.h @@ -393,18 +393,62 @@ public: virtual bool setFollowerMode(const MemberState& newState) = 0; /** - * Returns true if the coordinator wants the applier to pause application. + * Step-up + * ======= + * On stepup, repl coord enters catch-up mode. It's the same as the secondary mode from + * the perspective of producer and applier, so there's nothing to do with them. + * When a node enters drain mode, producer state = Stopped, applier state = Draining. * - * If this returns true, the applier should call signalDrainComplete() when it has - * completed draining its operation buffer and no further ops are being applied. + * If the applier state is Draining, it will signal repl coord when there's nothing to apply. + * The applier goes into Stopped state at the same time. + * + * The states go like the following: + * - secondary and during catchup mode + * (producer: Running, applier: Running) + * | + * | finish catch-up, enter drain mode + * V + * - drain mode + * (producer: Stopped, applier: Draining) + * | + * | applier signals drain is complete + * V + * - primary is in master mode + * (producer: Stopped, applier: Stopped) + * + * + * Step-down + * ========= + * The state transitions become: + * - primary is in master mode + * (producer: Stopped, applier: Stopped) + * | + * | step down + * V + * - secondary mode, starting bgsync + * (producer: Starting, applier: Running) + * | + * | bgsync runs start() + * V + * - secondary mode, normal + * (producer: Running, applier: Running) + * + * When a node steps down during draining mode, it's OK to change from (producer: Stopped, + * applier: Draining) to (producer: Starting, applier: Running). + * + * When a node steps down during catchup mode, the states remain the same (producer: Running, + * applier: Running). */ - virtual bool isWaitingForApplierToDrain() = 0; + enum class ApplierState { Running, Draining, Stopped }; /** - * A new primary tries to have its oplog catch up after winning an election. - * Return true if the coordinator is waiting for catch-up to finish. + * In normal cases: Running -> Draining -> Stopped -> Running. + * Draining -> Running is also possible if a node steps down during drain mode. + * + * Only the applier can make the transition from Draining to Stopped by calling + * signalDrainComplete(). */ - virtual bool isCatchingUp() = 0; + virtual ApplierState getApplierState() = 0; /** * Signals that a previously requested pause and drain of the applier buffer @@ -412,12 +456,18 @@ public: * * This is an interface that allows the applier to reenable writes after * a successful election triggers the draining of the applier buffer. + * + * The applier signals drain complete when the buffer is empty and it's in Draining + * state. We need to make sure the applier checks both conditions in the same term. + * Otherwise, it's possible that the applier confirms the empty buffer, but the node + * steps down and steps up so quickly that the applier signals drain complete in the wrong + * term. */ - virtual void signalDrainComplete(OperationContext* txn) = 0; + virtual void signalDrainComplete(OperationContext* txn, long long termWhenBufferIsEmpty) = 0; /** * Waits duration of 'timeout' for applier to finish draining its buffer of operations. - * Returns OK if isWaitingForApplierToDrain() returns false. + * Returns OK if we are not in drain mode. * Returns ErrorCodes::ExceededTimeLimit if we timed out waiting for the applier to drain its * buffer. * Returns ErrorCodes::BadValue if timeout is negative. diff --git a/src/mongo/db/repl/replication_coordinator_external_state.h b/src/mongo/db/repl/replication_coordinator_external_state.h index c995f30a468..66283518bfe 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state.h +++ b/src/mongo/db/repl/replication_coordinator_external_state.h @@ -255,9 +255,14 @@ public: virtual void signalApplierToChooseNewSyncSource() = 0; /** - * Notifies the bgsync to cancel the current oplog fetcher. + * Notifies the bgsync to stop fetching data. */ - virtual void signalApplierToCancelFetcher() = 0; + virtual void stopProducer() = 0; + + /** + * Start bgsync's producer if it's stopped. + */ + virtual void startProducerIfStopped() = 0; /** * Drops all snapshots and clears the "committed" snapshot. diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp index d7f08443d45..f5a96ed3123 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp @@ -808,12 +808,18 @@ void ReplicationCoordinatorExternalStateImpl::signalApplierToChooseNewSyncSource } } -void ReplicationCoordinatorExternalStateImpl::signalApplierToCancelFetcher() { +void ReplicationCoordinatorExternalStateImpl::stopProducer() { LockGuard lk(_threadMutex); - if (!_bgSync) { - return; + if (_bgSync) { + _bgSync->stop(false); + } +} + +void ReplicationCoordinatorExternalStateImpl::startProducerIfStopped() { + LockGuard lk(_threadMutex); + if (_bgSync) { + _bgSync->startProducerIfStopped(); } - _bgSync->cancelFetcher(); } void ReplicationCoordinatorExternalStateImpl::_dropAllTempCollections(OperationContext* txn) { diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.h b/src/mongo/db/repl/replication_coordinator_external_state_impl.h index 2bee57d3419..b0d9487eaaf 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.h +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.h @@ -94,7 +94,8 @@ public: virtual void killAllUserOperations(OperationContext* txn); virtual void shardingOnStepDownHook(); virtual void signalApplierToChooseNewSyncSource(); - virtual void signalApplierToCancelFetcher(); + virtual void stopProducer(); + virtual void startProducerIfStopped(); void dropAllSnapshots() final; void updateCommittedSnapshot(SnapshotName newCommitPoint) final; void createSnapshot(OperationContext* txn, SnapshotName name) final; diff --git a/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp b/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp index 5f305c067d6..274806326a6 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp @@ -55,7 +55,6 @@ ReplicationCoordinatorExternalStateMock::ReplicationCoordinatorExternalStateMock _storeLocalLastVoteDocumentStatus(Status::OK()), _storeLocalConfigDocumentShouldHang(false), _storeLocalLastVoteDocumentShouldHang(false), - _isApplierSignaledToCancelFetcher(false), _connectionsClosed(false), _threadsStarted(false) {} @@ -203,10 +202,6 @@ void ReplicationCoordinatorExternalStateMock::setStoreLocalConfigDocumentToHang( } } -bool ReplicationCoordinatorExternalStateMock::isApplierSignaledToCancelFetcher() const { - return _isApplierSignaledToCancelFetcher; -} - bool ReplicationCoordinatorExternalStateMock::threadsStarted() const { return _threadsStarted; } @@ -233,9 +228,9 @@ void ReplicationCoordinatorExternalStateMock::shardingOnStepDownHook() {} void ReplicationCoordinatorExternalStateMock::signalApplierToChooseNewSyncSource() {} -void ReplicationCoordinatorExternalStateMock::signalApplierToCancelFetcher() { - _isApplierSignaledToCancelFetcher = true; -} +void ReplicationCoordinatorExternalStateMock::stopProducer() {} + +void ReplicationCoordinatorExternalStateMock::startProducerIfStopped() {} void ReplicationCoordinatorExternalStateMock::dropAllSnapshots() {} diff --git a/src/mongo/db/repl/replication_coordinator_external_state_mock.h b/src/mongo/db/repl/replication_coordinator_external_state_mock.h index d35c269a332..1b575ede697 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_mock.h +++ b/src/mongo/db/repl/replication_coordinator_external_state_mock.h @@ -87,7 +87,8 @@ public: virtual void killAllUserOperations(OperationContext* txn); virtual void shardingOnStepDownHook(); virtual void signalApplierToChooseNewSyncSource(); - virtual void signalApplierToCancelFetcher(); + virtual void stopProducer(); + virtual void startProducerIfStopped(); virtual void dropAllSnapshots(); virtual void updateCommittedSnapshot(SnapshotName newCommitPoint); virtual void createSnapshot(OperationContext* txn, SnapshotName name); @@ -161,11 +162,6 @@ public: void setStoreLocalLastVoteDocumentToHang(bool hang); /** - * Returns true if applier was signaled to cancel fetcher. - */ - bool isApplierSignaledToCancelFetcher() const; - - /** * Returns true if startThreads() has been called. */ bool threadsStarted() const; @@ -211,7 +207,6 @@ private: stdx::condition_variable _shouldHangLastVoteCondVar; bool _storeLocalConfigDocumentShouldHang; bool _storeLocalLastVoteDocumentShouldHang; - bool _isApplierSignaledToCancelFetcher; bool _connectionsClosed; HostAndPort _clientHostAndPort; bool _threadsStarted; diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index 099f9462504..a6d936efc0c 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -322,7 +322,6 @@ ReplicationCoordinatorImpl::ReplicationCoordinatorImpl( _externalState(std::move(externalState)), _inShutdown(false), _memberState(MemberState::RS_STARTUP), - _isWaitingForDrainToComplete(false), _rsConfigState(kConfigPreStart), _selfIndex(-1), _sleptLastElection(false), @@ -884,18 +883,13 @@ void ReplicationCoordinatorImpl::_setFollowerModeFinish( _replExecutor.signalEvent(finishedSettingFollowerMode); } -bool ReplicationCoordinatorImpl::isWaitingForApplierToDrain() { +ReplicationCoordinator::ApplierState ReplicationCoordinatorImpl::getApplierState() { stdx::lock_guard<stdx::mutex> lk(_mutex); - return _isWaitingForDrainToComplete; + return _applierState; } -bool ReplicationCoordinatorImpl::isCatchingUp() { - stdx::lock_guard<stdx::mutex> lk(_mutex); - return _isCatchingUp; -} - - -void ReplicationCoordinatorImpl::signalDrainComplete(OperationContext* txn) { +void ReplicationCoordinatorImpl::signalDrainComplete(OperationContext* txn, + long long termWhenBufferIsEmpty) { // This logic is a little complicated in order to avoid acquiring the global exclusive lock // unnecessarily. This is important because the applier may call signalDrainComplete() // whenever it wants, not only when the ReplicationCoordinator is expecting it. @@ -905,7 +899,7 @@ void ReplicationCoordinatorImpl::signalDrainComplete(OperationContext* txn) { // 2.) Otherwise, release the mutex while acquiring the global exclusive lock, // since that might take a while (NB there's a deadlock cycle otherwise, too). // 3.) Re-check to see if we've somehow left drain mode. If we have not, clear - // _isWaitingForDrainToComplete, set the flag allowing non-local database writes and + // producer and applier's states, set the flag allowing non-local database writes and // drop the mutex. At this point, no writes can occur from other threads, due to the // global exclusive lock. // 4.) Drop all temp collections, and log the drops to the oplog. @@ -915,16 +909,15 @@ void ReplicationCoordinatorImpl::signalDrainComplete(OperationContext* txn) { // // Because replicatable writes are forbidden while in drain mode, and we don't exit drain // mode until we have the global exclusive lock, which forbids all other threads from making - // writes, we know that from the time that _isWaitingForDrainToComplete is set in - // _performPostMemberStateUpdateAction(kActionWinElection) until this method returns, no - // external writes will be processed. This is important so that a new temp collection isn't - // introduced on the new primary before we drop all the temp collections. + // writes, we know that from the time that _canAcceptNonLocalWrites is set until + // this method returns, no external writes will be processed. This is important so that a new + // temp collection isn't introduced on the new primary before we drop all the temp collections. // When we go to drop all temp collections, we must replicate the drops. invariant(txn->writesAreReplicated()); stdx::unique_lock<stdx::mutex> lk(_mutex); - if (!_isWaitingForDrainToComplete) { + if (_applierState != ApplierState::Draining) { return; } lk.unlock(); @@ -950,19 +943,14 @@ void ReplicationCoordinatorImpl::signalDrainComplete(OperationContext* txn) { Lock::GlobalWrite globalWriteLock(txn->lockState()); lk.lock(); - if (!_isWaitingForDrainToComplete) { + // Exit drain mode when the buffer is empty in the current term and we're in Draining mode. + if (_applierState != ApplierState::Draining || termWhenBufferIsEmpty != _cachedTerm) { return; } - invariant(!_isCatchingUp); - _isWaitingForDrainToComplete = false; + _applierState = ApplierState::Stopped; _drainFinishedCond.notify_all(); - if (!_getMemberState_inlock().primary()) { - // We must have decided not to transition to primary while waiting for the applier to drain. - // Skip the rest of this function since it should only be done when really transitioning. - return; - } - + invariant(_getMemberState_inlock().primary()); invariant(!_canAcceptNonLocalWrites); _canAcceptNonLocalWrites = true; @@ -984,7 +972,7 @@ Status ReplicationCoordinatorImpl::waitForDrainFinish(Milliseconds timeout) { } stdx::unique_lock<stdx::mutex> lk(_mutex); - auto pred = [this]() { return !_isCatchingUp && !_isWaitingForDrainToComplete; }; + auto pred = [this]() { return _applierState != ApplierState::Draining; }; if (!_drainFinishedCond.wait_for(lk, timeout.toSystemDuration(), pred)) { return Status(ErrorCodes::ExceededTimeLimit, "Timed out waiting to finish draining applier buffer"); @@ -2071,15 +2059,15 @@ void ReplicationCoordinatorImpl::fillIsMasterForReplSet(IsMasterResponse* respon _topCoord->fillIsMasterForReplSet(response); } - OpTime lastOpTime = getMyLastAppliedOpTime(); + LockGuard lock(_mutex); + OpTime lastOpTime = _getMyLastAppliedOpTime_inlock(); response->setLastWrite(lastOpTime, lastOpTime.getTimestamp().getSecs()); if (_currentCommittedSnapshot) { OpTime majorityOpTime = _currentCommittedSnapshot->opTime; response->setLastMajorityWrite(majorityOpTime, majorityOpTime.getTimestamp().getSecs()); } - - if (isWaitingForApplierToDrain() || isCatchingUp()) { - // Report that we are secondary to ismaster callers until drain completes. + // Report that we are secondary to ismaster callers until drain completes. + if (response->isMaster() && !_canAcceptNonLocalWrites) { response->setIsMaster(false); response->setIsSecondary(true); } @@ -2577,8 +2565,6 @@ ReplicationCoordinatorImpl::_updateMemberStateFromTopologyCoordinator_inlock() { _replicationWaiterList.signalAndRemoveAll_inlock(); // Wake up the optime waiter that is waiting for primary catch-up to finish. _opTimeWaiterList.signalAndRemoveAll_inlock(); - // _isCatchingUp and _isWaitingForDrainToComplete could be cleaned up asynchronously - // by freshness scan. _canAcceptNonLocalWrites = false; _stepDownPending = false; serverGlobalParams.featureCompatibility.validateFeaturesAsMaster.store(false); @@ -2587,6 +2573,12 @@ ReplicationCoordinatorImpl::_updateMemberStateFromTopologyCoordinator_inlock() { result = kActionFollowerModeStateChange; } + // Enable replication producer and applier on stepdown. + if (_memberState.primary()) { + _applierState = ApplierState::Running; + _externalState->startProducerIfStopped(); + } + if (_memberState.secondary() && !newState.primary()) { // Switching out of SECONDARY, but not to PRIMARY. _canServeNonLocalReads.store(0U); @@ -2681,9 +2673,6 @@ void ReplicationCoordinatorImpl::_performPostMemberStateUpdateAction( auto ts = LogicalClock::get(getServiceContext())->reserveTicks(1).asTimestamp(); _topCoord->processWinElection(_electionId, ts); - invariant(!_isCatchingUp); - invariant(!_isWaitingForDrainToComplete); - _isCatchingUp = true; const PostMemberStateUpdateAction nextAction = _updateMemberStateFromTopologyCoordinator_inlock(); invariant(nextAction != kActionWinElection); @@ -2695,7 +2684,7 @@ void ReplicationCoordinatorImpl::_performPostMemberStateUpdateAction( if (isV1ElectionProtocol()) { _scanOpTimeForCatchUp_inlock(); } else { - _finishCatchUpOplog_inlock(true); + _finishCatchingUpOplog_inlock(); } break; } @@ -2716,7 +2705,7 @@ void ReplicationCoordinatorImpl::_scanOpTimeForCatchUp_inlock() { auto evhStatus = scanner->start(&_replExecutor, _rsConfig, _selfIndex, _rsConfig.getCatchUpTimeoutPeriod()); if (evhStatus == ErrorCodes::ShutdownInProgress) { - _finishCatchUpOplog_inlock(true); + _finishCatchingUpOplog_inlock(); return; } fassertStatusOK(40254, evhStatus.getStatus()); @@ -2725,7 +2714,7 @@ void ReplicationCoordinatorImpl::_scanOpTimeForCatchUp_inlock() { evhStatus.getValue(), [this, scanner, scanStartTime, term](const CallbackArgs& cbData) { LockGuard lk(_mutex); if (cbData.status == ErrorCodes::CallbackCanceled) { - _finishCatchUpOplog_inlock(true); + _finishCatchingUpOplog_inlock(); return; } auto totalTimeout = _rsConfig.getCatchUpTimeoutPeriod(); @@ -2741,9 +2730,9 @@ void ReplicationCoordinatorImpl::_catchUpOplogToLatest_inlock(const FreshnessSca // Term is also checked in case the catchup timeout is so long that the node becomes primary // again. if (!_memberState.primary() || originalTerm != _cachedTerm) { + // If the node steps down during the catch-up, we don't go into drain mode. log() << "Stopped transition to primary of term " << originalTerm << " because I've already stepped down."; - _finishCatchUpOplog_inlock(false); return; } @@ -2754,7 +2743,7 @@ void ReplicationCoordinatorImpl::_catchUpOplogToLatest_inlock(const FreshnessSca log() << "Could not access any nodes within timeout when checking for " << "additional ops to apply before finishing transition to primary. " << "Will move forward with becoming primary anyway."; - _finishCatchUpOplog_inlock(true); + _finishCatchingUpOplog_inlock(); return; } @@ -2763,7 +2752,7 @@ void ReplicationCoordinatorImpl::_catchUpOplogToLatest_inlock(const FreshnessSca if (freshnessInfo.opTime <= _getMyLastAppliedOpTime_inlock()) { log() << "My optime is most up-to-date, skipping catch-up " << "and completing transition to primary."; - _finishCatchUpOplog_inlock(true); + _finishCatchingUpOplog_inlock(); return; } @@ -2776,7 +2765,7 @@ void ReplicationCoordinatorImpl::_catchUpOplogToLatest_inlock(const FreshnessSca log() << "Finished catch-up oplog after becoming primary."; } - _finishCatchUpOplog_inlock(true); + _finishCatchingUpOplog_inlock(); }; auto waiterInfo = std::make_shared<WaiterInfo>(freshnessInfo.opTime, finishCB); @@ -2791,18 +2780,9 @@ void ReplicationCoordinatorImpl::_catchUpOplogToLatest_inlock(const FreshnessSca _replExecutor.scheduleWorkAt(_replExecutor.now() + timeout, timeoutCB); } -void ReplicationCoordinatorImpl::_finishCatchUpOplog_inlock(bool startToDrain) { - invariant(_isCatchingUp); - _isCatchingUp = false; - // If the node steps down during the catch-up, we don't go into drain mode. - if (startToDrain) { - invariant(!_isWaitingForDrainToComplete); - _isWaitingForDrainToComplete = true; - // Signal applier in executor to avoid the deadlock with bgsync's mutex that is required to - // cancel fetcher. - _replExecutor.scheduleWork( - _wrapAsCallbackFn([this]() { _externalState->signalApplierToCancelFetcher(); })); - } +void ReplicationCoordinatorImpl::_finishCatchingUpOplog_inlock() { + _applierState = ApplierState::Draining; + _externalState->stopProducer(); } Status ReplicationCoordinatorImpl::processReplSetGetRBID(BSONObjBuilder* resultObj) { @@ -3079,7 +3059,8 @@ HostAndPort ReplicationCoordinatorImpl::chooseNewSyncSource(const OpTime& lastOp LockGuard topoLock(_topoMutex); HostAndPort oldSyncSource = _topCoord->getSyncSourceAddress(); - auto chainingPreference = isCatchingUp() + // Always allow chaining while in catchup and drain mode. + auto chainingPreference = getMemberState().primary() ? TopologyCoordinator::ChainingPreference::kAllowChaining : TopologyCoordinator::ChainingPreference::kUseConfiguration; HostAndPort newSyncSource = diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h index 9144e88b6b8..1e2543367a9 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.h +++ b/src/mongo/db/repl/replication_coordinator_impl.h @@ -174,11 +174,10 @@ public: virtual bool setFollowerMode(const MemberState& newState) override; - virtual bool isWaitingForApplierToDrain() override; + virtual ApplierState getApplierState() override; - virtual bool isCatchingUp() override; - - virtual void signalDrainComplete(OperationContext* txn) override; + virtual void signalDrainComplete(OperationContext* txn, + long long termWhenBufferIsEmpty) override; virtual Status waitForDrainFinish(Milliseconds timeout) override; @@ -1134,10 +1133,8 @@ private: long long originalTerm); /** * Finish catch-up mode and start drain mode. - * If "startToDrain" is true, the node enters drain mode. Otherwise, it goes back to secondary - * mode. */ - void _finishCatchUpOplog_inlock(bool startToDrain); + void _finishCatchingUpOplog_inlock(); /** * Waits for the config state to leave kConfigStartingUp, which indicates that start() has @@ -1247,11 +1244,7 @@ private: // Used to signal threads waiting for changes to _memberState. stdx::condition_variable _drainFinishedCond; // (M) - // True if we are waiting for the applier to finish draining. - bool _isWaitingForDrainToComplete; // (M) - - // True if we are waiting for oplog catch-up to finish. - bool _isCatchingUp = false; // (M) + ReplicationCoordinator::ApplierState _applierState = ApplierState::Running; // (M) // Used to signal threads waiting for changes to _rsConfigState. stdx::condition_variable _rsConfigStateChange; // (M) diff --git a/src/mongo/db/repl/replication_coordinator_impl_elect_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_elect_test.cpp index 0520bbad0f2..51817d9e0c2 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_elect_test.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_elect_test.cpp @@ -155,7 +155,7 @@ TEST_F(ReplCoordElectTest, ElectionSucceedsWhenNodeIsTheOnlyElectableNode) { ASSERT(getReplCoord()->getMemberState().primary()) << getReplCoord()->getMemberState().toString(); - ASSERT(getReplCoord()->isWaitingForApplierToDrain()); + ASSERT(getReplCoord()->getApplierState() == ReplicationCoordinator::ApplierState::Draining); const auto txnPtr = makeOperationContext(); auto& txn = *txnPtr; @@ -165,7 +165,7 @@ TEST_F(ReplCoordElectTest, ElectionSucceedsWhenNodeIsTheOnlyElectableNode) { getReplCoord()->fillIsMasterForReplSet(&imResponse); ASSERT_FALSE(imResponse.isMaster()) << imResponse.toBSON().toString(); ASSERT_TRUE(imResponse.isSecondary()) << imResponse.toBSON().toString(); - getReplCoord()->signalDrainComplete(&txn); + getReplCoord()->signalDrainComplete(&txn, getReplCoord()->getTerm()); getReplCoord()->fillIsMasterForReplSet(&imResponse); ASSERT_TRUE(imResponse.isMaster()) << imResponse.toBSON().toString(); ASSERT_FALSE(imResponse.isSecondary()) << imResponse.toBSON().toString(); @@ -189,7 +189,7 @@ TEST_F(ReplCoordElectTest, ElectionSucceedsWhenNodeIsTheOnlyNode) { ASSERT(getReplCoord()->getMemberState().primary()) << getReplCoord()->getMemberState().toString(); - ASSERT(getReplCoord()->isWaitingForApplierToDrain()); + ASSERT(getReplCoord()->getApplierState() == ReplicationCoordinator::ApplierState::Draining); const auto txnPtr = makeOperationContext(); auto& txn = *txnPtr; @@ -199,7 +199,7 @@ TEST_F(ReplCoordElectTest, ElectionSucceedsWhenNodeIsTheOnlyNode) { getReplCoord()->fillIsMasterForReplSet(&imResponse); ASSERT_FALSE(imResponse.isMaster()) << imResponse.toBSON().toString(); ASSERT_TRUE(imResponse.isSecondary()) << imResponse.toBSON().toString(); - getReplCoord()->signalDrainComplete(&txn); + getReplCoord()->signalDrainComplete(&txn, getReplCoord()->getTerm()); getReplCoord()->fillIsMasterForReplSet(&imResponse); ASSERT_TRUE(imResponse.isMaster()) << imResponse.toBSON().toString(); ASSERT_FALSE(imResponse.isSecondary()) << imResponse.toBSON().toString(); diff --git a/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp index 7bbe9000bc7..137c6483a6e 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp @@ -51,6 +51,7 @@ namespace { using executor::NetworkInterfaceMock; using executor::RemoteCommandRequest; using executor::RemoteCommandResponse; +using ApplierState = ReplicationCoordinator::ApplierState; TEST_F(ReplCoordTest, ElectionSucceedsWhenNodeIsTheOnlyElectableNode) { assertStartSuccess(BSON("_id" @@ -106,7 +107,7 @@ TEST_F(ReplCoordTest, ElectionSucceedsWhenNodeIsTheOnlyElectableNode) { ASSERT(getReplCoord()->getMemberState().primary()) << getReplCoord()->getMemberState().toString(); simulateCatchUpTimeout(); - ASSERT(getReplCoord()->isWaitingForApplierToDrain()); + ASSERT(getReplCoord()->getApplierState() == ApplierState::Draining); const auto txnPtr = makeOperationContext(); auto& txn = *txnPtr; @@ -116,7 +117,7 @@ TEST_F(ReplCoordTest, ElectionSucceedsWhenNodeIsTheOnlyElectableNode) { getReplCoord()->fillIsMasterForReplSet(&imResponse); ASSERT_FALSE(imResponse.isMaster()) << imResponse.toBSON().toString(); ASSERT_TRUE(imResponse.isSecondary()) << imResponse.toBSON().toString(); - getReplCoord()->signalDrainComplete(&txn); + getReplCoord()->signalDrainComplete(&txn, getReplCoord()->getTerm()); getReplCoord()->fillIsMasterForReplSet(&imResponse); ASSERT_TRUE(imResponse.isMaster()) << imResponse.toBSON().toString(); ASSERT_FALSE(imResponse.isSecondary()) << imResponse.toBSON().toString(); @@ -170,7 +171,7 @@ TEST_F(ReplCoordTest, ElectionSucceedsWhenNodeIsTheOnlyNode) { << getReplCoord()->getMemberState().toString(); // Wait for catchup check to finish. simulateCatchUpTimeout(); - ASSERT(getReplCoord()->isWaitingForApplierToDrain()); + ASSERT(getReplCoord()->getApplierState() == ApplierState::Draining); const auto txnPtr = makeOperationContext(); auto& txn = *txnPtr; @@ -180,7 +181,7 @@ TEST_F(ReplCoordTest, ElectionSucceedsWhenNodeIsTheOnlyNode) { getReplCoord()->fillIsMasterForReplSet(&imResponse); ASSERT_FALSE(imResponse.isMaster()) << imResponse.toBSON().toString(); ASSERT_TRUE(imResponse.isSecondary()) << imResponse.toBSON().toString(); - getReplCoord()->signalDrainComplete(&txn); + getReplCoord()->signalDrainComplete(&txn, getReplCoord()->getTerm()); getReplCoord()->fillIsMasterForReplSet(&imResponse); ASSERT_TRUE(imResponse.isMaster()) << imResponse.toBSON().toString(); ASSERT_FALSE(imResponse.isSecondary()) << imResponse.toBSON().toString(); @@ -1120,6 +1121,15 @@ protected: using NetworkOpIter = NetworkInterfaceMock::NetworkOperationIterator; using FreshnessScanFn = stdx::function<void(const NetworkOpIter)>; + void replyToHeartbeatRequestAsSecondaries(const NetworkOpIter noi) { + ReplicaSetConfig rsConfig = getReplCoord()->getReplicaSetConfig_forTest(); + ReplSetHeartbeatResponse hbResp; + hbResp.setSetName(rsConfig.getReplSetName()); + hbResp.setState(MemberState::RS_SECONDARY); + hbResp.setConfigVersion(rsConfig.getConfigVersion()); + getNet()->scheduleResponse(noi, getNet()->now(), makeResponseStatus(hbResp.toBSON(true))); + } + void simulateSuccessfulV1Voting() { ReplicationCoordinatorImpl* replCoord = getReplCoord(); NetworkInterfaceMock* net = getNet(); @@ -1128,32 +1138,25 @@ protected: ASSERT_NOT_EQUALS(Date_t(), electionTimeoutWhen); log() << "Election timeout scheduled at " << electionTimeoutWhen << " (simulator time)"; - ReplicaSetConfig rsConfig = replCoord->getReplicaSetConfig_forTest(); ASSERT(replCoord->getMemberState().secondary()) << replCoord->getMemberState().toString(); bool hasReadyRequests = true; // Process requests until we're primary and consume the heartbeats for the notification - // of election win. Exit immediately on catch up. - while (!replCoord->isCatchingUp() && - (!replCoord->getMemberState().primary() || hasReadyRequests)) { + // of election win. Exit immediately on unexpected requests. + while (!replCoord->getMemberState().primary() || hasReadyRequests) { log() << "Waiting on network in state " << replCoord->getMemberState(); - getNet()->enterNetwork(); + net->enterNetwork(); if (net->now() < electionTimeoutWhen) { net->runUntil(electionTimeoutWhen); } - const NetworkInterfaceMock::NetworkOperationIterator noi = net->getNextReadyRequest(); + // Peek the next request, don't consume it yet. + const NetworkOpIter noi = net->getFrontOfUnscheduledQueue(); const RemoteCommandRequest& request = noi->getRequest(); log() << request.target.toString() << " processing " << request.cmdObj; - ReplSetHeartbeatArgsV1 hbArgs; - Status status = hbArgs.initialize(request.cmdObj); - if (hbArgs.initialize(request.cmdObj).isOK()) { - ReplSetHeartbeatResponse hbResp; - hbResp.setSetName(rsConfig.getReplSetName()); - hbResp.setState(MemberState::RS_SECONDARY); - hbResp.setConfigVersion(rsConfig.getConfigVersion()); - net->scheduleResponse(noi, net->now(), makeResponseStatus(hbResp.toBSON(true))); + if (ReplSetHeartbeatArgsV1().initialize(request.cmdObj).isOK()) { + replyToHeartbeatRequestAsSecondaries(net->getNextReadyRequest()); } else if (request.cmdObj.firstElement().fieldNameStringData() == "replSetRequestVotes") { - net->scheduleResponse(noi, + net->scheduleResponse(net->getNextReadyRequest(), net->now(), makeResponseStatus(BSON("ok" << 1 << "reason" << "" @@ -1162,9 +1165,9 @@ protected: << "voteGranted" << true))); } else { - error() << "Black holing unexpected request to " << request.target << ": " - << request.cmdObj; - net->blackHole(noi); + // Stop the loop and let the caller handle unexpected requests. + net->exitNetwork(); + break; } net->runReadyNetworkOperations(); // Successful elections need to write the last vote to disk, which is done by DB worker. @@ -1173,7 +1176,7 @@ protected: getReplExec()->waitForDBWork_forTest(); net->runReadyNetworkOperations(); hasReadyRequests = net->hasReadyRequests(); - getNet()->exitNetwork(); + net->exitNetwork(); } } @@ -1220,9 +1223,11 @@ protected: while (net->hasReadyRequests()) { const NetworkInterfaceMock::NetworkOperationIterator noi = net->getNextReadyRequest(); const RemoteCommandRequest& request = noi->getRequest(); + log() << request.target.toString() << " processing " << request.cmdObj; if (request.cmdObj.firstElement().fieldNameStringData() == "replSetGetStatus") { - log() << request.target.toString() << " processing " << request.cmdObj; onFreshnessScanRequest(noi); + } else if (ReplSetHeartbeatArgsV1().initialize(request.cmdObj).isOK()) { + replyToHeartbeatRequestAsSecondaries(noi); } else { log() << "Black holing unexpected request to " << request.target << ": " << request.cmdObj; @@ -1232,6 +1237,28 @@ protected: } net->exitNetwork(); } + + void replyHeartbeatsAndRunUntil(Date_t until) { + auto net = getNet(); + net->enterNetwork(); + while (net->now() < until) { + while (net->hasReadyRequests()) { + // Peek the next request + auto noi = net->getFrontOfUnscheduledQueue(); + auto& request = noi->getRequest(); + if (ReplSetHeartbeatArgsV1().initialize(request.cmdObj).isOK()) { + // Consume the next request + replyToHeartbeatRequestAsSecondaries(net->getNextReadyRequest()); + } else { + // Cannot consume other requests than heartbeats. + net->exitNetwork(); + return; + } + } + net->runUntil(until); + } + net->exitNetwork(); + } }; TEST_F(PrimaryCatchUpTest, PrimaryDoNotNeedToCatchUp) { @@ -1242,11 +1269,11 @@ TEST_F(PrimaryCatchUpTest, PrimaryDoNotNeedToCatchUp) { processFreshnessScanRequests([this](const NetworkOpIter noi) { getNet()->scheduleResponse(noi, getNet()->now(), makeFreshnessScanResponse(OpTime())); }); - ASSERT(getReplCoord()->isWaitingForApplierToDrain()); + ASSERT(getReplCoord()->getApplierState() == ApplierState::Draining); stopCapturingLogMessages(); ASSERT_EQUALS(1, countLogLinesContaining("My optime is most up-to-date, skipping catch-up")); auto txn = makeOperationContext(); - getReplCoord()->signalDrainComplete(txn.get()); + getReplCoord()->signalDrainComplete(txn.get(), getReplCoord()->getTerm()); ASSERT_TRUE(getReplCoord()->canAcceptWritesForDatabase("test")); } @@ -1263,14 +1290,13 @@ TEST_F(PrimaryCatchUpTest, PrimaryFreshnessScanTimeout) { }); auto net = getNet(); - net->enterNetwork(); - net->runUntil(net->now() + config.getCatchUpTimeoutPeriod()); - net->exitNetwork(); - ASSERT(getReplCoord()->isWaitingForApplierToDrain()); + replyHeartbeatsAndRunUntil(net->now() + config.getCatchUpTimeoutPeriod()); + ASSERT_EQ((int)getReplCoord()->getApplierState(), (int)ApplierState::Draining); + ASSERT(getReplCoord()->getApplierState() == ApplierState::Draining); stopCapturingLogMessages(); ASSERT_EQUALS(1, countLogLinesContaining("Could not access any nodes within timeout")); auto txn = makeOperationContext(); - getReplCoord()->signalDrainComplete(txn.get()); + getReplCoord()->signalDrainComplete(txn.get(), getReplCoord()->getTerm()); ASSERT_TRUE(getReplCoord()->canAcceptWritesForDatabase("test")); } @@ -1288,18 +1314,18 @@ TEST_F(PrimaryCatchUpTest, PrimaryCatchUpSucceeds) { }); NetworkInterfaceMock* net = getNet(); - ASSERT(getReplCoord()->isCatchingUp()); + ASSERT(getReplCoord()->getApplierState() == ApplierState::Running); // Simulate the work done by bgsync and applier threads. // setMyLastAppliedOpTime() will signal the optime waiter. getReplCoord()->setMyLastAppliedOpTime(time2); net->enterNetwork(); net->runReadyNetworkOperations(); net->exitNetwork(); - ASSERT(getReplCoord()->isWaitingForApplierToDrain()); + ASSERT(getReplCoord()->getApplierState() == ApplierState::Draining); stopCapturingLogMessages(); ASSERT_EQUALS(1, countLogLinesContaining("Finished catch-up oplog after becoming primary.")); auto txn = makeOperationContext(); - getReplCoord()->signalDrainComplete(txn.get()); + getReplCoord()->signalDrainComplete(txn.get(), getReplCoord()->getTerm()); ASSERT_TRUE(getReplCoord()->canAcceptWritesForDatabase("test")); } @@ -1316,16 +1342,13 @@ TEST_F(PrimaryCatchUpTest, PrimaryCatchUpTimeout) { net->scheduleResponse(noi, net->now(), makeFreshnessScanResponse(time2)); }); - NetworkInterfaceMock* net = getNet(); - ASSERT(getReplCoord()->isCatchingUp()); - net->enterNetwork(); - net->runUntil(net->now() + config.getCatchUpTimeoutPeriod()); - net->exitNetwork(); - ASSERT(getReplCoord()->isWaitingForApplierToDrain()); + ASSERT(getReplCoord()->getApplierState() == ApplierState::Running); + replyHeartbeatsAndRunUntil(getNet()->now() + config.getCatchUpTimeoutPeriod()); + ASSERT(getReplCoord()->getApplierState() == ApplierState::Draining); stopCapturingLogMessages(); ASSERT_EQUALS(1, countLogLinesContaining("Cannot catch up oplog after becoming primary")); auto txn = makeOperationContext(); - getReplCoord()->signalDrainComplete(txn.get()); + getReplCoord()->signalDrainComplete(txn.get(), getReplCoord()->getTerm()); ASSERT_TRUE(getReplCoord()->canAcceptWritesForDatabase("test")); } @@ -1341,18 +1364,15 @@ TEST_F(PrimaryCatchUpTest, PrimaryStepsDownDuringFreshnessScan) { log() << "Black holing request to " << request.target << ": " << request.cmdObj; getNet()->blackHole(noi); }); - ASSERT(getReplCoord()->isCatchingUp()); + ASSERT(getReplCoord()->getApplierState() == ApplierState::Running); TopologyCoordinator::UpdateTermResult updateTermResult; auto evh = getReplCoord()->updateTerm_forTest(2, &updateTermResult); ASSERT_TRUE(evh.isValid()); getReplExec()->waitForEvent(evh); ASSERT_TRUE(getReplCoord()->getMemberState().secondary()); - auto net = getNet(); - net->enterNetwork(); - net->runUntil(net->now() + config.getCatchUpTimeoutPeriod()); - net->exitNetwork(); - ASSERT_FALSE(getReplCoord()->isWaitingForApplierToDrain()); + replyHeartbeatsAndRunUntil(getNet()->now() + config.getCatchUpTimeoutPeriod()); + ASSERT(getReplCoord()->getApplierState() == ApplierState::Running); stopCapturingLogMessages(); ASSERT_EQUALS(1, countLogLinesContaining("Stopped transition to primary")); ASSERT_FALSE(getReplCoord()->canAcceptWritesForDatabase("test")); @@ -1370,7 +1390,7 @@ TEST_F(PrimaryCatchUpTest, PrimaryStepsDownDuringCatchUp) { // The old primary accepted one more op and all nodes caught up after voting for me. net->scheduleResponse(noi, net->now(), makeFreshnessScanResponse(time2)); }); - ASSERT(getReplCoord()->isCatchingUp()); + ASSERT(getReplCoord()->getApplierState() == ApplierState::Running); TopologyCoordinator::UpdateTermResult updateTermResult; auto evh = getReplCoord()->updateTerm_forTest(2, &updateTermResult); @@ -1382,15 +1402,65 @@ TEST_F(PrimaryCatchUpTest, PrimaryStepsDownDuringCatchUp) { net->runReadyNetworkOperations(); net->exitNetwork(); auto txn = makeOperationContext(); - // Simulate bgsync signaling replCoord to exit drain mode. + // Simulate the applier signaling replCoord to exit drain mode. // At this point, we see the stepdown and reset the states. - getReplCoord()->signalDrainComplete(txn.get()); - ASSERT_FALSE(getReplCoord()->isWaitingForApplierToDrain()); + getReplCoord()->signalDrainComplete(txn.get(), getReplCoord()->getTerm()); + ASSERT(getReplCoord()->getApplierState() == ApplierState::Running); stopCapturingLogMessages(); ASSERT_EQUALS(1, countLogLinesContaining("Cannot catch up oplog after becoming primary")); ASSERT_FALSE(getReplCoord()->canAcceptWritesForDatabase("test")); } +TEST_F(PrimaryCatchUpTest, PrimaryStepsDownDuringDrainMode) { + startCapturingLogMessages(); + + OpTime time1(Timestamp(100, 1), 0); + OpTime time2(Timestamp(100, 2), 0); + ReplicaSetConfig config = setUp3NodeReplSetAndRunForElection(time1); + + processFreshnessScanRequests([this, time2](const NetworkOpIter noi) { + auto net = getNet(); + // The old primary accepted one more op and all nodes caught up after voting for me. + net->scheduleResponse(noi, net->now(), makeFreshnessScanResponse(time2)); + }); + + NetworkInterfaceMock* net = getNet(); + ReplicationCoordinatorImpl* replCoord = getReplCoord(); + ASSERT(getReplCoord()->getApplierState() == ApplierState::Running); + // Simulate the work done by bgsync and applier threads. + // setMyLastAppliedOpTime() will signal the optime waiter. + replCoord->setMyLastAppliedOpTime(time2); + net->enterNetwork(); + net->runReadyNetworkOperations(); + net->exitNetwork(); + ASSERT(replCoord->getApplierState() == ApplierState::Draining); + stopCapturingLogMessages(); + ASSERT_EQUALS(1, countLogLinesContaining("Finished catch-up oplog after becoming primary.")); + + // Step down during drain mode. + TopologyCoordinator::UpdateTermResult updateTermResult; + auto evh = replCoord->updateTerm_forTest(2, &updateTermResult); + ASSERT_TRUE(evh.isValid()); + getReplExec()->waitForEvent(evh); + ASSERT_TRUE(replCoord->getMemberState().secondary()); + + // Step up again + ASSERT(replCoord->getApplierState() == ApplierState::Running); + simulateSuccessfulV1Voting(); + ASSERT_TRUE(replCoord->getMemberState().primary()); + + // No need to catch-up, so we enter drain mode. + processFreshnessScanRequests([this](const NetworkOpIter noi) { + getNet()->scheduleResponse(noi, getNet()->now(), makeFreshnessScanResponse(OpTime())); + }); + ASSERT(replCoord->getApplierState() == ApplierState::Draining); + ASSERT_FALSE(replCoord->canAcceptWritesForDatabase("test")); + auto txn = makeOperationContext(); + replCoord->signalDrainComplete(txn.get(), replCoord->getTerm()); + ASSERT(replCoord->getApplierState() == ApplierState::Stopped); + ASSERT_TRUE(replCoord->canAcceptWritesForDatabase("test")); +} + } // namespace } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/replication_coordinator_impl_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_test.cpp index 9e8a92a531f..3fd13b767a4 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_test.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_test.cpp @@ -113,10 +113,10 @@ void runSingleNodeElection(ServiceContext::UniqueOperationContext txn, net->exitNetwork(); - ASSERT(replCoord->isWaitingForApplierToDrain()); + ASSERT(replCoord->getApplierState() == ReplicationCoordinator::ApplierState::Draining); ASSERT(replCoord->getMemberState().primary()) << replCoord->getMemberState().toString(); - replCoord->signalDrainComplete(txn.get()); + replCoord->signalDrainComplete(txn.get(), replCoord->getTerm()); } /** @@ -709,8 +709,6 @@ TEST_F(ReplCoordTest, NodeReturnsOkWhenRunningAwaitReplicationAgainstPrimaryWith ReplicationCoordinator::StatusAndDuration statusAndDur = getReplCoord()->awaitReplication(txn.get(), time, writeConcern); ASSERT_OK(statusAndDur.status); - - ASSERT_TRUE(getExternalState()->isApplierSignaledToCancelFetcher()); } TEST_F(ReplCoordTest, @@ -5067,13 +5065,13 @@ TEST_F(ReplCoordTest, WaitForDrainFinish) { auto timeout = Milliseconds(1); ASSERT_OK(replCoord->waitForMemberState(MemberState::RS_PRIMARY, timeout)); - ASSERT_TRUE(replCoord->isWaitingForApplierToDrain()); + ASSERT(replCoord->getApplierState() == ReplicationCoordinator::ApplierState::Draining); ASSERT_EQUALS(ErrorCodes::ExceededTimeLimit, replCoord->waitForDrainFinish(timeout)); ASSERT_EQUALS(ErrorCodes::BadValue, replCoord->waitForDrainFinish(Milliseconds(-1))); const auto txn = makeOperationContext(); - replCoord->signalDrainComplete(txn.get()); + replCoord->signalDrainComplete(txn.get(), replCoord->getTerm()); ASSERT_OK(replCoord->waitForDrainFinish(timeout)); // Zero timeout is fine. diff --git a/src/mongo/db/repl/replication_coordinator_mock.cpp b/src/mongo/db/repl/replication_coordinator_mock.cpp index fd41930ce28..38c4408eebc 100644 --- a/src/mongo/db/repl/replication_coordinator_mock.cpp +++ b/src/mongo/db/repl/replication_coordinator_mock.cpp @@ -210,15 +210,11 @@ bool ReplicationCoordinatorMock::setFollowerMode(const MemberState& newState) { return true; } -bool ReplicationCoordinatorMock::isWaitingForApplierToDrain() { - return false; -} - -bool ReplicationCoordinatorMock::isCatchingUp() { - return false; +ReplicationCoordinator::ApplierState ReplicationCoordinatorMock::getApplierState() { + return ApplierState::Running; } -void ReplicationCoordinatorMock::signalDrainComplete(OperationContext*) {} +void ReplicationCoordinatorMock::signalDrainComplete(OperationContext*, long long) {} Status ReplicationCoordinatorMock::waitForDrainFinish(Milliseconds timeout) { invariant(false); diff --git a/src/mongo/db/repl/replication_coordinator_mock.h b/src/mongo/db/repl/replication_coordinator_mock.h index b211007a8dd..5fed6539829 100644 --- a/src/mongo/db/repl/replication_coordinator_mock.h +++ b/src/mongo/db/repl/replication_coordinator_mock.h @@ -128,11 +128,9 @@ public: virtual bool setFollowerMode(const MemberState& newState); - virtual bool isWaitingForApplierToDrain(); + virtual ApplierState getApplierState(); - virtual bool isCatchingUp(); - - virtual void signalDrainComplete(OperationContext*); + virtual void signalDrainComplete(OperationContext*, long long); virtual Status waitForDrainFinish(Milliseconds timeout) override; diff --git a/src/mongo/db/repl/replication_coordinator_test_fixture.cpp b/src/mongo/db/repl/replication_coordinator_test_fixture.cpp index a8f74185d0d..24ecd3af4d0 100644 --- a/src/mongo/db/repl/replication_coordinator_test_fixture.cpp +++ b/src/mongo/db/repl/replication_coordinator_test_fixture.cpp @@ -352,7 +352,7 @@ void ReplCoordTest::simulateSuccessfulV1ElectionAt(Date_t electionTime) { hasReadyRequests = net->hasReadyRequests(); getNet()->exitNetwork(); } - ASSERT(replCoord->isWaitingForApplierToDrain()); + ASSERT(replCoord->getApplierState() == ReplicationCoordinator::ApplierState::Draining); ASSERT(replCoord->getMemberState().primary()) << replCoord->getMemberState().toString(); IsMasterResponse imResponse; @@ -361,8 +361,9 @@ void ReplCoordTest::simulateSuccessfulV1ElectionAt(Date_t electionTime) { ASSERT_TRUE(imResponse.isSecondary()) << imResponse.toBSON().toString(); { auto txn = makeOperationContext(); - replCoord->signalDrainComplete(txn.get()); + replCoord->signalDrainComplete(txn.get(), replCoord->getTerm()); } + ASSERT(replCoord->getApplierState() == ReplicationCoordinator::ApplierState::Stopped); replCoord->fillIsMasterForReplSet(&imResponse); ASSERT_TRUE(imResponse.isMaster()) << imResponse.toBSON().toString(); ASSERT_FALSE(imResponse.isSecondary()) << imResponse.toBSON().toString(); @@ -414,7 +415,7 @@ void ReplCoordTest::simulateSuccessfulElection() { hasReadyRequests = net->hasReadyRequests(); getNet()->exitNetwork(); } - ASSERT(replCoord->isWaitingForApplierToDrain()); + ASSERT(replCoord->getApplierState() == ReplicationCoordinator::ApplierState::Draining); ASSERT(replCoord->getMemberState().primary()) << replCoord->getMemberState().toString(); IsMasterResponse imResponse; @@ -423,7 +424,7 @@ void ReplCoordTest::simulateSuccessfulElection() { ASSERT_TRUE(imResponse.isSecondary()) << imResponse.toBSON().toString(); { auto txn = makeOperationContext(); - replCoord->signalDrainComplete(txn.get()); + replCoord->signalDrainComplete(txn.get(), replCoord->getTerm()); } replCoord->fillIsMasterForReplSet(&imResponse); ASSERT_TRUE(imResponse.isMaster()) << imResponse.toBSON().toString(); diff --git a/src/mongo/db/repl/rs_initialsync.cpp b/src/mongo/db/repl/rs_initialsync.cpp index 5c3ec06be96..073afcc73ec 100644 --- a/src/mongo/db/repl/rs_initialsync.cpp +++ b/src/mongo/db/repl/rs_initialsync.cpp @@ -102,7 +102,8 @@ void truncateAndResetOplog(OperationContext* txn, // We must clear the sync source blacklist after calling stop() // because the bgsync thread, while running, may update the blacklist. replCoord->resetMyLastOpTimes(); - bgsync->stop(); + bgsync->stop(true); + bgsync->startProducerIfStopped(); bgsync->clearBuffer(txn); replCoord->clearSyncSourceBlacklist(); diff --git a/src/mongo/db/repl/rs_sync.cpp b/src/mongo/db/repl/rs_sync.cpp index 5a1c98574d2..b04972c29e4 100644 --- a/src/mongo/db/repl/rs_sync.cpp +++ b/src/mongo/db/repl/rs_sync.cpp @@ -85,6 +85,7 @@ void RSDataSync::_run() { const MemberState memberState = _replCoord->getMemberState(); + // TODO(siyuan) Control the behavior using applier state. // An arbiter can never transition to any other state, and doesn't replicate, ever if (memberState.arbiter()) { break; @@ -97,8 +98,7 @@ void RSDataSync::_run() { } try { - if (memberState.primary() && !_replCoord->isWaitingForApplierToDrain() && - !_replCoord->isCatchingUp()) { + if (_replCoord->getApplierState() == ReplicationCoordinator::ApplierState::Stopped) { sleepsecs(1); continue; } diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp index 8f752b49e05..50fbfe820b3 100644 --- a/src/mongo/db/repl/sync_tail.cpp +++ b/src/mongo/db/repl/sync_tail.cpp @@ -723,17 +723,6 @@ private: while (!_syncTail->tryPopAndWaitForMore(&txn, &ops, batchLimits)) { } - // For pausing replication in tests. - while (MONGO_FAIL_POINT(rsSyncApplyStop)) { - // Tests should not trigger clean shutdown while that failpoint is active. If we - // think we need this, we need to think hard about what the behavior should be. - if (_syncTail->_networkQueue->inShutdown()) { - severe() << "Turn off rsSyncApplyStop before attempting clean shutdown"; - fassertFailedNoTrace(40304); - } - sleepmillis(10); - } - if (ops.empty() && !ops.mustShutdown()) { continue; // Don't emit empty batches. } @@ -774,8 +763,20 @@ void SyncTail::oplogApplication(ReplicationCoordinator* replCoord) { : new ApplyBatchFinalizer(replCoord)}; while (true) { // Exits on message from OpQueueBatcher. + // For pausing replication in tests. + while (MONGO_FAIL_POINT(rsSyncApplyStop)) { + // Tests should not trigger clean shutdown while that failpoint is active. If we + // think we need this, we need to think hard about what the behavior should be. + if (_networkQueue->inShutdown()) { + severe() << "Turn off rsSyncApplyStop before attempting clean shutdown"; + fassertFailedNoTrace(40304); + } + sleepmillis(10); + } + tryToGoLiveAsASecondary(&txn, replCoord); + long long termWhenBufferIsEmpty = replCoord->getTerm(); // Blocks up to a second waiting for a batch to be ready to apply. If one doesn't become // ready in time, we'll loop again so we can do the above checks periodically. OpQueue ops = batcher.getNextBatch(Seconds(1)); @@ -783,17 +784,12 @@ void SyncTail::oplogApplication(ReplicationCoordinator* replCoord) { if (ops.mustShutdown()) { return; } - continue; // Try again. - } - - if (ops.front().raw.isEmpty()) { - // This means that the network thread has coalesced and we have processed all of its - // data. - invariant(ops.getCount() == 1); - if (replCoord->isWaitingForApplierToDrain()) { - replCoord->signalDrainComplete(&txn); + if (MONGO_FAIL_POINT(rsSyncApplyStop)) { + continue; } - continue; // This wasn't a real op. Don't try to apply it. + // Signal drain complete if we're in Draining state and the buffer is empty. + replCoord->signalDrainComplete(&txn, termWhenBufferIsEmpty); + continue; // Try again. } // Extract some info from ops that we'll need after releasing the batch below. @@ -863,6 +859,12 @@ bool SyncTail::tryPopAndWaitForMore(OperationContext* txn, return true; // Return before wasting time parsing the op. } + // Don't consume the op if we are told to stop. + if (MONGO_FAIL_POINT(rsSyncApplyStop)) { + sleepmillis(10); + return true; + } + ops->emplace_back(std::move(op)); // Parses the op in-place. } @@ -1265,8 +1267,7 @@ StatusWith<OpTime> multiApply(OperationContext* txn, Lock::ParallelBatchWriterMode pbwm(txn->lockState()); auto replCoord = ReplicationCoordinator::get(txn); - if (replCoord->getMemberState().primary() && !replCoord->isWaitingForApplierToDrain() && - !replCoord->isCatchingUp()) { + if (replCoord->getApplierState() == ReplicationCoordinator::ApplierState::Stopped) { severe() << "attempting to replicate ops while primary"; return {ErrorCodes::CannotApplyOplogWhilePrimary, "attempting to replicate ops while primary"}; |