summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAndy Schwerin <Andy Schwerin schwerin@mongodb.com>2017-03-09 17:33:16 -0500
committerAndy Schwerin <Andy Schwerin schwerin@mongodb.com>2017-03-09 17:34:38 -0500
commit9352a9eb96a350b9fedeb2ff2984ef40fe80c894 (patch)
treeea5cb7e6890956e043fbc8212f528495fbe19f85 /src
parent851e45451775131b255aa5c754b1fc4281861b1e (diff)
downloadmongo-9352a9eb96a350b9fedeb2ff2984ef40fe80c894.tar.gz
SERVER-25062 Implement interruptible sleep and waitFor methods on OperationContext.
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/operation_context.cpp20
-rw-r--r--src/mongo/db/operation_context.h29
-rw-r--r--src/mongo/db/operation_context_test.cpp162
3 files changed, 201 insertions, 10 deletions
diff --git a/src/mongo/db/operation_context.cpp b/src/mongo/db/operation_context.cpp
index 8151fe97dfa..d0cb8e244ad 100644
--- a/src/mongo/db/operation_context.cpp
+++ b/src/mongo/db/operation_context.cpp
@@ -210,6 +210,22 @@ Status OperationContext::checkForInterruptNoAssert() {
return Status::OK();
}
+void OperationContext::sleepUntil(Date_t deadline) {
+ stdx::mutex m;
+ stdx::condition_variable cv;
+ stdx::unique_lock<stdx::mutex> lk(m);
+ invariant(stdx::cv_status::timeout ==
+ waitForConditionOrInterruptUntil(cv, lk, deadline, [] { return false; }));
+}
+
+void OperationContext::sleepFor(Milliseconds duration) {
+ stdx::mutex m;
+ stdx::condition_variable cv;
+ stdx::unique_lock<stdx::mutex> lk(m);
+ invariant(stdx::cv_status::timeout ==
+ waitForConditionOrInterruptFor(cv, lk, duration, [] { return false; }));
+}
+
void OperationContext::waitForConditionOrInterrupt(stdx::condition_variable& cv,
stdx::unique_lock<stdx::mutex>& m) {
uassertStatusOK(waitForConditionOrInterruptNoAssert(cv, m));
@@ -408,4 +424,8 @@ void OperationContext::setLockState(std::unique_ptr<Locker> locker) {
_locker = std::move(locker);
}
+Date_t OperationContext::getExpirationDateForWaitForValue(Milliseconds waitFor) {
+ return getServiceContext()->getPreciseClockSource()->now() + waitFor;
+}
+
} // namespace mongo
diff --git a/src/mongo/db/operation_context.h b/src/mongo/db/operation_context.h
index 15688438561..fb867589651 100644
--- a/src/mongo/db/operation_context.h
+++ b/src/mongo/db/operation_context.h
@@ -139,6 +139,16 @@ public:
Status checkForInterruptNoAssert();
/**
+ * Sleeps until "deadline"; throws an exception if the operation is interrupted before then.
+ */
+ void sleepUntil(Date_t deadline);
+
+ /**
+ * Sleeps for "duration" ms; throws an exception if the operation is interrupted before then.
+ */
+ void sleepFor(Milliseconds duration);
+
+ /**
* Waits for either the condition "cv" to be signaled, this operation to be interrupted, or the
* deadline on this operation to expire. In the event of interruption or operation deadline
* expiration, raises a UserException with an error code indicating the interruption type.
@@ -202,6 +212,19 @@ public:
}
/**
+ * Same as the predicate form of waitForConditionOrInterruptUntil, but takes a relative
+ * amount of time to wait instead of an absolute time point.
+ */
+ template <typename Pred>
+ stdx::cv_status waitForConditionOrInterruptFor(stdx::condition_variable& cv,
+ stdx::unique_lock<stdx::mutex>& m,
+ Milliseconds duration,
+ Pred pred) {
+ return waitForConditionOrInterruptUntil(
+ cv, m, getExpirationDateForWaitForValue(duration), pred);
+ }
+
+ /**
* Same as waitForConditionOrInterruptUntil, except returns StatusWith<stdx::cv_status> and
* non-ok status indicates the error instead of a DBException.
*/
@@ -375,6 +398,12 @@ private:
*/
void setDeadlineAndMaxTime(Date_t when, Microseconds maxTime);
+ /**
+ * Returns the timepoint that is "waitFor" ms after now according to the
+ * ServiceContext's precise clock.
+ */
+ Date_t getExpirationDateForWaitForValue(Milliseconds waitFor);
+
friend class WriteUnitOfWork;
Client* const _client;
const unsigned int _opId;
diff --git a/src/mongo/db/operation_context_test.cpp b/src/mongo/db/operation_context_test.cpp
index 532e25b640a..46f7bc04772 100644
--- a/src/mongo/db/operation_context_test.cpp
+++ b/src/mongo/db/operation_context_test.cpp
@@ -201,6 +201,16 @@ TEST_F(OperationDeadlineTests, WaitForUntilExpiredCVWithMaxTimeSet) {
opCtx->waitForConditionOrInterruptNoAssertUntil(cv, lk, mockClock->now())));
}
+TEST_F(OperationDeadlineTests, WaitForDurationExpired) {
+ auto opCtx = client->makeOperationContext();
+ stdx::mutex m;
+ stdx::condition_variable cv;
+ stdx::unique_lock<stdx::mutex> lk(m);
+ ASSERT(stdx::cv_status::timeout ==
+ opCtx->waitForConditionOrInterruptFor(
+ cv, lk, Milliseconds(-1000), []() -> bool { return false; }));
+}
+
TEST_F(OperationDeadlineTests, DuringWaitMaxTimeExpirationDominatesUntilExpiration) {
auto opCtx = client->makeOperationContext();
opCtx->setDeadlineByDate(mockClock->now());
@@ -213,6 +223,10 @@ TEST_F(OperationDeadlineTests, DuringWaitMaxTimeExpirationDominatesUntilExpirati
class ThreadedOperationDeadlineTests : public OperationDeadlineTests {
public:
+ using CvPred = stdx::function<bool()>;
+ using WaitFn = stdx::function<stdx::cv_status(
+ OperationContext*, stdx::condition_variable&, stdx::unique_lock<stdx::mutex>&, CvPred)>;
+
struct WaitTestState {
void signal() {
stdx::lock_guard<stdx::mutex> lk(mutex);
@@ -226,10 +240,10 @@ public:
bool isSignaled = false;
};
- stdx::future<stdx::cv_status> startWaiterWithUntilAndMaxTime(OperationContext* opCtx,
- WaitTestState* state,
- Date_t until,
- Date_t maxTime) {
+ stdx::future<stdx::cv_status> startWaiterWithMaxTime(OperationContext* opCtx,
+ WaitTestState* state,
+ WaitFn waitFn,
+ Date_t maxTime) {
auto barrier = std::make_shared<unittest::Barrier>(2);
auto task = stdx::packaged_task<stdx::cv_status()>([=] {
@@ -239,12 +253,7 @@ public:
auto predicate = [state] { return state->isSignaled; };
stdx::unique_lock<stdx::mutex> lk(state->mutex);
barrier->countDownAndWait();
- if (until < Date_t::max()) {
- return opCtx->waitForConditionOrInterruptUntil(state->cv, lk, until, predicate);
- } else {
- opCtx->waitForConditionOrInterrupt(state->cv, lk, predicate);
- return stdx::cv_status::no_timeout;
- }
+ return waitFn(opCtx, state->cv, lk, predicate);
});
auto result = task.get_future();
stdx::thread(std::move(task)).detach();
@@ -261,9 +270,74 @@ public:
return result;
}
+ stdx::future<stdx::cv_status> startWaiterWithUntilAndMaxTime(OperationContext* opCtx,
+ WaitTestState* state,
+ Date_t until,
+ Date_t maxTime) {
+ const auto waitFn = [until](OperationContext* opCtx,
+ stdx::condition_variable& cv,
+ stdx::unique_lock<stdx::mutex>& lk,
+ CvPred predicate) {
+ if (until < Date_t::max()) {
+ return opCtx->waitForConditionOrInterruptUntil(cv, lk, until, predicate);
+ } else {
+ opCtx->waitForConditionOrInterrupt(cv, lk, predicate);
+ return stdx::cv_status::no_timeout;
+ }
+ };
+ return startWaiterWithMaxTime(opCtx, state, waitFn, maxTime);
+ }
+
+ template <typename Period>
+ stdx::future<stdx::cv_status> startWaiterWithDurationAndMaxTime(OperationContext* opCtx,
+ WaitTestState* state,
+ Duration<Period> duration,
+ Date_t maxTime) {
+ const auto waitFn = [duration](OperationContext* opCtx,
+ stdx::condition_variable& cv,
+ stdx::unique_lock<stdx::mutex>& lk,
+ CvPred predicate) {
+ return opCtx->waitForConditionOrInterruptFor(cv, lk, duration, predicate);
+ };
+ return startWaiterWithMaxTime(opCtx, state, waitFn, maxTime);
+ }
+
stdx::future<stdx::cv_status> startWaiter(OperationContext* opCtx, WaitTestState* state) {
return startWaiterWithUntilAndMaxTime(opCtx, state, Date_t::max(), Date_t::max());
}
+
+ stdx::future<stdx::cv_status> startWaiterWithSleepUntilAndMaxTime(OperationContext* opCtx,
+ WaitTestState* state,
+ Date_t sleepUntil,
+ Date_t maxTime) {
+ auto waitFn = [sleepUntil](OperationContext* opCtx,
+ stdx::condition_variable& cv,
+ stdx::unique_lock<stdx::mutex>& lk,
+ CvPred predicate) {
+ lk.unlock();
+ opCtx->sleepUntil(sleepUntil);
+ lk.lock();
+ return stdx::cv_status::timeout;
+ };
+ return startWaiterWithMaxTime(opCtx, state, waitFn, maxTime);
+ }
+
+ template <typename Period>
+ stdx::future<stdx::cv_status> startWaiterWithSleepForAndMaxTime(OperationContext* opCtx,
+ WaitTestState* state,
+ Duration<Period> sleepFor,
+ Date_t maxTime) {
+ auto waitFn = [sleepFor](OperationContext* opCtx,
+ stdx::condition_variable& cv,
+ stdx::unique_lock<stdx::mutex>& lk,
+ CvPred predicate) {
+ lk.unlock();
+ opCtx->sleepFor(sleepFor);
+ lk.lock();
+ return stdx::cv_status::timeout;
+ };
+ return startWaiterWithMaxTime(opCtx, state, waitFn, maxTime);
+ }
};
TEST_F(ThreadedOperationDeadlineTests, KillArrivesWhileWaiting) {
@@ -315,6 +389,22 @@ TEST_F(ThreadedOperationDeadlineTests, UntilExpiresWhileWaiting) {
ASSERT(stdx::cv_status::timeout == waiterResult.get());
}
+TEST_F(ThreadedOperationDeadlineTests, ForExpiresWhileWaiting) {
+ auto opCtx = client->makeOperationContext();
+ WaitTestState state;
+ const auto startDate = mockClock->now();
+ auto waiterResult = startWaiterWithDurationAndMaxTime(
+ opCtx.get(), &state, Seconds{10}, startDate + Seconds{60}); // maxTime
+ ASSERT(stdx::future_status::ready !=
+ waiterResult.wait_for(Milliseconds::zero().toSystemDuration()))
+ << waiterResult.get();
+ mockClock->advance(Seconds{9});
+ ASSERT(stdx::future_status::ready !=
+ waiterResult.wait_for(Milliseconds::zero().toSystemDuration()));
+ mockClock->advance(Seconds{2});
+ ASSERT(stdx::cv_status::timeout == waiterResult.get());
+}
+
TEST_F(ThreadedOperationDeadlineTests, SignalOne) {
auto opCtx = client->makeOperationContext();
WaitTestState state;
@@ -387,6 +477,58 @@ TEST_F(ThreadedOperationDeadlineTests, SignalBeforeMaxTimeExpires) {
ASSERT(stdx::cv_status::no_timeout == waiterResult.get());
}
+TEST_F(ThreadedOperationDeadlineTests, SleepUntilWithExpiredUntilDoesNotBlock) {
+ auto opCtx = client->makeOperationContext();
+ WaitTestState state;
+ const auto startDate = mockClock->now();
+ auto waiterResult = startWaiterWithSleepUntilAndMaxTime(opCtx.get(),
+ &state,
+ startDate - Seconds{10}, // until
+ startDate + Seconds{60}); // maxTime
+ ASSERT(stdx::cv_status::timeout == waiterResult.get());
+}
+
+TEST_F(ThreadedOperationDeadlineTests, SleepUntilExpires) {
+ auto opCtx = client->makeOperationContext();
+ WaitTestState state;
+ const auto startDate = mockClock->now();
+ auto waiterResult = startWaiterWithSleepUntilAndMaxTime(opCtx.get(),
+ &state,
+ startDate + Seconds{10}, // until
+ startDate + Seconds{60}); // maxTime
+ ASSERT(stdx::future_status::ready !=
+ waiterResult.wait_for(Milliseconds::zero().toSystemDuration()));
+ mockClock->advance(Seconds{9});
+ ASSERT(stdx::future_status::ready !=
+ waiterResult.wait_for(Milliseconds::zero().toSystemDuration()));
+ mockClock->advance(Seconds{2});
+ ASSERT(stdx::cv_status::timeout == waiterResult.get());
+}
+
+TEST_F(ThreadedOperationDeadlineTests, SleepForWithExpiredForDoesNotBlock) {
+ auto opCtx = client->makeOperationContext();
+ WaitTestState state;
+ const auto startDate = mockClock->now();
+ auto waiterResult = startWaiterWithSleepForAndMaxTime(
+ opCtx.get(), &state, Seconds{-10}, startDate + Seconds{60}); // maxTime
+ ASSERT(stdx::cv_status::timeout == waiterResult.get());
+}
+
+TEST_F(ThreadedOperationDeadlineTests, SleepForExpires) {
+ auto opCtx = client->makeOperationContext();
+ WaitTestState state;
+ const auto startDate = mockClock->now();
+ auto waiterResult = startWaiterWithSleepForAndMaxTime(
+ opCtx.get(), &state, Seconds{10}, startDate + Seconds{60}); // maxTime
+ ASSERT(stdx::future_status::ready !=
+ waiterResult.wait_for(Milliseconds::zero().toSystemDuration()));
+ mockClock->advance(Seconds{9});
+ ASSERT(stdx::future_status::ready !=
+ waiterResult.wait_for(Milliseconds::zero().toSystemDuration()));
+ mockClock->advance(Seconds{2});
+ ASSERT(stdx::cv_status::timeout == waiterResult.get());
+}
+
} // namespace
} // namespace mongo