diff options
-rw-r--r-- | src/mongo/client/fetcher.cpp | 18 | ||||
-rw-r--r-- | src/mongo/client/fetcher.h | 13 | ||||
-rw-r--r-- | src/mongo/client/fetcher_test.cpp | 45 | ||||
-rw-r--r-- | src/mongo/s/client/shard_remote.cpp | 17 |
4 files changed, 83 insertions, 10 deletions
diff --git a/src/mongo/client/fetcher.cpp b/src/mongo/client/fetcher.cpp index 4223dbd85ab..45eb03d3366 100644 --- a/src/mongo/client/fetcher.cpp +++ b/src/mongo/client/fetcher.cpp @@ -196,7 +196,7 @@ Fetcher::Fetcher(executor::TaskExecutor* executor, } Fetcher::~Fetcher() { - DESTRUCTOR_GUARD(shutdown(); join();); + DESTRUCTOR_GUARD(shutdown(); _join();); } HostAndPort Fetcher::getSource() const { @@ -294,9 +294,19 @@ void Fetcher::shutdown() { } } -void Fetcher::join() { - stdx::unique_lock<Latch> lk(_mutex); - _condition.wait(lk, [this]() { return !_isActive_inlock(); }); +Status Fetcher::join(Interruptible* interruptible) { + try { + stdx::unique_lock<Latch> lk(_mutex); + interruptible->waitForConditionOrInterrupt( + _condition, lk, [this]() { return !_isActive_inlock(); }); + return Status::OK(); + } catch (const DBException&) { + return exceptionToStatus(); + } +} + +void Fetcher::_join() { + invariantStatusOK(join(Interruptible::notInterruptible())); } Fetcher::State Fetcher::getState_forTest() const { diff --git a/src/mongo/client/fetcher.h b/src/mongo/client/fetcher.h index 82ab4f4e83f..90ba61b00fc 100644 --- a/src/mongo/client/fetcher.h +++ b/src/mongo/client/fetcher.h @@ -185,10 +185,13 @@ public: void shutdown(); /** - * Waits for remote command requests to complete. + * Waits for remote command requests to complete subject to the Interruptible being interrupted. * Returns immediately if fetcher is not active. + * + * Returns an OK Status if the wait completed successfully without interruption. + * Returns a non-OK Status if the Interruptible had been interrupted. */ - void join(); + Status join(Interruptible* interruptible); // State transitions: // PreStart --> Running --> ShuttingDown --> Complete @@ -243,6 +246,12 @@ private: bool _isShuttingDown() const; bool _isShuttingDown_inlock() const; + /** + * Waits for remote command requests to complete. + * Returns immediately if fetcher is not active. + */ + void _join(); + // Not owned by us. executor::TaskExecutor* _executor; diff --git a/src/mongo/client/fetcher_test.cpp b/src/mongo/client/fetcher_test.cpp index 2a6f3c26024..e013691b77d 100644 --- a/src/mongo/client/fetcher_test.cpp +++ b/src/mongo/client/fetcher_test.cpp @@ -36,6 +36,7 @@ #include "mongo/executor/network_interface_mock.h" #include "mongo/executor/thread_pool_task_executor_test_fixture.h" #include "mongo/rpc/metadata.h" +#include "mongo/util/future_test_utils.h" #include "mongo/unittest/unittest.h" @@ -409,7 +410,7 @@ TEST_F(FetcherTest, CancelWithoutSchedule) { TEST_F(FetcherTest, WaitWithoutSchedule) { ASSERT_FALSE(fetcher->isActive()); - fetcher->join(); + ASSERT_OK(fetcher->join(Interruptible::notInterruptible())); ASSERT_FALSE(fetcher->isActive()); } @@ -445,6 +446,46 @@ TEST_F(FetcherTest, ScheduleAndCancel) { ASSERT_EQUALS(Fetcher::State::kComplete, fetcher->getState_forTest()); } + +TEST_F(FetcherTest, ScheduleAndCancelDueToJoinInterruption) { + ASSERT_EQUALS(Fetcher::State::kPreStart, fetcher->getState_forTest()); + + ASSERT_OK(fetcher->schedule()); + ASSERT_EQUALS(Fetcher::State::kRunning, fetcher->getState_forTest()); + + auto net = getNet(); + { + executor::NetworkInterfaceMock::InNetworkGuard guard(net); + assertRemoteCommandNameEquals("find", net->scheduleSuccessfulResponse(BSON("ok" << 1))); + } + + ASSERT_TRUE(fetcher->isActive()); + ASSERT_EQUALS(Fetcher::State::kRunning, fetcher->getState_forTest()); + + DummyInterruptible interruptible; + ASSERT_EQ(fetcher->join(&interruptible), ErrorCodes::Interrupted); + + // To make this test deterministic, we need the Fetcher to already be shut down so it doesn't + // attempt to process the scheduled response. Normally Fetcher::join() would be solely + // responsible for calling Fetcher::shutdown(). + fetcher->shutdown(); + + // The finishProcessingNetworkResponseThread is needed to prevent the main test thread from + // blocking on the NetworkInterfaceMock. + stdx::thread finishProcessingNetworkResponseThread([&]() { + executor::NetworkInterfaceMock::InNetworkGuard guard(net); + getNet()->runReadyNetworkOperations(); + }); + + // We destroy the Fetcher before shutting down the task executor to reflect what would + // ordinarily happen after Fetcher::join() returns an error Status from the Interruptible being + // interrupted. + fetcher.reset(); + finishProcessingNetworkResponseThread.join(); + + ASSERT_EQUALS(ErrorCodes::CallbackCanceled, status.code()); +} + TEST_F(FetcherTest, ScheduleButShutdown) { ASSERT_EQUALS(Fetcher::State::kPreStart, fetcher->getState_forTest()); @@ -464,7 +505,7 @@ TEST_F(FetcherTest, ScheduleButShutdown) { getExecutor().shutdown(); - fetcher->join(); + ASSERT_OK(fetcher->join(Interruptible::notInterruptible())); ASSERT_FALSE(fetcher->isActive()); ASSERT_EQUALS(ErrorCodes::CallbackCanceled, status.code()); diff --git a/src/mongo/s/client/shard_remote.cpp b/src/mongo/s/client/shard_remote.cpp index 78ca5ddd6e8..b678c276510 100644 --- a/src/mongo/s/client/shard_remote.cpp +++ b/src/mongo/s/client/shard_remote.cpp @@ -328,7 +328,17 @@ StatusWith<Shard::QueryResponse> ShardRemote::_runExhaustiveCursorCommand( return scheduleStatus; } - fetcher.join(); + Status joinStatus = fetcher.join(opCtx); + if (!joinStatus.isOK()) { + if (ErrorCodes::isExceededTimeLimitError(joinStatus.code())) { + LOGV2(6195000, + "Operation timed out {error}", + "Operation timed out", + "error"_attr = joinStatus); + } + + return joinStatus; + } updateReplSetMonitor(host.getValue(), status); @@ -513,7 +523,10 @@ Status ShardRemote::runAggregation( return scheduleStatus; } - fetcher.join(); + Status joinStatus = fetcher.join(opCtx); + if (!joinStatus.isOK()) { + return joinStatus; + } updateReplSetMonitor(host, status); |