diff options
author | Ben Caimano <ben.caimano@mongodb.com> | 2019-11-12 20:37:46 +0000 |
---|---|---|
committer | evergreen <evergreen@mongodb.com> | 2019-11-12 20:37:46 +0000 |
commit | 5ecb2429a23af0ebf4af60d21e3b670a3045f563 (patch) | |
tree | fe4bb88350ca2ec2d69fac05e2cf608d8abf1595 | |
parent | b26181808380c2a448687d331dab7d19e9f9f2e4 (diff) | |
download | mongo-5ecb2429a23af0ebf4af60d21e3b670a3045f563.tar.gz |
SERVER-43987 Require predicates with OperationContext::waitForConditionOrInterrupt()
This commit also incoroporates parts of SERVER-44086 for backporting to
v4.2.
19 files changed, 537 insertions, 317 deletions
diff --git a/jstests/core/currentop_waiting_for_latch.js b/jstests/core/currentop_waiting_for_latch.js index aebfaddfcdb..c584e0d98bf 100644 --- a/jstests/core/currentop_waiting_for_latch.js +++ b/jstests/core/currentop_waiting_for_latch.js @@ -28,7 +28,7 @@ const getCurrentOp = function() { const blockedOpClients = { "DiagnosticCaptureTestLatch": {"seen": false}, - // "DiagnosticCaptureTestInterruptible": {"seen": false}, + "DiagnosticCaptureTestInterruptible": {"seen": false}, }; const getClientName = function() { diff --git a/src/mongo/db/commands/validate.cpp b/src/mongo/db/commands/validate.cpp index 171163a8b2a..ff22256383c 100644 --- a/src/mongo/db/commands/validate.cpp +++ b/src/mongo/db/commands/validate.cpp @@ -154,9 +154,9 @@ public: { stdx::unique_lock<Latch> lock(_validationMutex); try { - while (_validationsInProgress.find(nss.ns()) != _validationsInProgress.end()) { - opCtx->waitForConditionOrInterrupt(_validationNotifier, lock); - } + opCtx->waitForConditionOrInterrupt(_validationNotifier, lock, [&] { + return _validationsInProgress.find(nss.ns()) == _validationsInProgress.end(); + }); } catch (AssertionException& e) { CommandHelpers::appendCommandStatusNoThrow( result, diff --git a/src/mongo/db/concurrency/flow_control_ticketholder.cpp b/src/mongo/db/concurrency/flow_control_ticketholder.cpp index 6bb95797502..b8f6217665c 100644 --- a/src/mongo/db/concurrency/flow_control_ticketholder.cpp +++ b/src/mongo/db/concurrency/flow_control_ticketholder.cpp @@ -98,31 +98,25 @@ void FlowControlTicketholder::getTicket(OperationContext* opCtx, ++stats->acquireWaitCount; } - // Make sure operations already waiting on a Flow Control ticket during shut down do not - // hang if the ticket refresher thread has been shut down. - while (_tickets == 0 && !_inShutdown) { - stats->waiting = true; - const std::uint64_t startWaitTime = curTimeMicros64(); - - // This method will wait forever for a ticket. However, it will wake up every so often to - // update the time spent waiting on the ticket. - auto waitDeadline = Date_t::now() + Milliseconds(500); - StatusWith<stdx::cv_status> swCondStatus = - opCtx->waitForConditionOrInterruptNoAssertUntil(_cv, lk, waitDeadline); - - auto waitTime = curTimeMicros64() - startWaitTime; - _totalTimeAcquiringMicros.fetchAndAddRelaxed(waitTime); - stats->timeAcquiringMicros += waitTime; - - // If the operation context state interrupted this wait, the StatusWith result will contain - // the error. If the `waitDeadline` expired, the Status variable will be OK, and the - // `cv_status` value will be `cv_status::timeout`. In either case where Status::OK is - // returned, the loop must re-check the predicate. If the operation context is interrupted - // (and an error status is returned), the intended behavior is to bubble an exception up to - // the user. - uassertStatusOK(swCondStatus); + auto currentWaitTime = curTimeMicros64(); + auto updateTotalTime = [&]() { + auto oldWaitTime = std::exchange(currentWaitTime, curTimeMicros64()); + _totalTimeAcquiringMicros.fetchAndAddRelaxed(currentWaitTime - oldWaitTime); + }; + + stats->waiting = true; + ON_BLOCK_EXIT([&] { + // When this block exits, update the time one last time and note that getTicket() is no + // longer waiting. + updateTotalTime(); + stats->waiting = false; + }); + + // getTicket() should block until there are tickets or the Ticketholder is in shutdown + while (!opCtx->waitForConditionOrInterruptFor( + _cv, lk, Milliseconds(500), [&] { return _tickets > 0 || _inShutdown; })) { + updateTotalTime(); } - stats->waiting = false; if (_inShutdown) { return; diff --git a/src/mongo/db/keys_collection_manager.cpp b/src/mongo/db/keys_collection_manager.cpp index 92834348382..77037f80c84 100644 --- a/src/mongo/db/keys_collection_manager.cpp +++ b/src/mongo/db/keys_collection_manager.cpp @@ -199,12 +199,10 @@ void KeysCollectionManager::PeriodicRunner::refreshNow(OperationContext* opCtx) "aborting keys cache refresh because node is shutting down"); } - if (_refreshRequest) { - return _refreshRequest; + if (!_refreshRequest) { + _refreshRequest = std::make_shared<Notification<void>>(); } - _refreshNeededCV.notify_all(); - _refreshRequest = std::make_shared<Notification<void>>(); return _refreshRequest; }(); @@ -222,8 +220,9 @@ void KeysCollectionManager::PeriodicRunner::_doPeriodicRefresh(ServiceContext* s ThreadClient tc(threadName, service); while (true) { - bool hasRefreshRequestInitially = false; unsigned errorCount = 0; + + decltype(_refreshRequest) request; std::shared_ptr<RefreshFunc> doRefresh; { stdx::lock_guard<Latch> lock(_mutex); @@ -234,7 +233,7 @@ void KeysCollectionManager::PeriodicRunner::_doPeriodicRefresh(ServiceContext* s invariant(_doRefresh.get() != nullptr); doRefresh = _doRefresh; - hasRefreshRequestInitially = _refreshRequest.get() != nullptr; + request = std::move(_refreshRequest); } Milliseconds nextWakeup = kRefreshIntervalIfErrored; @@ -262,6 +261,11 @@ void KeysCollectionManager::PeriodicRunner::_doPeriodicRefresh(ServiceContext* s nextWakeup = kMaxRefreshWaitTime; } } + + // Notify all waiters that the refresh has finished and they can move on + if (request) { + request->set(); + } } MONGO_FAIL_POINT_BLOCK(maxKeyRefreshWaitTimeOverrideMS, data) { @@ -273,15 +277,9 @@ void KeysCollectionManager::PeriodicRunner::_doPeriodicRefresh(ServiceContext* s } stdx::unique_lock<Latch> lock(_mutex); - if (_refreshRequest) { - if (!hasRefreshRequestInitially) { - // A fresh request came in, fulfill the request before going to sleep. - continue; - } - - _refreshRequest->set(); - _refreshRequest.reset(); + // A fresh request came in, fulfill the request before going to sleep. + continue; } if (_inShutdown) { @@ -292,24 +290,33 @@ void KeysCollectionManager::PeriodicRunner::_doPeriodicRefresh(ServiceContext* s auto opCtx = cc().makeOperationContext(); MONGO_IDLE_THREAD_BLOCK; - auto sleepStatus = opCtx->waitForConditionOrInterruptNoAssertUntil( - _refreshNeededCV, lock, Date_t::now() + nextWakeup); - - if (ErrorCodes::isShutdownError(sleepStatus.getStatus().code())) { - break; + try { + opCtx->waitForConditionOrInterruptFor( + _refreshNeededCV, lock, nextWakeup, [&]() -> bool { + return _inShutdown || _refreshRequest; + }); + } catch (const DBException& e) { + LOG(1) << "Unable to wait for refresh request due to: " << e; + + if (ErrorCodes::isShutdownError(e.code())) { + return; + } } } - - stdx::unique_lock<Latch> lock(_mutex); - if (_refreshRequest) { - _refreshRequest->set(); - _refreshRequest.reset(); - } } void KeysCollectionManager::PeriodicRunner::setFunc(RefreshFunc newRefreshStrategy) { stdx::lock_guard<Latch> lock(_mutex); + if (_inShutdown) { + uasserted(ErrorCodes::ShutdownInProgress, + "aborting KeysCollectionManager::PeriodicRunner::setFunc because node is " + "shutting down"); + } + _doRefresh = std::make_shared<RefreshFunc>(std::move(newRefreshStrategy)); + if (!_refreshRequest) { + _refreshRequest = std::make_shared<Notification<void>>(); + } _refreshNeededCV.notify_all(); } diff --git a/src/mongo/db/operation_context.h b/src/mongo/db/operation_context.h index 20a367a2092..ed5d6c88abd 100644 --- a/src/mongo/db/operation_context.h +++ b/src/mongo/db/operation_context.h @@ -352,9 +352,6 @@ public: */ Microseconds getRemainingMaxTimeMicros() const; - StatusWith<stdx::cv_status> waitForConditionOrInterruptNoAssertUntil( - stdx::condition_variable& cv, BasicLockableAdapter m, Date_t deadline) noexcept override; - bool isIgnoringInterrupts() const; /** @@ -374,6 +371,9 @@ public: } private: + StatusWith<stdx::cv_status> waitForConditionOrInterruptNoAssertUntil( + stdx::condition_variable& cv, BasicLockableAdapter m, Date_t deadline) noexcept override; + IgnoreInterruptsState pushIgnoreInterrupts() override { IgnoreInterruptsState iis{_ignoreInterrupts, {_deadline, _timeoutError, _hasArtificialDeadline}}; diff --git a/src/mongo/db/operation_context_test.cpp b/src/mongo/db/operation_context_test.cpp index 00fa8734534..cd01b72dbb2 100644 --- a/src/mongo/db/operation_context_test.cpp +++ b/src/mongo/db/operation_context_test.cpp @@ -257,7 +257,7 @@ public: auto m = MONGO_MAKE_LATCH(); stdx::condition_variable cv; stdx::unique_lock<Latch> lk(m); - opCtx->waitForConditionOrInterrupt(cv, lk); + opCtx->waitForConditionOrInterrupt(cv, lk, [] { return false; }); } const std::shared_ptr<ClockSourceMock> mockClock = std::make_shared<ClockSourceMock>(); @@ -337,7 +337,9 @@ TEST_F(OperationDeadlineTests, WaitForMaxTimeExpiredCV) { auto m = MONGO_MAKE_LATCH(); stdx::condition_variable cv; stdx::unique_lock<Latch> lk(m); - ASSERT_EQ(ErrorCodes::ExceededTimeLimit, opCtx->waitForConditionOrInterruptNoAssert(cv, lk)); + ASSERT_THROWS_CODE(opCtx->waitForConditionOrInterrupt(cv, lk, [] { return false; }), + DBException, + ErrorCodes::ExceededTimeLimit); } TEST_F(OperationDeadlineTests, WaitForMaxTimeExpiredCVWithWaitUntilSet) { @@ -346,10 +348,10 @@ TEST_F(OperationDeadlineTests, WaitForMaxTimeExpiredCVWithWaitUntilSet) { auto m = MONGO_MAKE_LATCH(); stdx::condition_variable cv; stdx::unique_lock<Latch> lk(m); - ASSERT_EQ( - ErrorCodes::ExceededTimeLimit, - opCtx->waitForConditionOrInterruptNoAssertUntil(cv, lk, mockClock->now() + Seconds{10}) - .getStatus()); + ASSERT_THROWS_CODE(opCtx->waitForConditionOrInterruptUntil( + cv, lk, mockClock->now() + Seconds{10}, [] { return false; }), + DBException, + ErrorCodes::ExceededTimeLimit); } TEST_F(OperationDeadlineTests, NestedTimeoutsTimeoutInOrder) { @@ -601,7 +603,9 @@ TEST_F(OperationDeadlineTests, WaitForKilledOpCV) { auto m = MONGO_MAKE_LATCH(); stdx::condition_variable cv; stdx::unique_lock<Latch> lk(m); - ASSERT_EQ(ErrorCodes::Interrupted, opCtx->waitForConditionOrInterruptNoAssert(cv, lk)); + ASSERT_THROWS_CODE(opCtx->waitForConditionOrInterrupt(cv, lk, [] { return false; }), + DBException, + ErrorCodes::Interrupted); } TEST_F(OperationDeadlineTests, WaitForUntilExpiredCV) { @@ -609,9 +613,8 @@ TEST_F(OperationDeadlineTests, WaitForUntilExpiredCV) { auto m = MONGO_MAKE_LATCH(); stdx::condition_variable cv; stdx::unique_lock<Latch> lk(m); - ASSERT(stdx::cv_status::timeout == - unittest::assertGet( - opCtx->waitForConditionOrInterruptNoAssertUntil(cv, lk, mockClock->now()))); + ASSERT_FALSE( + opCtx->waitForConditionOrInterruptUntil(cv, lk, mockClock->now(), [] { return false; })); } TEST_F(OperationDeadlineTests, WaitForUntilExpiredCVWithMaxTimeSet) { @@ -620,9 +623,8 @@ TEST_F(OperationDeadlineTests, WaitForUntilExpiredCVWithMaxTimeSet) { auto m = MONGO_MAKE_LATCH(); stdx::condition_variable cv; stdx::unique_lock<Latch> lk(m); - ASSERT(stdx::cv_status::timeout == - unittest::assertGet( - opCtx->waitForConditionOrInterruptNoAssertUntil(cv, lk, mockClock->now()))); + ASSERT_FALSE( + opCtx->waitForConditionOrInterruptUntil(cv, lk, mockClock->now(), [] { return false; })); } TEST_F(OperationDeadlineTests, WaitForDurationExpired) { @@ -640,8 +642,10 @@ TEST_F(OperationDeadlineTests, DuringWaitMaxTimeExpirationDominatesUntilExpirati auto m = MONGO_MAKE_LATCH(); stdx::condition_variable cv; stdx::unique_lock<Latch> lk(m); - ASSERT(ErrorCodes::ExceededTimeLimit == - opCtx->waitForConditionOrInterruptNoAssertUntil(cv, lk, mockClock->now())); + ASSERT_THROWS_CODE( + opCtx->waitForConditionOrInterruptUntil(cv, lk, mockClock->now(), [] { return false; }), + DBException, + ErrorCodes::ExceededTimeLimit); } class ThreadedOperationDeadlineTests : public OperationDeadlineTests { @@ -937,17 +941,17 @@ TEST_F(ThreadedOperationDeadlineTests, SleepForWithExpiredForDoesNotBlock) { ASSERT_FALSE(waiterResult.get()); } -TEST(OperationContextTest, TestWaitForConditionOrInterruptNoAssertUntilAPI) { - // `waitForConditionOrInterruptNoAssertUntil` can have three outcomes: +TEST(OperationContextTest, TestWaitForConditionOrInterruptUntilAPI) { + // `waitForConditionOrInterruptUntil` can have three outcomes: // // 1) The condition is satisfied before any timeouts. // 2) The explicit `deadline` function argument is triggered. // 3) The operation context implicitly times out, or is interrupted from a killOp command or // shutdown, etc. // - // Case (1) must return a Status::OK with a value of `cv_status::no_timeout`. Case (2) must also - // return a Status::OK with a value of `cv_status::timeout`. Case (3) must return an error - // status. The error status returned is otherwise configurable. + // Case (1) must return true. + // Case (2) must return false. + // Case (3) must throw a DBException. // // Case (1) is the hardest to test. The condition variable must be notified by a second thread // when the client is waiting on it. Case (1) is also the least in need of having the API @@ -962,17 +966,16 @@ TEST(OperationContextTest, TestWaitForConditionOrInterruptNoAssertUntilAPI) { // Case (2). Expect a Status::OK with a cv_status::timeout. Date_t deadline = Date_t::now() + Milliseconds(500); - StatusWith<stdx::cv_status> ret = - opCtx->waitForConditionOrInterruptNoAssertUntil(cv, lk, deadline); - ASSERT_OK(ret.getStatus()); - ASSERT(ret.getValue() == stdx::cv_status::timeout); + ASSERT_EQ(opCtx->waitForConditionOrInterruptUntil(cv, lk, deadline, [] { return false; }), + false); // Case (3). Expect an error of `MaxTimeMSExpired`. opCtx->setDeadlineByDate(Date_t::now(), ErrorCodes::MaxTimeMSExpired); deadline = Date_t::now() + Seconds(500); - ret = opCtx->waitForConditionOrInterruptNoAssertUntil(cv, lk, deadline); - ASSERT_FALSE(ret.isOK()); - ASSERT_EQUALS(ErrorCodes::MaxTimeMSExpired, ret.getStatus().code()); + ASSERT_THROWS_CODE( + opCtx->waitForConditionOrInterruptUntil(cv, lk, deadline, [] { return false; }), + DBException, + ErrorCodes::MaxTimeMSExpired); } } // namespace diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index 060e425238b..378e1e3ba47 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -202,6 +202,8 @@ ReplicationCoordinatorImpl::ThreadWaiter::ThreadWaiter(OpTime _opTime, void ReplicationCoordinatorImpl::ThreadWaiter::notify_inlock() { invariant(condVar); + + ++notifyCount; condVar->notify_all(); } @@ -1418,27 +1420,36 @@ Status ReplicationCoordinatorImpl::_waitUntilOpTime(OperationContext* opCtx, return {ErrorCodes::ShutdownInProgress, "Shutdown in progress"}; } - // If we are doing a majority committed read we only need to wait for a new snapshot. if (isMajorityCommittedRead) { + // If we are doing a majority committed read we only need to wait for a new snapshot to + // update getCurrentOpTime() past targetOpTime. This block should only run once and + // return without further looping. + LOG(3) << "waitUntilOpTime: waiting for a new snapshot until " << opCtx->getDeadline(); - auto waitStatus = - opCtx->waitForConditionOrInterruptNoAssert(_currentCommittedSnapshotCond, lock); - if (!waitStatus.isOK()) { - return waitStatus.withContext( + try { + opCtx->waitForConditionOrInterrupt(_currentCommittedSnapshotCond, lock, [&] { + return _inShutdown || (targetOpTime <= getCurrentOpTime()); + }); + } catch (const DBException& e) { + return e.toStatus().withContext( str::stream() << "Error waiting for snapshot not less than " << targetOpTime.toString() << ", current relevant optime is " << getCurrentOpTime().toString() << "."); } - if (!_currentCommittedSnapshot) { - // It is possible for the thread to be awoken due to a spurious wakeup, meaning - // the condition variable was never set. - LOG(3) << "waitUntilOpTime: awoken but current committed snapshot is null." - << " Continuing to wait for new snapshot."; - } else { + + if (_inShutdown) { + return {ErrorCodes::ShutdownInProgress, "Shutdown in progress"}; + } + + if (_currentCommittedSnapshot) { + // It seems that targetOpTime can sometimes be default OpTime{}. When there is no + // _currentCommittedSnapshot, _getCurrentCommittedSnapshotOpTime_inlock() and thus + // getCurrentOpTime() also return default OpTime{}. Hence this branch that only runs + // if _currentCommittedSnapshot actually exists. LOG(3) << "Got notified of new snapshot: " << _currentCommittedSnapshot->toString(); } - continue; + return Status::OK(); } // We just need to wait for the opTime to catch up to what we need (not majority RC). @@ -1449,20 +1460,14 @@ Status ReplicationCoordinatorImpl::_waitUntilOpTime(OperationContext* opCtx, LOG(3) << "waitUntilOpTime: OpID " << opCtx->getOpID() << " is waiting for OpTime " << waiter << " until " << opCtx->getDeadline(); - auto waitStatus = Status::OK(); - if (deadline) { - auto waitUntilStatus = - opCtx->waitForConditionOrInterruptNoAssertUntil(condVar, lock, *deadline); - waitStatus = waitUntilStatus.getStatus(); - } else { - waitStatus = opCtx->waitForConditionOrInterruptNoAssert(condVar, lock); - } - - if (!waitStatus.isOK()) { - return waitStatus.withContext(str::stream() - << "Error waiting for optime " << targetOpTime.toString() - << ", current relevant optime is " - << getCurrentOpTime().toString() << "."); + try { + opCtx->waitForConditionOrInterruptUntil( + condVar, lock, deadline.value_or(Date_t::max()), waiter.makePredicate()); + } catch (const DBException& e) { + return e.toStatus().withContext(str::stream() << "Error waiting for optime " + << targetOpTime.toString() + << ", current relevant optime is " + << getCurrentOpTime().toString() << "."); } // If deadline is set no need to wait until the targetTime time is reached. @@ -1799,13 +1804,13 @@ Status ReplicationCoordinatorImpl::_awaitReplication_inlock( return {ErrorCodes::ShutdownInProgress, "Replication is being shut down"}; } - auto status = opCtx->waitForConditionOrInterruptNoAssertUntil(condVar, *lock, wTimeoutDate); - if (!status.isOK()) { - return status.getStatus(); - } - - if (status.getValue() == stdx::cv_status::timeout) { - return {ErrorCodes::WriteConcernFailed, "waiting for replication timed out"}; + try { + if (!opCtx->waitForConditionOrInterruptUntil( + condVar, *lock, wTimeoutDate, waiter.makePredicate())) { + return {ErrorCodes::WriteConcernFailed, "waiting for replication timed out"}; + } + } catch (const DBException& e) { + return e.toStatus(); } stepdownStatus = checkForStepDown(); @@ -2103,7 +2108,7 @@ void ReplicationCoordinatorImpl::stepDown(OperationContext* opCtx, // tryToStartStepDown again will cause tryToStartStepDown to return ExceededTimeLimit // with the proper error message. opCtx->waitForConditionOrInterruptUntil( - condVar, lk, std::min(stepDownUntil, waitUntil)); + condVar, lk, std::min(stepDownUntil, waitUntil), waiter.makePredicate()); } } @@ -3965,10 +3970,11 @@ void ReplicationCoordinatorImpl::waitUntilSnapshotCommitted(OperationContext* op uassert(ErrorCodes::NotYetInitialized, "Cannot use snapshots until replica set is finished initializing.", _rsConfigState != kConfigUninitialized && _rsConfigState != kConfigInitiating); - while (!_currentCommittedSnapshot || - _currentCommittedSnapshot->opTime.getTimestamp() < untilSnapshot) { - opCtx->waitForConditionOrInterrupt(_currentCommittedSnapshotCond, lock); - } + + opCtx->waitForConditionOrInterrupt(_currentCommittedSnapshotCond, lock, [&] { + return _currentCommittedSnapshot && + _currentCommittedSnapshot->opTime.getTimestamp() >= untilSnapshot; + }); } size_t ReplicationCoordinatorImpl::getNumUncommittedSnapshots() { diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h index 07bdf226a80..c42b47cf73a 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.h +++ b/src/mongo/db/repl/replication_coordinator_impl.h @@ -607,6 +607,17 @@ private: return false; } + // This predicate is an extremely simple way to never miss a notification. When the + // ThreadWater::notify_inlock() is called, notifyCount is incremented. As should be obvious, + // this predicate should only be called "inlock" as well. The ThreadWaiter itself goes away + // with SERVER-43135 so this function and notifyCount as a concept is a v4.2 only measure. + auto makePredicate() { + return [this, startingNotifyCount = notifyCount]() -> bool { + return notifyCount != startingNotifyCount; + }; + } + + uint64_t notifyCount = 0; stdx::condition_variable* condVar = nullptr; }; diff --git a/src/mongo/db/s/implicit_create_collection.cpp b/src/mongo/db/s/implicit_create_collection.cpp index a0a3d6068f9..63dfa503123 100644 --- a/src/mongo/db/s/implicit_create_collection.cpp +++ b/src/mongo/db/s/implicit_create_collection.cpp @@ -72,16 +72,14 @@ public: Status onCannotImplicitlyCreateCollection(OperationContext* opCtx) noexcept { invariant(!opCtx->lockState()->isLocked()); - { + try { stdx::unique_lock<Latch> lg(_mutex); - while (_isInProgress) { - auto status = opCtx->waitForConditionOrInterruptNoAssert(_cvIsInProgress, lg); - if (!status.isOK()) { - return status; - } - } + + opCtx->waitForConditionOrInterrupt(_cvIsInProgress, lg, [&] { return !_isInProgress; }); _isInProgress = true; + } catch (const DBException& e) { + return e.toStatus(); } ON_BLOCK_EXIT([&] { diff --git a/src/mongo/db/s/wait_for_majority_service.cpp b/src/mongo/db/s/wait_for_majority_service.cpp index f41ed83c630..1864335150a 100644 --- a/src/mongo/db/s/wait_for_majority_service.cpp +++ b/src/mongo/db/s/wait_for_majority_service.cpp @@ -27,6 +27,8 @@ * it in the license file. */ +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding + #include "mongo/platform/basic.h" #include "mongo/db/s/wait_for_majority_service.h" @@ -38,6 +40,7 @@ #include "mongo/executor/network_interface_factory.h" #include "mongo/executor/thread_pool_task_executor.h" #include "mongo/util/concurrency/thread_pool.h" +#include "mongo/util/log.h" namespace mongo { @@ -177,8 +180,11 @@ void WaitForMajorityService::_periodicallyWaitForMajority(ServiceContext* servic _queuedOpTimes.erase(lowestOpTimeIter); } - if (_queuedOpTimes.empty() && !_inShutDown) { - _opCtx->waitForConditionOrInterruptNoAssert(_hasNewOpTimeCV, lk).ignore(); + try { + _opCtx->waitForConditionOrInterrupt( + _hasNewOpTimeCV, lk, [&] { return !_queuedOpTimes.empty() || _inShutDown; }); + } catch (const DBException& e) { + LOG(1) << "Unable to wait for new op time due to: " << e; } _opCtx = nullptr; diff --git a/src/mongo/db/s/wait_for_majority_service_test.cpp b/src/mongo/db/s/wait_for_majority_service_test.cpp index ca89ac04c8b..1d7c975d385 100644 --- a/src/mongo/db/s/wait_for_majority_service_test.cpp +++ b/src/mongo/db/s/wait_for_majority_service_test.cpp @@ -79,14 +79,13 @@ public: _waitForMajorityCallCount++; _callCountChangedCV.notify_one(); - while (!_isTestReady) { - auto status = opCtx->waitForConditionOrInterruptNoAssert(_isTestReadyCV, lk); - if (!status.isOK()) { - _isTestReady = false; - _finishWaitingOneOpTimeCV.notify_one(); - - return status; - } + try { + opCtx->waitForConditionOrInterrupt(_isTestReadyCV, lk, [&] { return _isTestReady; }); + } catch (const DBException& e) { + _isTestReady = false; + _finishWaitingOneOpTimeCV.notify_one(); + + return e.toStatus(); } _lastOpTimeWaited = opTime; diff --git a/src/mongo/db/session_catalog_test.cpp b/src/mongo/db/session_catalog_test.cpp index 151924e2709..631b342f073 100644 --- a/src/mongo/db/session_catalog_test.cpp +++ b/src/mongo/db/session_catalog_test.cpp @@ -602,8 +602,10 @@ TEST_F(SessionCatalogTestWithDefaultOpCtx, ConcurrentCheckOutAndKill) { auto m = MONGO_MAKE_LATCH(); stdx::condition_variable cond; stdx::unique_lock<Latch> lock(m); - ASSERT_EQ(ErrorCodes::InternalError, - _opCtx->waitForConditionOrInterruptNoAssert(cond, lock)); + ASSERT_THROWS_CODE( + _opCtx->waitForConditionOrInterrupt(cond, lock, [] { return false; }), + DBException, + ErrorCodes::InternalError); } } normalCheckOutFinish.get(); diff --git a/src/mongo/executor/thread_pool_task_executor.cpp b/src/mongo/executor/thread_pool_task_executor.cpp index f07e1d476c5..f8096eecb04 100644 --- a/src/mongo/executor/thread_pool_task_executor.cpp +++ b/src/mongo/executor/thread_pool_task_executor.cpp @@ -306,18 +306,18 @@ StatusWith<stdx::cv_status> ThreadPoolTaskExecutor::waitForEvent(OperationContex auto eventState = checked_cast<EventState*>(getEventFromHandle(event)); stdx::unique_lock<Latch> lk(_mutex); - // std::condition_variable::wait() can wake up spuriously, so we have to loop until the event - // is signalled or we time out. - while (!eventState->isSignaledFlag) { - auto status = opCtx->waitForConditionOrInterruptNoAssertUntil( - eventState->isSignaledCondition, lk, deadline); - - if (!status.isOK() || stdx::cv_status::timeout == status) { - return status; + try { + if (opCtx->waitForConditionOrInterruptUntil( + eventState->isSignaledCondition, lk, deadline, [&] { + return eventState->isSignaledFlag; + })) { + return stdx::cv_status::no_timeout; } - } - return stdx::cv_status::no_timeout; + return stdx::cv_status::timeout; + } catch (const DBException& e) { + return e.toStatus(); + } } void ThreadPoolTaskExecutor::waitForEvent(const EventHandle& event) { @@ -531,9 +531,9 @@ void ThreadPoolTaskExecutor::wait(const CallbackHandle& cbHandle, Interruptible* if (!cbState->finishedCondition) { cbState->finishedCondition.emplace(); } - while (!cbState->isFinished.load()) { - interruptible->waitForConditionOrInterrupt(*cbState->finishedCondition, lk); - } + + interruptible->waitForConditionOrInterrupt( + *cbState->finishedCondition, lk, [&] { return cbState->isFinished.load(); }); } void ThreadPoolTaskExecutor::appendConnectionStats(ConnectionPoolStats* stats) const { diff --git a/src/mongo/s/client/shard_registry.cpp b/src/mongo/s/client/shard_registry.cpp index 38c4bc6976d..f8e547592dd 100644 --- a/src/mongo/s/client/shard_registry.cpp +++ b/src/mongo/s/client/shard_registry.cpp @@ -291,13 +291,13 @@ bool ShardRegistry::reload(OperationContext* opCtx) { // There is also an issue if multiple threads are allowed to call getAllShards() // simultaneously because there is no good way to determine which of the threads has the // more recent version of the data. - do { - auto waitStatus = opCtx->waitForConditionOrInterruptNoAssert(_inReloadCV, reloadLock); - if (!waitStatus.isOK()) { - LOG(1) << "ShardRegistry reload is interrupted due to: " << redact(waitStatus); - return false; - } - } while (_reloadState == ReloadState::Reloading); + try { + opCtx->waitForConditionOrInterrupt( + _inReloadCV, reloadLock, [&] { return _reloadState != ReloadState::Reloading; }); + } catch (const DBException& e) { + LOG(1) << "ShardRegistry reload is interrupted due to: " << redact(e.toStatus()); + return false; + } if (_reloadState == ReloadState::Idle) { return false; diff --git a/src/mongo/util/clock_source.cpp b/src/mongo/util/clock_source.cpp index a3fb3aabeb1..1031b8ed621 100644 --- a/src/mongo/util/clock_source.cpp +++ b/src/mongo/util/clock_source.cpp @@ -29,8 +29,10 @@ #include "mongo/platform/basic.h" -#include "mongo/stdx/thread.h" #include "mongo/util/clock_source.h" + +#include "mongo/platform/mutex.h" +#include "mongo/stdx/thread.h" #include "mongo/util/waitable.h" namespace mongo { diff --git a/src/mongo/util/diagnostic_info.cpp b/src/mongo/util/diagnostic_info.cpp index 9a6710f8a10..566040236e5 100644 --- a/src/mongo/util/diagnostic_info.cpp +++ b/src/mongo/util/diagnostic_info.cpp @@ -42,6 +42,7 @@ #include "mongo/db/client.h" #include "mongo/platform/mutex.h" #include "mongo/util/clock_source.h" +#include "mongo/util/fail_point.h" #include "mongo/util/fail_point_service.h" #include "mongo/util/interruptible.h" #include "mongo/util/log.h" @@ -74,6 +75,16 @@ private: Mutex mutex = MONGO_MAKE_LATCH(kBlockedOpMutexName); }; LatchState _latchState; + + struct InterruptibleState { + bool isWaiting = false; + boost::optional<stdx::thread> thread{boost::none}; + + stdx::condition_variable cv; + Mutex mutex = MONGO_MAKE_LATCH(kBlockedOpInterruptibleName); + bool isDone = false; + }; + InterruptibleState _interruptibleState; } gBlockedOp; // This function causes us to make an additional thread with a self-contended lock so that @@ -83,6 +94,7 @@ void BlockedOp::start(ServiceContext* serviceContext) { stdx::unique_lock<stdx::mutex> lk(_m); invariant(!_latchState.thread); + invariant(!_interruptibleState.thread); _latchState.mutex.lock(); _latchState.thread = stdx::thread([this, serviceContext]() mutable { @@ -95,7 +107,21 @@ void BlockedOp::start(ServiceContext* serviceContext) { log() << "Joining currentOpSpawnsThreadWaitingForLatch thread"; }); - _cv.wait(lk, [this] { return _latchState.isContended; }); + _interruptibleState.thread = stdx::thread([this, serviceContext]() mutable { + ThreadClient tc("DiagnosticCaptureTestInterruptible", serviceContext); + auto opCtx = tc->makeOperationContext(); + + log() << "Entered currentOpSpawnsThreadWaitingForLatch thread for interruptibles"; + stdx::unique_lock lk(_interruptibleState.mutex); + opCtx->waitForConditionOrInterrupt( + _interruptibleState.cv, lk, [&] { return _interruptibleState.isDone; }); + _interruptibleState.isDone = false; + + log() << "Joining currentOpSpawnsThreadWaitingForLatch thread for interruptibles"; + }); + + + _cv.wait(lk, [this] { return _latchState.isContended && _interruptibleState.isWaiting; }); log() << "Started threads for currentOpSpawnsThreadWaitingForLatch"; } @@ -103,18 +129,29 @@ void BlockedOp::start(ServiceContext* serviceContext) { // remaining void BlockedOp::join() { decltype(_latchState.thread) latchThread; + decltype(_interruptibleState.thread) interruptibleThread; { stdx::lock_guard<stdx::mutex> lk(_m); invariant(_latchState.thread); + invariant(_interruptibleState.thread); _latchState.mutex.unlock(); _latchState.isContended = false; + { + stdx::lock_guard lk(_interruptibleState.mutex); + _interruptibleState.isDone = true; + _interruptibleState.cv.notify_one(); + } + _interruptibleState.isWaiting = false; + std::swap(_latchState.thread, latchThread); + std::swap(_interruptibleState.thread, interruptibleThread); } latchThread->join(); + interruptibleThread->join(); } void BlockedOp::setIsContended(bool value) { @@ -124,6 +161,13 @@ void BlockedOp::setIsContended(bool value) { _cv.notify_one(); } +void BlockedOp::setIsWaiting(bool value) { + log() << "Setting isWaiting to " << (value ? "true" : "false"); + stdx::lock_guard lk(_m); + _interruptibleState.isWaiting = value; + _cv.notify_one(); +} + struct DiagnosticInfoHandle { stdx::mutex mutex; // NOLINT std::forward_list<DiagnosticInfo> list; @@ -171,6 +215,52 @@ MONGO_INITIALIZER(LockListener)(InitializerContext* context) { return Status::OK(); } +MONGO_INITIALIZER(InterruptibleWaitListener)(InitializerContext* context) { + class WaitListener : public Interruptible::WaitListener { + using WakeReason = Interruptible::WakeReason; + using WakeSpeed = Interruptible::WakeSpeed; + + void addInfo(const StringData& name) { + if (auto client = Client::getCurrent()) { + auto& handle = getDiagnosticInfoHandle(client); + stdx::lock_guard<stdx::mutex> lk(handle.mutex); + handle.list.emplace_front(DiagnosticInfo::capture(name)); + + if (currentOpSpawnsThreadWaitingForLatch.shouldFail() && + (name == kBlockedOpInterruptibleName)) { + gBlockedOp.setIsWaiting(true); + } + } + } + + void removeInfo(const StringData& name) { + if (auto client = Client::getCurrent()) { + auto& handle = getDiagnosticInfoHandle(client); + stdx::lock_guard<stdx::mutex> lk(handle.mutex); + + invariant(!handle.list.empty()); + handle.list.pop_front(); + } + } + + void onLongSleep(const StringData& name) override { + addInfo(name); + } + + void onWake(const StringData& name, WakeReason, WakeSpeed speed) override { + if (speed == WakeSpeed::kSlow) { + removeInfo(name); + } + } + }; + + // Intentionally leaked, people can use in detached threads + static auto& listener = *new WaitListener(); + Interruptible::addWaitListener(&listener); + + return Status::OK(); +} + } // namespace bool operator==(const DiagnosticInfo& info1, const DiagnosticInfo& info2) { diff --git a/src/mongo/util/future_test_utils.h b/src/mongo/util/future_test_utils.h index 98ce1984660..925b5c4758f 100644 --- a/src/mongo/util/future_test_utils.h +++ b/src/mongo/util/future_test_utils.h @@ -101,7 +101,7 @@ class DummyInterruptable final : public Interruptible { MONGO_UNREACHABLE; } Date_t getExpirationDateForWaitForValue(Milliseconds waitFor) override { - MONGO_UNREACHABLE; + return Date_t::now() + waitFor; } }; diff --git a/src/mongo/util/interruptible.h b/src/mongo/util/interruptible.h index 6e182d6bbd7..3a036f55201 100644 --- a/src/mongo/util/interruptible.h +++ b/src/mongo/util/interruptible.h @@ -29,9 +29,13 @@ #pragma once +#include <vector> + +#include "mongo/platform/mutex.h" #include "mongo/stdx/condition_variable.h" -#include "mongo/stdx/mutex.h" +#include "mongo/util/concepts.h" #include "mongo/util/lockable_adapter.h" +#include "mongo/util/scopeguard.h" #include "mongo/util/time_support.h" #include "mongo/util/waitable.h" @@ -41,21 +45,90 @@ namespace mongo { * A type which can be used to wait on condition variables with a level triggered one-way interrupt. * I.e. after the interrupt is triggered (via some non-public api call) subsequent calls to * waitForConditionXXX will fail. Interrupts must unblock all callers of waitForConditionXXX. + * + * This class should never be derived from directly. Instead, please derive from Interruptible. */ -class Interruptible { +class InterruptibleBase { +public: + virtual ~InterruptibleBase() = default; + + /** + * Returns the deadline for this InterruptibleBase, or Date_t::max() if there is no deadline. + */ + virtual Date_t getDeadline() const = 0; + + /** + * Returns Status::OK() unless this operation is in a killed state. + */ + virtual Status checkForInterruptNoAssert() noexcept = 0; + protected: + /** + * Same as Interruptible::waitForConditionOrInterruptUntil(), except returns + * StatusWith<stdx::cv_status> and a non-ok status indicates the error instead of a DBException. + */ + virtual StatusWith<stdx::cv_status> waitForConditionOrInterruptNoAssertUntil( + stdx::condition_variable& cv, BasicLockableAdapter m, Date_t deadline) noexcept = 0; + struct DeadlineState { Date_t deadline; ErrorCodes::Error error; bool hasArtificialDeadline; }; + /** + * Pushes a subsidiary deadline into the InterruptibleBase. Until an associated + * popArtificialDeadline() is invoked, the InterruptibleBase will fail checkForInterrupt() and + * waitForConditionOrInterrupt() calls with the passed error code if the deadline has passed. + * + * Note that deadline's higher than the current value are constrained (such that the passed + * error code will be returned/thrown, but after the min(oldDeadline, newDeadline) has passed). + * + * Returns state needed to pop the deadline. + */ + virtual DeadlineState pushArtificialDeadline(Date_t deadline, ErrorCodes::Error error) = 0; + + /** + * Pops the subsidiary deadline introduced by push. + */ + virtual void popArtificialDeadline(DeadlineState) = 0; + + /** + * Returns the equivalent of Date_t::now() + waitFor for the InterruptibleBase's clock + */ + virtual Date_t getExpirationDateForWaitForValue(Milliseconds waitFor) = 0; + struct IgnoreInterruptsState { bool ignoreInterrupts; DeadlineState deadline; }; /** + * Pushes an ignore interruption critical section into the InterruptibleBase. + * Until an associated popIgnoreInterrupts() is invoked, the InterruptibleBase should ignore + * interruptions related to explicit interruption or previously set deadlines. + * + * Note that new deadlines can be set after this is called, which will again introduce the + * possibility of interruption. + * + * Returns state needed to pop interruption. + */ + virtual IgnoreInterruptsState pushIgnoreInterrupts() = 0; + + /** + * Pops the ignored interruption critical section introduced by push. + */ + virtual void popIgnoreInterrupts(IgnoreInterruptsState iis) = 0; +}; + +/** + * An derived class of InterruptibleBase which provides a variety of helper functions + * + * Please derive from this class instead of InterruptibleBase. + */ +class Interruptible : public InterruptibleBase { +private: + /** * A deadline guard provides a subsidiary deadline to the parent. */ class DeadlineGuard { @@ -132,9 +205,39 @@ protected: } public: + class WaitListener; + + /** + * Enum to convey why an Interruptible woke up + */ + enum class WakeReason { + kPredicate, + kTimeout, + kInterrupt, + }; + + static constexpr auto kFastWakeTimeout = Milliseconds(100); + + /** + * Enum to convey if an Interruptible woke up before or after kFastWakeTimeout + */ + enum class WakeSpeed { + kFast, + kSlow, + }; + + /** + * This function adds a WaitListener subclass to the triggers for certain actions. + * + * WaitListeners can only be added and not removed. If you wish to deactivate a WaitListeners + * subclass, please provide the switch on that subclass to noop its functions. It is only safe + * to add a WaitListener during a MONGO_INITIALIZER. + */ + static void addWaitListener(WaitListener* listnener); + /** * Returns a statically allocated instance that cannot be interrupted. Useful as a default - * argument to interruptible taking methods. + * argument to Interruptible-taking methods. */ static Interruptible* notInterruptible(); @@ -163,11 +266,6 @@ public: } /** - * Returns the deadline for this interruptible, or Date_t::max() if there is no deadline. - */ - virtual Date_t getDeadline() const = 0; - - /** * Invokes the passed callback with an interruption guard active. Additionally handles the * dance of try/catching the invocation and checking checkForInterrupt with the guard inactive * (to allow a higher level timeout to override a lower level one, or for top level interruption @@ -193,86 +291,26 @@ public: } /** - * Returns Status::OK() unless this operation is in a killed state. - */ - virtual Status checkForInterruptNoAssert() noexcept = 0; - - /** - * Waits for either the condition "cv" to be signaled, this operation to be interrupted, or the - * deadline on this operation to expire. In the event of interruption or operation deadline - * expiration, raises a AssertionException with an error code indicating the interruption type. - */ - template <typename LockT> - void waitForConditionOrInterrupt(stdx::condition_variable& cv, LockT& m) { - uassertStatusOK(waitForConditionOrInterruptNoAssert(cv, m)); - } - - /** - * Waits on condition "cv" for "pred" until "pred" returns true, or this operation - * is interrupted or its deadline expires. Throws a DBException for interruption and - * deadline expiration. - */ - template <typename LockT, typename PredicateT> - void waitForConditionOrInterrupt(stdx::condition_variable& cv, LockT& m, PredicateT pred) { - while (!pred()) { - waitForConditionOrInterrupt(cv, m); - } - } - - /** - * Same as waitForConditionOrInterrupt, except returns a Status instead of throwing - * a DBException to report interruption. + * Get the name for a Latch */ - template <typename LockT> - Status waitForConditionOrInterruptNoAssert(stdx::condition_variable& cv, LockT& m) noexcept { - auto status = waitForConditionOrInterruptNoAssertUntil(cv, m, Date_t::max()); - if (!status.isOK()) { - return status.getStatus(); - } - - invariant(status.getValue() == stdx::cv_status::no_timeout); - return Status::OK(); + TEMPLATE(typename LatchT) + REQUIRES(std::is_base_of_v<Latch, LatchT>) // + static StringData getLatchName(const stdx::unique_lock<LatchT>& lk) { + return lk.mutex()->getName(); } /** - * Same as the predicate form of waitForConditionOrInterrupt, except that it returns a not okay - * status instead of throwing on interruption. - */ - template <typename LockT, typename PredicateT> - Status waitForConditionOrInterruptNoAssert(stdx::condition_variable& cv, - LockT& m, - PredicateT pred) noexcept { - while (!pred()) { - auto status = waitForConditionOrInterruptNoAssert(cv, m); - - if (!status.isOK()) { - return status; - } - } - - return Status::OK(); - } - - /** - * Waits for condition "cv" to be signaled, or for the given "deadline" to expire, or - * for the operation to be interrupted, or for the operation's own deadline to expire. - * - * If the operation deadline expires or the operation is interrupted, throws a DBException. If - * the given "deadline" expires, returns cv_status::timeout. Otherwise, returns - * cv_status::no_timeout. + * Get a placeholder name for an arbitrary type */ template <typename LockT> - stdx::cv_status waitForConditionOrInterruptUntil(stdx::condition_variable& cv, - LockT& m, - Date_t deadline) { - return uassertStatusOK(waitForConditionOrInterruptNoAssertUntil(cv, m, deadline)); + static constexpr StringData getLatchName(const LockT&) { + return "AnonymousLockable"_sd; } /** * Waits on condition "cv" for "pred" until "pred" returns true, or the given "deadline" * expires, or this operation is interrupted, or this operation's own deadline expires. * - * * If the operation deadline expires or the operation is interrupted, throws a DBException. If * the given "deadline" expires, returns cv_status::timeout. Otherwise, returns * cv_status::no_timeout indicating that "pred" finally returned true. @@ -280,26 +318,83 @@ public: template <typename LockT, typename PredicateT> bool waitForConditionOrInterruptUntil(stdx::condition_variable& cv, LockT& m, - Date_t deadline, + Date_t finalDeadline, PredicateT pred) { - while (!pred()) { - if (stdx::cv_status::timeout == waitForConditionOrInterruptUntil(cv, m, deadline)) { - return pred(); + auto latchName = getLatchName(m); + + auto waitUntil = [&](Date_t deadline, WakeSpeed speed) -> boost::optional<WakeReason> { + // If the result of waitForConditionOrInterruptNoAssertUntil() is non-spurious, return + // a WakeReason. Otherwise, return boost::none + + auto swResult = waitForConditionOrInterruptNoAssertUntil(cv, m, deadline); + if (!swResult.isOK()) { + _onWake(latchName, WakeReason::kInterrupt, speed); + uassertStatusOK(std::move(swResult)); + } + + if (pred()) { + _onWake(latchName, WakeReason::kPredicate, speed); + return WakeReason::kPredicate; } + + if (swResult.getValue() == stdx::cv_status::timeout) { + _onWake(latchName, WakeReason::kTimeout, speed); + return WakeReason::kTimeout; + } + + return boost::none; + }; + + auto waitUntilNonSpurious = [&](Date_t deadline, WakeSpeed speed) -> WakeReason { + // Check waitUntil() in a loop until it says it has a genuine WakeReason + + if (pred()) { + // Check for the predicate first, just in case + _onWake(latchName, WakeReason::kPredicate, speed); + return WakeReason::kPredicate; + } + + auto maybeWakeReason = waitUntil(deadline, speed); + while (!maybeWakeReason) { + maybeWakeReason = waitUntil(deadline, speed); + }; + + return *maybeWakeReason; + }; + + const auto traceDeadline = getExpirationDateForWaitForValue(kFastWakeTimeout); + const auto firstDeadline = std::min(traceDeadline, finalDeadline); + + // Wait for the first deadline + if (auto wakeReason = waitUntilNonSpurious(firstDeadline, WakeSpeed::kFast); + wakeReason == WakeReason::kPredicate) { + // If our first wait fulfilled our predicate then return true + return true; + } else if (firstDeadline == finalDeadline) { + // If we didn't fulfill our predicate but finalDeadline was less than traceDeadline, + // then the wait should return false + return false; + } + + _onLongSleep(latchName); + + // Wait for the final deadline + if (auto wakeReason = waitUntilNonSpurious(finalDeadline, WakeSpeed::kSlow); + wakeReason == WakeReason::kPredicate) { + return true; + } else { + return false; } - return true; } /** - * Same as the non-predicate form of waitForConditionOrInterruptUntil, but takes a relative - * amount of time to wait instead of an absolute time point. + * Waits on condition "cv" for "pred" until "pred" returns true, or this operation + * is interrupted or its deadline expires. Throws a DBException for interruption and + * deadline expiration. */ - template <typename LockT> - stdx::cv_status waitForConditionOrInterruptFor(stdx::condition_variable& cv, - LockT& m, - Milliseconds ms) { - return uassertStatusOK( - waitForConditionOrInterruptNoAssertUntil(cv, m, getExpirationDateForWaitForValue(ms))); + template <typename LockT, typename PredicateT> + void waitForConditionOrInterrupt(stdx::condition_variable& cv, LockT& m, PredicateT pred) { + waitForConditionOrInterruptUntil(cv, m, Date_t::max(), std::move(pred)); } /** @@ -311,24 +406,12 @@ public: LockT& m, Milliseconds ms, PredicateT pred) { - const auto deadline = getExpirationDateForWaitForValue(ms); - while (!pred()) { - if (stdx::cv_status::timeout == waitForConditionOrInterruptUntil(cv, m, deadline)) { - return pred(); - } - } - return true; + return waitForConditionOrInterruptUntil( + cv, m, getExpirationDateForWaitForValue(ms), std::move(pred)); } /** - * Same as waitForConditionOrInterruptUntil, except returns StatusWith<stdx::cv_status> and - * non-ok status indicates the error instead of a DBException. - */ - virtual StatusWith<stdx::cv_status> waitForConditionOrInterruptNoAssertUntil( - stdx::condition_variable& cv, BasicLockableAdapter m, Date_t deadline) noexcept = 0; - - /** - * Sleeps until "deadline"; throws an exception if the interruptible is interrupted before then. + * Sleeps until "deadline"; throws an exception if the Interruptible is interrupted before then. */ void sleepUntil(Date_t deadline) { auto m = MONGO_MAKE_LATCH(); @@ -338,7 +421,7 @@ public: } /** - * Sleeps for "duration" ms; throws an exception if the interruptible is interrupted before + * Sleeps for "duration" ms; throws an exception if the Interruptible is interrupted before * then. */ void sleepFor(Milliseconds duration) { @@ -349,51 +432,69 @@ public: } protected: - /** - * Pushes an ignore interruption critical section into the interruptible. Until an associated - * popIgnoreInterrupts is invoked, the interruptible should ignore interruptions related to - * explicit interruption or previously set deadlines. - * - * Note that new deadlines can be set after this is called, which will again introduce the - * possibility of interruption. - * - * Returns state needed to pop interruption. - */ - virtual IgnoreInterruptsState pushIgnoreInterrupts() = 0; + class NotInterruptible; - /** - * Pops the ignored interruption critical section introduced by push. - */ - virtual void popIgnoreInterrupts(IgnoreInterruptsState iis) = 0; + void _onLongSleep(const StringData& name); + void _onWake(const StringData& name, WakeReason reason, WakeSpeed speed); + static auto& _getListenerState() { + struct State { + std::vector<WaitListener*> list; + }; + + // Note that state should no longer be mutated after init-time (ala MONGO_INITIALIZERS). If + // this changes, than this state needs to be synchronized. + static State state; + return state; + } +}; + +/** + * A set of event handles for an Interruptible type + */ +class Interruptible::WaitListener { +public: /** - * Pushes a subsidiary deadline into the interruptible. Until an associated - * popArtificialDeadline is - * invoked, the interruptible will fail checkForInterrupt and waitForConditionOrInterrupt calls - * with the passed error code if the deadline has passed. + * Action to do when a wait does not resolve quickly * - * Note that deadline's higher than the current value are constrained (such that the passed - * error code will be returned/thrown, but after the min(oldDeadline, newDeadline) has passed). - * - * Returns state needed to pop the deadline. + * Any implementation of this function must be safe to invoke when an Interruptible-associated + * latch is held. As this is hard to reason about, avoid external latches whenever possible. */ - virtual DeadlineState pushArtificialDeadline(Date_t deadline, ErrorCodes::Error error) = 0; + virtual void onLongSleep(const StringData& name) = 0; /** - * Pops the subsidiary deadline introduced by push. + * Action to do when a wait resolves after a sleep + * + * Any implementation of this function must be safe to invoke when an Interruptible-associated + * latch is held. As this is hard to reason about, avoid external latches whenever possible. */ - virtual void popArtificialDeadline(DeadlineState) = 0; + virtual void onWake(const StringData& name, WakeReason reason, WakeSpeed speed) = 0; +}; - /** - * Returns the equivalent of Date_t::now() + waitFor for the interruptible's clock - */ - virtual Date_t getExpirationDateForWaitForValue(Milliseconds waitFor) = 0; +inline void Interruptible::addWaitListener(WaitListener* listener) { + auto& state = _getListenerState(); - class NotInterruptible; -}; + state.list.push_back(listener); +} + +inline void Interruptible::_onLongSleep(const StringData& name) { + auto& state = _getListenerState(); + + for (auto listener : state.list) { + listener->onLongSleep(name); + } +} + +inline void Interruptible::_onWake(const StringData& name, WakeReason reason, WakeSpeed speed) { + auto& state = _getListenerState(); + + for (auto listener : state.list) { + listener->onWake(name, reason, speed); + } +} /** - * A not interruptible type which can be used as a lightweight default arg for interruptible taking + * A non-interruptible type which can be used as a lightweight default arg for Interruptible-taking * functions. */ class Interruptible::NotInterruptible final : public Interruptible { @@ -417,9 +518,9 @@ class Interruptible::NotInterruptible final : public Interruptible { } // It's invalid to call the deadline or ignore interruption guards on a possibly noop - // interruptible. + // Interruptible. // - // The noop interruptible should only be invoked as a default arg at the bottom of the call + // The noop Interruptible should only be invoked as a default arg at the bottom of the call // stack (with types that won't modify it's invocation) IgnoreInterruptsState pushIgnoreInterrupts() override { MONGO_UNREACHABLE; diff --git a/src/mongo/watchdog/watchdog.cpp b/src/mongo/watchdog/watchdog.cpp index 3dff62a1086..cc2cb5942a5 100644 --- a/src/mongo/watchdog/watchdog.cpp +++ b/src/mongo/watchdog/watchdog.cpp @@ -151,21 +151,22 @@ void WatchdogPeriodicThread::doLoop() { // Check if the period is different? // We are signalled on period changes at which point we may be done waiting or need to // wait longer. - while (startTime + _period > preciseClockSource->now() && - _state != State::kShutdownRequested) { - auto s = opCtx->waitForConditionOrInterruptNoAssertUntil( - _condvar, lock, startTime + _period); - - if (!s.isOK()) { - // The only bad status is when we are in shutdown - if (!opCtx->getServiceContext()->getKillAllOperations()) { - severe() << "Watchdog was interrupted, shutting down, reason: " - << s.getStatus(); - exitCleanly(ExitCode::EXIT_ABRUPT); - } - - return; + try { + opCtx->waitForConditionOrInterruptUntil(_condvar, lock, startTime + _period, [&] { + return (startTime + _period) <= preciseClockSource->now() || + _state == State::kShutdownRequested; + }); + } catch (const DBException& e) { + // The only bad status is when we are in shutdown + if (!opCtx->getServiceContext()->getKillAllOperations()) { + severe() << "Watchdog was interrupted, shutting down, reason: " << e.toStatus(); + exitCleanly(ExitCode::EXIT_ABRUPT); } + + // This interruption ends the WatchdogPeriodicThread. This means it is possible to + // killOp this operation and stop it for the lifetime of the process. + LOG(1) << "WatchdogPeriodicThread interrupted by: " << e; + return; } // Are we done running? |