diff options
author | Matthew Russotto <matthew.russotto@10gen.com> | 2020-02-27 13:57:06 -0500 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-02-27 19:16:06 +0000 |
commit | c278fcebdc161bc589fd4869a51019cfa1cdaf8c (patch) | |
tree | 9a44a758cb0700cd2acbb46f2c5765ec63f5d019 | |
parent | 37c0df01bb74749e403d1ee3d396344227867918 (diff) | |
download | mongo-c278fcebdc161bc589fd4869a51019cfa1cdaf8c.tar.gz |
SERVER-39112 Remove 1-second delays when starting and stopping OplogApplier. Speeds up transition from primary drain mode to a writeable primary.
-rw-r--r-- | src/mongo/db/repl/bgsync.cpp | 23 | ||||
-rw-r--r-- | src/mongo/db/repl/bgsync.h | 7 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_buffer.h | 12 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_buffer_blocking_queue.cpp | 22 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_buffer_blocking_queue.h | 8 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_external_state_impl.cpp | 10 |
6 files changed, 74 insertions, 8 deletions
diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp index f5f48e093a1..33031a1df3b 100644 --- a/src/mongo/db/repl/bgsync.cpp +++ b/src/mongo/db/repl/bgsync.cpp @@ -182,7 +182,7 @@ void BackgroundSync::startup(OperationContext* opCtx) { void BackgroundSync::shutdown(OperationContext* opCtx) { stdx::lock_guard<stdx::mutex> lock(_mutex); - _state = ProducerState::Stopped; + setState(lock, ProducerState::Stopped); if (_syncSourceResolver) { _syncSourceResolver->shutdown(); @@ -235,9 +235,13 @@ void BackgroundSync::_run() { } void BackgroundSync::_runProducer() { - if (getState() == ProducerState::Stopped) { - sleepsecs(1); - return; + { + // This wait keeps us from spinning. We will re-check the condition in _produce(), so if + // the state changes after we release the lock, the behavior is still correct. + stdx::unique_lock<stdx::mutex> lk(_mutex); + _stateCv.wait(lk, [&]() { return _inShutdown || _state != ProducerState::Stopped; }); + if (_inShutdown) + return; } auto memberState = _replCoord->getMemberState(); @@ -748,7 +752,7 @@ void BackgroundSync::clearSyncTarget() { void BackgroundSync::stop(bool resetLastFetchedOptime) { stdx::lock_guard<stdx::mutex> lock(_mutex); - _state = ProducerState::Stopped; + setState(lock, ProducerState::Stopped); log() << "Stopping replication producer"; _syncSourceHost = HostAndPort(); @@ -787,7 +791,7 @@ void BackgroundSync::start(OperationContext* opCtx) { if (!_oplogBuffer->isEmpty()) { log() << "going to start syncing, but buffer is not empty"; } - _state = ProducerState::Running; + setState(lk, ProducerState::Running); // When a node steps down during drain mode, the last fetched optime would be newer than // the last applied. @@ -870,11 +874,16 @@ BackgroundSync::ProducerState BackgroundSync::getState() const { return _state; } +void BackgroundSync::setState(WithLock, ProducerState newState) { + _state = newState; + _stateCv.notify_one(); +} + void BackgroundSync::startProducerIfStopped() { stdx::lock_guard<stdx::mutex> lock(_mutex); // Let producer run if it's already running. if (_state == ProducerState::Stopped) { - _state = ProducerState::Starting; + setState(lock, ProducerState::Starting); } } diff --git a/src/mongo/db/repl/bgsync.h b/src/mongo/db/repl/bgsync.h index 14b97b9b130..991d7ff4723 100644 --- a/src/mongo/db/repl/bgsync.h +++ b/src/mongo/db/repl/bgsync.h @@ -47,6 +47,7 @@ #include "mongo/stdx/functional.h" #include "mongo/stdx/mutex.h" #include "mongo/stdx/thread.h" +#include "mongo/util/concurrency/with_lock.h" #include "mongo/util/net/hostandport.h" namespace mongo { @@ -212,6 +213,9 @@ private: // restart syncing void start(OperationContext* opCtx); + // Set the state and notify the condition variable. + void setState(WithLock, ProducerState newState); + OpTimeWithHash _readLastAppliedOpTimeWithHash(OperationContext* opCtx); // This OplogBuffer holds oplog entries fetched from the sync source. @@ -252,6 +256,9 @@ private: // Thread running producerThread(). std::unique_ptr<stdx::thread> _producerThread; // (M) + // Condition variable to notify of _state and _inShutdown changes. + stdx::condition_variable _stateCv; // (S) + // Set to true if shutdown() has been called. bool _inShutdown = false; // (M) diff --git a/src/mongo/db/repl/oplog_buffer.h b/src/mongo/db/repl/oplog_buffer.h index b179dd2a71d..2f353cf6a2c 100644 --- a/src/mongo/db/repl/oplog_buffer.h +++ b/src/mongo/db/repl/oplog_buffer.h @@ -162,6 +162,18 @@ public: * Returns the item most recently added to the oplog buffer or nothing if the buffer is empty. */ virtual boost::optional<Value> lastObjectPushed(OperationContext* opCtx) const = 0; + + /** + * Enters "drain mode". May only be called by the producer. When the buffer is in drain mode, + * "waitForData" will return immediately even if there is data in the queue. This + * is an optimization and subclasses may choose not to implement this function. + */ + virtual void enterDrainMode(){}; + + /** + * Leaves "drain mode". May only be called by the producer. + */ + virtual void exitDrainMode(){}; }; } // namespace repl diff --git a/src/mongo/db/repl/oplog_buffer_blocking_queue.cpp b/src/mongo/db/repl/oplog_buffer_blocking_queue.cpp index a4ee96aa5d5..8d5aa562787 100644 --- a/src/mongo/db/repl/oplog_buffer_blocking_queue.cpp +++ b/src/mongo/db/repl/oplog_buffer_blocking_queue.cpp @@ -56,17 +56,23 @@ void OplogBufferBlockingQueue::shutdown(OperationContext* opCtx) { } void OplogBufferBlockingQueue::pushEvenIfFull(OperationContext*, const Value& value) { + invariant(!_drainMode); _queue.pushEvenIfFull(value); + _notEmptyCv.notify_one(); } void OplogBufferBlockingQueue::push(OperationContext*, const Value& value) { + invariant(!_drainMode); _queue.push(value); + _notEmptyCv.notify_one(); } void OplogBufferBlockingQueue::pushAllNonBlocking(OperationContext*, Batch::const_iterator begin, Batch::const_iterator end) { + invariant(!_drainMode); _queue.pushAllNonBlocking(begin, end); + _notEmptyCv.notify_one(); } void OplogBufferBlockingQueue::waitForSpace(OperationContext*, std::size_t size) { @@ -99,7 +105,10 @@ bool OplogBufferBlockingQueue::tryPop(OperationContext*, Value* value) { bool OplogBufferBlockingQueue::waitForData(Seconds waitDuration) { Value ignored; - return _queue.blockingPeek(ignored, static_cast<int>(durationCount<Seconds>(waitDuration))); + stdx::unique_lock<stdx::mutex> lk(_notEmptyMutex); + _notEmptyCv.wait_for( + lk, waitDuration.toSystemDuration(), [&] { return _drainMode || _queue.peek(ignored); }); + return _queue.peek(ignored); } bool OplogBufferBlockingQueue::peek(OperationContext*, Value* value) { @@ -111,5 +120,16 @@ boost::optional<OplogBuffer::Value> OplogBufferBlockingQueue::lastObjectPushed( return _queue.lastObjectPushed(); } +void OplogBufferBlockingQueue::enterDrainMode() { + stdx::lock_guard<stdx::mutex> lk(_notEmptyMutex); + _drainMode = true; + _notEmptyCv.notify_one(); +} + +void OplogBufferBlockingQueue::exitDrainMode() { + stdx::lock_guard<stdx::mutex> lk(_notEmptyMutex); + _drainMode = false; +} + } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/oplog_buffer_blocking_queue.h b/src/mongo/db/repl/oplog_buffer_blocking_queue.h index 1e888fde0c5..c24daae96a9 100644 --- a/src/mongo/db/repl/oplog_buffer_blocking_queue.h +++ b/src/mongo/db/repl/oplog_buffer_blocking_queue.h @@ -61,7 +61,15 @@ public: bool peek(OperationContext* opCtx, Value* value) override; boost::optional<Value> lastObjectPushed(OperationContext* opCtx) const override; + // In drain mode, the queue does not block. It is the responsibility of the caller to ensure + // that no items are added to the queue while in drain mode; this is enforced by invariant(). + void enterDrainMode() final; + void exitDrainMode() final; + private: + stdx::mutex _notEmptyMutex; + stdx::condition_variable _notEmptyCv; + bool _drainMode = false; BlockingQueue<BSONObj> _queue; }; diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp index fc6f0399089..0cdde412198 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp @@ -474,6 +474,10 @@ Status ReplicationCoordinatorExternalStateImpl::initializeReplSetStorage(Operati void ReplicationCoordinatorExternalStateImpl::onDrainComplete(OperationContext* opCtx) { invariant(!opCtx->lockState()->isLocked()); + if (_oplogBuffer) { + _oplogBuffer->exitDrainMode(); + } + // If this is a config server node becoming a primary, ensure the balancer is ready to start. if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) { // We must ensure the balancer has stopped because it may still be in the process of @@ -877,10 +881,16 @@ void ReplicationCoordinatorExternalStateImpl::stopProducer() { if (_bgSync) { _bgSync->stop(false); } + if (_oplogBuffer) { + _oplogBuffer->enterDrainMode(); + } } void ReplicationCoordinatorExternalStateImpl::startProducerIfStopped() { LockGuard lk(_threadMutex); + if (_oplogBuffer) { + _oplogBuffer->exitDrainMode(); + } if (_bgSync) { _bgSync->startProducerIfStopped(); } |