diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/repl/bgsync.cpp | 23 | ||||
-rw-r--r-- | src/mongo/db/repl/bgsync.h | 6 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_buffer.h | 12 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_buffer_blocking_queue.cpp | 18 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_buffer_blocking_queue.h | 8 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_external_state_impl.cpp | 10 |
6 files changed, 69 insertions, 8 deletions
diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp index 8b2d5943ee6..94bd86706e3 100644 --- a/src/mongo/db/repl/bgsync.cpp +++ b/src/mongo/db/repl/bgsync.cpp @@ -168,7 +168,7 @@ void BackgroundSync::startup(OperationContext* opCtx) { void BackgroundSync::shutdown(OperationContext* opCtx) { stdx::lock_guard<Latch> lock(_mutex); - _state = ProducerState::Stopped; + setState(lock, ProducerState::Stopped); if (_syncSourceResolver) { _syncSourceResolver->shutdown(); @@ -227,9 +227,13 @@ void BackgroundSync::_run() { } void BackgroundSync::_runProducer() { - if (getState() == ProducerState::Stopped) { - sleepsecs(1); - return; + { + // This wait keeps us from spinning. We will re-check the condition in _produce(), so if + // the state changes after we release the lock, the behavior is still correct. + stdx::unique_lock<Latch> lk(_mutex); + _stateCv.wait(lk, [&]() { return _inShutdown || _state != ProducerState::Stopped; }); + if (_inShutdown) + return; } auto memberState = _replCoord->getMemberState(); @@ -810,7 +814,7 @@ void BackgroundSync::clearSyncTarget() { void BackgroundSync::stop(bool resetLastFetchedOptime) { stdx::lock_guard<Latch> lock(_mutex); - _state = ProducerState::Stopped; + setState(lock, ProducerState::Stopped); LOGV2(21107, "Stopping replication producer"); _syncSourceHost = HostAndPort(); @@ -848,7 +852,7 @@ void BackgroundSync::start(OperationContext* opCtx) { if (!_oplogApplier->getBuffer()->isEmpty()) { LOGV2(21109, "going to start syncing, but buffer is not empty"); } - _state = ProducerState::Running; + setState(lk, ProducerState::Running); // When a node steps down during drain mode, the last fetched optime would be newer than // the last applied. @@ -925,11 +929,16 @@ BackgroundSync::ProducerState BackgroundSync::getState() const { return _state; } +void BackgroundSync::setState(WithLock, ProducerState newState) { + _state = newState; + _stateCv.notify_one(); +} + void BackgroundSync::startProducerIfStopped() { stdx::lock_guard<Latch> lock(_mutex); // Let producer run if it's already running. if (_state == ProducerState::Stopped) { - _state = ProducerState::Starting; + setState(lock, ProducerState::Starting); } } diff --git a/src/mongo/db/repl/bgsync.h b/src/mongo/db/repl/bgsync.h index 2e96b1ae434..38b2df2d955 100644 --- a/src/mongo/db/repl/bgsync.h +++ b/src/mongo/db/repl/bgsync.h @@ -208,6 +208,9 @@ private: // restart syncing void start(OperationContext* opCtx); + // Set the state and notify the condition variable. + void setState(WithLock, ProducerState newState); + OpTime _readLastAppliedOpTime(OperationContext* opCtx); // This OplogApplier applies oplog entries fetched from the sync source. @@ -245,6 +248,9 @@ private: // Thread running producerThread(). std::unique_ptr<stdx::thread> _producerThread; // (M) + // Condition variable to notify of _state and _inShutdown changes. + stdx::condition_variable _stateCv; // (S) + // Set to true if shutdown() has been called. bool _inShutdown = false; // (M) diff --git a/src/mongo/db/repl/oplog_buffer.h b/src/mongo/db/repl/oplog_buffer.h index 41723fbdd34..674917d9904 100644 --- a/src/mongo/db/repl/oplog_buffer.h +++ b/src/mongo/db/repl/oplog_buffer.h @@ -152,6 +152,18 @@ public: * Returns the item most recently added to the oplog buffer or nothing if the buffer is empty. */ virtual boost::optional<Value> lastObjectPushed(OperationContext* opCtx) const = 0; + + /** + * Enters "drain mode". May only be called by the producer. When the buffer is in drain mode, + * "waitForData" will return immediately even if there is data in the queue. This + * is an optimization and subclasses may choose not to implement this function. + */ + virtual void enterDrainMode(){}; + + /** + * Leaves "drain mode". May only be called by the producer. + */ + virtual void exitDrainMode(){}; }; class OplogBuffer::Counters { diff --git a/src/mongo/db/repl/oplog_buffer_blocking_queue.cpp b/src/mongo/db/repl/oplog_buffer_blocking_queue.cpp index 38763679a06..86b7f906d1a 100644 --- a/src/mongo/db/repl/oplog_buffer_blocking_queue.cpp +++ b/src/mongo/db/repl/oplog_buffer_blocking_queue.cpp @@ -64,7 +64,9 @@ void OplogBufferBlockingQueue::shutdown(OperationContext* opCtx) { void OplogBufferBlockingQueue::push(OperationContext*, Batch::const_iterator begin, Batch::const_iterator end) { + invariant(!_drainMode); _queue.pushAllBlocking(begin, end); + _notEmptyCv.notify_one(); if (_counters) { for (auto i = begin; i != end; ++i) { @@ -112,7 +114,10 @@ bool OplogBufferBlockingQueue::tryPop(OperationContext*, Value* value) { bool OplogBufferBlockingQueue::waitForData(Seconds waitDuration) { Value ignored; - return _queue.blockingPeek(ignored, static_cast<int>(durationCount<Seconds>(waitDuration))); + stdx::unique_lock<Latch> lk(_notEmptyMutex); + _notEmptyCv.wait_for( + lk, waitDuration.toSystemDuration(), [&] { return _drainMode || _queue.peek(ignored); }); + return _queue.peek(ignored); } bool OplogBufferBlockingQueue::peek(OperationContext*, Value* value) { @@ -124,5 +129,16 @@ boost::optional<OplogBuffer::Value> OplogBufferBlockingQueue::lastObjectPushed( return _queue.lastObjectPushed(); } +void OplogBufferBlockingQueue::enterDrainMode() { + stdx::lock_guard<Latch> lk(_notEmptyMutex); + _drainMode = true; + _notEmptyCv.notify_one(); +} + +void OplogBufferBlockingQueue::exitDrainMode() { + stdx::lock_guard<Latch> lk(_notEmptyMutex); + _drainMode = false; +} + } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/oplog_buffer_blocking_queue.h b/src/mongo/db/repl/oplog_buffer_blocking_queue.h index 08ead40b4cd..164f30afc8f 100644 --- a/src/mongo/db/repl/oplog_buffer_blocking_queue.h +++ b/src/mongo/db/repl/oplog_buffer_blocking_queue.h @@ -59,7 +59,15 @@ public: bool peek(OperationContext* opCtx, Value* value) override; boost::optional<Value> lastObjectPushed(OperationContext* opCtx) const override; + // In drain mode, the queue does not block. It is the responsibility of the caller to ensure + // that no items are added to the queue while in drain mode; this is enforced by invariant(). + void enterDrainMode() final; + void exitDrainMode() final; + private: + Mutex _notEmptyMutex = MONGO_MAKE_LATCH("OplogBufferBlockingQueue::mutex"); + stdx::condition_variable _notEmptyCv; + bool _drainMode = false; Counters* const _counters; BlockingQueue<BSONObj> _queue; }; diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp index 5bd60f1b8d5..84f0cc084d1 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp @@ -463,6 +463,10 @@ void ReplicationCoordinatorExternalStateImpl::onDrainComplete(OperationContext* invariant(!opCtx->lockState()->isLocked()); invariant(!opCtx->shouldParticipateInFlowControl()); + if (_oplogBuffer) { + _oplogBuffer->exitDrainMode(); + } + // If this is a config server node becoming a primary, ensure the balancer is ready to start. if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) { // We must ensure the balancer has stopped because it may still be in the process of @@ -895,10 +899,16 @@ void ReplicationCoordinatorExternalStateImpl::stopProducer() { if (_bgSync) { _bgSync->stop(false); } + if (_oplogBuffer) { + _oplogBuffer->enterDrainMode(); + } } void ReplicationCoordinatorExternalStateImpl::startProducerIfStopped() { stdx::lock_guard<Latch> lk(_threadMutex); + if (_oplogBuffer) { + _oplogBuffer->exitDrainMode(); + } if (_bgSync) { _bgSync->startProducerIfStopped(); } |