summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Russotto <matthew.russotto@mongodb.com>2022-04-22 13:00:48 -0400
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-06-22 15:24:30 +0000
commitf60e7b0978053aa7908d7e7544e1bb14bd7f9b22 (patch)
tree7695e00337cb981fd5ddddcd4607c27c26dd9878
parent4ea21c1a749099fca07cad1fa5d76c0acaa4b030 (diff)
downloadmongo-f60e7b0978053aa7908d7e7544e1bb14bd7f9b22.tar.gz
SERVER-65723 Enhance waiting API of oplog buffer.
Add millisecond granularity, deadlines, and Interruptibles (cherry picked from commit 6d128d3ac2197f120a8a5927e31c56bb6ab9d328)
-rw-r--r--src/mongo/db/repl/oplog_batcher_test_fixture.cpp14
-rw-r--r--src/mongo/db/repl/oplog_batcher_test_fixture.h3
-rw-r--r--src/mongo/db/repl/oplog_buffer.h25
-rw-r--r--src/mongo/db/repl/oplog_buffer_blocking_queue.cpp15
-rw-r--r--src/mongo/db/repl/oplog_buffer_blocking_queue.h3
-rw-r--r--src/mongo/db/repl/oplog_buffer_collection.cpp16
-rw-r--r--src/mongo/db/repl/oplog_buffer_collection.h3
-rw-r--r--src/mongo/db/repl/oplog_buffer_proxy.cpp14
-rw-r--r--src/mongo/db/repl/oplog_buffer_proxy.h3
-rw-r--r--src/mongo/db/repl/oplog_buffer_proxy_test.cpp26
-rw-r--r--src/mongo/db/repl/replication_recovery.cpp5
11 files changed, 108 insertions, 19 deletions
diff --git a/src/mongo/db/repl/oplog_batcher_test_fixture.cpp b/src/mongo/db/repl/oplog_batcher_test_fixture.cpp
index 304398e21d4..8feae33d868 100644
--- a/src/mongo/db/repl/oplog_batcher_test_fixture.cpp
+++ b/src/mongo/db/repl/oplog_batcher_test_fixture.cpp
@@ -109,11 +109,17 @@ bool OplogBufferMock::tryPop(OperationContext* opCtx, Value* value) {
return true;
}
-bool OplogBufferMock::waitForData(Seconds waitDuration) {
+bool OplogBufferMock::waitForDataFor(Milliseconds waitDuration, Interruptible* interruptible) {
stdx::unique_lock<Latch> lk(_mutex);
- _notEmptyCv.wait_for(lk, waitDuration.toSystemDuration(), [&] {
- return _hasShutDown || _curIndex < _data.size();
- });
+ interruptible->waitForConditionOrInterruptFor(
+ _notEmptyCv, lk, waitDuration, [&] { return _hasShutDown || _curIndex < _data.size(); });
+ return _curIndex < _data.size();
+}
+
+bool OplogBufferMock::waitForDataUntil(Date_t deadline, Interruptible* interruptible) {
+ stdx::unique_lock<Latch> lk(_mutex);
+ interruptible->waitForConditionOrInterruptUntil(
+ _notEmptyCv, lk, deadline, [&] { return _hasShutDown || _curIndex < _data.size(); });
return _curIndex < _data.size();
}
diff --git a/src/mongo/db/repl/oplog_batcher_test_fixture.h b/src/mongo/db/repl/oplog_batcher_test_fixture.h
index c4fb377f98b..6983492c249 100644
--- a/src/mongo/db/repl/oplog_batcher_test_fixture.h
+++ b/src/mongo/db/repl/oplog_batcher_test_fixture.h
@@ -58,7 +58,8 @@ public:
std::size_t getCount() const final;
void clear(OperationContext* opCtx) final;
bool tryPop(OperationContext* opCtx, Value* value) final;
- bool waitForData(Seconds waitDuration) final;
+ bool waitForDataFor(Milliseconds waitDuration, Interruptible* interruptible) final;
+ bool waitForDataUntil(Date_t deadline, Interruptible* interruptible) final;
bool peek(OperationContext* opCtx, Value* value) final;
boost::optional<Value> lastObjectPushed(OperationContext* opCtx) const final;
StatusWith<Value> findByTimestamp(OperationContext* opCtx, const Timestamp& ts) final;
diff --git a/src/mongo/db/repl/oplog_buffer.h b/src/mongo/db/repl/oplog_buffer.h
index 96faaf7f37f..2e039cb1f91 100644
--- a/src/mongo/db/repl/oplog_buffer.h
+++ b/src/mongo/db/repl/oplog_buffer.h
@@ -35,6 +35,7 @@
#include "mongo/base/counter.h"
#include "mongo/bson/bsonobj.h"
+#include "mongo/util/interruptible.h"
#include "mongo/util/time_support.h"
namespace mongo {
@@ -136,11 +137,33 @@ public:
virtual bool tryPop(OperationContext* opCtx, Value* value) = 0;
/**
+ * Waits uninterruptibly for "waitDuration" for an operation to be pushed into the oplog buffer.
+ * Returns false if oplog buffer is still empty after "waitDuration".
+ * Otherwise, returns true.
+ */
+ bool waitForData(Seconds waitDuration) {
+ return waitForDataFor(duration_cast<Milliseconds>(waitDuration),
+ Interruptible::notInterruptible());
+ };
+
+ /**
+ * Interruptible wait with millisecond granularity.
+ *
* Waits "waitDuration" for an operation to be pushed into the oplog buffer.
* Returns false if oplog buffer is still empty after "waitDuration".
* Otherwise, returns true.
+ * Throws if the interruptible is interrupted.
+ */
+ virtual bool waitForDataFor(
+ Milliseconds waitDuration,
+ Interruptible* interruptible = Interruptible::notInterruptible()) = 0;
+
+ /**
+ * Same as waitForDataFor(Milliseconds, Interruptible) above but takes a deadline instead
+ * of a duration.
*/
- virtual bool waitForData(Seconds waitDuration) = 0;
+ virtual bool waitForDataUntil(
+ Date_t deadline, Interruptible* interruptible = Interruptible::notInterruptible()) = 0;
/**
* Returns false if oplog buffer is empty.
diff --git a/src/mongo/db/repl/oplog_buffer_blocking_queue.cpp b/src/mongo/db/repl/oplog_buffer_blocking_queue.cpp
index 86b7f906d1a..d81cc6adc9c 100644
--- a/src/mongo/db/repl/oplog_buffer_blocking_queue.cpp
+++ b/src/mongo/db/repl/oplog_buffer_blocking_queue.cpp
@@ -112,11 +112,20 @@ bool OplogBufferBlockingQueue::tryPop(OperationContext*, Value* value) {
return true;
}
-bool OplogBufferBlockingQueue::waitForData(Seconds waitDuration) {
+bool OplogBufferBlockingQueue::waitForDataFor(Milliseconds waitDuration,
+ Interruptible* interruptible) {
Value ignored;
stdx::unique_lock<Latch> lk(_notEmptyMutex);
- _notEmptyCv.wait_for(
- lk, waitDuration.toSystemDuration(), [&] { return _drainMode || _queue.peek(ignored); });
+ interruptible->waitForConditionOrInterruptFor(
+ _notEmptyCv, lk, waitDuration, [&] { return _drainMode || _queue.peek(ignored); });
+ return _queue.peek(ignored);
+}
+
+bool OplogBufferBlockingQueue::waitForDataUntil(Date_t deadline, Interruptible* interruptible) {
+ Value ignored;
+ stdx::unique_lock<Latch> lk(_notEmptyMutex);
+ interruptible->waitForConditionOrInterruptUntil(
+ _notEmptyCv, lk, deadline, [&] { return _drainMode || _queue.peek(ignored); });
return _queue.peek(ignored);
}
diff --git a/src/mongo/db/repl/oplog_buffer_blocking_queue.h b/src/mongo/db/repl/oplog_buffer_blocking_queue.h
index 164f30afc8f..a881a7e0e41 100644
--- a/src/mongo/db/repl/oplog_buffer_blocking_queue.h
+++ b/src/mongo/db/repl/oplog_buffer_blocking_queue.h
@@ -55,7 +55,8 @@ public:
std::size_t getCount() const override;
void clear(OperationContext* opCtx) override;
bool tryPop(OperationContext* opCtx, Value* value) override;
- bool waitForData(Seconds waitDuration) override;
+ bool waitForDataFor(Milliseconds waitDuration, Interruptible* interruptible) override;
+ bool waitForDataUntil(Date_t deadline, Interruptible* interruptible) override;
bool peek(OperationContext* opCtx, Value* value) override;
boost::optional<Value> lastObjectPushed(OperationContext* opCtx) const override;
diff --git a/src/mongo/db/repl/oplog_buffer_collection.cpp b/src/mongo/db/repl/oplog_buffer_collection.cpp
index 799bf539ca7..b5190d7ff46 100644
--- a/src/mongo/db/repl/oplog_buffer_collection.cpp
+++ b/src/mongo/db/repl/oplog_buffer_collection.cpp
@@ -272,10 +272,20 @@ bool OplogBufferCollection::tryPop(OperationContext* opCtx, Value* value) {
return _pop_inlock(opCtx, value);
}
-bool OplogBufferCollection::waitForData(Seconds waitDuration) {
+bool OplogBufferCollection::waitForDataFor(Milliseconds waitDuration,
+ Interruptible* interruptible) {
stdx::unique_lock<Latch> lk(_mutex);
- if (!_cvNoLongerEmpty.wait_for(
- lk, waitDuration.toSystemDuration(), [&]() { return _count != 0; })) {
+ if (!interruptible->waitForConditionOrInterruptFor(
+ _cvNoLongerEmpty, lk, waitDuration, [&]() { return _count != 0; })) {
+ return false;
+ }
+ return _count != 0;
+}
+
+bool OplogBufferCollection::waitForDataUntil(Date_t deadline, Interruptible* interruptible) {
+ stdx::unique_lock<Latch> lk(_mutex);
+ if (!interruptible->waitForConditionOrInterruptUntil(
+ _cvNoLongerEmpty, lk, deadline, [&]() { return _count != 0; })) {
return false;
}
return _count != 0;
diff --git a/src/mongo/db/repl/oplog_buffer_collection.h b/src/mongo/db/repl/oplog_buffer_collection.h
index 00c65d5436b..f4ff564432e 100644
--- a/src/mongo/db/repl/oplog_buffer_collection.h
+++ b/src/mongo/db/repl/oplog_buffer_collection.h
@@ -112,7 +112,8 @@ public:
std::size_t getCount() const override;
void clear(OperationContext* opCtx) override;
bool tryPop(OperationContext* opCtx, Value* value) override;
- bool waitForData(Seconds waitDuration) override;
+ bool waitForDataFor(Milliseconds waitDuration, Interruptible* interruptible) override;
+ bool waitForDataUntil(Date_t deadline, Interruptible* interruptible) override;
bool peek(OperationContext* opCtx, Value* value) override;
boost::optional<Value> lastObjectPushed(OperationContext* opCtx) const override;
diff --git a/src/mongo/db/repl/oplog_buffer_proxy.cpp b/src/mongo/db/repl/oplog_buffer_proxy.cpp
index 28497728ff8..223e52ced91 100644
--- a/src/mongo/db/repl/oplog_buffer_proxy.cpp
+++ b/src/mongo/db/repl/oplog_buffer_proxy.cpp
@@ -112,14 +112,24 @@ bool OplogBufferProxy::tryPop(OperationContext* opCtx, Value* value) {
return true;
}
-bool OplogBufferProxy::waitForData(Seconds waitDuration) {
+bool OplogBufferProxy::waitForDataFor(Milliseconds waitDuration, Interruptible* interruptible) {
{
stdx::unique_lock<Latch> lk(_lastPushedMutex);
if (_lastPushed) {
return true;
}
}
- return _target->waitForData(waitDuration);
+ return _target->waitForDataFor(waitDuration, interruptible);
+}
+
+bool OplogBufferProxy::waitForDataUntil(Date_t deadline, Interruptible* interruptible) {
+ {
+ stdx::unique_lock<Latch> lk(_lastPushedMutex);
+ if (_lastPushed) {
+ return true;
+ }
+ }
+ return _target->waitForDataUntil(deadline, interruptible);
}
bool OplogBufferProxy::peek(OperationContext* opCtx, Value* value) {
diff --git a/src/mongo/db/repl/oplog_buffer_proxy.h b/src/mongo/db/repl/oplog_buffer_proxy.h
index 5effffd815c..f827530c4f3 100644
--- a/src/mongo/db/repl/oplog_buffer_proxy.h
+++ b/src/mongo/db/repl/oplog_buffer_proxy.h
@@ -68,7 +68,8 @@ public:
std::size_t getCount() const override;
void clear(OperationContext* opCtx) override;
bool tryPop(OperationContext* opCtx, Value* value) override;
- bool waitForData(Seconds waitDuration) override;
+ bool waitForDataFor(Milliseconds waitDuration, Interruptible* interruptible) override;
+ bool waitForDataUntil(Date_t deadline, Interruptible* interruptible) override;
bool peek(OperationContext* opCtx, Value* value) override;
boost::optional<Value> lastObjectPushed(OperationContext* opCtx) const override;
diff --git a/src/mongo/db/repl/oplog_buffer_proxy_test.cpp b/src/mongo/db/repl/oplog_buffer_proxy_test.cpp
index eab37c883d3..e1cade7b515 100644
--- a/src/mongo/db/repl/oplog_buffer_proxy_test.cpp
+++ b/src/mongo/db/repl/oplog_buffer_proxy_test.cpp
@@ -95,11 +95,16 @@ public:
values.pop_front();
return true;
}
- bool waitForData(Seconds) override {
+ bool waitForDataFor(Milliseconds, Interruptible*) override {
// Blocking not supported.
waitForDataCalled = true;
return !values.empty();
}
+ bool waitForDataUntil(Date_t, Interruptible*) override {
+ // Blocking not supported.
+ waitForDataUntilCalled = true;
+ return !values.empty();
+ }
bool peek(OperationContext*, Value* value) override {
peekCalled = true;
if (values.empty()) {
@@ -120,6 +125,7 @@ public:
bool shutdownCalled = false;
bool waitForSpaceCalled = false;
bool waitForDataCalled = false;
+ bool waitForDataUntilCalled = false;
bool tryPopCalled = false;
bool peekCalled = false;
mutable bool lastObjectPushedCalled = false;
@@ -264,11 +270,29 @@ TEST_F(OplogBufferProxyTest, WaitForDataReturnsTrueImmediatelyIfLastObjectPushed
_proxy->push(_opCtx, values.cbegin(), values.cend());
ASSERT_TRUE(_proxy->waitForData(Seconds(10)));
ASSERT_FALSE(_mock->waitForDataCalled);
+ ASSERT_FALSE(_mock->waitForDataUntilCalled);
}
TEST_F(OplogBufferProxyTest, WaitForDataForwardsCallToTargetIfLastObjectPushedIsNotCached) {
ASSERT_FALSE(_proxy->waitForData(Seconds(10)));
ASSERT_TRUE(_mock->waitForDataCalled);
+ ASSERT_FALSE(_mock->waitForDataUntilCalled);
+}
+
+TEST_F(OplogBufferProxyTest, WaitForDataUntilReturnsTrueImmediatelyIfLastObjectPushedIsCached) {
+ OplogBuffer::Batch values = {BSON("x" << 1)};
+ _proxy->push(_opCtx, values.cbegin(), values.cend());
+ ASSERT_TRUE(
+ _proxy->waitForDataUntil(Date_t::now() + Seconds(10), Interruptible::notInterruptible()));
+ ASSERT_FALSE(_mock->waitForDataUntilCalled);
+ ASSERT_FALSE(_mock->waitForDataCalled);
+}
+
+TEST_F(OplogBufferProxyTest, WaitForDataUntilForwardsCallToTargetIfLastObjectPushedIsNotCached) {
+ ASSERT_FALSE(
+ _proxy->waitForDataUntil(Date_t::now() + Seconds(10), Interruptible::notInterruptible()));
+ ASSERT_TRUE(_mock->waitForDataUntilCalled);
+ ASSERT_FALSE(_mock->waitForDataCalled);
}
TEST_F(OplogBufferProxyTest, TryPopResetsLastPushedObjectIfBufferIsEmpty) {
diff --git a/src/mongo/db/repl/replication_recovery.cpp b/src/mongo/db/repl/replication_recovery.cpp
index 1a30cadfc80..7bce580fd7a 100644
--- a/src/mongo/db/repl/replication_recovery.cpp
+++ b/src/mongo/db/repl/replication_recovery.cpp
@@ -215,7 +215,10 @@ public:
void clear(OperationContext*) final {
MONGO_UNREACHABLE;
}
- bool waitForData(Seconds) final {
+ bool waitForDataFor(Milliseconds, Interruptible*) final {
+ MONGO_UNREACHABLE;
+ }
+ bool waitForDataUntil(Date_t, Interruptible*) final {
MONGO_UNREACHABLE;
}
boost::optional<Value> lastObjectPushed(OperationContext*) const final {