summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Russotto <matthew.russotto@10gen.com>2020-02-27 13:57:06 -0500
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-02-27 19:16:06 +0000
commitc278fcebdc161bc589fd4869a51019cfa1cdaf8c (patch)
tree9a44a758cb0700cd2acbb46f2c5765ec63f5d019
parent37c0df01bb74749e403d1ee3d396344227867918 (diff)
downloadmongo-c278fcebdc161bc589fd4869a51019cfa1cdaf8c.tar.gz
SERVER-39112 Remove 1-second delays when starting and stopping OplogApplier. Speeds up transition from primary drain mode to a writeable primary.
-rw-r--r--src/mongo/db/repl/bgsync.cpp23
-rw-r--r--src/mongo/db/repl/bgsync.h7
-rw-r--r--src/mongo/db/repl/oplog_buffer.h12
-rw-r--r--src/mongo/db/repl/oplog_buffer_blocking_queue.cpp22
-rw-r--r--src/mongo/db/repl/oplog_buffer_blocking_queue.h8
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.cpp10
6 files changed, 74 insertions, 8 deletions
diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp
index f5f48e093a1..33031a1df3b 100644
--- a/src/mongo/db/repl/bgsync.cpp
+++ b/src/mongo/db/repl/bgsync.cpp
@@ -182,7 +182,7 @@ void BackgroundSync::startup(OperationContext* opCtx) {
void BackgroundSync::shutdown(OperationContext* opCtx) {
stdx::lock_guard<stdx::mutex> lock(_mutex);
- _state = ProducerState::Stopped;
+ setState(lock, ProducerState::Stopped);
if (_syncSourceResolver) {
_syncSourceResolver->shutdown();
@@ -235,9 +235,13 @@ void BackgroundSync::_run() {
}
void BackgroundSync::_runProducer() {
- if (getState() == ProducerState::Stopped) {
- sleepsecs(1);
- return;
+ {
+ // This wait keeps us from spinning. We will re-check the condition in _produce(), so if
+ // the state changes after we release the lock, the behavior is still correct.
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ _stateCv.wait(lk, [&]() { return _inShutdown || _state != ProducerState::Stopped; });
+ if (_inShutdown)
+ return;
}
auto memberState = _replCoord->getMemberState();
@@ -748,7 +752,7 @@ void BackgroundSync::clearSyncTarget() {
void BackgroundSync::stop(bool resetLastFetchedOptime) {
stdx::lock_guard<stdx::mutex> lock(_mutex);
- _state = ProducerState::Stopped;
+ setState(lock, ProducerState::Stopped);
log() << "Stopping replication producer";
_syncSourceHost = HostAndPort();
@@ -787,7 +791,7 @@ void BackgroundSync::start(OperationContext* opCtx) {
if (!_oplogBuffer->isEmpty()) {
log() << "going to start syncing, but buffer is not empty";
}
- _state = ProducerState::Running;
+ setState(lk, ProducerState::Running);
// When a node steps down during drain mode, the last fetched optime would be newer than
// the last applied.
@@ -870,11 +874,16 @@ BackgroundSync::ProducerState BackgroundSync::getState() const {
return _state;
}
+void BackgroundSync::setState(WithLock, ProducerState newState) {
+ _state = newState;
+ _stateCv.notify_one();
+}
+
void BackgroundSync::startProducerIfStopped() {
stdx::lock_guard<stdx::mutex> lock(_mutex);
// Let producer run if it's already running.
if (_state == ProducerState::Stopped) {
- _state = ProducerState::Starting;
+ setState(lock, ProducerState::Starting);
}
}
diff --git a/src/mongo/db/repl/bgsync.h b/src/mongo/db/repl/bgsync.h
index 14b97b9b130..991d7ff4723 100644
--- a/src/mongo/db/repl/bgsync.h
+++ b/src/mongo/db/repl/bgsync.h
@@ -47,6 +47,7 @@
#include "mongo/stdx/functional.h"
#include "mongo/stdx/mutex.h"
#include "mongo/stdx/thread.h"
+#include "mongo/util/concurrency/with_lock.h"
#include "mongo/util/net/hostandport.h"
namespace mongo {
@@ -212,6 +213,9 @@ private:
// restart syncing
void start(OperationContext* opCtx);
+ // Set the state and notify the condition variable.
+ void setState(WithLock, ProducerState newState);
+
OpTimeWithHash _readLastAppliedOpTimeWithHash(OperationContext* opCtx);
// This OplogBuffer holds oplog entries fetched from the sync source.
@@ -252,6 +256,9 @@ private:
// Thread running producerThread().
std::unique_ptr<stdx::thread> _producerThread; // (M)
+ // Condition variable to notify of _state and _inShutdown changes.
+ stdx::condition_variable _stateCv; // (S)
+
// Set to true if shutdown() has been called.
bool _inShutdown = false; // (M)
diff --git a/src/mongo/db/repl/oplog_buffer.h b/src/mongo/db/repl/oplog_buffer.h
index b179dd2a71d..2f353cf6a2c 100644
--- a/src/mongo/db/repl/oplog_buffer.h
+++ b/src/mongo/db/repl/oplog_buffer.h
@@ -162,6 +162,18 @@ public:
* Returns the item most recently added to the oplog buffer or nothing if the buffer is empty.
*/
virtual boost::optional<Value> lastObjectPushed(OperationContext* opCtx) const = 0;
+
+ /**
+ * Enters "drain mode". May only be called by the producer. When the buffer is in drain mode,
+ * "waitForData" will return immediately even if there is data in the queue. This
+ * is an optimization and subclasses may choose not to implement this function.
+ */
+ virtual void enterDrainMode(){};
+
+ /**
+ * Leaves "drain mode". May only be called by the producer.
+ */
+ virtual void exitDrainMode(){};
};
} // namespace repl
diff --git a/src/mongo/db/repl/oplog_buffer_blocking_queue.cpp b/src/mongo/db/repl/oplog_buffer_blocking_queue.cpp
index a4ee96aa5d5..8d5aa562787 100644
--- a/src/mongo/db/repl/oplog_buffer_blocking_queue.cpp
+++ b/src/mongo/db/repl/oplog_buffer_blocking_queue.cpp
@@ -56,17 +56,23 @@ void OplogBufferBlockingQueue::shutdown(OperationContext* opCtx) {
}
void OplogBufferBlockingQueue::pushEvenIfFull(OperationContext*, const Value& value) {
+ invariant(!_drainMode);
_queue.pushEvenIfFull(value);
+ _notEmptyCv.notify_one();
}
void OplogBufferBlockingQueue::push(OperationContext*, const Value& value) {
+ invariant(!_drainMode);
_queue.push(value);
+ _notEmptyCv.notify_one();
}
void OplogBufferBlockingQueue::pushAllNonBlocking(OperationContext*,
Batch::const_iterator begin,
Batch::const_iterator end) {
+ invariant(!_drainMode);
_queue.pushAllNonBlocking(begin, end);
+ _notEmptyCv.notify_one();
}
void OplogBufferBlockingQueue::waitForSpace(OperationContext*, std::size_t size) {
@@ -99,7 +105,10 @@ bool OplogBufferBlockingQueue::tryPop(OperationContext*, Value* value) {
bool OplogBufferBlockingQueue::waitForData(Seconds waitDuration) {
Value ignored;
- return _queue.blockingPeek(ignored, static_cast<int>(durationCount<Seconds>(waitDuration)));
+ stdx::unique_lock<stdx::mutex> lk(_notEmptyMutex);
+ _notEmptyCv.wait_for(
+ lk, waitDuration.toSystemDuration(), [&] { return _drainMode || _queue.peek(ignored); });
+ return _queue.peek(ignored);
}
bool OplogBufferBlockingQueue::peek(OperationContext*, Value* value) {
@@ -111,5 +120,16 @@ boost::optional<OplogBuffer::Value> OplogBufferBlockingQueue::lastObjectPushed(
return _queue.lastObjectPushed();
}
+void OplogBufferBlockingQueue::enterDrainMode() {
+ stdx::lock_guard<stdx::mutex> lk(_notEmptyMutex);
+ _drainMode = true;
+ _notEmptyCv.notify_one();
+}
+
+void OplogBufferBlockingQueue::exitDrainMode() {
+ stdx::lock_guard<stdx::mutex> lk(_notEmptyMutex);
+ _drainMode = false;
+}
+
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/oplog_buffer_blocking_queue.h b/src/mongo/db/repl/oplog_buffer_blocking_queue.h
index 1e888fde0c5..c24daae96a9 100644
--- a/src/mongo/db/repl/oplog_buffer_blocking_queue.h
+++ b/src/mongo/db/repl/oplog_buffer_blocking_queue.h
@@ -61,7 +61,15 @@ public:
bool peek(OperationContext* opCtx, Value* value) override;
boost::optional<Value> lastObjectPushed(OperationContext* opCtx) const override;
+ // In drain mode, the queue does not block. It is the responsibility of the caller to ensure
+ // that no items are added to the queue while in drain mode; this is enforced by invariant().
+ void enterDrainMode() final;
+ void exitDrainMode() final;
+
private:
+ stdx::mutex _notEmptyMutex;
+ stdx::condition_variable _notEmptyCv;
+ bool _drainMode = false;
BlockingQueue<BSONObj> _queue;
};
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
index fc6f0399089..0cdde412198 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
@@ -474,6 +474,10 @@ Status ReplicationCoordinatorExternalStateImpl::initializeReplSetStorage(Operati
void ReplicationCoordinatorExternalStateImpl::onDrainComplete(OperationContext* opCtx) {
invariant(!opCtx->lockState()->isLocked());
+ if (_oplogBuffer) {
+ _oplogBuffer->exitDrainMode();
+ }
+
// If this is a config server node becoming a primary, ensure the balancer is ready to start.
if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) {
// We must ensure the balancer has stopped because it may still be in the process of
@@ -877,10 +881,16 @@ void ReplicationCoordinatorExternalStateImpl::stopProducer() {
if (_bgSync) {
_bgSync->stop(false);
}
+ if (_oplogBuffer) {
+ _oplogBuffer->enterDrainMode();
+ }
}
void ReplicationCoordinatorExternalStateImpl::startProducerIfStopped() {
LockGuard lk(_threadMutex);
+ if (_oplogBuffer) {
+ _oplogBuffer->exitDrainMode();
+ }
if (_bgSync) {
_bgSync->startProducerIfStopped();
}