diff options
author | Max Hirschhorn <max.hirschhorn@mongodb.com> | 2021-12-10 05:21:03 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-12-10 06:39:52 +0000 |
commit | 0241ba4289618d77b2f3b9a3a3d07a6d08d2c432 (patch) | |
tree | 63a122a83224571e6c94bf2e72e26594d0c8a79f /src/mongo/client | |
parent | 0455be535f6d8149e86b828d9d75edb185c042e2 (diff) | |
download | mongo-0241ba4289618d77b2f3b9a3a3d07a6d08d2c432.tar.gz |
SERVER-61950 Make Fetcher::join() interruptible.
Diffstat (limited to 'src/mongo/client')
-rw-r--r-- | src/mongo/client/fetcher.cpp | 18 | ||||
-rw-r--r-- | src/mongo/client/fetcher.h | 13 | ||||
-rw-r--r-- | src/mongo/client/fetcher_test.cpp | 45 |
3 files changed, 68 insertions, 8 deletions
diff --git a/src/mongo/client/fetcher.cpp b/src/mongo/client/fetcher.cpp index 4223dbd85ab..45eb03d3366 100644 --- a/src/mongo/client/fetcher.cpp +++ b/src/mongo/client/fetcher.cpp @@ -196,7 +196,7 @@ Fetcher::Fetcher(executor::TaskExecutor* executor, } Fetcher::~Fetcher() { - DESTRUCTOR_GUARD(shutdown(); join();); + DESTRUCTOR_GUARD(shutdown(); _join();); } HostAndPort Fetcher::getSource() const { @@ -294,9 +294,19 @@ void Fetcher::shutdown() { } } -void Fetcher::join() { - stdx::unique_lock<Latch> lk(_mutex); - _condition.wait(lk, [this]() { return !_isActive_inlock(); }); +Status Fetcher::join(Interruptible* interruptible) { + try { + stdx::unique_lock<Latch> lk(_mutex); + interruptible->waitForConditionOrInterrupt( + _condition, lk, [this]() { return !_isActive_inlock(); }); + return Status::OK(); + } catch (const DBException&) { + return exceptionToStatus(); + } +} + +void Fetcher::_join() { + invariantStatusOK(join(Interruptible::notInterruptible())); } Fetcher::State Fetcher::getState_forTest() const { diff --git a/src/mongo/client/fetcher.h b/src/mongo/client/fetcher.h index 82ab4f4e83f..90ba61b00fc 100644 --- a/src/mongo/client/fetcher.h +++ b/src/mongo/client/fetcher.h @@ -185,10 +185,13 @@ public: void shutdown(); /** - * Waits for remote command requests to complete. + * Waits for remote command requests to complete subject to the Interruptible being interrupted. * Returns immediately if fetcher is not active. + * + * Returns an OK Status if the wait completed successfully without interruption. + * Returns a non-OK Status if the Interruptible had been interrupted. */ - void join(); + Status join(Interruptible* interruptible); // State transitions: // PreStart --> Running --> ShuttingDown --> Complete @@ -243,6 +246,12 @@ private: bool _isShuttingDown() const; bool _isShuttingDown_inlock() const; + /** + * Waits for remote command requests to complete. + * Returns immediately if fetcher is not active. + */ + void _join(); + // Not owned by us. executor::TaskExecutor* _executor; diff --git a/src/mongo/client/fetcher_test.cpp b/src/mongo/client/fetcher_test.cpp index 2a6f3c26024..e013691b77d 100644 --- a/src/mongo/client/fetcher_test.cpp +++ b/src/mongo/client/fetcher_test.cpp @@ -36,6 +36,7 @@ #include "mongo/executor/network_interface_mock.h" #include "mongo/executor/thread_pool_task_executor_test_fixture.h" #include "mongo/rpc/metadata.h" +#include "mongo/util/future_test_utils.h" #include "mongo/unittest/unittest.h" @@ -409,7 +410,7 @@ TEST_F(FetcherTest, CancelWithoutSchedule) { TEST_F(FetcherTest, WaitWithoutSchedule) { ASSERT_FALSE(fetcher->isActive()); - fetcher->join(); + ASSERT_OK(fetcher->join(Interruptible::notInterruptible())); ASSERT_FALSE(fetcher->isActive()); } @@ -445,6 +446,46 @@ TEST_F(FetcherTest, ScheduleAndCancel) { ASSERT_EQUALS(Fetcher::State::kComplete, fetcher->getState_forTest()); } + +TEST_F(FetcherTest, ScheduleAndCancelDueToJoinInterruption) { + ASSERT_EQUALS(Fetcher::State::kPreStart, fetcher->getState_forTest()); + + ASSERT_OK(fetcher->schedule()); + ASSERT_EQUALS(Fetcher::State::kRunning, fetcher->getState_forTest()); + + auto net = getNet(); + { + executor::NetworkInterfaceMock::InNetworkGuard guard(net); + assertRemoteCommandNameEquals("find", net->scheduleSuccessfulResponse(BSON("ok" << 1))); + } + + ASSERT_TRUE(fetcher->isActive()); + ASSERT_EQUALS(Fetcher::State::kRunning, fetcher->getState_forTest()); + + DummyInterruptible interruptible; + ASSERT_EQ(fetcher->join(&interruptible), ErrorCodes::Interrupted); + + // To make this test deterministic, we need the Fetcher to already be shut down so it doesn't + // attempt to process the scheduled response. Normally Fetcher::join() would be solely + // responsible for calling Fetcher::shutdown(). + fetcher->shutdown(); + + // The finishProcessingNetworkResponseThread is needed to prevent the main test thread from + // blocking on the NetworkInterfaceMock. + stdx::thread finishProcessingNetworkResponseThread([&]() { + executor::NetworkInterfaceMock::InNetworkGuard guard(net); + getNet()->runReadyNetworkOperations(); + }); + + // We destroy the Fetcher before shutting down the task executor to reflect what would + // ordinarily happen after Fetcher::join() returns an error Status from the Interruptible being + // interrupted. + fetcher.reset(); + finishProcessingNetworkResponseThread.join(); + + ASSERT_EQUALS(ErrorCodes::CallbackCanceled, status.code()); +} + TEST_F(FetcherTest, ScheduleButShutdown) { ASSERT_EQUALS(Fetcher::State::kPreStart, fetcher->getState_forTest()); @@ -464,7 +505,7 @@ TEST_F(FetcherTest, ScheduleButShutdown) { getExecutor().shutdown(); - fetcher->join(); + ASSERT_OK(fetcher->join(Interruptible::notInterruptible())); ASSERT_FALSE(fetcher->isActive()); ASSERT_EQUALS(ErrorCodes::CallbackCanceled, status.code()); |