diff options
author | Matthew Russotto <matthew.russotto@mongodb.com> | 2022-04-22 13:00:48 -0400 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-06-15 15:31:31 +0000 |
commit | fdef6fd5099d2a8c0c3c8d70b617a50195e2e609 (patch) | |
tree | 85806af8c38c18d0e56e4dca8f209f0d8ad12e71 | |
parent | 2b01d3385da9dc27b6d589ad084be7643281414a (diff) | |
download | mongo-fdef6fd5099d2a8c0c3c8d70b617a50195e2e609.tar.gz |
SERVER-65273 Enhance waiting API of oplog buffer.
Add millisecond granularity, deadlines, and Interruptibles
(cherry picked from commit 6d128d3ac2197f120a8a5927e31c56bb6ab9d328)
-rw-r--r-- | src/mongo/db/repl/oplog_batcher_test_fixture.cpp | 14 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_batcher_test_fixture.h | 3 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_buffer.h | 25 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_buffer_blocking_queue.cpp | 15 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_buffer_blocking_queue.h | 3 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_buffer_collection.cpp | 16 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_buffer_collection.h | 3 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_buffer_proxy.cpp | 14 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_buffer_proxy.h | 3 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_buffer_proxy_test.cpp | 26 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_recovery.cpp | 5 |
11 files changed, 108 insertions, 19 deletions
diff --git a/src/mongo/db/repl/oplog_batcher_test_fixture.cpp b/src/mongo/db/repl/oplog_batcher_test_fixture.cpp index f104a91250f..c05ce1c4de0 100644 --- a/src/mongo/db/repl/oplog_batcher_test_fixture.cpp +++ b/src/mongo/db/repl/oplog_batcher_test_fixture.cpp @@ -109,11 +109,17 @@ bool OplogBufferMock::tryPop(OperationContext* opCtx, Value* value) { return true; } -bool OplogBufferMock::waitForData(Seconds waitDuration) { +bool OplogBufferMock::waitForDataFor(Milliseconds waitDuration, Interruptible* interruptible) { stdx::unique_lock<Latch> lk(_mutex); - _notEmptyCv.wait_for(lk, waitDuration.toSystemDuration(), [&] { - return _hasShutDown || _curIndex < _data.size(); - }); + interruptible->waitForConditionOrInterruptFor( + _notEmptyCv, lk, waitDuration, [&] { return _hasShutDown || _curIndex < _data.size(); }); + return _curIndex < _data.size(); +} + +bool OplogBufferMock::waitForDataUntil(Date_t deadline, Interruptible* interruptible) { + stdx::unique_lock<Latch> lk(_mutex); + interruptible->waitForConditionOrInterruptUntil( + _notEmptyCv, lk, deadline, [&] { return _hasShutDown || _curIndex < _data.size(); }); return _curIndex < _data.size(); } diff --git a/src/mongo/db/repl/oplog_batcher_test_fixture.h b/src/mongo/db/repl/oplog_batcher_test_fixture.h index c4fb377f98b..6983492c249 100644 --- a/src/mongo/db/repl/oplog_batcher_test_fixture.h +++ b/src/mongo/db/repl/oplog_batcher_test_fixture.h @@ -58,7 +58,8 @@ public: std::size_t getCount() const final; void clear(OperationContext* opCtx) final; bool tryPop(OperationContext* opCtx, Value* value) final; - bool waitForData(Seconds waitDuration) final; + bool waitForDataFor(Milliseconds waitDuration, Interruptible* interruptible) final; + bool waitForDataUntil(Date_t deadline, Interruptible* interruptible) final; bool peek(OperationContext* opCtx, Value* value) final; boost::optional<Value> lastObjectPushed(OperationContext* opCtx) const final; StatusWith<Value> findByTimestamp(OperationContext* opCtx, const Timestamp& ts) final; diff --git a/src/mongo/db/repl/oplog_buffer.h b/src/mongo/db/repl/oplog_buffer.h index 96faaf7f37f..2e039cb1f91 100644 --- a/src/mongo/db/repl/oplog_buffer.h +++ b/src/mongo/db/repl/oplog_buffer.h @@ -35,6 +35,7 @@ #include "mongo/base/counter.h" #include "mongo/bson/bsonobj.h" +#include "mongo/util/interruptible.h" #include "mongo/util/time_support.h" namespace mongo { @@ -136,11 +137,33 @@ public: virtual bool tryPop(OperationContext* opCtx, Value* value) = 0; /** + * Waits uninterruptibly for "waitDuration" for an operation to be pushed into the oplog buffer. + * Returns false if oplog buffer is still empty after "waitDuration". + * Otherwise, returns true. + */ + bool waitForData(Seconds waitDuration) { + return waitForDataFor(duration_cast<Milliseconds>(waitDuration), + Interruptible::notInterruptible()); + }; + + /** + * Interruptible wait with millisecond granularity. + * * Waits "waitDuration" for an operation to be pushed into the oplog buffer. * Returns false if oplog buffer is still empty after "waitDuration". * Otherwise, returns true. + * Throws if the interruptible is interrupted. + */ + virtual bool waitForDataFor( + Milliseconds waitDuration, + Interruptible* interruptible = Interruptible::notInterruptible()) = 0; + + /** + * Same as waitForDataFor(Milliseconds, Interruptible) above but takes a deadline instead + * of a duration. */ - virtual bool waitForData(Seconds waitDuration) = 0; + virtual bool waitForDataUntil( + Date_t deadline, Interruptible* interruptible = Interruptible::notInterruptible()) = 0; /** * Returns false if oplog buffer is empty. diff --git a/src/mongo/db/repl/oplog_buffer_blocking_queue.cpp b/src/mongo/db/repl/oplog_buffer_blocking_queue.cpp index 86b7f906d1a..d81cc6adc9c 100644 --- a/src/mongo/db/repl/oplog_buffer_blocking_queue.cpp +++ b/src/mongo/db/repl/oplog_buffer_blocking_queue.cpp @@ -112,11 +112,20 @@ bool OplogBufferBlockingQueue::tryPop(OperationContext*, Value* value) { return true; } -bool OplogBufferBlockingQueue::waitForData(Seconds waitDuration) { +bool OplogBufferBlockingQueue::waitForDataFor(Milliseconds waitDuration, + Interruptible* interruptible) { Value ignored; stdx::unique_lock<Latch> lk(_notEmptyMutex); - _notEmptyCv.wait_for( - lk, waitDuration.toSystemDuration(), [&] { return _drainMode || _queue.peek(ignored); }); + interruptible->waitForConditionOrInterruptFor( + _notEmptyCv, lk, waitDuration, [&] { return _drainMode || _queue.peek(ignored); }); + return _queue.peek(ignored); +} + +bool OplogBufferBlockingQueue::waitForDataUntil(Date_t deadline, Interruptible* interruptible) { + Value ignored; + stdx::unique_lock<Latch> lk(_notEmptyMutex); + interruptible->waitForConditionOrInterruptUntil( + _notEmptyCv, lk, deadline, [&] { return _drainMode || _queue.peek(ignored); }); return _queue.peek(ignored); } diff --git a/src/mongo/db/repl/oplog_buffer_blocking_queue.h b/src/mongo/db/repl/oplog_buffer_blocking_queue.h index 164f30afc8f..a881a7e0e41 100644 --- a/src/mongo/db/repl/oplog_buffer_blocking_queue.h +++ b/src/mongo/db/repl/oplog_buffer_blocking_queue.h @@ -55,7 +55,8 @@ public: std::size_t getCount() const override; void clear(OperationContext* opCtx) override; bool tryPop(OperationContext* opCtx, Value* value) override; - bool waitForData(Seconds waitDuration) override; + bool waitForDataFor(Milliseconds waitDuration, Interruptible* interruptible) override; + bool waitForDataUntil(Date_t deadline, Interruptible* interruptible) override; bool peek(OperationContext* opCtx, Value* value) override; boost::optional<Value> lastObjectPushed(OperationContext* opCtx) const override; diff --git a/src/mongo/db/repl/oplog_buffer_collection.cpp b/src/mongo/db/repl/oplog_buffer_collection.cpp index 799bf539ca7..b5190d7ff46 100644 --- a/src/mongo/db/repl/oplog_buffer_collection.cpp +++ b/src/mongo/db/repl/oplog_buffer_collection.cpp @@ -272,10 +272,20 @@ bool OplogBufferCollection::tryPop(OperationContext* opCtx, Value* value) { return _pop_inlock(opCtx, value); } -bool OplogBufferCollection::waitForData(Seconds waitDuration) { +bool OplogBufferCollection::waitForDataFor(Milliseconds waitDuration, + Interruptible* interruptible) { stdx::unique_lock<Latch> lk(_mutex); - if (!_cvNoLongerEmpty.wait_for( - lk, waitDuration.toSystemDuration(), [&]() { return _count != 0; })) { + if (!interruptible->waitForConditionOrInterruptFor( + _cvNoLongerEmpty, lk, waitDuration, [&]() { return _count != 0; })) { + return false; + } + return _count != 0; +} + +bool OplogBufferCollection::waitForDataUntil(Date_t deadline, Interruptible* interruptible) { + stdx::unique_lock<Latch> lk(_mutex); + if (!interruptible->waitForConditionOrInterruptUntil( + _cvNoLongerEmpty, lk, deadline, [&]() { return _count != 0; })) { return false; } return _count != 0; diff --git a/src/mongo/db/repl/oplog_buffer_collection.h b/src/mongo/db/repl/oplog_buffer_collection.h index 00c65d5436b..f4ff564432e 100644 --- a/src/mongo/db/repl/oplog_buffer_collection.h +++ b/src/mongo/db/repl/oplog_buffer_collection.h @@ -112,7 +112,8 @@ public: std::size_t getCount() const override; void clear(OperationContext* opCtx) override; bool tryPop(OperationContext* opCtx, Value* value) override; - bool waitForData(Seconds waitDuration) override; + bool waitForDataFor(Milliseconds waitDuration, Interruptible* interruptible) override; + bool waitForDataUntil(Date_t deadline, Interruptible* interruptible) override; bool peek(OperationContext* opCtx, Value* value) override; boost::optional<Value> lastObjectPushed(OperationContext* opCtx) const override; diff --git a/src/mongo/db/repl/oplog_buffer_proxy.cpp b/src/mongo/db/repl/oplog_buffer_proxy.cpp index 28497728ff8..223e52ced91 100644 --- a/src/mongo/db/repl/oplog_buffer_proxy.cpp +++ b/src/mongo/db/repl/oplog_buffer_proxy.cpp @@ -112,14 +112,24 @@ bool OplogBufferProxy::tryPop(OperationContext* opCtx, Value* value) { return true; } -bool OplogBufferProxy::waitForData(Seconds waitDuration) { +bool OplogBufferProxy::waitForDataFor(Milliseconds waitDuration, Interruptible* interruptible) { { stdx::unique_lock<Latch> lk(_lastPushedMutex); if (_lastPushed) { return true; } } - return _target->waitForData(waitDuration); + return _target->waitForDataFor(waitDuration, interruptible); +} + +bool OplogBufferProxy::waitForDataUntil(Date_t deadline, Interruptible* interruptible) { + { + stdx::unique_lock<Latch> lk(_lastPushedMutex); + if (_lastPushed) { + return true; + } + } + return _target->waitForDataUntil(deadline, interruptible); } bool OplogBufferProxy::peek(OperationContext* opCtx, Value* value) { diff --git a/src/mongo/db/repl/oplog_buffer_proxy.h b/src/mongo/db/repl/oplog_buffer_proxy.h index 5effffd815c..f827530c4f3 100644 --- a/src/mongo/db/repl/oplog_buffer_proxy.h +++ b/src/mongo/db/repl/oplog_buffer_proxy.h @@ -68,7 +68,8 @@ public: std::size_t getCount() const override; void clear(OperationContext* opCtx) override; bool tryPop(OperationContext* opCtx, Value* value) override; - bool waitForData(Seconds waitDuration) override; + bool waitForDataFor(Milliseconds waitDuration, Interruptible* interruptible) override; + bool waitForDataUntil(Date_t deadline, Interruptible* interruptible) override; bool peek(OperationContext* opCtx, Value* value) override; boost::optional<Value> lastObjectPushed(OperationContext* opCtx) const override; diff --git a/src/mongo/db/repl/oplog_buffer_proxy_test.cpp b/src/mongo/db/repl/oplog_buffer_proxy_test.cpp index eab37c883d3..e1cade7b515 100644 --- a/src/mongo/db/repl/oplog_buffer_proxy_test.cpp +++ b/src/mongo/db/repl/oplog_buffer_proxy_test.cpp @@ -95,11 +95,16 @@ public: values.pop_front(); return true; } - bool waitForData(Seconds) override { + bool waitForDataFor(Milliseconds, Interruptible*) override { // Blocking not supported. waitForDataCalled = true; return !values.empty(); } + bool waitForDataUntil(Date_t, Interruptible*) override { + // Blocking not supported. + waitForDataUntilCalled = true; + return !values.empty(); + } bool peek(OperationContext*, Value* value) override { peekCalled = true; if (values.empty()) { @@ -120,6 +125,7 @@ public: bool shutdownCalled = false; bool waitForSpaceCalled = false; bool waitForDataCalled = false; + bool waitForDataUntilCalled = false; bool tryPopCalled = false; bool peekCalled = false; mutable bool lastObjectPushedCalled = false; @@ -264,11 +270,29 @@ TEST_F(OplogBufferProxyTest, WaitForDataReturnsTrueImmediatelyIfLastObjectPushed _proxy->push(_opCtx, values.cbegin(), values.cend()); ASSERT_TRUE(_proxy->waitForData(Seconds(10))); ASSERT_FALSE(_mock->waitForDataCalled); + ASSERT_FALSE(_mock->waitForDataUntilCalled); } TEST_F(OplogBufferProxyTest, WaitForDataForwardsCallToTargetIfLastObjectPushedIsNotCached) { ASSERT_FALSE(_proxy->waitForData(Seconds(10))); ASSERT_TRUE(_mock->waitForDataCalled); + ASSERT_FALSE(_mock->waitForDataUntilCalled); +} + +TEST_F(OplogBufferProxyTest, WaitForDataUntilReturnsTrueImmediatelyIfLastObjectPushedIsCached) { + OplogBuffer::Batch values = {BSON("x" << 1)}; + _proxy->push(_opCtx, values.cbegin(), values.cend()); + ASSERT_TRUE( + _proxy->waitForDataUntil(Date_t::now() + Seconds(10), Interruptible::notInterruptible())); + ASSERT_FALSE(_mock->waitForDataUntilCalled); + ASSERT_FALSE(_mock->waitForDataCalled); +} + +TEST_F(OplogBufferProxyTest, WaitForDataUntilForwardsCallToTargetIfLastObjectPushedIsNotCached) { + ASSERT_FALSE( + _proxy->waitForDataUntil(Date_t::now() + Seconds(10), Interruptible::notInterruptible())); + ASSERT_TRUE(_mock->waitForDataUntilCalled); + ASSERT_FALSE(_mock->waitForDataCalled); } TEST_F(OplogBufferProxyTest, TryPopResetsLastPushedObjectIfBufferIsEmpty) { diff --git a/src/mongo/db/repl/replication_recovery.cpp b/src/mongo/db/repl/replication_recovery.cpp index 5cd96227393..80388fd2f3e 100644 --- a/src/mongo/db/repl/replication_recovery.cpp +++ b/src/mongo/db/repl/replication_recovery.cpp @@ -214,7 +214,10 @@ public: void clear(OperationContext*) final { MONGO_UNREACHABLE; } - bool waitForData(Seconds) final { + bool waitForDataFor(Milliseconds, Interruptible*) final { + MONGO_UNREACHABLE; + } + bool waitForDataUntil(Date_t, Interruptible*) final { MONGO_UNREACHABLE; } boost::optional<Value> lastObjectPushed(OperationContext*) const final { |