diff options
author | Ben Caimano <ben.caimano@mongodb.com> | 2019-10-17 00:51:52 +0000 |
---|---|---|
committer | evergreen <evergreen@mongodb.com> | 2019-10-17 00:51:52 +0000 |
commit | 27ed83ca30107c8e39417ba1dfed5ec0dd8b859d (patch) | |
tree | 399e5b548782b6c84627389f94ddb61909e2746a /src | |
parent | b37eb88c8ae801751711dd54f2506d3561989db7 (diff) | |
download | mongo-27ed83ca30107c8e39417ba1dfed5ec0dd8b859d.tar.gz |
SERVER-43987 Require predicates with OperationContext::waitForConditionOrInterrupt()
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/commands/validate.cpp | 6 | ||||
-rw-r--r-- | src/mongo/db/concurrency/flow_control_ticketholder.cpp | 42 | ||||
-rw-r--r-- | src/mongo/db/keys_collection_manager.cpp | 57 | ||||
-rw-r--r-- | src/mongo/db/operation_context.h | 6 | ||||
-rw-r--r-- | src/mongo/db/operation_context_test.cpp | 57 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl.cpp | 42 | ||||
-rw-r--r-- | src/mongo/db/s/implicit_create_collection.cpp | 12 | ||||
-rw-r--r-- | src/mongo/db/s/wait_for_majority_service.cpp | 10 | ||||
-rw-r--r-- | src/mongo/db/s/wait_for_majority_service_test.cpp | 15 | ||||
-rw-r--r-- | src/mongo/db/session_catalog_test.cpp | 6 | ||||
-rw-r--r-- | src/mongo/executor/thread_pool_task_executor.cpp | 26 | ||||
-rw-r--r-- | src/mongo/s/client/shard_registry.cpp | 14 | ||||
-rw-r--r-- | src/mongo/util/future_test_utils.h | 2 | ||||
-rw-r--r-- | src/mongo/util/interruptible.h | 245 | ||||
-rw-r--r-- | src/mongo/watchdog/watchdog.cpp | 29 |
15 files changed, 264 insertions, 305 deletions
diff --git a/src/mongo/db/commands/validate.cpp b/src/mongo/db/commands/validate.cpp index 35369e9ef7c..09594aa207e 100644 --- a/src/mongo/db/commands/validate.cpp +++ b/src/mongo/db/commands/validate.cpp @@ -150,9 +150,9 @@ public: { stdx::unique_lock<Latch> lock(_validationMutex); try { - while (_validationsInProgress.find(nss.ns()) != _validationsInProgress.end()) { - opCtx->waitForConditionOrInterrupt(_validationNotifier, lock); - } + opCtx->waitForConditionOrInterrupt(_validationNotifier, lock, [&] { + return _validationsInProgress.find(nss.ns()) == _validationsInProgress.end(); + }); } catch (AssertionException& e) { CommandHelpers::appendCommandStatusNoThrow( result, diff --git a/src/mongo/db/concurrency/flow_control_ticketholder.cpp b/src/mongo/db/concurrency/flow_control_ticketholder.cpp index 6bb95797502..b8f6217665c 100644 --- a/src/mongo/db/concurrency/flow_control_ticketholder.cpp +++ b/src/mongo/db/concurrency/flow_control_ticketholder.cpp @@ -98,31 +98,25 @@ void FlowControlTicketholder::getTicket(OperationContext* opCtx, ++stats->acquireWaitCount; } - // Make sure operations already waiting on a Flow Control ticket during shut down do not - // hang if the ticket refresher thread has been shut down. - while (_tickets == 0 && !_inShutdown) { - stats->waiting = true; - const std::uint64_t startWaitTime = curTimeMicros64(); - - // This method will wait forever for a ticket. However, it will wake up every so often to - // update the time spent waiting on the ticket. - auto waitDeadline = Date_t::now() + Milliseconds(500); - StatusWith<stdx::cv_status> swCondStatus = - opCtx->waitForConditionOrInterruptNoAssertUntil(_cv, lk, waitDeadline); - - auto waitTime = curTimeMicros64() - startWaitTime; - _totalTimeAcquiringMicros.fetchAndAddRelaxed(waitTime); - stats->timeAcquiringMicros += waitTime; - - // If the operation context state interrupted this wait, the StatusWith result will contain - // the error. If the `waitDeadline` expired, the Status variable will be OK, and the - // `cv_status` value will be `cv_status::timeout`. In either case where Status::OK is - // returned, the loop must re-check the predicate. If the operation context is interrupted - // (and an error status is returned), the intended behavior is to bubble an exception up to - // the user. - uassertStatusOK(swCondStatus); + auto currentWaitTime = curTimeMicros64(); + auto updateTotalTime = [&]() { + auto oldWaitTime = std::exchange(currentWaitTime, curTimeMicros64()); + _totalTimeAcquiringMicros.fetchAndAddRelaxed(currentWaitTime - oldWaitTime); + }; + + stats->waiting = true; + ON_BLOCK_EXIT([&] { + // When this block exits, update the time one last time and note that getTicket() is no + // longer waiting. + updateTotalTime(); + stats->waiting = false; + }); + + // getTicket() should block until there are tickets or the Ticketholder is in shutdown + while (!opCtx->waitForConditionOrInterruptFor( + _cv, lk, Milliseconds(500), [&] { return _tickets > 0 || _inShutdown; })) { + updateTotalTime(); } - stats->waiting = false; if (_inShutdown) { return; diff --git a/src/mongo/db/keys_collection_manager.cpp b/src/mongo/db/keys_collection_manager.cpp index c96af99f13d..91e8bd8f4a4 100644 --- a/src/mongo/db/keys_collection_manager.cpp +++ b/src/mongo/db/keys_collection_manager.cpp @@ -200,12 +200,10 @@ void KeysCollectionManager::PeriodicRunner::refreshNow(OperationContext* opCtx) "aborting keys cache refresh because node is shutting down"); } - if (_refreshRequest) { - return _refreshRequest; + if (!_refreshRequest) { + _refreshRequest = std::make_shared<Notification<void>>(); } - _refreshNeededCV.notify_all(); - _refreshRequest = std::make_shared<Notification<void>>(); return _refreshRequest; }(); @@ -223,8 +221,9 @@ void KeysCollectionManager::PeriodicRunner::_doPeriodicRefresh(ServiceContext* s ThreadClient tc(threadName, service); while (true) { - bool hasRefreshRequestInitially = false; unsigned errorCount = 0; + + decltype(_refreshRequest) request; std::shared_ptr<RefreshFunc> doRefresh; { stdx::lock_guard<Latch> lock(_mutex); @@ -235,7 +234,7 @@ void KeysCollectionManager::PeriodicRunner::_doPeriodicRefresh(ServiceContext* s invariant(_doRefresh.get() != nullptr); doRefresh = _doRefresh; - hasRefreshRequestInitially = _refreshRequest.get() != nullptr; + request = std::move(_refreshRequest); } Milliseconds nextWakeup = kRefreshIntervalIfErrored; @@ -263,6 +262,11 @@ void KeysCollectionManager::PeriodicRunner::_doPeriodicRefresh(ServiceContext* s nextWakeup = kMaxRefreshWaitTime; } } + + // Notify all waiters that the refresh has finished and they can move on + if (request) { + request->set(); + } } maxKeyRefreshWaitTimeOverrideMS.execute([&](const BSONObj& data) { @@ -270,15 +274,9 @@ void KeysCollectionManager::PeriodicRunner::_doPeriodicRefresh(ServiceContext* s }); stdx::unique_lock<Latch> lock(_mutex); - if (_refreshRequest) { - if (!hasRefreshRequestInitially) { - // A fresh request came in, fulfill the request before going to sleep. - continue; - } - - _refreshRequest->set(); - _refreshRequest.reset(); + // A fresh request came in, fulfill the request before going to sleep. + continue; } if (_inShutdown) { @@ -289,24 +287,33 @@ void KeysCollectionManager::PeriodicRunner::_doPeriodicRefresh(ServiceContext* s auto opCtx = cc().makeOperationContext(); MONGO_IDLE_THREAD_BLOCK; - auto sleepStatus = opCtx->waitForConditionOrInterruptNoAssertUntil( - _refreshNeededCV, lock, Date_t::now() + nextWakeup); - - if (ErrorCodes::isShutdownError(sleepStatus.getStatus().code())) { - break; + try { + opCtx->waitForConditionOrInterruptFor( + _refreshNeededCV, lock, nextWakeup, [&]() -> bool { + return _inShutdown || _refreshRequest; + }); + } catch (const DBException& e) { + LOG(1) << "Unable to wait for refresh request due to: " << e; + + if (ErrorCodes::isShutdownError(e)) { + return; + } } } - - stdx::unique_lock<Latch> lock(_mutex); - if (_refreshRequest) { - _refreshRequest->set(); - _refreshRequest.reset(); - } } void KeysCollectionManager::PeriodicRunner::setFunc(RefreshFunc newRefreshStrategy) { stdx::lock_guard<Latch> lock(_mutex); + if (_inShutdown) { + uasserted(ErrorCodes::ShutdownInProgress, + "aborting KeysCollectionManager::PeriodicRunner::setFunc because node is " + "shutting down"); + } + _doRefresh = std::make_shared<RefreshFunc>(std::move(newRefreshStrategy)); + if (!_refreshRequest) { + _refreshRequest = std::make_shared<Notification<void>>(); + } _refreshNeededCV.notify_all(); } diff --git a/src/mongo/db/operation_context.h b/src/mongo/db/operation_context.h index 43553efae9b..7ebef518aec 100644 --- a/src/mongo/db/operation_context.h +++ b/src/mongo/db/operation_context.h @@ -352,9 +352,6 @@ public: */ Microseconds getRemainingMaxTimeMicros() const; - StatusWith<stdx::cv_status> waitForConditionOrInterruptNoAssertUntil( - stdx::condition_variable& cv, BasicLockableAdapter m, Date_t deadline) noexcept override; - bool isIgnoringInterrupts() const; /** @@ -383,6 +380,9 @@ public: } private: + StatusWith<stdx::cv_status> waitForConditionOrInterruptNoAssertUntil( + stdx::condition_variable& cv, BasicLockableAdapter m, Date_t deadline) noexcept override; + IgnoreInterruptsState pushIgnoreInterrupts() override { IgnoreInterruptsState iis{_ignoreInterrupts, {_deadline, _timeoutError, _hasArtificialDeadline}}; diff --git a/src/mongo/db/operation_context_test.cpp b/src/mongo/db/operation_context_test.cpp index d805541218c..cd2c741f51f 100644 --- a/src/mongo/db/operation_context_test.cpp +++ b/src/mongo/db/operation_context_test.cpp @@ -257,7 +257,7 @@ public: auto m = MONGO_MAKE_LATCH(); stdx::condition_variable cv; stdx::unique_lock<Latch> lk(m); - opCtx->waitForConditionOrInterrupt(cv, lk); + opCtx->waitForConditionOrInterrupt(cv, lk, [] { return false; }); } const std::shared_ptr<ClockSourceMock> mockClock = std::make_shared<ClockSourceMock>(); @@ -337,7 +337,9 @@ TEST_F(OperationDeadlineTests, WaitForMaxTimeExpiredCV) { auto m = MONGO_MAKE_LATCH(); stdx::condition_variable cv; stdx::unique_lock<Latch> lk(m); - ASSERT_EQ(ErrorCodes::ExceededTimeLimit, opCtx->waitForConditionOrInterruptNoAssert(cv, lk)); + ASSERT_THROWS_CODE(opCtx->waitForConditionOrInterrupt(cv, lk, [] { return false; }), + DBException, + ErrorCodes::ExceededTimeLimit); } TEST_F(OperationDeadlineTests, WaitForMaxTimeExpiredCVWithWaitUntilSet) { @@ -346,10 +348,10 @@ TEST_F(OperationDeadlineTests, WaitForMaxTimeExpiredCVWithWaitUntilSet) { auto m = MONGO_MAKE_LATCH(); stdx::condition_variable cv; stdx::unique_lock<Latch> lk(m); - ASSERT_EQ( - ErrorCodes::ExceededTimeLimit, - opCtx->waitForConditionOrInterruptNoAssertUntil(cv, lk, mockClock->now() + Seconds{10}) - .getStatus()); + ASSERT_THROWS_CODE(opCtx->waitForConditionOrInterruptUntil( + cv, lk, mockClock->now() + Seconds{10}, [] { return false; }), + DBException, + ErrorCodes::ExceededTimeLimit); } TEST_F(OperationDeadlineTests, NestedTimeoutsTimeoutInOrder) { @@ -601,7 +603,9 @@ TEST_F(OperationDeadlineTests, WaitForKilledOpCV) { auto m = MONGO_MAKE_LATCH(); stdx::condition_variable cv; stdx::unique_lock<Latch> lk(m); - ASSERT_EQ(ErrorCodes::Interrupted, opCtx->waitForConditionOrInterruptNoAssert(cv, lk)); + ASSERT_THROWS_CODE(opCtx->waitForConditionOrInterrupt(cv, lk, [] { return false; }), + DBException, + ErrorCodes::Interrupted); } TEST_F(OperationDeadlineTests, WaitForUntilExpiredCV) { @@ -609,9 +613,8 @@ TEST_F(OperationDeadlineTests, WaitForUntilExpiredCV) { auto m = MONGO_MAKE_LATCH(); stdx::condition_variable cv; stdx::unique_lock<Latch> lk(m); - ASSERT(stdx::cv_status::timeout == - unittest::assertGet( - opCtx->waitForConditionOrInterruptNoAssertUntil(cv, lk, mockClock->now()))); + ASSERT_FALSE( + opCtx->waitForConditionOrInterruptUntil(cv, lk, mockClock->now(), [] { return false; })); } TEST_F(OperationDeadlineTests, WaitForUntilExpiredCVWithMaxTimeSet) { @@ -620,9 +623,8 @@ TEST_F(OperationDeadlineTests, WaitForUntilExpiredCVWithMaxTimeSet) { auto m = MONGO_MAKE_LATCH(); stdx::condition_variable cv; stdx::unique_lock<Latch> lk(m); - ASSERT(stdx::cv_status::timeout == - unittest::assertGet( - opCtx->waitForConditionOrInterruptNoAssertUntil(cv, lk, mockClock->now()))); + ASSERT_FALSE( + opCtx->waitForConditionOrInterruptUntil(cv, lk, mockClock->now(), [] { return false; })); } TEST_F(OperationDeadlineTests, WaitForDurationExpired) { @@ -640,8 +642,10 @@ TEST_F(OperationDeadlineTests, DuringWaitMaxTimeExpirationDominatesUntilExpirati auto m = MONGO_MAKE_LATCH(); stdx::condition_variable cv; stdx::unique_lock<Latch> lk(m); - ASSERT(ErrorCodes::ExceededTimeLimit == - opCtx->waitForConditionOrInterruptNoAssertUntil(cv, lk, mockClock->now())); + ASSERT_THROWS_CODE( + opCtx->waitForConditionOrInterruptUntil(cv, lk, mockClock->now(), [] { return false; }), + DBException, + ErrorCodes::ExceededTimeLimit); } class ThreadedOperationDeadlineTests : public OperationDeadlineTests { @@ -937,17 +941,17 @@ TEST_F(ThreadedOperationDeadlineTests, SleepForWithExpiredForDoesNotBlock) { ASSERT_FALSE(waiterResult.get()); } -TEST(OperationContextTest, TestWaitForConditionOrInterruptNoAssertUntilAPI) { - // `waitForConditionOrInterruptNoAssertUntil` can have three outcomes: +TEST(OperationContextTest, TestWaitForConditionOrInterruptUntilAPI) { + // `waitForConditionOrInterruptUntil` can have three outcomes: // // 1) The condition is satisfied before any timeouts. // 2) The explicit `deadline` function argument is triggered. // 3) The operation context implicitly times out, or is interrupted from a killOp command or // shutdown, etc. // - // Case (1) must return a Status::OK with a value of `cv_status::no_timeout`. Case (2) must also - // return a Status::OK with a value of `cv_status::timeout`. Case (3) must return an error - // status. The error status returned is otherwise configurable. + // Case (1) must return true. + // Case (2) must return false. + // Case (3) must throw a DBException. // // Case (1) is the hardest to test. The condition variable must be notified by a second thread // when the client is waiting on it. Case (1) is also the least in need of having the API @@ -962,17 +966,16 @@ TEST(OperationContextTest, TestWaitForConditionOrInterruptNoAssertUntilAPI) { // Case (2). Expect a Status::OK with a cv_status::timeout. Date_t deadline = Date_t::now() + Milliseconds(500); - StatusWith<stdx::cv_status> ret = - opCtx->waitForConditionOrInterruptNoAssertUntil(cv, lk, deadline); - ASSERT_OK(ret.getStatus()); - ASSERT(ret.getValue() == stdx::cv_status::timeout); + ASSERT_EQ(opCtx->waitForConditionOrInterruptUntil(cv, lk, deadline, [] { return false; }), + false); // Case (3). Expect an error of `MaxTimeMSExpired`. opCtx->setDeadlineByDate(Date_t::now(), ErrorCodes::MaxTimeMSExpired); deadline = Date_t::now() + Seconds(500); - ret = opCtx->waitForConditionOrInterruptNoAssertUntil(cv, lk, deadline); - ASSERT_FALSE(ret.isOK()); - ASSERT_EQUALS(ErrorCodes::MaxTimeMSExpired, ret.getStatus().code()); + ASSERT_THROWS_CODE( + opCtx->waitForConditionOrInterruptUntil(cv, lk, deadline, [] { return false; }), + DBException, + ErrorCodes::MaxTimeMSExpired); } } // namespace diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index d67ade26d70..7103ecf2864 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -1382,27 +1382,36 @@ Status ReplicationCoordinatorImpl::_waitUntilOpTime(OperationContext* opCtx, return {ErrorCodes::ShutdownInProgress, "Shutdown in progress"}; } - // If we are doing a majority committed read we only need to wait for a new snapshot. if (isMajorityCommittedRead) { + // If we are doing a majority committed read we only need to wait for a new snapshot to + // update getCurrentOpTime() past targetOpTime. This block should only run once and + // return without further looping. + LOG(3) << "waitUntilOpTime: waiting for a new snapshot until " << opCtx->getDeadline(); - auto waitStatus = - opCtx->waitForConditionOrInterruptNoAssert(_currentCommittedSnapshotCond, lock); - if (!waitStatus.isOK()) { - return waitStatus.withContext( + try { + opCtx->waitForConditionOrInterrupt(_currentCommittedSnapshotCond, lock, [&] { + return _inShutdown || (targetOpTime <= getCurrentOpTime()); + }); + } catch (const DBException& e) { + return e.toStatus().withContext( str::stream() << "Error waiting for snapshot not less than " << targetOpTime.toString() << ", current relevant optime is " << getCurrentOpTime().toString() << "."); } - if (!_currentCommittedSnapshot) { - // It is possible for the thread to be awoken due to a spurious wakeup, meaning - // the condition variable was never set. - LOG(3) << "waitUntilOpTime: awoken but current committed snapshot is null." - << " Continuing to wait for new snapshot."; - } else { + + if (_inShutdown) { + return {ErrorCodes::ShutdownInProgress, "Shutdown in progress"}; + } + + if (_currentCommittedSnapshot) { + // It seems that targetOpTime can sometimes be default OpTime{}. When there is no + // _currentCommittedSnapshot, _getCurrentCommittedSnapshotOpTime_inlock() and thus + // getCurrentOpTime() also return default OpTime{}. Hence this branch that only runs + // if _currentCommittedSnapshot actually exists. LOG(3) << "Got notified of new snapshot: " << _currentCommittedSnapshot->toString(); } - continue; + return Status::OK(); } // We just need to wait for the opTime to catch up to what we need (not majority RC). @@ -4003,10 +4012,11 @@ void ReplicationCoordinatorImpl::waitUntilSnapshotCommitted(OperationContext* op uassert(ErrorCodes::NotYetInitialized, "Cannot use snapshots until replica set is finished initializing.", _rsConfigState != kConfigUninitialized && _rsConfigState != kConfigInitiating); - while (!_currentCommittedSnapshot || - _currentCommittedSnapshot->opTime.getTimestamp() < untilSnapshot) { - opCtx->waitForConditionOrInterrupt(_currentCommittedSnapshotCond, lock); - } + + opCtx->waitForConditionOrInterrupt(_currentCommittedSnapshotCond, lock, [&] { + return _currentCommittedSnapshot && + _currentCommittedSnapshot->opTime.getTimestamp() >= untilSnapshot; + }); } size_t ReplicationCoordinatorImpl::getNumUncommittedSnapshots() { diff --git a/src/mongo/db/s/implicit_create_collection.cpp b/src/mongo/db/s/implicit_create_collection.cpp index 0154d3cff40..dfc8cd97cb2 100644 --- a/src/mongo/db/s/implicit_create_collection.cpp +++ b/src/mongo/db/s/implicit_create_collection.cpp @@ -72,16 +72,14 @@ public: Status onCannotImplicitlyCreateCollection(OperationContext* opCtx) noexcept { invariant(!opCtx->lockState()->isLocked()); - { + try { stdx::unique_lock<Latch> lg(_mutex); - while (_isInProgress) { - auto status = opCtx->waitForConditionOrInterruptNoAssert(_cvIsInProgress, lg); - if (!status.isOK()) { - return status; - } - } + + opCtx->waitForConditionOrInterrupt(_cvIsInProgress, lg, [&] { return !_isInProgress; }); _isInProgress = true; + } catch (const DBException& e) { + return e.toStatus(); } ON_BLOCK_EXIT([&] { diff --git a/src/mongo/db/s/wait_for_majority_service.cpp b/src/mongo/db/s/wait_for_majority_service.cpp index f41ed83c630..1864335150a 100644 --- a/src/mongo/db/s/wait_for_majority_service.cpp +++ b/src/mongo/db/s/wait_for_majority_service.cpp @@ -27,6 +27,8 @@ * it in the license file. */ +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding + #include "mongo/platform/basic.h" #include "mongo/db/s/wait_for_majority_service.h" @@ -38,6 +40,7 @@ #include "mongo/executor/network_interface_factory.h" #include "mongo/executor/thread_pool_task_executor.h" #include "mongo/util/concurrency/thread_pool.h" +#include "mongo/util/log.h" namespace mongo { @@ -177,8 +180,11 @@ void WaitForMajorityService::_periodicallyWaitForMajority(ServiceContext* servic _queuedOpTimes.erase(lowestOpTimeIter); } - if (_queuedOpTimes.empty() && !_inShutDown) { - _opCtx->waitForConditionOrInterruptNoAssert(_hasNewOpTimeCV, lk).ignore(); + try { + _opCtx->waitForConditionOrInterrupt( + _hasNewOpTimeCV, lk, [&] { return !_queuedOpTimes.empty() || _inShutDown; }); + } catch (const DBException& e) { + LOG(1) << "Unable to wait for new op time due to: " << e; } _opCtx = nullptr; diff --git a/src/mongo/db/s/wait_for_majority_service_test.cpp b/src/mongo/db/s/wait_for_majority_service_test.cpp index ca89ac04c8b..1d7c975d385 100644 --- a/src/mongo/db/s/wait_for_majority_service_test.cpp +++ b/src/mongo/db/s/wait_for_majority_service_test.cpp @@ -79,14 +79,13 @@ public: _waitForMajorityCallCount++; _callCountChangedCV.notify_one(); - while (!_isTestReady) { - auto status = opCtx->waitForConditionOrInterruptNoAssert(_isTestReadyCV, lk); - if (!status.isOK()) { - _isTestReady = false; - _finishWaitingOneOpTimeCV.notify_one(); - - return status; - } + try { + opCtx->waitForConditionOrInterrupt(_isTestReadyCV, lk, [&] { return _isTestReady; }); + } catch (const DBException& e) { + _isTestReady = false; + _finishWaitingOneOpTimeCV.notify_one(); + + return e.toStatus(); } _lastOpTimeWaited = opTime; diff --git a/src/mongo/db/session_catalog_test.cpp b/src/mongo/db/session_catalog_test.cpp index 58e03aa20b4..cd086a9e8b0 100644 --- a/src/mongo/db/session_catalog_test.cpp +++ b/src/mongo/db/session_catalog_test.cpp @@ -603,8 +603,10 @@ TEST_F(SessionCatalogTestWithDefaultOpCtx, ConcurrentCheckOutAndKill) { auto m = MONGO_MAKE_LATCH(); stdx::condition_variable cond; stdx::unique_lock<Latch> lock(m); - ASSERT_EQ(ErrorCodes::InternalError, - _opCtx->waitForConditionOrInterruptNoAssert(cond, lock)); + ASSERT_THROWS_CODE( + _opCtx->waitForConditionOrInterrupt(cond, lock, [] { return false; }), + DBException, + ErrorCodes::InternalError); } } normalCheckOutFinish.get(); diff --git a/src/mongo/executor/thread_pool_task_executor.cpp b/src/mongo/executor/thread_pool_task_executor.cpp index 8573e31d4a1..b7ec2eafb27 100644 --- a/src/mongo/executor/thread_pool_task_executor.cpp +++ b/src/mongo/executor/thread_pool_task_executor.cpp @@ -306,18 +306,18 @@ StatusWith<stdx::cv_status> ThreadPoolTaskExecutor::waitForEvent(OperationContex auto eventState = checked_cast<EventState*>(getEventFromHandle(event)); stdx::unique_lock<Latch> lk(_mutex); - // std::condition_variable::wait() can wake up spuriously, so we have to loop until the event - // is signalled or we time out. - while (!eventState->isSignaledFlag) { - auto status = opCtx->waitForConditionOrInterruptNoAssertUntil( - eventState->isSignaledCondition, lk, deadline); - - if (!status.isOK() || stdx::cv_status::timeout == status) { - return status; + try { + if (opCtx->waitForConditionOrInterruptUntil( + eventState->isSignaledCondition, lk, deadline, [&] { + return eventState->isSignaledFlag; + })) { + return stdx::cv_status::no_timeout; } - } - return stdx::cv_status::no_timeout; + return stdx::cv_status::timeout; + } catch (const DBException& e) { + return e.toStatus(); + } } void ThreadPoolTaskExecutor::waitForEvent(const EventHandle& event) { @@ -531,9 +531,9 @@ void ThreadPoolTaskExecutor::wait(const CallbackHandle& cbHandle, Interruptible* if (!cbState->finishedCondition) { cbState->finishedCondition.emplace(); } - while (!cbState->isFinished.load()) { - interruptible->waitForConditionOrInterrupt(*cbState->finishedCondition, lk); - } + + interruptible->waitForConditionOrInterrupt( + *cbState->finishedCondition, lk, [&] { return cbState->isFinished.load(); }); } void ThreadPoolTaskExecutor::appendConnectionStats(ConnectionPoolStats* stats) const { diff --git a/src/mongo/s/client/shard_registry.cpp b/src/mongo/s/client/shard_registry.cpp index 95b3a726eff..16b1f8bf5f4 100644 --- a/src/mongo/s/client/shard_registry.cpp +++ b/src/mongo/s/client/shard_registry.cpp @@ -294,13 +294,13 @@ bool ShardRegistry::reload(OperationContext* opCtx) { // There is also an issue if multiple threads are allowed to call getAllShards() // simultaneously because there is no good way to determine which of the threads has the // more recent version of the data. - do { - auto waitStatus = opCtx->waitForConditionOrInterruptNoAssert(_inReloadCV, reloadLock); - if (!waitStatus.isOK()) { - LOG(1) << "ShardRegistry reload is interrupted due to: " << redact(waitStatus); - return false; - } - } while (_reloadState == ReloadState::Reloading); + try { + opCtx->waitForConditionOrInterrupt( + _inReloadCV, reloadLock, [&] { return _reloadState != ReloadState::Reloading; }); + } catch (const DBException& e) { + LOG(1) << "ShardRegistry reload is interrupted due to: " << redact(e.toStatus()); + return false; + } if (_reloadState == ReloadState::Idle) { return false; diff --git a/src/mongo/util/future_test_utils.h b/src/mongo/util/future_test_utils.h index 8de926e4f12..551e95b7b64 100644 --- a/src/mongo/util/future_test_utils.h +++ b/src/mongo/util/future_test_utils.h @@ -101,7 +101,7 @@ class DummyInterruptable final : public Interruptible { MONGO_UNREACHABLE; } Date_t getExpirationDateForWaitForValue(Milliseconds waitFor) override { - MONGO_UNREACHABLE; + return Date_t::now() + waitFor; } }; diff --git a/src/mongo/util/interruptible.h b/src/mongo/util/interruptible.h index 6e182d6bbd7..d72be7b77d0 100644 --- a/src/mongo/util/interruptible.h +++ b/src/mongo/util/interruptible.h @@ -41,21 +41,90 @@ namespace mongo { * A type which can be used to wait on condition variables with a level triggered one-way interrupt. * I.e. after the interrupt is triggered (via some non-public api call) subsequent calls to * waitForConditionXXX will fail. Interrupts must unblock all callers of waitForConditionXXX. + * + * This class should never be derived from directly. Instead, please derive from Interruptible. */ -class Interruptible { +class InterruptibleBase { +public: + virtual ~InterruptibleBase() = default; + + /** + * Returns the deadline for this InterruptibleBase, or Date_t::max() if there is no deadline. + */ + virtual Date_t getDeadline() const = 0; + + /** + * Returns Status::OK() unless this operation is in a killed state. + */ + virtual Status checkForInterruptNoAssert() noexcept = 0; + protected: + /** + * Same as Interruptible::waitForConditionOrInterruptUntil(), except returns + * StatusWith<stdx::cv_status> and a non-ok status indicates the error instead of a DBException. + */ + virtual StatusWith<stdx::cv_status> waitForConditionOrInterruptNoAssertUntil( + stdx::condition_variable& cv, BasicLockableAdapter m, Date_t deadline) noexcept = 0; + struct DeadlineState { Date_t deadline; ErrorCodes::Error error; bool hasArtificialDeadline; }; + /** + * Pushes a subsidiary deadline into the InterruptibleBase. Until an associated + * popArtificialDeadline() is invoked, the InterruptibleBase will fail checkForInterrupt() and + * waitForConditionOrInterrupt() calls with the passed error code if the deadline has passed. + * + * Note that deadline's higher than the current value are constrained (such that the passed + * error code will be returned/thrown, but after the min(oldDeadline, newDeadline) has passed). + * + * Returns state needed to pop the deadline. + */ + virtual DeadlineState pushArtificialDeadline(Date_t deadline, ErrorCodes::Error error) = 0; + + /** + * Pops the subsidiary deadline introduced by push. + */ + virtual void popArtificialDeadline(DeadlineState) = 0; + + /** + * Returns the equivalent of Date_t::now() + waitFor for the InterruptibleBase's clock + */ + virtual Date_t getExpirationDateForWaitForValue(Milliseconds waitFor) = 0; + struct IgnoreInterruptsState { bool ignoreInterrupts; DeadlineState deadline; }; /** + * Pushes an ignore interruption critical section into the InterruptibleBase. + * Until an associated popIgnoreInterrupts() is invoked, the InterruptibleBase should ignore + * interruptions related to explicit interruption or previously set deadlines. + * + * Note that new deadlines can be set after this is called, which will again introduce the + * possibility of interruption. + * + * Returns state needed to pop interruption. + */ + virtual IgnoreInterruptsState pushIgnoreInterrupts() = 0; + + /** + * Pops the ignored interruption critical section introduced by push. + */ + virtual void popIgnoreInterrupts(IgnoreInterruptsState iis) = 0; +}; + +/** + * An derived class of InterruptibleBase which provides a variety of helper functions + * + * Please derive from this class instead of InterruptibleBase. + */ +class Interruptible : public InterruptibleBase { +private: + /** * A deadline guard provides a subsidiary deadline to the parent. */ class DeadlineGuard { @@ -134,7 +203,7 @@ protected: public: /** * Returns a statically allocated instance that cannot be interrupted. Useful as a default - * argument to interruptible taking methods. + * argument to Interruptible-taking methods. */ static Interruptible* notInterruptible(); @@ -163,11 +232,6 @@ public: } /** - * Returns the deadline for this interruptible, or Date_t::max() if there is no deadline. - */ - virtual Date_t getDeadline() const = 0; - - /** * Invokes the passed callback with an interruption guard active. Additionally handles the * dance of try/catching the invocation and checking checkForInterrupt with the guard inactive * (to allow a higher level timeout to override a lower level one, or for top level interruption @@ -193,86 +257,9 @@ public: } /** - * Returns Status::OK() unless this operation is in a killed state. - */ - virtual Status checkForInterruptNoAssert() noexcept = 0; - - /** - * Waits for either the condition "cv" to be signaled, this operation to be interrupted, or the - * deadline on this operation to expire. In the event of interruption or operation deadline - * expiration, raises a AssertionException with an error code indicating the interruption type. - */ - template <typename LockT> - void waitForConditionOrInterrupt(stdx::condition_variable& cv, LockT& m) { - uassertStatusOK(waitForConditionOrInterruptNoAssert(cv, m)); - } - - /** - * Waits on condition "cv" for "pred" until "pred" returns true, or this operation - * is interrupted or its deadline expires. Throws a DBException for interruption and - * deadline expiration. - */ - template <typename LockT, typename PredicateT> - void waitForConditionOrInterrupt(stdx::condition_variable& cv, LockT& m, PredicateT pred) { - while (!pred()) { - waitForConditionOrInterrupt(cv, m); - } - } - - /** - * Same as waitForConditionOrInterrupt, except returns a Status instead of throwing - * a DBException to report interruption. - */ - template <typename LockT> - Status waitForConditionOrInterruptNoAssert(stdx::condition_variable& cv, LockT& m) noexcept { - auto status = waitForConditionOrInterruptNoAssertUntil(cv, m, Date_t::max()); - if (!status.isOK()) { - return status.getStatus(); - } - - invariant(status.getValue() == stdx::cv_status::no_timeout); - return Status::OK(); - } - - /** - * Same as the predicate form of waitForConditionOrInterrupt, except that it returns a not okay - * status instead of throwing on interruption. - */ - template <typename LockT, typename PredicateT> - Status waitForConditionOrInterruptNoAssert(stdx::condition_variable& cv, - LockT& m, - PredicateT pred) noexcept { - while (!pred()) { - auto status = waitForConditionOrInterruptNoAssert(cv, m); - - if (!status.isOK()) { - return status; - } - } - - return Status::OK(); - } - - /** - * Waits for condition "cv" to be signaled, or for the given "deadline" to expire, or - * for the operation to be interrupted, or for the operation's own deadline to expire. - * - * If the operation deadline expires or the operation is interrupted, throws a DBException. If - * the given "deadline" expires, returns cv_status::timeout. Otherwise, returns - * cv_status::no_timeout. - */ - template <typename LockT> - stdx::cv_status waitForConditionOrInterruptUntil(stdx::condition_variable& cv, - LockT& m, - Date_t deadline) { - return uassertStatusOK(waitForConditionOrInterruptNoAssertUntil(cv, m, deadline)); - } - - /** * Waits on condition "cv" for "pred" until "pred" returns true, or the given "deadline" * expires, or this operation is interrupted, or this operation's own deadline expires. * - * * If the operation deadline expires or the operation is interrupted, throws a DBException. If * the given "deadline" expires, returns cv_status::timeout. Otherwise, returns * cv_status::no_timeout indicating that "pred" finally returned true. @@ -282,24 +269,28 @@ public: LockT& m, Date_t deadline, PredicateT pred) { + auto waitUntil = [&](Date_t deadline) { + // Wrapping this in a lambda because it's a mouthful + return uassertStatusOK(waitForConditionOrInterruptNoAssertUntil(cv, m, deadline)); + }; + while (!pred()) { - if (stdx::cv_status::timeout == waitForConditionOrInterruptUntil(cv, m, deadline)) { + if (waitUntil(deadline) == stdx::cv_status::timeout) { return pred(); } - } + }; + return true; } /** - * Same as the non-predicate form of waitForConditionOrInterruptUntil, but takes a relative - * amount of time to wait instead of an absolute time point. + * Waits on condition "cv" for "pred" until "pred" returns true, or this operation + * is interrupted or its deadline expires. Throws a DBException for interruption and + * deadline expiration. */ - template <typename LockT> - stdx::cv_status waitForConditionOrInterruptFor(stdx::condition_variable& cv, - LockT& m, - Milliseconds ms) { - return uassertStatusOK( - waitForConditionOrInterruptNoAssertUntil(cv, m, getExpirationDateForWaitForValue(ms))); + template <typename LockT, typename PredicateT> + void waitForConditionOrInterrupt(stdx::condition_variable& cv, LockT& m, PredicateT pred) { + waitForConditionOrInterruptUntil(cv, m, Date_t::max(), std::move(pred)); } /** @@ -311,24 +302,12 @@ public: LockT& m, Milliseconds ms, PredicateT pred) { - const auto deadline = getExpirationDateForWaitForValue(ms); - while (!pred()) { - if (stdx::cv_status::timeout == waitForConditionOrInterruptUntil(cv, m, deadline)) { - return pred(); - } - } - return true; + return waitForConditionOrInterruptUntil( + cv, m, getExpirationDateForWaitForValue(ms), std::move(pred)); } /** - * Same as waitForConditionOrInterruptUntil, except returns StatusWith<stdx::cv_status> and - * non-ok status indicates the error instead of a DBException. - */ - virtual StatusWith<stdx::cv_status> waitForConditionOrInterruptNoAssertUntil( - stdx::condition_variable& cv, BasicLockableAdapter m, Date_t deadline) noexcept = 0; - - /** - * Sleeps until "deadline"; throws an exception if the interruptible is interrupted before then. + * Sleeps until "deadline"; throws an exception if the Interruptible is interrupted before then. */ void sleepUntil(Date_t deadline) { auto m = MONGO_MAKE_LATCH(); @@ -338,7 +317,7 @@ public: } /** - * Sleeps for "duration" ms; throws an exception if the interruptible is interrupted before + * Sleeps for "duration" ms; throws an exception if the Interruptible is interrupted before * then. */ void sleepFor(Milliseconds duration) { @@ -349,51 +328,11 @@ public: } protected: - /** - * Pushes an ignore interruption critical section into the interruptible. Until an associated - * popIgnoreInterrupts is invoked, the interruptible should ignore interruptions related to - * explicit interruption or previously set deadlines. - * - * Note that new deadlines can be set after this is called, which will again introduce the - * possibility of interruption. - * - * Returns state needed to pop interruption. - */ - virtual IgnoreInterruptsState pushIgnoreInterrupts() = 0; - - /** - * Pops the ignored interruption critical section introduced by push. - */ - virtual void popIgnoreInterrupts(IgnoreInterruptsState iis) = 0; - - /** - * Pushes a subsidiary deadline into the interruptible. Until an associated - * popArtificialDeadline is - * invoked, the interruptible will fail checkForInterrupt and waitForConditionOrInterrupt calls - * with the passed error code if the deadline has passed. - * - * Note that deadline's higher than the current value are constrained (such that the passed - * error code will be returned/thrown, but after the min(oldDeadline, newDeadline) has passed). - * - * Returns state needed to pop the deadline. - */ - virtual DeadlineState pushArtificialDeadline(Date_t deadline, ErrorCodes::Error error) = 0; - - /** - * Pops the subsidiary deadline introduced by push. - */ - virtual void popArtificialDeadline(DeadlineState) = 0; - - /** - * Returns the equivalent of Date_t::now() + waitFor for the interruptible's clock - */ - virtual Date_t getExpirationDateForWaitForValue(Milliseconds waitFor) = 0; - class NotInterruptible; }; /** - * A not interruptible type which can be used as a lightweight default arg for interruptible taking + * A non-interruptible type which can be used as a lightweight default arg for Interruptible-taking * functions. */ class Interruptible::NotInterruptible final : public Interruptible { @@ -417,9 +356,9 @@ class Interruptible::NotInterruptible final : public Interruptible { } // It's invalid to call the deadline or ignore interruption guards on a possibly noop - // interruptible. + // Interruptible. // - // The noop interruptible should only be invoked as a default arg at the bottom of the call + // The noop Interruptible should only be invoked as a default arg at the bottom of the call // stack (with types that won't modify it's invocation) IgnoreInterruptsState pushIgnoreInterrupts() override { MONGO_UNREACHABLE; diff --git a/src/mongo/watchdog/watchdog.cpp b/src/mongo/watchdog/watchdog.cpp index 360b98a0be9..accc1b3b06d 100644 --- a/src/mongo/watchdog/watchdog.cpp +++ b/src/mongo/watchdog/watchdog.cpp @@ -151,21 +151,22 @@ void WatchdogPeriodicThread::doLoop() { // Check if the period is different? // We are signalled on period changes at which point we may be done waiting or need to // wait longer. - while (startTime + _period > preciseClockSource->now() && - _state != State::kShutdownRequested) { - auto s = opCtx->waitForConditionOrInterruptNoAssertUntil( - _condvar, lock, startTime + _period); - - if (!s.isOK()) { - // The only bad status is when we are in shutdown - if (!opCtx->getServiceContext()->getKillAllOperations()) { - severe() << "Watchdog was interrupted, shutting down, reason: " - << s.getStatus(); - exitCleanly(ExitCode::EXIT_ABRUPT); - } - - return; + try { + opCtx->waitForConditionOrInterruptUntil(_condvar, lock, startTime + _period, [&] { + return (startTime + _period) <= preciseClockSource->now() || + _state == State::kShutdownRequested; + }); + } catch (const DBException& e) { + // The only bad status is when we are in shutdown + if (!opCtx->getServiceContext()->getKillAllOperations()) { + severe() << "Watchdog was interrupted, shutting down, reason: " << e.toStatus(); + exitCleanly(ExitCode::EXIT_ABRUPT); } + + // This interruption ends the WatchdogPeriodicThread. This means it is possible to + // killOp this operation and stop it for the lifetime of the process. + LOG(1) << "WatchdogPeriodicThread interrupted by: " << e; + return; } // Are we done running? |