summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Russotto <matthew.russotto@10gen.com>2020-02-27 10:34:53 -0500
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-02-27 17:01:52 +0000
commit869dbc8bd522338c2ce25e679c4772952c1f1fa6 (patch)
tree78ad2511765b49f8bfb186028a39260581508c45 /src
parentabcbc1ea9de96ac463720445c1e42080b417a8eb (diff)
downloadmongo-869dbc8bd522338c2ce25e679c4772952c1f1fa6.tar.gz
SERVER-39112 Remove 1-second delays when starting and stopping OplogApplier. Speeds up transition from primary drain mode to a writeable primary.
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/repl/bgsync.cpp23
-rw-r--r--src/mongo/db/repl/bgsync.h6
-rw-r--r--src/mongo/db/repl/oplog_buffer.h12
-rw-r--r--src/mongo/db/repl/oplog_buffer_blocking_queue.cpp18
-rw-r--r--src/mongo/db/repl/oplog_buffer_blocking_queue.h8
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.cpp10
6 files changed, 69 insertions, 8 deletions
diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp
index 8b2d5943ee6..94bd86706e3 100644
--- a/src/mongo/db/repl/bgsync.cpp
+++ b/src/mongo/db/repl/bgsync.cpp
@@ -168,7 +168,7 @@ void BackgroundSync::startup(OperationContext* opCtx) {
void BackgroundSync::shutdown(OperationContext* opCtx) {
stdx::lock_guard<Latch> lock(_mutex);
- _state = ProducerState::Stopped;
+ setState(lock, ProducerState::Stopped);
if (_syncSourceResolver) {
_syncSourceResolver->shutdown();
@@ -227,9 +227,13 @@ void BackgroundSync::_run() {
}
void BackgroundSync::_runProducer() {
- if (getState() == ProducerState::Stopped) {
- sleepsecs(1);
- return;
+ {
+ // This wait keeps us from spinning. We will re-check the condition in _produce(), so if
+ // the state changes after we release the lock, the behavior is still correct.
+ stdx::unique_lock<Latch> lk(_mutex);
+ _stateCv.wait(lk, [&]() { return _inShutdown || _state != ProducerState::Stopped; });
+ if (_inShutdown)
+ return;
}
auto memberState = _replCoord->getMemberState();
@@ -810,7 +814,7 @@ void BackgroundSync::clearSyncTarget() {
void BackgroundSync::stop(bool resetLastFetchedOptime) {
stdx::lock_guard<Latch> lock(_mutex);
- _state = ProducerState::Stopped;
+ setState(lock, ProducerState::Stopped);
LOGV2(21107, "Stopping replication producer");
_syncSourceHost = HostAndPort();
@@ -848,7 +852,7 @@ void BackgroundSync::start(OperationContext* opCtx) {
if (!_oplogApplier->getBuffer()->isEmpty()) {
LOGV2(21109, "going to start syncing, but buffer is not empty");
}
- _state = ProducerState::Running;
+ setState(lk, ProducerState::Running);
// When a node steps down during drain mode, the last fetched optime would be newer than
// the last applied.
@@ -925,11 +929,16 @@ BackgroundSync::ProducerState BackgroundSync::getState() const {
return _state;
}
+void BackgroundSync::setState(WithLock, ProducerState newState) {
+ _state = newState;
+ _stateCv.notify_one();
+}
+
void BackgroundSync::startProducerIfStopped() {
stdx::lock_guard<Latch> lock(_mutex);
// Let producer run if it's already running.
if (_state == ProducerState::Stopped) {
- _state = ProducerState::Starting;
+ setState(lock, ProducerState::Starting);
}
}
diff --git a/src/mongo/db/repl/bgsync.h b/src/mongo/db/repl/bgsync.h
index 2e96b1ae434..38b2df2d955 100644
--- a/src/mongo/db/repl/bgsync.h
+++ b/src/mongo/db/repl/bgsync.h
@@ -208,6 +208,9 @@ private:
// restart syncing
void start(OperationContext* opCtx);
+ // Set the state and notify the condition variable.
+ void setState(WithLock, ProducerState newState);
+
OpTime _readLastAppliedOpTime(OperationContext* opCtx);
// This OplogApplier applies oplog entries fetched from the sync source.
@@ -245,6 +248,9 @@ private:
// Thread running producerThread().
std::unique_ptr<stdx::thread> _producerThread; // (M)
+ // Condition variable to notify of _state and _inShutdown changes.
+ stdx::condition_variable _stateCv; // (S)
+
// Set to true if shutdown() has been called.
bool _inShutdown = false; // (M)
diff --git a/src/mongo/db/repl/oplog_buffer.h b/src/mongo/db/repl/oplog_buffer.h
index 41723fbdd34..674917d9904 100644
--- a/src/mongo/db/repl/oplog_buffer.h
+++ b/src/mongo/db/repl/oplog_buffer.h
@@ -152,6 +152,18 @@ public:
* Returns the item most recently added to the oplog buffer or nothing if the buffer is empty.
*/
virtual boost::optional<Value> lastObjectPushed(OperationContext* opCtx) const = 0;
+
+ /**
+ * Enters "drain mode". May only be called by the producer. When the buffer is in drain mode,
+ * "waitForData" will return immediately even if there is data in the queue. This
+ * is an optimization and subclasses may choose not to implement this function.
+ */
+ virtual void enterDrainMode(){};
+
+ /**
+ * Leaves "drain mode". May only be called by the producer.
+ */
+ virtual void exitDrainMode(){};
};
class OplogBuffer::Counters {
diff --git a/src/mongo/db/repl/oplog_buffer_blocking_queue.cpp b/src/mongo/db/repl/oplog_buffer_blocking_queue.cpp
index 38763679a06..86b7f906d1a 100644
--- a/src/mongo/db/repl/oplog_buffer_blocking_queue.cpp
+++ b/src/mongo/db/repl/oplog_buffer_blocking_queue.cpp
@@ -64,7 +64,9 @@ void OplogBufferBlockingQueue::shutdown(OperationContext* opCtx) {
void OplogBufferBlockingQueue::push(OperationContext*,
Batch::const_iterator begin,
Batch::const_iterator end) {
+ invariant(!_drainMode);
_queue.pushAllBlocking(begin, end);
+ _notEmptyCv.notify_one();
if (_counters) {
for (auto i = begin; i != end; ++i) {
@@ -112,7 +114,10 @@ bool OplogBufferBlockingQueue::tryPop(OperationContext*, Value* value) {
bool OplogBufferBlockingQueue::waitForData(Seconds waitDuration) {
Value ignored;
- return _queue.blockingPeek(ignored, static_cast<int>(durationCount<Seconds>(waitDuration)));
+ stdx::unique_lock<Latch> lk(_notEmptyMutex);
+ _notEmptyCv.wait_for(
+ lk, waitDuration.toSystemDuration(), [&] { return _drainMode || _queue.peek(ignored); });
+ return _queue.peek(ignored);
}
bool OplogBufferBlockingQueue::peek(OperationContext*, Value* value) {
@@ -124,5 +129,16 @@ boost::optional<OplogBuffer::Value> OplogBufferBlockingQueue::lastObjectPushed(
return _queue.lastObjectPushed();
}
+void OplogBufferBlockingQueue::enterDrainMode() {
+ stdx::lock_guard<Latch> lk(_notEmptyMutex);
+ _drainMode = true;
+ _notEmptyCv.notify_one();
+}
+
+void OplogBufferBlockingQueue::exitDrainMode() {
+ stdx::lock_guard<Latch> lk(_notEmptyMutex);
+ _drainMode = false;
+}
+
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/oplog_buffer_blocking_queue.h b/src/mongo/db/repl/oplog_buffer_blocking_queue.h
index 08ead40b4cd..164f30afc8f 100644
--- a/src/mongo/db/repl/oplog_buffer_blocking_queue.h
+++ b/src/mongo/db/repl/oplog_buffer_blocking_queue.h
@@ -59,7 +59,15 @@ public:
bool peek(OperationContext* opCtx, Value* value) override;
boost::optional<Value> lastObjectPushed(OperationContext* opCtx) const override;
+ // In drain mode, the queue does not block. It is the responsibility of the caller to ensure
+ // that no items are added to the queue while in drain mode; this is enforced by invariant().
+ void enterDrainMode() final;
+ void exitDrainMode() final;
+
private:
+ Mutex _notEmptyMutex = MONGO_MAKE_LATCH("OplogBufferBlockingQueue::mutex");
+ stdx::condition_variable _notEmptyCv;
+ bool _drainMode = false;
Counters* const _counters;
BlockingQueue<BSONObj> _queue;
};
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
index 5bd60f1b8d5..84f0cc084d1 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
@@ -463,6 +463,10 @@ void ReplicationCoordinatorExternalStateImpl::onDrainComplete(OperationContext*
invariant(!opCtx->lockState()->isLocked());
invariant(!opCtx->shouldParticipateInFlowControl());
+ if (_oplogBuffer) {
+ _oplogBuffer->exitDrainMode();
+ }
+
// If this is a config server node becoming a primary, ensure the balancer is ready to start.
if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) {
// We must ensure the balancer has stopped because it may still be in the process of
@@ -895,10 +899,16 @@ void ReplicationCoordinatorExternalStateImpl::stopProducer() {
if (_bgSync) {
_bgSync->stop(false);
}
+ if (_oplogBuffer) {
+ _oplogBuffer->enterDrainMode();
+ }
}
void ReplicationCoordinatorExternalStateImpl::startProducerIfStopped() {
stdx::lock_guard<Latch> lk(_threadMutex);
+ if (_oplogBuffer) {
+ _oplogBuffer->exitDrainMode();
+ }
if (_bgSync) {
_bgSync->startProducerIfStopped();
}