summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBen Caimano <ben.caimano@mongodb.com>2019-10-17 00:51:52 +0000
committerevergreen <evergreen@mongodb.com>2019-10-17 00:51:52 +0000
commit27ed83ca30107c8e39417ba1dfed5ec0dd8b859d (patch)
tree399e5b548782b6c84627389f94ddb61909e2746a
parentb37eb88c8ae801751711dd54f2506d3561989db7 (diff)
downloadmongo-27ed83ca30107c8e39417ba1dfed5ec0dd8b859d.tar.gz
SERVER-43987 Require predicates with OperationContext::waitForConditionOrInterrupt()
-rw-r--r--src/mongo/db/commands/validate.cpp6
-rw-r--r--src/mongo/db/concurrency/flow_control_ticketholder.cpp42
-rw-r--r--src/mongo/db/keys_collection_manager.cpp57
-rw-r--r--src/mongo/db/operation_context.h6
-rw-r--r--src/mongo/db/operation_context_test.cpp57
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp42
-rw-r--r--src/mongo/db/s/implicit_create_collection.cpp12
-rw-r--r--src/mongo/db/s/wait_for_majority_service.cpp10
-rw-r--r--src/mongo/db/s/wait_for_majority_service_test.cpp15
-rw-r--r--src/mongo/db/session_catalog_test.cpp6
-rw-r--r--src/mongo/executor/thread_pool_task_executor.cpp26
-rw-r--r--src/mongo/s/client/shard_registry.cpp14
-rw-r--r--src/mongo/util/future_test_utils.h2
-rw-r--r--src/mongo/util/interruptible.h245
-rw-r--r--src/mongo/watchdog/watchdog.cpp29
15 files changed, 264 insertions, 305 deletions
diff --git a/src/mongo/db/commands/validate.cpp b/src/mongo/db/commands/validate.cpp
index 35369e9ef7c..09594aa207e 100644
--- a/src/mongo/db/commands/validate.cpp
+++ b/src/mongo/db/commands/validate.cpp
@@ -150,9 +150,9 @@ public:
{
stdx::unique_lock<Latch> lock(_validationMutex);
try {
- while (_validationsInProgress.find(nss.ns()) != _validationsInProgress.end()) {
- opCtx->waitForConditionOrInterrupt(_validationNotifier, lock);
- }
+ opCtx->waitForConditionOrInterrupt(_validationNotifier, lock, [&] {
+ return _validationsInProgress.find(nss.ns()) == _validationsInProgress.end();
+ });
} catch (AssertionException& e) {
CommandHelpers::appendCommandStatusNoThrow(
result,
diff --git a/src/mongo/db/concurrency/flow_control_ticketholder.cpp b/src/mongo/db/concurrency/flow_control_ticketholder.cpp
index 6bb95797502..b8f6217665c 100644
--- a/src/mongo/db/concurrency/flow_control_ticketholder.cpp
+++ b/src/mongo/db/concurrency/flow_control_ticketholder.cpp
@@ -98,31 +98,25 @@ void FlowControlTicketholder::getTicket(OperationContext* opCtx,
++stats->acquireWaitCount;
}
- // Make sure operations already waiting on a Flow Control ticket during shut down do not
- // hang if the ticket refresher thread has been shut down.
- while (_tickets == 0 && !_inShutdown) {
- stats->waiting = true;
- const std::uint64_t startWaitTime = curTimeMicros64();
-
- // This method will wait forever for a ticket. However, it will wake up every so often to
- // update the time spent waiting on the ticket.
- auto waitDeadline = Date_t::now() + Milliseconds(500);
- StatusWith<stdx::cv_status> swCondStatus =
- opCtx->waitForConditionOrInterruptNoAssertUntil(_cv, lk, waitDeadline);
-
- auto waitTime = curTimeMicros64() - startWaitTime;
- _totalTimeAcquiringMicros.fetchAndAddRelaxed(waitTime);
- stats->timeAcquiringMicros += waitTime;
-
- // If the operation context state interrupted this wait, the StatusWith result will contain
- // the error. If the `waitDeadline` expired, the Status variable will be OK, and the
- // `cv_status` value will be `cv_status::timeout`. In either case where Status::OK is
- // returned, the loop must re-check the predicate. If the operation context is interrupted
- // (and an error status is returned), the intended behavior is to bubble an exception up to
- // the user.
- uassertStatusOK(swCondStatus);
+ auto currentWaitTime = curTimeMicros64();
+ auto updateTotalTime = [&]() {
+ auto oldWaitTime = std::exchange(currentWaitTime, curTimeMicros64());
+ _totalTimeAcquiringMicros.fetchAndAddRelaxed(currentWaitTime - oldWaitTime);
+ };
+
+ stats->waiting = true;
+ ON_BLOCK_EXIT([&] {
+ // When this block exits, update the time one last time and note that getTicket() is no
+ // longer waiting.
+ updateTotalTime();
+ stats->waiting = false;
+ });
+
+ // getTicket() should block until there are tickets or the Ticketholder is in shutdown
+ while (!opCtx->waitForConditionOrInterruptFor(
+ _cv, lk, Milliseconds(500), [&] { return _tickets > 0 || _inShutdown; })) {
+ updateTotalTime();
}
- stats->waiting = false;
if (_inShutdown) {
return;
diff --git a/src/mongo/db/keys_collection_manager.cpp b/src/mongo/db/keys_collection_manager.cpp
index c96af99f13d..91e8bd8f4a4 100644
--- a/src/mongo/db/keys_collection_manager.cpp
+++ b/src/mongo/db/keys_collection_manager.cpp
@@ -200,12 +200,10 @@ void KeysCollectionManager::PeriodicRunner::refreshNow(OperationContext* opCtx)
"aborting keys cache refresh because node is shutting down");
}
- if (_refreshRequest) {
- return _refreshRequest;
+ if (!_refreshRequest) {
+ _refreshRequest = std::make_shared<Notification<void>>();
}
-
_refreshNeededCV.notify_all();
- _refreshRequest = std::make_shared<Notification<void>>();
return _refreshRequest;
}();
@@ -223,8 +221,9 @@ void KeysCollectionManager::PeriodicRunner::_doPeriodicRefresh(ServiceContext* s
ThreadClient tc(threadName, service);
while (true) {
- bool hasRefreshRequestInitially = false;
unsigned errorCount = 0;
+
+ decltype(_refreshRequest) request;
std::shared_ptr<RefreshFunc> doRefresh;
{
stdx::lock_guard<Latch> lock(_mutex);
@@ -235,7 +234,7 @@ void KeysCollectionManager::PeriodicRunner::_doPeriodicRefresh(ServiceContext* s
invariant(_doRefresh.get() != nullptr);
doRefresh = _doRefresh;
- hasRefreshRequestInitially = _refreshRequest.get() != nullptr;
+ request = std::move(_refreshRequest);
}
Milliseconds nextWakeup = kRefreshIntervalIfErrored;
@@ -263,6 +262,11 @@ void KeysCollectionManager::PeriodicRunner::_doPeriodicRefresh(ServiceContext* s
nextWakeup = kMaxRefreshWaitTime;
}
}
+
+ // Notify all waiters that the refresh has finished and they can move on
+ if (request) {
+ request->set();
+ }
}
maxKeyRefreshWaitTimeOverrideMS.execute([&](const BSONObj& data) {
@@ -270,15 +274,9 @@ void KeysCollectionManager::PeriodicRunner::_doPeriodicRefresh(ServiceContext* s
});
stdx::unique_lock<Latch> lock(_mutex);
-
if (_refreshRequest) {
- if (!hasRefreshRequestInitially) {
- // A fresh request came in, fulfill the request before going to sleep.
- continue;
- }
-
- _refreshRequest->set();
- _refreshRequest.reset();
+ // A fresh request came in, fulfill the request before going to sleep.
+ continue;
}
if (_inShutdown) {
@@ -289,24 +287,33 @@ void KeysCollectionManager::PeriodicRunner::_doPeriodicRefresh(ServiceContext* s
auto opCtx = cc().makeOperationContext();
MONGO_IDLE_THREAD_BLOCK;
- auto sleepStatus = opCtx->waitForConditionOrInterruptNoAssertUntil(
- _refreshNeededCV, lock, Date_t::now() + nextWakeup);
-
- if (ErrorCodes::isShutdownError(sleepStatus.getStatus().code())) {
- break;
+ try {
+ opCtx->waitForConditionOrInterruptFor(
+ _refreshNeededCV, lock, nextWakeup, [&]() -> bool {
+ return _inShutdown || _refreshRequest;
+ });
+ } catch (const DBException& e) {
+ LOG(1) << "Unable to wait for refresh request due to: " << e;
+
+ if (ErrorCodes::isShutdownError(e)) {
+ return;
+ }
}
}
-
- stdx::unique_lock<Latch> lock(_mutex);
- if (_refreshRequest) {
- _refreshRequest->set();
- _refreshRequest.reset();
- }
}
void KeysCollectionManager::PeriodicRunner::setFunc(RefreshFunc newRefreshStrategy) {
stdx::lock_guard<Latch> lock(_mutex);
+ if (_inShutdown) {
+ uasserted(ErrorCodes::ShutdownInProgress,
+ "aborting KeysCollectionManager::PeriodicRunner::setFunc because node is "
+ "shutting down");
+ }
+
_doRefresh = std::make_shared<RefreshFunc>(std::move(newRefreshStrategy));
+ if (!_refreshRequest) {
+ _refreshRequest = std::make_shared<Notification<void>>();
+ }
_refreshNeededCV.notify_all();
}
diff --git a/src/mongo/db/operation_context.h b/src/mongo/db/operation_context.h
index 43553efae9b..7ebef518aec 100644
--- a/src/mongo/db/operation_context.h
+++ b/src/mongo/db/operation_context.h
@@ -352,9 +352,6 @@ public:
*/
Microseconds getRemainingMaxTimeMicros() const;
- StatusWith<stdx::cv_status> waitForConditionOrInterruptNoAssertUntil(
- stdx::condition_variable& cv, BasicLockableAdapter m, Date_t deadline) noexcept override;
-
bool isIgnoringInterrupts() const;
/**
@@ -383,6 +380,9 @@ public:
}
private:
+ StatusWith<stdx::cv_status> waitForConditionOrInterruptNoAssertUntil(
+ stdx::condition_variable& cv, BasicLockableAdapter m, Date_t deadline) noexcept override;
+
IgnoreInterruptsState pushIgnoreInterrupts() override {
IgnoreInterruptsState iis{_ignoreInterrupts,
{_deadline, _timeoutError, _hasArtificialDeadline}};
diff --git a/src/mongo/db/operation_context_test.cpp b/src/mongo/db/operation_context_test.cpp
index d805541218c..cd2c741f51f 100644
--- a/src/mongo/db/operation_context_test.cpp
+++ b/src/mongo/db/operation_context_test.cpp
@@ -257,7 +257,7 @@ public:
auto m = MONGO_MAKE_LATCH();
stdx::condition_variable cv;
stdx::unique_lock<Latch> lk(m);
- opCtx->waitForConditionOrInterrupt(cv, lk);
+ opCtx->waitForConditionOrInterrupt(cv, lk, [] { return false; });
}
const std::shared_ptr<ClockSourceMock> mockClock = std::make_shared<ClockSourceMock>();
@@ -337,7 +337,9 @@ TEST_F(OperationDeadlineTests, WaitForMaxTimeExpiredCV) {
auto m = MONGO_MAKE_LATCH();
stdx::condition_variable cv;
stdx::unique_lock<Latch> lk(m);
- ASSERT_EQ(ErrorCodes::ExceededTimeLimit, opCtx->waitForConditionOrInterruptNoAssert(cv, lk));
+ ASSERT_THROWS_CODE(opCtx->waitForConditionOrInterrupt(cv, lk, [] { return false; }),
+ DBException,
+ ErrorCodes::ExceededTimeLimit);
}
TEST_F(OperationDeadlineTests, WaitForMaxTimeExpiredCVWithWaitUntilSet) {
@@ -346,10 +348,10 @@ TEST_F(OperationDeadlineTests, WaitForMaxTimeExpiredCVWithWaitUntilSet) {
auto m = MONGO_MAKE_LATCH();
stdx::condition_variable cv;
stdx::unique_lock<Latch> lk(m);
- ASSERT_EQ(
- ErrorCodes::ExceededTimeLimit,
- opCtx->waitForConditionOrInterruptNoAssertUntil(cv, lk, mockClock->now() + Seconds{10})
- .getStatus());
+ ASSERT_THROWS_CODE(opCtx->waitForConditionOrInterruptUntil(
+ cv, lk, mockClock->now() + Seconds{10}, [] { return false; }),
+ DBException,
+ ErrorCodes::ExceededTimeLimit);
}
TEST_F(OperationDeadlineTests, NestedTimeoutsTimeoutInOrder) {
@@ -601,7 +603,9 @@ TEST_F(OperationDeadlineTests, WaitForKilledOpCV) {
auto m = MONGO_MAKE_LATCH();
stdx::condition_variable cv;
stdx::unique_lock<Latch> lk(m);
- ASSERT_EQ(ErrorCodes::Interrupted, opCtx->waitForConditionOrInterruptNoAssert(cv, lk));
+ ASSERT_THROWS_CODE(opCtx->waitForConditionOrInterrupt(cv, lk, [] { return false; }),
+ DBException,
+ ErrorCodes::Interrupted);
}
TEST_F(OperationDeadlineTests, WaitForUntilExpiredCV) {
@@ -609,9 +613,8 @@ TEST_F(OperationDeadlineTests, WaitForUntilExpiredCV) {
auto m = MONGO_MAKE_LATCH();
stdx::condition_variable cv;
stdx::unique_lock<Latch> lk(m);
- ASSERT(stdx::cv_status::timeout ==
- unittest::assertGet(
- opCtx->waitForConditionOrInterruptNoAssertUntil(cv, lk, mockClock->now())));
+ ASSERT_FALSE(
+ opCtx->waitForConditionOrInterruptUntil(cv, lk, mockClock->now(), [] { return false; }));
}
TEST_F(OperationDeadlineTests, WaitForUntilExpiredCVWithMaxTimeSet) {
@@ -620,9 +623,8 @@ TEST_F(OperationDeadlineTests, WaitForUntilExpiredCVWithMaxTimeSet) {
auto m = MONGO_MAKE_LATCH();
stdx::condition_variable cv;
stdx::unique_lock<Latch> lk(m);
- ASSERT(stdx::cv_status::timeout ==
- unittest::assertGet(
- opCtx->waitForConditionOrInterruptNoAssertUntil(cv, lk, mockClock->now())));
+ ASSERT_FALSE(
+ opCtx->waitForConditionOrInterruptUntil(cv, lk, mockClock->now(), [] { return false; }));
}
TEST_F(OperationDeadlineTests, WaitForDurationExpired) {
@@ -640,8 +642,10 @@ TEST_F(OperationDeadlineTests, DuringWaitMaxTimeExpirationDominatesUntilExpirati
auto m = MONGO_MAKE_LATCH();
stdx::condition_variable cv;
stdx::unique_lock<Latch> lk(m);
- ASSERT(ErrorCodes::ExceededTimeLimit ==
- opCtx->waitForConditionOrInterruptNoAssertUntil(cv, lk, mockClock->now()));
+ ASSERT_THROWS_CODE(
+ opCtx->waitForConditionOrInterruptUntil(cv, lk, mockClock->now(), [] { return false; }),
+ DBException,
+ ErrorCodes::ExceededTimeLimit);
}
class ThreadedOperationDeadlineTests : public OperationDeadlineTests {
@@ -937,17 +941,17 @@ TEST_F(ThreadedOperationDeadlineTests, SleepForWithExpiredForDoesNotBlock) {
ASSERT_FALSE(waiterResult.get());
}
-TEST(OperationContextTest, TestWaitForConditionOrInterruptNoAssertUntilAPI) {
- // `waitForConditionOrInterruptNoAssertUntil` can have three outcomes:
+TEST(OperationContextTest, TestWaitForConditionOrInterruptUntilAPI) {
+ // `waitForConditionOrInterruptUntil` can have three outcomes:
//
// 1) The condition is satisfied before any timeouts.
// 2) The explicit `deadline` function argument is triggered.
// 3) The operation context implicitly times out, or is interrupted from a killOp command or
// shutdown, etc.
//
- // Case (1) must return a Status::OK with a value of `cv_status::no_timeout`. Case (2) must also
- // return a Status::OK with a value of `cv_status::timeout`. Case (3) must return an error
- // status. The error status returned is otherwise configurable.
+ // Case (1) must return true.
+ // Case (2) must return false.
+ // Case (3) must throw a DBException.
//
// Case (1) is the hardest to test. The condition variable must be notified by a second thread
// when the client is waiting on it. Case (1) is also the least in need of having the API
@@ -962,17 +966,16 @@ TEST(OperationContextTest, TestWaitForConditionOrInterruptNoAssertUntilAPI) {
// Case (2). Expect a Status::OK with a cv_status::timeout.
Date_t deadline = Date_t::now() + Milliseconds(500);
- StatusWith<stdx::cv_status> ret =
- opCtx->waitForConditionOrInterruptNoAssertUntil(cv, lk, deadline);
- ASSERT_OK(ret.getStatus());
- ASSERT(ret.getValue() == stdx::cv_status::timeout);
+ ASSERT_EQ(opCtx->waitForConditionOrInterruptUntil(cv, lk, deadline, [] { return false; }),
+ false);
// Case (3). Expect an error of `MaxTimeMSExpired`.
opCtx->setDeadlineByDate(Date_t::now(), ErrorCodes::MaxTimeMSExpired);
deadline = Date_t::now() + Seconds(500);
- ret = opCtx->waitForConditionOrInterruptNoAssertUntil(cv, lk, deadline);
- ASSERT_FALSE(ret.isOK());
- ASSERT_EQUALS(ErrorCodes::MaxTimeMSExpired, ret.getStatus().code());
+ ASSERT_THROWS_CODE(
+ opCtx->waitForConditionOrInterruptUntil(cv, lk, deadline, [] { return false; }),
+ DBException,
+ ErrorCodes::MaxTimeMSExpired);
}
} // namespace
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index d67ade26d70..7103ecf2864 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -1382,27 +1382,36 @@ Status ReplicationCoordinatorImpl::_waitUntilOpTime(OperationContext* opCtx,
return {ErrorCodes::ShutdownInProgress, "Shutdown in progress"};
}
- // If we are doing a majority committed read we only need to wait for a new snapshot.
if (isMajorityCommittedRead) {
+ // If we are doing a majority committed read we only need to wait for a new snapshot to
+ // update getCurrentOpTime() past targetOpTime. This block should only run once and
+ // return without further looping.
+
LOG(3) << "waitUntilOpTime: waiting for a new snapshot until " << opCtx->getDeadline();
- auto waitStatus =
- opCtx->waitForConditionOrInterruptNoAssert(_currentCommittedSnapshotCond, lock);
- if (!waitStatus.isOK()) {
- return waitStatus.withContext(
+ try {
+ opCtx->waitForConditionOrInterrupt(_currentCommittedSnapshotCond, lock, [&] {
+ return _inShutdown || (targetOpTime <= getCurrentOpTime());
+ });
+ } catch (const DBException& e) {
+ return e.toStatus().withContext(
str::stream() << "Error waiting for snapshot not less than "
<< targetOpTime.toString() << ", current relevant optime is "
<< getCurrentOpTime().toString() << ".");
}
- if (!_currentCommittedSnapshot) {
- // It is possible for the thread to be awoken due to a spurious wakeup, meaning
- // the condition variable was never set.
- LOG(3) << "waitUntilOpTime: awoken but current committed snapshot is null."
- << " Continuing to wait for new snapshot.";
- } else {
+
+ if (_inShutdown) {
+ return {ErrorCodes::ShutdownInProgress, "Shutdown in progress"};
+ }
+
+ if (_currentCommittedSnapshot) {
+ // It seems that targetOpTime can sometimes be default OpTime{}. When there is no
+ // _currentCommittedSnapshot, _getCurrentCommittedSnapshotOpTime_inlock() and thus
+ // getCurrentOpTime() also return default OpTime{}. Hence this branch that only runs
+ // if _currentCommittedSnapshot actually exists.
LOG(3) << "Got notified of new snapshot: " << _currentCommittedSnapshot->toString();
}
- continue;
+ return Status::OK();
}
// We just need to wait for the opTime to catch up to what we need (not majority RC).
@@ -4003,10 +4012,11 @@ void ReplicationCoordinatorImpl::waitUntilSnapshotCommitted(OperationContext* op
uassert(ErrorCodes::NotYetInitialized,
"Cannot use snapshots until replica set is finished initializing.",
_rsConfigState != kConfigUninitialized && _rsConfigState != kConfigInitiating);
- while (!_currentCommittedSnapshot ||
- _currentCommittedSnapshot->opTime.getTimestamp() < untilSnapshot) {
- opCtx->waitForConditionOrInterrupt(_currentCommittedSnapshotCond, lock);
- }
+
+ opCtx->waitForConditionOrInterrupt(_currentCommittedSnapshotCond, lock, [&] {
+ return _currentCommittedSnapshot &&
+ _currentCommittedSnapshot->opTime.getTimestamp() >= untilSnapshot;
+ });
}
size_t ReplicationCoordinatorImpl::getNumUncommittedSnapshots() {
diff --git a/src/mongo/db/s/implicit_create_collection.cpp b/src/mongo/db/s/implicit_create_collection.cpp
index 0154d3cff40..dfc8cd97cb2 100644
--- a/src/mongo/db/s/implicit_create_collection.cpp
+++ b/src/mongo/db/s/implicit_create_collection.cpp
@@ -72,16 +72,14 @@ public:
Status onCannotImplicitlyCreateCollection(OperationContext* opCtx) noexcept {
invariant(!opCtx->lockState()->isLocked());
- {
+ try {
stdx::unique_lock<Latch> lg(_mutex);
- while (_isInProgress) {
- auto status = opCtx->waitForConditionOrInterruptNoAssert(_cvIsInProgress, lg);
- if (!status.isOK()) {
- return status;
- }
- }
+
+ opCtx->waitForConditionOrInterrupt(_cvIsInProgress, lg, [&] { return !_isInProgress; });
_isInProgress = true;
+ } catch (const DBException& e) {
+ return e.toStatus();
}
ON_BLOCK_EXIT([&] {
diff --git a/src/mongo/db/s/wait_for_majority_service.cpp b/src/mongo/db/s/wait_for_majority_service.cpp
index f41ed83c630..1864335150a 100644
--- a/src/mongo/db/s/wait_for_majority_service.cpp
+++ b/src/mongo/db/s/wait_for_majority_service.cpp
@@ -27,6 +27,8 @@
* it in the license file.
*/
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding
+
#include "mongo/platform/basic.h"
#include "mongo/db/s/wait_for_majority_service.h"
@@ -38,6 +40,7 @@
#include "mongo/executor/network_interface_factory.h"
#include "mongo/executor/thread_pool_task_executor.h"
#include "mongo/util/concurrency/thread_pool.h"
+#include "mongo/util/log.h"
namespace mongo {
@@ -177,8 +180,11 @@ void WaitForMajorityService::_periodicallyWaitForMajority(ServiceContext* servic
_queuedOpTimes.erase(lowestOpTimeIter);
}
- if (_queuedOpTimes.empty() && !_inShutDown) {
- _opCtx->waitForConditionOrInterruptNoAssert(_hasNewOpTimeCV, lk).ignore();
+ try {
+ _opCtx->waitForConditionOrInterrupt(
+ _hasNewOpTimeCV, lk, [&] { return !_queuedOpTimes.empty() || _inShutDown; });
+ } catch (const DBException& e) {
+ LOG(1) << "Unable to wait for new op time due to: " << e;
}
_opCtx = nullptr;
diff --git a/src/mongo/db/s/wait_for_majority_service_test.cpp b/src/mongo/db/s/wait_for_majority_service_test.cpp
index ca89ac04c8b..1d7c975d385 100644
--- a/src/mongo/db/s/wait_for_majority_service_test.cpp
+++ b/src/mongo/db/s/wait_for_majority_service_test.cpp
@@ -79,14 +79,13 @@ public:
_waitForMajorityCallCount++;
_callCountChangedCV.notify_one();
- while (!_isTestReady) {
- auto status = opCtx->waitForConditionOrInterruptNoAssert(_isTestReadyCV, lk);
- if (!status.isOK()) {
- _isTestReady = false;
- _finishWaitingOneOpTimeCV.notify_one();
-
- return status;
- }
+ try {
+ opCtx->waitForConditionOrInterrupt(_isTestReadyCV, lk, [&] { return _isTestReady; });
+ } catch (const DBException& e) {
+ _isTestReady = false;
+ _finishWaitingOneOpTimeCV.notify_one();
+
+ return e.toStatus();
}
_lastOpTimeWaited = opTime;
diff --git a/src/mongo/db/session_catalog_test.cpp b/src/mongo/db/session_catalog_test.cpp
index 58e03aa20b4..cd086a9e8b0 100644
--- a/src/mongo/db/session_catalog_test.cpp
+++ b/src/mongo/db/session_catalog_test.cpp
@@ -603,8 +603,10 @@ TEST_F(SessionCatalogTestWithDefaultOpCtx, ConcurrentCheckOutAndKill) {
auto m = MONGO_MAKE_LATCH();
stdx::condition_variable cond;
stdx::unique_lock<Latch> lock(m);
- ASSERT_EQ(ErrorCodes::InternalError,
- _opCtx->waitForConditionOrInterruptNoAssert(cond, lock));
+ ASSERT_THROWS_CODE(
+ _opCtx->waitForConditionOrInterrupt(cond, lock, [] { return false; }),
+ DBException,
+ ErrorCodes::InternalError);
}
}
normalCheckOutFinish.get();
diff --git a/src/mongo/executor/thread_pool_task_executor.cpp b/src/mongo/executor/thread_pool_task_executor.cpp
index 8573e31d4a1..b7ec2eafb27 100644
--- a/src/mongo/executor/thread_pool_task_executor.cpp
+++ b/src/mongo/executor/thread_pool_task_executor.cpp
@@ -306,18 +306,18 @@ StatusWith<stdx::cv_status> ThreadPoolTaskExecutor::waitForEvent(OperationContex
auto eventState = checked_cast<EventState*>(getEventFromHandle(event));
stdx::unique_lock<Latch> lk(_mutex);
- // std::condition_variable::wait() can wake up spuriously, so we have to loop until the event
- // is signalled or we time out.
- while (!eventState->isSignaledFlag) {
- auto status = opCtx->waitForConditionOrInterruptNoAssertUntil(
- eventState->isSignaledCondition, lk, deadline);
-
- if (!status.isOK() || stdx::cv_status::timeout == status) {
- return status;
+ try {
+ if (opCtx->waitForConditionOrInterruptUntil(
+ eventState->isSignaledCondition, lk, deadline, [&] {
+ return eventState->isSignaledFlag;
+ })) {
+ return stdx::cv_status::no_timeout;
}
- }
- return stdx::cv_status::no_timeout;
+ return stdx::cv_status::timeout;
+ } catch (const DBException& e) {
+ return e.toStatus();
+ }
}
void ThreadPoolTaskExecutor::waitForEvent(const EventHandle& event) {
@@ -531,9 +531,9 @@ void ThreadPoolTaskExecutor::wait(const CallbackHandle& cbHandle, Interruptible*
if (!cbState->finishedCondition) {
cbState->finishedCondition.emplace();
}
- while (!cbState->isFinished.load()) {
- interruptible->waitForConditionOrInterrupt(*cbState->finishedCondition, lk);
- }
+
+ interruptible->waitForConditionOrInterrupt(
+ *cbState->finishedCondition, lk, [&] { return cbState->isFinished.load(); });
}
void ThreadPoolTaskExecutor::appendConnectionStats(ConnectionPoolStats* stats) const {
diff --git a/src/mongo/s/client/shard_registry.cpp b/src/mongo/s/client/shard_registry.cpp
index 95b3a726eff..16b1f8bf5f4 100644
--- a/src/mongo/s/client/shard_registry.cpp
+++ b/src/mongo/s/client/shard_registry.cpp
@@ -294,13 +294,13 @@ bool ShardRegistry::reload(OperationContext* opCtx) {
// There is also an issue if multiple threads are allowed to call getAllShards()
// simultaneously because there is no good way to determine which of the threads has the
// more recent version of the data.
- do {
- auto waitStatus = opCtx->waitForConditionOrInterruptNoAssert(_inReloadCV, reloadLock);
- if (!waitStatus.isOK()) {
- LOG(1) << "ShardRegistry reload is interrupted due to: " << redact(waitStatus);
- return false;
- }
- } while (_reloadState == ReloadState::Reloading);
+ try {
+ opCtx->waitForConditionOrInterrupt(
+ _inReloadCV, reloadLock, [&] { return _reloadState != ReloadState::Reloading; });
+ } catch (const DBException& e) {
+ LOG(1) << "ShardRegistry reload is interrupted due to: " << redact(e.toStatus());
+ return false;
+ }
if (_reloadState == ReloadState::Idle) {
return false;
diff --git a/src/mongo/util/future_test_utils.h b/src/mongo/util/future_test_utils.h
index 8de926e4f12..551e95b7b64 100644
--- a/src/mongo/util/future_test_utils.h
+++ b/src/mongo/util/future_test_utils.h
@@ -101,7 +101,7 @@ class DummyInterruptable final : public Interruptible {
MONGO_UNREACHABLE;
}
Date_t getExpirationDateForWaitForValue(Milliseconds waitFor) override {
- MONGO_UNREACHABLE;
+ return Date_t::now() + waitFor;
}
};
diff --git a/src/mongo/util/interruptible.h b/src/mongo/util/interruptible.h
index 6e182d6bbd7..d72be7b77d0 100644
--- a/src/mongo/util/interruptible.h
+++ b/src/mongo/util/interruptible.h
@@ -41,21 +41,90 @@ namespace mongo {
* A type which can be used to wait on condition variables with a level triggered one-way interrupt.
* I.e. after the interrupt is triggered (via some non-public api call) subsequent calls to
* waitForConditionXXX will fail. Interrupts must unblock all callers of waitForConditionXXX.
+ *
+ * This class should never be derived from directly. Instead, please derive from Interruptible.
*/
-class Interruptible {
+class InterruptibleBase {
+public:
+ virtual ~InterruptibleBase() = default;
+
+ /**
+ * Returns the deadline for this InterruptibleBase, or Date_t::max() if there is no deadline.
+ */
+ virtual Date_t getDeadline() const = 0;
+
+ /**
+ * Returns Status::OK() unless this operation is in a killed state.
+ */
+ virtual Status checkForInterruptNoAssert() noexcept = 0;
+
protected:
+ /**
+ * Same as Interruptible::waitForConditionOrInterruptUntil(), except returns
+ * StatusWith<stdx::cv_status> and a non-ok status indicates the error instead of a DBException.
+ */
+ virtual StatusWith<stdx::cv_status> waitForConditionOrInterruptNoAssertUntil(
+ stdx::condition_variable& cv, BasicLockableAdapter m, Date_t deadline) noexcept = 0;
+
struct DeadlineState {
Date_t deadline;
ErrorCodes::Error error;
bool hasArtificialDeadline;
};
+ /**
+ * Pushes a subsidiary deadline into the InterruptibleBase. Until an associated
+ * popArtificialDeadline() is invoked, the InterruptibleBase will fail checkForInterrupt() and
+ * waitForConditionOrInterrupt() calls with the passed error code if the deadline has passed.
+ *
+ * Note that deadline's higher than the current value are constrained (such that the passed
+ * error code will be returned/thrown, but after the min(oldDeadline, newDeadline) has passed).
+ *
+ * Returns state needed to pop the deadline.
+ */
+ virtual DeadlineState pushArtificialDeadline(Date_t deadline, ErrorCodes::Error error) = 0;
+
+ /**
+ * Pops the subsidiary deadline introduced by push.
+ */
+ virtual void popArtificialDeadline(DeadlineState) = 0;
+
+ /**
+ * Returns the equivalent of Date_t::now() + waitFor for the InterruptibleBase's clock
+ */
+ virtual Date_t getExpirationDateForWaitForValue(Milliseconds waitFor) = 0;
+
struct IgnoreInterruptsState {
bool ignoreInterrupts;
DeadlineState deadline;
};
/**
+ * Pushes an ignore interruption critical section into the InterruptibleBase.
+ * Until an associated popIgnoreInterrupts() is invoked, the InterruptibleBase should ignore
+ * interruptions related to explicit interruption or previously set deadlines.
+ *
+ * Note that new deadlines can be set after this is called, which will again introduce the
+ * possibility of interruption.
+ *
+ * Returns state needed to pop interruption.
+ */
+ virtual IgnoreInterruptsState pushIgnoreInterrupts() = 0;
+
+ /**
+ * Pops the ignored interruption critical section introduced by push.
+ */
+ virtual void popIgnoreInterrupts(IgnoreInterruptsState iis) = 0;
+};
+
+/**
+ * An derived class of InterruptibleBase which provides a variety of helper functions
+ *
+ * Please derive from this class instead of InterruptibleBase.
+ */
+class Interruptible : public InterruptibleBase {
+private:
+ /**
* A deadline guard provides a subsidiary deadline to the parent.
*/
class DeadlineGuard {
@@ -134,7 +203,7 @@ protected:
public:
/**
* Returns a statically allocated instance that cannot be interrupted. Useful as a default
- * argument to interruptible taking methods.
+ * argument to Interruptible-taking methods.
*/
static Interruptible* notInterruptible();
@@ -163,11 +232,6 @@ public:
}
/**
- * Returns the deadline for this interruptible, or Date_t::max() if there is no deadline.
- */
- virtual Date_t getDeadline() const = 0;
-
- /**
* Invokes the passed callback with an interruption guard active. Additionally handles the
* dance of try/catching the invocation and checking checkForInterrupt with the guard inactive
* (to allow a higher level timeout to override a lower level one, or for top level interruption
@@ -193,86 +257,9 @@ public:
}
/**
- * Returns Status::OK() unless this operation is in a killed state.
- */
- virtual Status checkForInterruptNoAssert() noexcept = 0;
-
- /**
- * Waits for either the condition "cv" to be signaled, this operation to be interrupted, or the
- * deadline on this operation to expire. In the event of interruption or operation deadline
- * expiration, raises a AssertionException with an error code indicating the interruption type.
- */
- template <typename LockT>
- void waitForConditionOrInterrupt(stdx::condition_variable& cv, LockT& m) {
- uassertStatusOK(waitForConditionOrInterruptNoAssert(cv, m));
- }
-
- /**
- * Waits on condition "cv" for "pred" until "pred" returns true, or this operation
- * is interrupted or its deadline expires. Throws a DBException for interruption and
- * deadline expiration.
- */
- template <typename LockT, typename PredicateT>
- void waitForConditionOrInterrupt(stdx::condition_variable& cv, LockT& m, PredicateT pred) {
- while (!pred()) {
- waitForConditionOrInterrupt(cv, m);
- }
- }
-
- /**
- * Same as waitForConditionOrInterrupt, except returns a Status instead of throwing
- * a DBException to report interruption.
- */
- template <typename LockT>
- Status waitForConditionOrInterruptNoAssert(stdx::condition_variable& cv, LockT& m) noexcept {
- auto status = waitForConditionOrInterruptNoAssertUntil(cv, m, Date_t::max());
- if (!status.isOK()) {
- return status.getStatus();
- }
-
- invariant(status.getValue() == stdx::cv_status::no_timeout);
- return Status::OK();
- }
-
- /**
- * Same as the predicate form of waitForConditionOrInterrupt, except that it returns a not okay
- * status instead of throwing on interruption.
- */
- template <typename LockT, typename PredicateT>
- Status waitForConditionOrInterruptNoAssert(stdx::condition_variable& cv,
- LockT& m,
- PredicateT pred) noexcept {
- while (!pred()) {
- auto status = waitForConditionOrInterruptNoAssert(cv, m);
-
- if (!status.isOK()) {
- return status;
- }
- }
-
- return Status::OK();
- }
-
- /**
- * Waits for condition "cv" to be signaled, or for the given "deadline" to expire, or
- * for the operation to be interrupted, or for the operation's own deadline to expire.
- *
- * If the operation deadline expires or the operation is interrupted, throws a DBException. If
- * the given "deadline" expires, returns cv_status::timeout. Otherwise, returns
- * cv_status::no_timeout.
- */
- template <typename LockT>
- stdx::cv_status waitForConditionOrInterruptUntil(stdx::condition_variable& cv,
- LockT& m,
- Date_t deadline) {
- return uassertStatusOK(waitForConditionOrInterruptNoAssertUntil(cv, m, deadline));
- }
-
- /**
* Waits on condition "cv" for "pred" until "pred" returns true, or the given "deadline"
* expires, or this operation is interrupted, or this operation's own deadline expires.
*
- *
* If the operation deadline expires or the operation is interrupted, throws a DBException. If
* the given "deadline" expires, returns cv_status::timeout. Otherwise, returns
* cv_status::no_timeout indicating that "pred" finally returned true.
@@ -282,24 +269,28 @@ public:
LockT& m,
Date_t deadline,
PredicateT pred) {
+ auto waitUntil = [&](Date_t deadline) {
+ // Wrapping this in a lambda because it's a mouthful
+ return uassertStatusOK(waitForConditionOrInterruptNoAssertUntil(cv, m, deadline));
+ };
+
while (!pred()) {
- if (stdx::cv_status::timeout == waitForConditionOrInterruptUntil(cv, m, deadline)) {
+ if (waitUntil(deadline) == stdx::cv_status::timeout) {
return pred();
}
- }
+ };
+
return true;
}
/**
- * Same as the non-predicate form of waitForConditionOrInterruptUntil, but takes a relative
- * amount of time to wait instead of an absolute time point.
+ * Waits on condition "cv" for "pred" until "pred" returns true, or this operation
+ * is interrupted or its deadline expires. Throws a DBException for interruption and
+ * deadline expiration.
*/
- template <typename LockT>
- stdx::cv_status waitForConditionOrInterruptFor(stdx::condition_variable& cv,
- LockT& m,
- Milliseconds ms) {
- return uassertStatusOK(
- waitForConditionOrInterruptNoAssertUntil(cv, m, getExpirationDateForWaitForValue(ms)));
+ template <typename LockT, typename PredicateT>
+ void waitForConditionOrInterrupt(stdx::condition_variable& cv, LockT& m, PredicateT pred) {
+ waitForConditionOrInterruptUntil(cv, m, Date_t::max(), std::move(pred));
}
/**
@@ -311,24 +302,12 @@ public:
LockT& m,
Milliseconds ms,
PredicateT pred) {
- const auto deadline = getExpirationDateForWaitForValue(ms);
- while (!pred()) {
- if (stdx::cv_status::timeout == waitForConditionOrInterruptUntil(cv, m, deadline)) {
- return pred();
- }
- }
- return true;
+ return waitForConditionOrInterruptUntil(
+ cv, m, getExpirationDateForWaitForValue(ms), std::move(pred));
}
/**
- * Same as waitForConditionOrInterruptUntil, except returns StatusWith<stdx::cv_status> and
- * non-ok status indicates the error instead of a DBException.
- */
- virtual StatusWith<stdx::cv_status> waitForConditionOrInterruptNoAssertUntil(
- stdx::condition_variable& cv, BasicLockableAdapter m, Date_t deadline) noexcept = 0;
-
- /**
- * Sleeps until "deadline"; throws an exception if the interruptible is interrupted before then.
+ * Sleeps until "deadline"; throws an exception if the Interruptible is interrupted before then.
*/
void sleepUntil(Date_t deadline) {
auto m = MONGO_MAKE_LATCH();
@@ -338,7 +317,7 @@ public:
}
/**
- * Sleeps for "duration" ms; throws an exception if the interruptible is interrupted before
+ * Sleeps for "duration" ms; throws an exception if the Interruptible is interrupted before
* then.
*/
void sleepFor(Milliseconds duration) {
@@ -349,51 +328,11 @@ public:
}
protected:
- /**
- * Pushes an ignore interruption critical section into the interruptible. Until an associated
- * popIgnoreInterrupts is invoked, the interruptible should ignore interruptions related to
- * explicit interruption or previously set deadlines.
- *
- * Note that new deadlines can be set after this is called, which will again introduce the
- * possibility of interruption.
- *
- * Returns state needed to pop interruption.
- */
- virtual IgnoreInterruptsState pushIgnoreInterrupts() = 0;
-
- /**
- * Pops the ignored interruption critical section introduced by push.
- */
- virtual void popIgnoreInterrupts(IgnoreInterruptsState iis) = 0;
-
- /**
- * Pushes a subsidiary deadline into the interruptible. Until an associated
- * popArtificialDeadline is
- * invoked, the interruptible will fail checkForInterrupt and waitForConditionOrInterrupt calls
- * with the passed error code if the deadline has passed.
- *
- * Note that deadline's higher than the current value are constrained (such that the passed
- * error code will be returned/thrown, but after the min(oldDeadline, newDeadline) has passed).
- *
- * Returns state needed to pop the deadline.
- */
- virtual DeadlineState pushArtificialDeadline(Date_t deadline, ErrorCodes::Error error) = 0;
-
- /**
- * Pops the subsidiary deadline introduced by push.
- */
- virtual void popArtificialDeadline(DeadlineState) = 0;
-
- /**
- * Returns the equivalent of Date_t::now() + waitFor for the interruptible's clock
- */
- virtual Date_t getExpirationDateForWaitForValue(Milliseconds waitFor) = 0;
-
class NotInterruptible;
};
/**
- * A not interruptible type which can be used as a lightweight default arg for interruptible taking
+ * A non-interruptible type which can be used as a lightweight default arg for Interruptible-taking
* functions.
*/
class Interruptible::NotInterruptible final : public Interruptible {
@@ -417,9 +356,9 @@ class Interruptible::NotInterruptible final : public Interruptible {
}
// It's invalid to call the deadline or ignore interruption guards on a possibly noop
- // interruptible.
+ // Interruptible.
//
- // The noop interruptible should only be invoked as a default arg at the bottom of the call
+ // The noop Interruptible should only be invoked as a default arg at the bottom of the call
// stack (with types that won't modify it's invocation)
IgnoreInterruptsState pushIgnoreInterrupts() override {
MONGO_UNREACHABLE;
diff --git a/src/mongo/watchdog/watchdog.cpp b/src/mongo/watchdog/watchdog.cpp
index 360b98a0be9..accc1b3b06d 100644
--- a/src/mongo/watchdog/watchdog.cpp
+++ b/src/mongo/watchdog/watchdog.cpp
@@ -151,21 +151,22 @@ void WatchdogPeriodicThread::doLoop() {
// Check if the period is different?
// We are signalled on period changes at which point we may be done waiting or need to
// wait longer.
- while (startTime + _period > preciseClockSource->now() &&
- _state != State::kShutdownRequested) {
- auto s = opCtx->waitForConditionOrInterruptNoAssertUntil(
- _condvar, lock, startTime + _period);
-
- if (!s.isOK()) {
- // The only bad status is when we are in shutdown
- if (!opCtx->getServiceContext()->getKillAllOperations()) {
- severe() << "Watchdog was interrupted, shutting down, reason: "
- << s.getStatus();
- exitCleanly(ExitCode::EXIT_ABRUPT);
- }
-
- return;
+ try {
+ opCtx->waitForConditionOrInterruptUntil(_condvar, lock, startTime + _period, [&] {
+ return (startTime + _period) <= preciseClockSource->now() ||
+ _state == State::kShutdownRequested;
+ });
+ } catch (const DBException& e) {
+ // The only bad status is when we are in shutdown
+ if (!opCtx->getServiceContext()->getKillAllOperations()) {
+ severe() << "Watchdog was interrupted, shutting down, reason: " << e.toStatus();
+ exitCleanly(ExitCode::EXIT_ABRUPT);
}
+
+ // This interruption ends the WatchdogPeriodicThread. This means it is possible to
+ // killOp this operation and stop it for the lifetime of the process.
+ LOG(1) << "WatchdogPeriodicThread interrupted by: " << e;
+ return;
}
// Are we done running?