summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBen Caimano <ben.caimano@mongodb.com>2019-11-12 20:37:46 +0000
committerevergreen <evergreen@mongodb.com>2019-11-12 20:37:46 +0000
commit5ecb2429a23af0ebf4af60d21e3b670a3045f563 (patch)
treefe4bb88350ca2ec2d69fac05e2cf608d8abf1595
parentb26181808380c2a448687d331dab7d19e9f9f2e4 (diff)
downloadmongo-5ecb2429a23af0ebf4af60d21e3b670a3045f563.tar.gz
SERVER-43987 Require predicates with OperationContext::waitForConditionOrInterrupt()
This commit also incoroporates parts of SERVER-44086 for backporting to v4.2.
-rw-r--r--jstests/core/currentop_waiting_for_latch.js2
-rw-r--r--src/mongo/db/commands/validate.cpp6
-rw-r--r--src/mongo/db/concurrency/flow_control_ticketholder.cpp42
-rw-r--r--src/mongo/db/keys_collection_manager.cpp57
-rw-r--r--src/mongo/db/operation_context.h6
-rw-r--r--src/mongo/db/operation_context_test.cpp57
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp82
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.h11
-rw-r--r--src/mongo/db/s/implicit_create_collection.cpp12
-rw-r--r--src/mongo/db/s/wait_for_majority_service.cpp10
-rw-r--r--src/mongo/db/s/wait_for_majority_service_test.cpp15
-rw-r--r--src/mongo/db/session_catalog_test.cpp6
-rw-r--r--src/mongo/executor/thread_pool_task_executor.cpp26
-rw-r--r--src/mongo/s/client/shard_registry.cpp14
-rw-r--r--src/mongo/util/clock_source.cpp4
-rw-r--r--src/mongo/util/diagnostic_info.cpp92
-rw-r--r--src/mongo/util/future_test_utils.h2
-rw-r--r--src/mongo/util/interruptible.h381
-rw-r--r--src/mongo/watchdog/watchdog.cpp29
19 files changed, 537 insertions, 317 deletions
diff --git a/jstests/core/currentop_waiting_for_latch.js b/jstests/core/currentop_waiting_for_latch.js
index aebfaddfcdb..c584e0d98bf 100644
--- a/jstests/core/currentop_waiting_for_latch.js
+++ b/jstests/core/currentop_waiting_for_latch.js
@@ -28,7 +28,7 @@ const getCurrentOp = function() {
const blockedOpClients = {
"DiagnosticCaptureTestLatch": {"seen": false},
- // "DiagnosticCaptureTestInterruptible": {"seen": false},
+ "DiagnosticCaptureTestInterruptible": {"seen": false},
};
const getClientName = function() {
diff --git a/src/mongo/db/commands/validate.cpp b/src/mongo/db/commands/validate.cpp
index 171163a8b2a..ff22256383c 100644
--- a/src/mongo/db/commands/validate.cpp
+++ b/src/mongo/db/commands/validate.cpp
@@ -154,9 +154,9 @@ public:
{
stdx::unique_lock<Latch> lock(_validationMutex);
try {
- while (_validationsInProgress.find(nss.ns()) != _validationsInProgress.end()) {
- opCtx->waitForConditionOrInterrupt(_validationNotifier, lock);
- }
+ opCtx->waitForConditionOrInterrupt(_validationNotifier, lock, [&] {
+ return _validationsInProgress.find(nss.ns()) == _validationsInProgress.end();
+ });
} catch (AssertionException& e) {
CommandHelpers::appendCommandStatusNoThrow(
result,
diff --git a/src/mongo/db/concurrency/flow_control_ticketholder.cpp b/src/mongo/db/concurrency/flow_control_ticketholder.cpp
index 6bb95797502..b8f6217665c 100644
--- a/src/mongo/db/concurrency/flow_control_ticketholder.cpp
+++ b/src/mongo/db/concurrency/flow_control_ticketholder.cpp
@@ -98,31 +98,25 @@ void FlowControlTicketholder::getTicket(OperationContext* opCtx,
++stats->acquireWaitCount;
}
- // Make sure operations already waiting on a Flow Control ticket during shut down do not
- // hang if the ticket refresher thread has been shut down.
- while (_tickets == 0 && !_inShutdown) {
- stats->waiting = true;
- const std::uint64_t startWaitTime = curTimeMicros64();
-
- // This method will wait forever for a ticket. However, it will wake up every so often to
- // update the time spent waiting on the ticket.
- auto waitDeadline = Date_t::now() + Milliseconds(500);
- StatusWith<stdx::cv_status> swCondStatus =
- opCtx->waitForConditionOrInterruptNoAssertUntil(_cv, lk, waitDeadline);
-
- auto waitTime = curTimeMicros64() - startWaitTime;
- _totalTimeAcquiringMicros.fetchAndAddRelaxed(waitTime);
- stats->timeAcquiringMicros += waitTime;
-
- // If the operation context state interrupted this wait, the StatusWith result will contain
- // the error. If the `waitDeadline` expired, the Status variable will be OK, and the
- // `cv_status` value will be `cv_status::timeout`. In either case where Status::OK is
- // returned, the loop must re-check the predicate. If the operation context is interrupted
- // (and an error status is returned), the intended behavior is to bubble an exception up to
- // the user.
- uassertStatusOK(swCondStatus);
+ auto currentWaitTime = curTimeMicros64();
+ auto updateTotalTime = [&]() {
+ auto oldWaitTime = std::exchange(currentWaitTime, curTimeMicros64());
+ _totalTimeAcquiringMicros.fetchAndAddRelaxed(currentWaitTime - oldWaitTime);
+ };
+
+ stats->waiting = true;
+ ON_BLOCK_EXIT([&] {
+ // When this block exits, update the time one last time and note that getTicket() is no
+ // longer waiting.
+ updateTotalTime();
+ stats->waiting = false;
+ });
+
+ // getTicket() should block until there are tickets or the Ticketholder is in shutdown
+ while (!opCtx->waitForConditionOrInterruptFor(
+ _cv, lk, Milliseconds(500), [&] { return _tickets > 0 || _inShutdown; })) {
+ updateTotalTime();
}
- stats->waiting = false;
if (_inShutdown) {
return;
diff --git a/src/mongo/db/keys_collection_manager.cpp b/src/mongo/db/keys_collection_manager.cpp
index 92834348382..77037f80c84 100644
--- a/src/mongo/db/keys_collection_manager.cpp
+++ b/src/mongo/db/keys_collection_manager.cpp
@@ -199,12 +199,10 @@ void KeysCollectionManager::PeriodicRunner::refreshNow(OperationContext* opCtx)
"aborting keys cache refresh because node is shutting down");
}
- if (_refreshRequest) {
- return _refreshRequest;
+ if (!_refreshRequest) {
+ _refreshRequest = std::make_shared<Notification<void>>();
}
-
_refreshNeededCV.notify_all();
- _refreshRequest = std::make_shared<Notification<void>>();
return _refreshRequest;
}();
@@ -222,8 +220,9 @@ void KeysCollectionManager::PeriodicRunner::_doPeriodicRefresh(ServiceContext* s
ThreadClient tc(threadName, service);
while (true) {
- bool hasRefreshRequestInitially = false;
unsigned errorCount = 0;
+
+ decltype(_refreshRequest) request;
std::shared_ptr<RefreshFunc> doRefresh;
{
stdx::lock_guard<Latch> lock(_mutex);
@@ -234,7 +233,7 @@ void KeysCollectionManager::PeriodicRunner::_doPeriodicRefresh(ServiceContext* s
invariant(_doRefresh.get() != nullptr);
doRefresh = _doRefresh;
- hasRefreshRequestInitially = _refreshRequest.get() != nullptr;
+ request = std::move(_refreshRequest);
}
Milliseconds nextWakeup = kRefreshIntervalIfErrored;
@@ -262,6 +261,11 @@ void KeysCollectionManager::PeriodicRunner::_doPeriodicRefresh(ServiceContext* s
nextWakeup = kMaxRefreshWaitTime;
}
}
+
+ // Notify all waiters that the refresh has finished and they can move on
+ if (request) {
+ request->set();
+ }
}
MONGO_FAIL_POINT_BLOCK(maxKeyRefreshWaitTimeOverrideMS, data) {
@@ -273,15 +277,9 @@ void KeysCollectionManager::PeriodicRunner::_doPeriodicRefresh(ServiceContext* s
}
stdx::unique_lock<Latch> lock(_mutex);
-
if (_refreshRequest) {
- if (!hasRefreshRequestInitially) {
- // A fresh request came in, fulfill the request before going to sleep.
- continue;
- }
-
- _refreshRequest->set();
- _refreshRequest.reset();
+ // A fresh request came in, fulfill the request before going to sleep.
+ continue;
}
if (_inShutdown) {
@@ -292,24 +290,33 @@ void KeysCollectionManager::PeriodicRunner::_doPeriodicRefresh(ServiceContext* s
auto opCtx = cc().makeOperationContext();
MONGO_IDLE_THREAD_BLOCK;
- auto sleepStatus = opCtx->waitForConditionOrInterruptNoAssertUntil(
- _refreshNeededCV, lock, Date_t::now() + nextWakeup);
-
- if (ErrorCodes::isShutdownError(sleepStatus.getStatus().code())) {
- break;
+ try {
+ opCtx->waitForConditionOrInterruptFor(
+ _refreshNeededCV, lock, nextWakeup, [&]() -> bool {
+ return _inShutdown || _refreshRequest;
+ });
+ } catch (const DBException& e) {
+ LOG(1) << "Unable to wait for refresh request due to: " << e;
+
+ if (ErrorCodes::isShutdownError(e.code())) {
+ return;
+ }
}
}
-
- stdx::unique_lock<Latch> lock(_mutex);
- if (_refreshRequest) {
- _refreshRequest->set();
- _refreshRequest.reset();
- }
}
void KeysCollectionManager::PeriodicRunner::setFunc(RefreshFunc newRefreshStrategy) {
stdx::lock_guard<Latch> lock(_mutex);
+ if (_inShutdown) {
+ uasserted(ErrorCodes::ShutdownInProgress,
+ "aborting KeysCollectionManager::PeriodicRunner::setFunc because node is "
+ "shutting down");
+ }
+
_doRefresh = std::make_shared<RefreshFunc>(std::move(newRefreshStrategy));
+ if (!_refreshRequest) {
+ _refreshRequest = std::make_shared<Notification<void>>();
+ }
_refreshNeededCV.notify_all();
}
diff --git a/src/mongo/db/operation_context.h b/src/mongo/db/operation_context.h
index 20a367a2092..ed5d6c88abd 100644
--- a/src/mongo/db/operation_context.h
+++ b/src/mongo/db/operation_context.h
@@ -352,9 +352,6 @@ public:
*/
Microseconds getRemainingMaxTimeMicros() const;
- StatusWith<stdx::cv_status> waitForConditionOrInterruptNoAssertUntil(
- stdx::condition_variable& cv, BasicLockableAdapter m, Date_t deadline) noexcept override;
-
bool isIgnoringInterrupts() const;
/**
@@ -374,6 +371,9 @@ public:
}
private:
+ StatusWith<stdx::cv_status> waitForConditionOrInterruptNoAssertUntil(
+ stdx::condition_variable& cv, BasicLockableAdapter m, Date_t deadline) noexcept override;
+
IgnoreInterruptsState pushIgnoreInterrupts() override {
IgnoreInterruptsState iis{_ignoreInterrupts,
{_deadline, _timeoutError, _hasArtificialDeadline}};
diff --git a/src/mongo/db/operation_context_test.cpp b/src/mongo/db/operation_context_test.cpp
index 00fa8734534..cd01b72dbb2 100644
--- a/src/mongo/db/operation_context_test.cpp
+++ b/src/mongo/db/operation_context_test.cpp
@@ -257,7 +257,7 @@ public:
auto m = MONGO_MAKE_LATCH();
stdx::condition_variable cv;
stdx::unique_lock<Latch> lk(m);
- opCtx->waitForConditionOrInterrupt(cv, lk);
+ opCtx->waitForConditionOrInterrupt(cv, lk, [] { return false; });
}
const std::shared_ptr<ClockSourceMock> mockClock = std::make_shared<ClockSourceMock>();
@@ -337,7 +337,9 @@ TEST_F(OperationDeadlineTests, WaitForMaxTimeExpiredCV) {
auto m = MONGO_MAKE_LATCH();
stdx::condition_variable cv;
stdx::unique_lock<Latch> lk(m);
- ASSERT_EQ(ErrorCodes::ExceededTimeLimit, opCtx->waitForConditionOrInterruptNoAssert(cv, lk));
+ ASSERT_THROWS_CODE(opCtx->waitForConditionOrInterrupt(cv, lk, [] { return false; }),
+ DBException,
+ ErrorCodes::ExceededTimeLimit);
}
TEST_F(OperationDeadlineTests, WaitForMaxTimeExpiredCVWithWaitUntilSet) {
@@ -346,10 +348,10 @@ TEST_F(OperationDeadlineTests, WaitForMaxTimeExpiredCVWithWaitUntilSet) {
auto m = MONGO_MAKE_LATCH();
stdx::condition_variable cv;
stdx::unique_lock<Latch> lk(m);
- ASSERT_EQ(
- ErrorCodes::ExceededTimeLimit,
- opCtx->waitForConditionOrInterruptNoAssertUntil(cv, lk, mockClock->now() + Seconds{10})
- .getStatus());
+ ASSERT_THROWS_CODE(opCtx->waitForConditionOrInterruptUntil(
+ cv, lk, mockClock->now() + Seconds{10}, [] { return false; }),
+ DBException,
+ ErrorCodes::ExceededTimeLimit);
}
TEST_F(OperationDeadlineTests, NestedTimeoutsTimeoutInOrder) {
@@ -601,7 +603,9 @@ TEST_F(OperationDeadlineTests, WaitForKilledOpCV) {
auto m = MONGO_MAKE_LATCH();
stdx::condition_variable cv;
stdx::unique_lock<Latch> lk(m);
- ASSERT_EQ(ErrorCodes::Interrupted, opCtx->waitForConditionOrInterruptNoAssert(cv, lk));
+ ASSERT_THROWS_CODE(opCtx->waitForConditionOrInterrupt(cv, lk, [] { return false; }),
+ DBException,
+ ErrorCodes::Interrupted);
}
TEST_F(OperationDeadlineTests, WaitForUntilExpiredCV) {
@@ -609,9 +613,8 @@ TEST_F(OperationDeadlineTests, WaitForUntilExpiredCV) {
auto m = MONGO_MAKE_LATCH();
stdx::condition_variable cv;
stdx::unique_lock<Latch> lk(m);
- ASSERT(stdx::cv_status::timeout ==
- unittest::assertGet(
- opCtx->waitForConditionOrInterruptNoAssertUntil(cv, lk, mockClock->now())));
+ ASSERT_FALSE(
+ opCtx->waitForConditionOrInterruptUntil(cv, lk, mockClock->now(), [] { return false; }));
}
TEST_F(OperationDeadlineTests, WaitForUntilExpiredCVWithMaxTimeSet) {
@@ -620,9 +623,8 @@ TEST_F(OperationDeadlineTests, WaitForUntilExpiredCVWithMaxTimeSet) {
auto m = MONGO_MAKE_LATCH();
stdx::condition_variable cv;
stdx::unique_lock<Latch> lk(m);
- ASSERT(stdx::cv_status::timeout ==
- unittest::assertGet(
- opCtx->waitForConditionOrInterruptNoAssertUntil(cv, lk, mockClock->now())));
+ ASSERT_FALSE(
+ opCtx->waitForConditionOrInterruptUntil(cv, lk, mockClock->now(), [] { return false; }));
}
TEST_F(OperationDeadlineTests, WaitForDurationExpired) {
@@ -640,8 +642,10 @@ TEST_F(OperationDeadlineTests, DuringWaitMaxTimeExpirationDominatesUntilExpirati
auto m = MONGO_MAKE_LATCH();
stdx::condition_variable cv;
stdx::unique_lock<Latch> lk(m);
- ASSERT(ErrorCodes::ExceededTimeLimit ==
- opCtx->waitForConditionOrInterruptNoAssertUntil(cv, lk, mockClock->now()));
+ ASSERT_THROWS_CODE(
+ opCtx->waitForConditionOrInterruptUntil(cv, lk, mockClock->now(), [] { return false; }),
+ DBException,
+ ErrorCodes::ExceededTimeLimit);
}
class ThreadedOperationDeadlineTests : public OperationDeadlineTests {
@@ -937,17 +941,17 @@ TEST_F(ThreadedOperationDeadlineTests, SleepForWithExpiredForDoesNotBlock) {
ASSERT_FALSE(waiterResult.get());
}
-TEST(OperationContextTest, TestWaitForConditionOrInterruptNoAssertUntilAPI) {
- // `waitForConditionOrInterruptNoAssertUntil` can have three outcomes:
+TEST(OperationContextTest, TestWaitForConditionOrInterruptUntilAPI) {
+ // `waitForConditionOrInterruptUntil` can have three outcomes:
//
// 1) The condition is satisfied before any timeouts.
// 2) The explicit `deadline` function argument is triggered.
// 3) The operation context implicitly times out, or is interrupted from a killOp command or
// shutdown, etc.
//
- // Case (1) must return a Status::OK with a value of `cv_status::no_timeout`. Case (2) must also
- // return a Status::OK with a value of `cv_status::timeout`. Case (3) must return an error
- // status. The error status returned is otherwise configurable.
+ // Case (1) must return true.
+ // Case (2) must return false.
+ // Case (3) must throw a DBException.
//
// Case (1) is the hardest to test. The condition variable must be notified by a second thread
// when the client is waiting on it. Case (1) is also the least in need of having the API
@@ -962,17 +966,16 @@ TEST(OperationContextTest, TestWaitForConditionOrInterruptNoAssertUntilAPI) {
// Case (2). Expect a Status::OK with a cv_status::timeout.
Date_t deadline = Date_t::now() + Milliseconds(500);
- StatusWith<stdx::cv_status> ret =
- opCtx->waitForConditionOrInterruptNoAssertUntil(cv, lk, deadline);
- ASSERT_OK(ret.getStatus());
- ASSERT(ret.getValue() == stdx::cv_status::timeout);
+ ASSERT_EQ(opCtx->waitForConditionOrInterruptUntil(cv, lk, deadline, [] { return false; }),
+ false);
// Case (3). Expect an error of `MaxTimeMSExpired`.
opCtx->setDeadlineByDate(Date_t::now(), ErrorCodes::MaxTimeMSExpired);
deadline = Date_t::now() + Seconds(500);
- ret = opCtx->waitForConditionOrInterruptNoAssertUntil(cv, lk, deadline);
- ASSERT_FALSE(ret.isOK());
- ASSERT_EQUALS(ErrorCodes::MaxTimeMSExpired, ret.getStatus().code());
+ ASSERT_THROWS_CODE(
+ opCtx->waitForConditionOrInterruptUntil(cv, lk, deadline, [] { return false; }),
+ DBException,
+ ErrorCodes::MaxTimeMSExpired);
}
} // namespace
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index 060e425238b..378e1e3ba47 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -202,6 +202,8 @@ ReplicationCoordinatorImpl::ThreadWaiter::ThreadWaiter(OpTime _opTime,
void ReplicationCoordinatorImpl::ThreadWaiter::notify_inlock() {
invariant(condVar);
+
+ ++notifyCount;
condVar->notify_all();
}
@@ -1418,27 +1420,36 @@ Status ReplicationCoordinatorImpl::_waitUntilOpTime(OperationContext* opCtx,
return {ErrorCodes::ShutdownInProgress, "Shutdown in progress"};
}
- // If we are doing a majority committed read we only need to wait for a new snapshot.
if (isMajorityCommittedRead) {
+ // If we are doing a majority committed read we only need to wait for a new snapshot to
+ // update getCurrentOpTime() past targetOpTime. This block should only run once and
+ // return without further looping.
+
LOG(3) << "waitUntilOpTime: waiting for a new snapshot until " << opCtx->getDeadline();
- auto waitStatus =
- opCtx->waitForConditionOrInterruptNoAssert(_currentCommittedSnapshotCond, lock);
- if (!waitStatus.isOK()) {
- return waitStatus.withContext(
+ try {
+ opCtx->waitForConditionOrInterrupt(_currentCommittedSnapshotCond, lock, [&] {
+ return _inShutdown || (targetOpTime <= getCurrentOpTime());
+ });
+ } catch (const DBException& e) {
+ return e.toStatus().withContext(
str::stream() << "Error waiting for snapshot not less than "
<< targetOpTime.toString() << ", current relevant optime is "
<< getCurrentOpTime().toString() << ".");
}
- if (!_currentCommittedSnapshot) {
- // It is possible for the thread to be awoken due to a spurious wakeup, meaning
- // the condition variable was never set.
- LOG(3) << "waitUntilOpTime: awoken but current committed snapshot is null."
- << " Continuing to wait for new snapshot.";
- } else {
+
+ if (_inShutdown) {
+ return {ErrorCodes::ShutdownInProgress, "Shutdown in progress"};
+ }
+
+ if (_currentCommittedSnapshot) {
+ // It seems that targetOpTime can sometimes be default OpTime{}. When there is no
+ // _currentCommittedSnapshot, _getCurrentCommittedSnapshotOpTime_inlock() and thus
+ // getCurrentOpTime() also return default OpTime{}. Hence this branch that only runs
+ // if _currentCommittedSnapshot actually exists.
LOG(3) << "Got notified of new snapshot: " << _currentCommittedSnapshot->toString();
}
- continue;
+ return Status::OK();
}
// We just need to wait for the opTime to catch up to what we need (not majority RC).
@@ -1449,20 +1460,14 @@ Status ReplicationCoordinatorImpl::_waitUntilOpTime(OperationContext* opCtx,
LOG(3) << "waitUntilOpTime: OpID " << opCtx->getOpID() << " is waiting for OpTime "
<< waiter << " until " << opCtx->getDeadline();
- auto waitStatus = Status::OK();
- if (deadline) {
- auto waitUntilStatus =
- opCtx->waitForConditionOrInterruptNoAssertUntil(condVar, lock, *deadline);
- waitStatus = waitUntilStatus.getStatus();
- } else {
- waitStatus = opCtx->waitForConditionOrInterruptNoAssert(condVar, lock);
- }
-
- if (!waitStatus.isOK()) {
- return waitStatus.withContext(str::stream()
- << "Error waiting for optime " << targetOpTime.toString()
- << ", current relevant optime is "
- << getCurrentOpTime().toString() << ".");
+ try {
+ opCtx->waitForConditionOrInterruptUntil(
+ condVar, lock, deadline.value_or(Date_t::max()), waiter.makePredicate());
+ } catch (const DBException& e) {
+ return e.toStatus().withContext(str::stream() << "Error waiting for optime "
+ << targetOpTime.toString()
+ << ", current relevant optime is "
+ << getCurrentOpTime().toString() << ".");
}
// If deadline is set no need to wait until the targetTime time is reached.
@@ -1799,13 +1804,13 @@ Status ReplicationCoordinatorImpl::_awaitReplication_inlock(
return {ErrorCodes::ShutdownInProgress, "Replication is being shut down"};
}
- auto status = opCtx->waitForConditionOrInterruptNoAssertUntil(condVar, *lock, wTimeoutDate);
- if (!status.isOK()) {
- return status.getStatus();
- }
-
- if (status.getValue() == stdx::cv_status::timeout) {
- return {ErrorCodes::WriteConcernFailed, "waiting for replication timed out"};
+ try {
+ if (!opCtx->waitForConditionOrInterruptUntil(
+ condVar, *lock, wTimeoutDate, waiter.makePredicate())) {
+ return {ErrorCodes::WriteConcernFailed, "waiting for replication timed out"};
+ }
+ } catch (const DBException& e) {
+ return e.toStatus();
}
stepdownStatus = checkForStepDown();
@@ -2103,7 +2108,7 @@ void ReplicationCoordinatorImpl::stepDown(OperationContext* opCtx,
// tryToStartStepDown again will cause tryToStartStepDown to return ExceededTimeLimit
// with the proper error message.
opCtx->waitForConditionOrInterruptUntil(
- condVar, lk, std::min(stepDownUntil, waitUntil));
+ condVar, lk, std::min(stepDownUntil, waitUntil), waiter.makePredicate());
}
}
@@ -3965,10 +3970,11 @@ void ReplicationCoordinatorImpl::waitUntilSnapshotCommitted(OperationContext* op
uassert(ErrorCodes::NotYetInitialized,
"Cannot use snapshots until replica set is finished initializing.",
_rsConfigState != kConfigUninitialized && _rsConfigState != kConfigInitiating);
- while (!_currentCommittedSnapshot ||
- _currentCommittedSnapshot->opTime.getTimestamp() < untilSnapshot) {
- opCtx->waitForConditionOrInterrupt(_currentCommittedSnapshotCond, lock);
- }
+
+ opCtx->waitForConditionOrInterrupt(_currentCommittedSnapshotCond, lock, [&] {
+ return _currentCommittedSnapshot &&
+ _currentCommittedSnapshot->opTime.getTimestamp() >= untilSnapshot;
+ });
}
size_t ReplicationCoordinatorImpl::getNumUncommittedSnapshots() {
diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h
index 07bdf226a80..c42b47cf73a 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.h
+++ b/src/mongo/db/repl/replication_coordinator_impl.h
@@ -607,6 +607,17 @@ private:
return false;
}
+ // This predicate is an extremely simple way to never miss a notification. When the
+ // ThreadWater::notify_inlock() is called, notifyCount is incremented. As should be obvious,
+ // this predicate should only be called "inlock" as well. The ThreadWaiter itself goes away
+ // with SERVER-43135 so this function and notifyCount as a concept is a v4.2 only measure.
+ auto makePredicate() {
+ return [this, startingNotifyCount = notifyCount]() -> bool {
+ return notifyCount != startingNotifyCount;
+ };
+ }
+
+ uint64_t notifyCount = 0;
stdx::condition_variable* condVar = nullptr;
};
diff --git a/src/mongo/db/s/implicit_create_collection.cpp b/src/mongo/db/s/implicit_create_collection.cpp
index a0a3d6068f9..63dfa503123 100644
--- a/src/mongo/db/s/implicit_create_collection.cpp
+++ b/src/mongo/db/s/implicit_create_collection.cpp
@@ -72,16 +72,14 @@ public:
Status onCannotImplicitlyCreateCollection(OperationContext* opCtx) noexcept {
invariant(!opCtx->lockState()->isLocked());
- {
+ try {
stdx::unique_lock<Latch> lg(_mutex);
- while (_isInProgress) {
- auto status = opCtx->waitForConditionOrInterruptNoAssert(_cvIsInProgress, lg);
- if (!status.isOK()) {
- return status;
- }
- }
+
+ opCtx->waitForConditionOrInterrupt(_cvIsInProgress, lg, [&] { return !_isInProgress; });
_isInProgress = true;
+ } catch (const DBException& e) {
+ return e.toStatus();
}
ON_BLOCK_EXIT([&] {
diff --git a/src/mongo/db/s/wait_for_majority_service.cpp b/src/mongo/db/s/wait_for_majority_service.cpp
index f41ed83c630..1864335150a 100644
--- a/src/mongo/db/s/wait_for_majority_service.cpp
+++ b/src/mongo/db/s/wait_for_majority_service.cpp
@@ -27,6 +27,8 @@
* it in the license file.
*/
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding
+
#include "mongo/platform/basic.h"
#include "mongo/db/s/wait_for_majority_service.h"
@@ -38,6 +40,7 @@
#include "mongo/executor/network_interface_factory.h"
#include "mongo/executor/thread_pool_task_executor.h"
#include "mongo/util/concurrency/thread_pool.h"
+#include "mongo/util/log.h"
namespace mongo {
@@ -177,8 +180,11 @@ void WaitForMajorityService::_periodicallyWaitForMajority(ServiceContext* servic
_queuedOpTimes.erase(lowestOpTimeIter);
}
- if (_queuedOpTimes.empty() && !_inShutDown) {
- _opCtx->waitForConditionOrInterruptNoAssert(_hasNewOpTimeCV, lk).ignore();
+ try {
+ _opCtx->waitForConditionOrInterrupt(
+ _hasNewOpTimeCV, lk, [&] { return !_queuedOpTimes.empty() || _inShutDown; });
+ } catch (const DBException& e) {
+ LOG(1) << "Unable to wait for new op time due to: " << e;
}
_opCtx = nullptr;
diff --git a/src/mongo/db/s/wait_for_majority_service_test.cpp b/src/mongo/db/s/wait_for_majority_service_test.cpp
index ca89ac04c8b..1d7c975d385 100644
--- a/src/mongo/db/s/wait_for_majority_service_test.cpp
+++ b/src/mongo/db/s/wait_for_majority_service_test.cpp
@@ -79,14 +79,13 @@ public:
_waitForMajorityCallCount++;
_callCountChangedCV.notify_one();
- while (!_isTestReady) {
- auto status = opCtx->waitForConditionOrInterruptNoAssert(_isTestReadyCV, lk);
- if (!status.isOK()) {
- _isTestReady = false;
- _finishWaitingOneOpTimeCV.notify_one();
-
- return status;
- }
+ try {
+ opCtx->waitForConditionOrInterrupt(_isTestReadyCV, lk, [&] { return _isTestReady; });
+ } catch (const DBException& e) {
+ _isTestReady = false;
+ _finishWaitingOneOpTimeCV.notify_one();
+
+ return e.toStatus();
}
_lastOpTimeWaited = opTime;
diff --git a/src/mongo/db/session_catalog_test.cpp b/src/mongo/db/session_catalog_test.cpp
index 151924e2709..631b342f073 100644
--- a/src/mongo/db/session_catalog_test.cpp
+++ b/src/mongo/db/session_catalog_test.cpp
@@ -602,8 +602,10 @@ TEST_F(SessionCatalogTestWithDefaultOpCtx, ConcurrentCheckOutAndKill) {
auto m = MONGO_MAKE_LATCH();
stdx::condition_variable cond;
stdx::unique_lock<Latch> lock(m);
- ASSERT_EQ(ErrorCodes::InternalError,
- _opCtx->waitForConditionOrInterruptNoAssert(cond, lock));
+ ASSERT_THROWS_CODE(
+ _opCtx->waitForConditionOrInterrupt(cond, lock, [] { return false; }),
+ DBException,
+ ErrorCodes::InternalError);
}
}
normalCheckOutFinish.get();
diff --git a/src/mongo/executor/thread_pool_task_executor.cpp b/src/mongo/executor/thread_pool_task_executor.cpp
index f07e1d476c5..f8096eecb04 100644
--- a/src/mongo/executor/thread_pool_task_executor.cpp
+++ b/src/mongo/executor/thread_pool_task_executor.cpp
@@ -306,18 +306,18 @@ StatusWith<stdx::cv_status> ThreadPoolTaskExecutor::waitForEvent(OperationContex
auto eventState = checked_cast<EventState*>(getEventFromHandle(event));
stdx::unique_lock<Latch> lk(_mutex);
- // std::condition_variable::wait() can wake up spuriously, so we have to loop until the event
- // is signalled or we time out.
- while (!eventState->isSignaledFlag) {
- auto status = opCtx->waitForConditionOrInterruptNoAssertUntil(
- eventState->isSignaledCondition, lk, deadline);
-
- if (!status.isOK() || stdx::cv_status::timeout == status) {
- return status;
+ try {
+ if (opCtx->waitForConditionOrInterruptUntil(
+ eventState->isSignaledCondition, lk, deadline, [&] {
+ return eventState->isSignaledFlag;
+ })) {
+ return stdx::cv_status::no_timeout;
}
- }
- return stdx::cv_status::no_timeout;
+ return stdx::cv_status::timeout;
+ } catch (const DBException& e) {
+ return e.toStatus();
+ }
}
void ThreadPoolTaskExecutor::waitForEvent(const EventHandle& event) {
@@ -531,9 +531,9 @@ void ThreadPoolTaskExecutor::wait(const CallbackHandle& cbHandle, Interruptible*
if (!cbState->finishedCondition) {
cbState->finishedCondition.emplace();
}
- while (!cbState->isFinished.load()) {
- interruptible->waitForConditionOrInterrupt(*cbState->finishedCondition, lk);
- }
+
+ interruptible->waitForConditionOrInterrupt(
+ *cbState->finishedCondition, lk, [&] { return cbState->isFinished.load(); });
}
void ThreadPoolTaskExecutor::appendConnectionStats(ConnectionPoolStats* stats) const {
diff --git a/src/mongo/s/client/shard_registry.cpp b/src/mongo/s/client/shard_registry.cpp
index 38c4bc6976d..f8e547592dd 100644
--- a/src/mongo/s/client/shard_registry.cpp
+++ b/src/mongo/s/client/shard_registry.cpp
@@ -291,13 +291,13 @@ bool ShardRegistry::reload(OperationContext* opCtx) {
// There is also an issue if multiple threads are allowed to call getAllShards()
// simultaneously because there is no good way to determine which of the threads has the
// more recent version of the data.
- do {
- auto waitStatus = opCtx->waitForConditionOrInterruptNoAssert(_inReloadCV, reloadLock);
- if (!waitStatus.isOK()) {
- LOG(1) << "ShardRegistry reload is interrupted due to: " << redact(waitStatus);
- return false;
- }
- } while (_reloadState == ReloadState::Reloading);
+ try {
+ opCtx->waitForConditionOrInterrupt(
+ _inReloadCV, reloadLock, [&] { return _reloadState != ReloadState::Reloading; });
+ } catch (const DBException& e) {
+ LOG(1) << "ShardRegistry reload is interrupted due to: " << redact(e.toStatus());
+ return false;
+ }
if (_reloadState == ReloadState::Idle) {
return false;
diff --git a/src/mongo/util/clock_source.cpp b/src/mongo/util/clock_source.cpp
index a3fb3aabeb1..1031b8ed621 100644
--- a/src/mongo/util/clock_source.cpp
+++ b/src/mongo/util/clock_source.cpp
@@ -29,8 +29,10 @@
#include "mongo/platform/basic.h"
-#include "mongo/stdx/thread.h"
#include "mongo/util/clock_source.h"
+
+#include "mongo/platform/mutex.h"
+#include "mongo/stdx/thread.h"
#include "mongo/util/waitable.h"
namespace mongo {
diff --git a/src/mongo/util/diagnostic_info.cpp b/src/mongo/util/diagnostic_info.cpp
index 9a6710f8a10..566040236e5 100644
--- a/src/mongo/util/diagnostic_info.cpp
+++ b/src/mongo/util/diagnostic_info.cpp
@@ -42,6 +42,7 @@
#include "mongo/db/client.h"
#include "mongo/platform/mutex.h"
#include "mongo/util/clock_source.h"
+#include "mongo/util/fail_point.h"
#include "mongo/util/fail_point_service.h"
#include "mongo/util/interruptible.h"
#include "mongo/util/log.h"
@@ -74,6 +75,16 @@ private:
Mutex mutex = MONGO_MAKE_LATCH(kBlockedOpMutexName);
};
LatchState _latchState;
+
+ struct InterruptibleState {
+ bool isWaiting = false;
+ boost::optional<stdx::thread> thread{boost::none};
+
+ stdx::condition_variable cv;
+ Mutex mutex = MONGO_MAKE_LATCH(kBlockedOpInterruptibleName);
+ bool isDone = false;
+ };
+ InterruptibleState _interruptibleState;
} gBlockedOp;
// This function causes us to make an additional thread with a self-contended lock so that
@@ -83,6 +94,7 @@ void BlockedOp::start(ServiceContext* serviceContext) {
stdx::unique_lock<stdx::mutex> lk(_m);
invariant(!_latchState.thread);
+ invariant(!_interruptibleState.thread);
_latchState.mutex.lock();
_latchState.thread = stdx::thread([this, serviceContext]() mutable {
@@ -95,7 +107,21 @@ void BlockedOp::start(ServiceContext* serviceContext) {
log() << "Joining currentOpSpawnsThreadWaitingForLatch thread";
});
- _cv.wait(lk, [this] { return _latchState.isContended; });
+ _interruptibleState.thread = stdx::thread([this, serviceContext]() mutable {
+ ThreadClient tc("DiagnosticCaptureTestInterruptible", serviceContext);
+ auto opCtx = tc->makeOperationContext();
+
+ log() << "Entered currentOpSpawnsThreadWaitingForLatch thread for interruptibles";
+ stdx::unique_lock lk(_interruptibleState.mutex);
+ opCtx->waitForConditionOrInterrupt(
+ _interruptibleState.cv, lk, [&] { return _interruptibleState.isDone; });
+ _interruptibleState.isDone = false;
+
+ log() << "Joining currentOpSpawnsThreadWaitingForLatch thread for interruptibles";
+ });
+
+
+ _cv.wait(lk, [this] { return _latchState.isContended && _interruptibleState.isWaiting; });
log() << "Started threads for currentOpSpawnsThreadWaitingForLatch";
}
@@ -103,18 +129,29 @@ void BlockedOp::start(ServiceContext* serviceContext) {
// remaining
void BlockedOp::join() {
decltype(_latchState.thread) latchThread;
+ decltype(_interruptibleState.thread) interruptibleThread;
{
stdx::lock_guard<stdx::mutex> lk(_m);
invariant(_latchState.thread);
+ invariant(_interruptibleState.thread);
_latchState.mutex.unlock();
_latchState.isContended = false;
+ {
+ stdx::lock_guard lk(_interruptibleState.mutex);
+ _interruptibleState.isDone = true;
+ _interruptibleState.cv.notify_one();
+ }
+ _interruptibleState.isWaiting = false;
+
std::swap(_latchState.thread, latchThread);
+ std::swap(_interruptibleState.thread, interruptibleThread);
}
latchThread->join();
+ interruptibleThread->join();
}
void BlockedOp::setIsContended(bool value) {
@@ -124,6 +161,13 @@ void BlockedOp::setIsContended(bool value) {
_cv.notify_one();
}
+void BlockedOp::setIsWaiting(bool value) {
+ log() << "Setting isWaiting to " << (value ? "true" : "false");
+ stdx::lock_guard lk(_m);
+ _interruptibleState.isWaiting = value;
+ _cv.notify_one();
+}
+
struct DiagnosticInfoHandle {
stdx::mutex mutex; // NOLINT
std::forward_list<DiagnosticInfo> list;
@@ -171,6 +215,52 @@ MONGO_INITIALIZER(LockListener)(InitializerContext* context) {
return Status::OK();
}
+MONGO_INITIALIZER(InterruptibleWaitListener)(InitializerContext* context) {
+ class WaitListener : public Interruptible::WaitListener {
+ using WakeReason = Interruptible::WakeReason;
+ using WakeSpeed = Interruptible::WakeSpeed;
+
+ void addInfo(const StringData& name) {
+ if (auto client = Client::getCurrent()) {
+ auto& handle = getDiagnosticInfoHandle(client);
+ stdx::lock_guard<stdx::mutex> lk(handle.mutex);
+ handle.list.emplace_front(DiagnosticInfo::capture(name));
+
+ if (currentOpSpawnsThreadWaitingForLatch.shouldFail() &&
+ (name == kBlockedOpInterruptibleName)) {
+ gBlockedOp.setIsWaiting(true);
+ }
+ }
+ }
+
+ void removeInfo(const StringData& name) {
+ if (auto client = Client::getCurrent()) {
+ auto& handle = getDiagnosticInfoHandle(client);
+ stdx::lock_guard<stdx::mutex> lk(handle.mutex);
+
+ invariant(!handle.list.empty());
+ handle.list.pop_front();
+ }
+ }
+
+ void onLongSleep(const StringData& name) override {
+ addInfo(name);
+ }
+
+ void onWake(const StringData& name, WakeReason, WakeSpeed speed) override {
+ if (speed == WakeSpeed::kSlow) {
+ removeInfo(name);
+ }
+ }
+ };
+
+ // Intentionally leaked, people can use in detached threads
+ static auto& listener = *new WaitListener();
+ Interruptible::addWaitListener(&listener);
+
+ return Status::OK();
+}
+
} // namespace
bool operator==(const DiagnosticInfo& info1, const DiagnosticInfo& info2) {
diff --git a/src/mongo/util/future_test_utils.h b/src/mongo/util/future_test_utils.h
index 98ce1984660..925b5c4758f 100644
--- a/src/mongo/util/future_test_utils.h
+++ b/src/mongo/util/future_test_utils.h
@@ -101,7 +101,7 @@ class DummyInterruptable final : public Interruptible {
MONGO_UNREACHABLE;
}
Date_t getExpirationDateForWaitForValue(Milliseconds waitFor) override {
- MONGO_UNREACHABLE;
+ return Date_t::now() + waitFor;
}
};
diff --git a/src/mongo/util/interruptible.h b/src/mongo/util/interruptible.h
index 6e182d6bbd7..3a036f55201 100644
--- a/src/mongo/util/interruptible.h
+++ b/src/mongo/util/interruptible.h
@@ -29,9 +29,13 @@
#pragma once
+#include <vector>
+
+#include "mongo/platform/mutex.h"
#include "mongo/stdx/condition_variable.h"
-#include "mongo/stdx/mutex.h"
+#include "mongo/util/concepts.h"
#include "mongo/util/lockable_adapter.h"
+#include "mongo/util/scopeguard.h"
#include "mongo/util/time_support.h"
#include "mongo/util/waitable.h"
@@ -41,21 +45,90 @@ namespace mongo {
* A type which can be used to wait on condition variables with a level triggered one-way interrupt.
* I.e. after the interrupt is triggered (via some non-public api call) subsequent calls to
* waitForConditionXXX will fail. Interrupts must unblock all callers of waitForConditionXXX.
+ *
+ * This class should never be derived from directly. Instead, please derive from Interruptible.
*/
-class Interruptible {
+class InterruptibleBase {
+public:
+ virtual ~InterruptibleBase() = default;
+
+ /**
+ * Returns the deadline for this InterruptibleBase, or Date_t::max() if there is no deadline.
+ */
+ virtual Date_t getDeadline() const = 0;
+
+ /**
+ * Returns Status::OK() unless this operation is in a killed state.
+ */
+ virtual Status checkForInterruptNoAssert() noexcept = 0;
+
protected:
+ /**
+ * Same as Interruptible::waitForConditionOrInterruptUntil(), except returns
+ * StatusWith<stdx::cv_status> and a non-ok status indicates the error instead of a DBException.
+ */
+ virtual StatusWith<stdx::cv_status> waitForConditionOrInterruptNoAssertUntil(
+ stdx::condition_variable& cv, BasicLockableAdapter m, Date_t deadline) noexcept = 0;
+
struct DeadlineState {
Date_t deadline;
ErrorCodes::Error error;
bool hasArtificialDeadline;
};
+ /**
+ * Pushes a subsidiary deadline into the InterruptibleBase. Until an associated
+ * popArtificialDeadline() is invoked, the InterruptibleBase will fail checkForInterrupt() and
+ * waitForConditionOrInterrupt() calls with the passed error code if the deadline has passed.
+ *
+ * Note that deadline's higher than the current value are constrained (such that the passed
+ * error code will be returned/thrown, but after the min(oldDeadline, newDeadline) has passed).
+ *
+ * Returns state needed to pop the deadline.
+ */
+ virtual DeadlineState pushArtificialDeadline(Date_t deadline, ErrorCodes::Error error) = 0;
+
+ /**
+ * Pops the subsidiary deadline introduced by push.
+ */
+ virtual void popArtificialDeadline(DeadlineState) = 0;
+
+ /**
+ * Returns the equivalent of Date_t::now() + waitFor for the InterruptibleBase's clock
+ */
+ virtual Date_t getExpirationDateForWaitForValue(Milliseconds waitFor) = 0;
+
struct IgnoreInterruptsState {
bool ignoreInterrupts;
DeadlineState deadline;
};
/**
+ * Pushes an ignore interruption critical section into the InterruptibleBase.
+ * Until an associated popIgnoreInterrupts() is invoked, the InterruptibleBase should ignore
+ * interruptions related to explicit interruption or previously set deadlines.
+ *
+ * Note that new deadlines can be set after this is called, which will again introduce the
+ * possibility of interruption.
+ *
+ * Returns state needed to pop interruption.
+ */
+ virtual IgnoreInterruptsState pushIgnoreInterrupts() = 0;
+
+ /**
+ * Pops the ignored interruption critical section introduced by push.
+ */
+ virtual void popIgnoreInterrupts(IgnoreInterruptsState iis) = 0;
+};
+
+/**
+ * An derived class of InterruptibleBase which provides a variety of helper functions
+ *
+ * Please derive from this class instead of InterruptibleBase.
+ */
+class Interruptible : public InterruptibleBase {
+private:
+ /**
* A deadline guard provides a subsidiary deadline to the parent.
*/
class DeadlineGuard {
@@ -132,9 +205,39 @@ protected:
}
public:
+ class WaitListener;
+
+ /**
+ * Enum to convey why an Interruptible woke up
+ */
+ enum class WakeReason {
+ kPredicate,
+ kTimeout,
+ kInterrupt,
+ };
+
+ static constexpr auto kFastWakeTimeout = Milliseconds(100);
+
+ /**
+ * Enum to convey if an Interruptible woke up before or after kFastWakeTimeout
+ */
+ enum class WakeSpeed {
+ kFast,
+ kSlow,
+ };
+
+ /**
+ * This function adds a WaitListener subclass to the triggers for certain actions.
+ *
+ * WaitListeners can only be added and not removed. If you wish to deactivate a WaitListeners
+ * subclass, please provide the switch on that subclass to noop its functions. It is only safe
+ * to add a WaitListener during a MONGO_INITIALIZER.
+ */
+ static void addWaitListener(WaitListener* listnener);
+
/**
* Returns a statically allocated instance that cannot be interrupted. Useful as a default
- * argument to interruptible taking methods.
+ * argument to Interruptible-taking methods.
*/
static Interruptible* notInterruptible();
@@ -163,11 +266,6 @@ public:
}
/**
- * Returns the deadline for this interruptible, or Date_t::max() if there is no deadline.
- */
- virtual Date_t getDeadline() const = 0;
-
- /**
* Invokes the passed callback with an interruption guard active. Additionally handles the
* dance of try/catching the invocation and checking checkForInterrupt with the guard inactive
* (to allow a higher level timeout to override a lower level one, or for top level interruption
@@ -193,86 +291,26 @@ public:
}
/**
- * Returns Status::OK() unless this operation is in a killed state.
- */
- virtual Status checkForInterruptNoAssert() noexcept = 0;
-
- /**
- * Waits for either the condition "cv" to be signaled, this operation to be interrupted, or the
- * deadline on this operation to expire. In the event of interruption or operation deadline
- * expiration, raises a AssertionException with an error code indicating the interruption type.
- */
- template <typename LockT>
- void waitForConditionOrInterrupt(stdx::condition_variable& cv, LockT& m) {
- uassertStatusOK(waitForConditionOrInterruptNoAssert(cv, m));
- }
-
- /**
- * Waits on condition "cv" for "pred" until "pred" returns true, or this operation
- * is interrupted or its deadline expires. Throws a DBException for interruption and
- * deadline expiration.
- */
- template <typename LockT, typename PredicateT>
- void waitForConditionOrInterrupt(stdx::condition_variable& cv, LockT& m, PredicateT pred) {
- while (!pred()) {
- waitForConditionOrInterrupt(cv, m);
- }
- }
-
- /**
- * Same as waitForConditionOrInterrupt, except returns a Status instead of throwing
- * a DBException to report interruption.
+ * Get the name for a Latch
*/
- template <typename LockT>
- Status waitForConditionOrInterruptNoAssert(stdx::condition_variable& cv, LockT& m) noexcept {
- auto status = waitForConditionOrInterruptNoAssertUntil(cv, m, Date_t::max());
- if (!status.isOK()) {
- return status.getStatus();
- }
-
- invariant(status.getValue() == stdx::cv_status::no_timeout);
- return Status::OK();
+ TEMPLATE(typename LatchT)
+ REQUIRES(std::is_base_of_v<Latch, LatchT>) //
+ static StringData getLatchName(const stdx::unique_lock<LatchT>& lk) {
+ return lk.mutex()->getName();
}
/**
- * Same as the predicate form of waitForConditionOrInterrupt, except that it returns a not okay
- * status instead of throwing on interruption.
- */
- template <typename LockT, typename PredicateT>
- Status waitForConditionOrInterruptNoAssert(stdx::condition_variable& cv,
- LockT& m,
- PredicateT pred) noexcept {
- while (!pred()) {
- auto status = waitForConditionOrInterruptNoAssert(cv, m);
-
- if (!status.isOK()) {
- return status;
- }
- }
-
- return Status::OK();
- }
-
- /**
- * Waits for condition "cv" to be signaled, or for the given "deadline" to expire, or
- * for the operation to be interrupted, or for the operation's own deadline to expire.
- *
- * If the operation deadline expires or the operation is interrupted, throws a DBException. If
- * the given "deadline" expires, returns cv_status::timeout. Otherwise, returns
- * cv_status::no_timeout.
+ * Get a placeholder name for an arbitrary type
*/
template <typename LockT>
- stdx::cv_status waitForConditionOrInterruptUntil(stdx::condition_variable& cv,
- LockT& m,
- Date_t deadline) {
- return uassertStatusOK(waitForConditionOrInterruptNoAssertUntil(cv, m, deadline));
+ static constexpr StringData getLatchName(const LockT&) {
+ return "AnonymousLockable"_sd;
}
/**
* Waits on condition "cv" for "pred" until "pred" returns true, or the given "deadline"
* expires, or this operation is interrupted, or this operation's own deadline expires.
*
- *
* If the operation deadline expires or the operation is interrupted, throws a DBException. If
* the given "deadline" expires, returns cv_status::timeout. Otherwise, returns
* cv_status::no_timeout indicating that "pred" finally returned true.
@@ -280,26 +318,83 @@ public:
template <typename LockT, typename PredicateT>
bool waitForConditionOrInterruptUntil(stdx::condition_variable& cv,
LockT& m,
- Date_t deadline,
+ Date_t finalDeadline,
PredicateT pred) {
- while (!pred()) {
- if (stdx::cv_status::timeout == waitForConditionOrInterruptUntil(cv, m, deadline)) {
- return pred();
+ auto latchName = getLatchName(m);
+
+ auto waitUntil = [&](Date_t deadline, WakeSpeed speed) -> boost::optional<WakeReason> {
+ // If the result of waitForConditionOrInterruptNoAssertUntil() is non-spurious, return
+ // a WakeReason. Otherwise, return boost::none
+
+ auto swResult = waitForConditionOrInterruptNoAssertUntil(cv, m, deadline);
+ if (!swResult.isOK()) {
+ _onWake(latchName, WakeReason::kInterrupt, speed);
+ uassertStatusOK(std::move(swResult));
+ }
+
+ if (pred()) {
+ _onWake(latchName, WakeReason::kPredicate, speed);
+ return WakeReason::kPredicate;
}
+
+ if (swResult.getValue() == stdx::cv_status::timeout) {
+ _onWake(latchName, WakeReason::kTimeout, speed);
+ return WakeReason::kTimeout;
+ }
+
+ return boost::none;
+ };
+
+ auto waitUntilNonSpurious = [&](Date_t deadline, WakeSpeed speed) -> WakeReason {
+ // Check waitUntil() in a loop until it says it has a genuine WakeReason
+
+ if (pred()) {
+ // Check for the predicate first, just in case
+ _onWake(latchName, WakeReason::kPredicate, speed);
+ return WakeReason::kPredicate;
+ }
+
+ auto maybeWakeReason = waitUntil(deadline, speed);
+ while (!maybeWakeReason) {
+ maybeWakeReason = waitUntil(deadline, speed);
+ };
+
+ return *maybeWakeReason;
+ };
+
+ const auto traceDeadline = getExpirationDateForWaitForValue(kFastWakeTimeout);
+ const auto firstDeadline = std::min(traceDeadline, finalDeadline);
+
+ // Wait for the first deadline
+ if (auto wakeReason = waitUntilNonSpurious(firstDeadline, WakeSpeed::kFast);
+ wakeReason == WakeReason::kPredicate) {
+ // If our first wait fulfilled our predicate then return true
+ return true;
+ } else if (firstDeadline == finalDeadline) {
+ // If we didn't fulfill our predicate but finalDeadline was less than traceDeadline,
+ // then the wait should return false
+ return false;
+ }
+
+ _onLongSleep(latchName);
+
+ // Wait for the final deadline
+ if (auto wakeReason = waitUntilNonSpurious(finalDeadline, WakeSpeed::kSlow);
+ wakeReason == WakeReason::kPredicate) {
+ return true;
+ } else {
+ return false;
}
- return true;
}
/**
- * Same as the non-predicate form of waitForConditionOrInterruptUntil, but takes a relative
- * amount of time to wait instead of an absolute time point.
+ * Waits on condition "cv" for "pred" until "pred" returns true, or this operation
+ * is interrupted or its deadline expires. Throws a DBException for interruption and
+ * deadline expiration.
*/
- template <typename LockT>
- stdx::cv_status waitForConditionOrInterruptFor(stdx::condition_variable& cv,
- LockT& m,
- Milliseconds ms) {
- return uassertStatusOK(
- waitForConditionOrInterruptNoAssertUntil(cv, m, getExpirationDateForWaitForValue(ms)));
+ template <typename LockT, typename PredicateT>
+ void waitForConditionOrInterrupt(stdx::condition_variable& cv, LockT& m, PredicateT pred) {
+ waitForConditionOrInterruptUntil(cv, m, Date_t::max(), std::move(pred));
}
/**
@@ -311,24 +406,12 @@ public:
LockT& m,
Milliseconds ms,
PredicateT pred) {
- const auto deadline = getExpirationDateForWaitForValue(ms);
- while (!pred()) {
- if (stdx::cv_status::timeout == waitForConditionOrInterruptUntil(cv, m, deadline)) {
- return pred();
- }
- }
- return true;
+ return waitForConditionOrInterruptUntil(
+ cv, m, getExpirationDateForWaitForValue(ms), std::move(pred));
}
/**
- * Same as waitForConditionOrInterruptUntil, except returns StatusWith<stdx::cv_status> and
- * non-ok status indicates the error instead of a DBException.
- */
- virtual StatusWith<stdx::cv_status> waitForConditionOrInterruptNoAssertUntil(
- stdx::condition_variable& cv, BasicLockableAdapter m, Date_t deadline) noexcept = 0;
-
- /**
- * Sleeps until "deadline"; throws an exception if the interruptible is interrupted before then.
+ * Sleeps until "deadline"; throws an exception if the Interruptible is interrupted before then.
*/
void sleepUntil(Date_t deadline) {
auto m = MONGO_MAKE_LATCH();
@@ -338,7 +421,7 @@ public:
}
/**
- * Sleeps for "duration" ms; throws an exception if the interruptible is interrupted before
+ * Sleeps for "duration" ms; throws an exception if the Interruptible is interrupted before
* then.
*/
void sleepFor(Milliseconds duration) {
@@ -349,51 +432,69 @@ public:
}
protected:
- /**
- * Pushes an ignore interruption critical section into the interruptible. Until an associated
- * popIgnoreInterrupts is invoked, the interruptible should ignore interruptions related to
- * explicit interruption or previously set deadlines.
- *
- * Note that new deadlines can be set after this is called, which will again introduce the
- * possibility of interruption.
- *
- * Returns state needed to pop interruption.
- */
- virtual IgnoreInterruptsState pushIgnoreInterrupts() = 0;
+ class NotInterruptible;
- /**
- * Pops the ignored interruption critical section introduced by push.
- */
- virtual void popIgnoreInterrupts(IgnoreInterruptsState iis) = 0;
+ void _onLongSleep(const StringData& name);
+ void _onWake(const StringData& name, WakeReason reason, WakeSpeed speed);
+ static auto& _getListenerState() {
+ struct State {
+ std::vector<WaitListener*> list;
+ };
+
+ // Note that state should no longer be mutated after init-time (ala MONGO_INITIALIZERS). If
+ // this changes, than this state needs to be synchronized.
+ static State state;
+ return state;
+ }
+};
+
+/**
+ * A set of event handles for an Interruptible type
+ */
+class Interruptible::WaitListener {
+public:
/**
- * Pushes a subsidiary deadline into the interruptible. Until an associated
- * popArtificialDeadline is
- * invoked, the interruptible will fail checkForInterrupt and waitForConditionOrInterrupt calls
- * with the passed error code if the deadline has passed.
+ * Action to do when a wait does not resolve quickly
*
- * Note that deadline's higher than the current value are constrained (such that the passed
- * error code will be returned/thrown, but after the min(oldDeadline, newDeadline) has passed).
- *
- * Returns state needed to pop the deadline.
+ * Any implementation of this function must be safe to invoke when an Interruptible-associated
+ * latch is held. As this is hard to reason about, avoid external latches whenever possible.
*/
- virtual DeadlineState pushArtificialDeadline(Date_t deadline, ErrorCodes::Error error) = 0;
+ virtual void onLongSleep(const StringData& name) = 0;
/**
- * Pops the subsidiary deadline introduced by push.
+ * Action to do when a wait resolves after a sleep
+ *
+ * Any implementation of this function must be safe to invoke when an Interruptible-associated
+ * latch is held. As this is hard to reason about, avoid external latches whenever possible.
*/
- virtual void popArtificialDeadline(DeadlineState) = 0;
+ virtual void onWake(const StringData& name, WakeReason reason, WakeSpeed speed) = 0;
+};
- /**
- * Returns the equivalent of Date_t::now() + waitFor for the interruptible's clock
- */
- virtual Date_t getExpirationDateForWaitForValue(Milliseconds waitFor) = 0;
+inline void Interruptible::addWaitListener(WaitListener* listener) {
+ auto& state = _getListenerState();
- class NotInterruptible;
-};
+ state.list.push_back(listener);
+}
+
+inline void Interruptible::_onLongSleep(const StringData& name) {
+ auto& state = _getListenerState();
+
+ for (auto listener : state.list) {
+ listener->onLongSleep(name);
+ }
+}
+
+inline void Interruptible::_onWake(const StringData& name, WakeReason reason, WakeSpeed speed) {
+ auto& state = _getListenerState();
+
+ for (auto listener : state.list) {
+ listener->onWake(name, reason, speed);
+ }
+}
/**
- * A not interruptible type which can be used as a lightweight default arg for interruptible taking
+ * A non-interruptible type which can be used as a lightweight default arg for Interruptible-taking
* functions.
*/
class Interruptible::NotInterruptible final : public Interruptible {
@@ -417,9 +518,9 @@ class Interruptible::NotInterruptible final : public Interruptible {
}
// It's invalid to call the deadline or ignore interruption guards on a possibly noop
- // interruptible.
+ // Interruptible.
//
- // The noop interruptible should only be invoked as a default arg at the bottom of the call
+ // The noop Interruptible should only be invoked as a default arg at the bottom of the call
// stack (with types that won't modify it's invocation)
IgnoreInterruptsState pushIgnoreInterrupts() override {
MONGO_UNREACHABLE;
diff --git a/src/mongo/watchdog/watchdog.cpp b/src/mongo/watchdog/watchdog.cpp
index 3dff62a1086..cc2cb5942a5 100644
--- a/src/mongo/watchdog/watchdog.cpp
+++ b/src/mongo/watchdog/watchdog.cpp
@@ -151,21 +151,22 @@ void WatchdogPeriodicThread::doLoop() {
// Check if the period is different?
// We are signalled on period changes at which point we may be done waiting or need to
// wait longer.
- while (startTime + _period > preciseClockSource->now() &&
- _state != State::kShutdownRequested) {
- auto s = opCtx->waitForConditionOrInterruptNoAssertUntil(
- _condvar, lock, startTime + _period);
-
- if (!s.isOK()) {
- // The only bad status is when we are in shutdown
- if (!opCtx->getServiceContext()->getKillAllOperations()) {
- severe() << "Watchdog was interrupted, shutting down, reason: "
- << s.getStatus();
- exitCleanly(ExitCode::EXIT_ABRUPT);
- }
-
- return;
+ try {
+ opCtx->waitForConditionOrInterruptUntil(_condvar, lock, startTime + _period, [&] {
+ return (startTime + _period) <= preciseClockSource->now() ||
+ _state == State::kShutdownRequested;
+ });
+ } catch (const DBException& e) {
+ // The only bad status is when we are in shutdown
+ if (!opCtx->getServiceContext()->getKillAllOperations()) {
+ severe() << "Watchdog was interrupted, shutting down, reason: " << e.toStatus();
+ exitCleanly(ExitCode::EXIT_ABRUPT);
}
+
+ // This interruption ends the WatchdogPeriodicThread. This means it is possible to
+ // killOp this operation and stop it for the lifetime of the process.
+ LOG(1) << "WatchdogPeriodicThread interrupted by: " << e;
+ return;
}
// Are we done running?