summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/mongo/client/fetcher.cpp18
-rw-r--r--src/mongo/client/fetcher.h13
-rw-r--r--src/mongo/client/fetcher_test.cpp45
-rw-r--r--src/mongo/s/client/shard_remote.cpp17
4 files changed, 83 insertions, 10 deletions
diff --git a/src/mongo/client/fetcher.cpp b/src/mongo/client/fetcher.cpp
index 4223dbd85ab..45eb03d3366 100644
--- a/src/mongo/client/fetcher.cpp
+++ b/src/mongo/client/fetcher.cpp
@@ -196,7 +196,7 @@ Fetcher::Fetcher(executor::TaskExecutor* executor,
}
Fetcher::~Fetcher() {
- DESTRUCTOR_GUARD(shutdown(); join(););
+ DESTRUCTOR_GUARD(shutdown(); _join(););
}
HostAndPort Fetcher::getSource() const {
@@ -294,9 +294,19 @@ void Fetcher::shutdown() {
}
}
-void Fetcher::join() {
- stdx::unique_lock<Latch> lk(_mutex);
- _condition.wait(lk, [this]() { return !_isActive_inlock(); });
+Status Fetcher::join(Interruptible* interruptible) {
+ try {
+ stdx::unique_lock<Latch> lk(_mutex);
+ interruptible->waitForConditionOrInterrupt(
+ _condition, lk, [this]() { return !_isActive_inlock(); });
+ return Status::OK();
+ } catch (const DBException&) {
+ return exceptionToStatus();
+ }
+}
+
+void Fetcher::_join() {
+ invariantStatusOK(join(Interruptible::notInterruptible()));
}
Fetcher::State Fetcher::getState_forTest() const {
diff --git a/src/mongo/client/fetcher.h b/src/mongo/client/fetcher.h
index 82ab4f4e83f..90ba61b00fc 100644
--- a/src/mongo/client/fetcher.h
+++ b/src/mongo/client/fetcher.h
@@ -185,10 +185,13 @@ public:
void shutdown();
/**
- * Waits for remote command requests to complete.
+ * Waits for remote command requests to complete subject to the Interruptible being interrupted.
* Returns immediately if fetcher is not active.
+ *
+ * Returns an OK Status if the wait completed successfully without interruption.
+ * Returns a non-OK Status if the Interruptible had been interrupted.
*/
- void join();
+ Status join(Interruptible* interruptible);
// State transitions:
// PreStart --> Running --> ShuttingDown --> Complete
@@ -243,6 +246,12 @@ private:
bool _isShuttingDown() const;
bool _isShuttingDown_inlock() const;
+ /**
+ * Waits for remote command requests to complete.
+ * Returns immediately if fetcher is not active.
+ */
+ void _join();
+
// Not owned by us.
executor::TaskExecutor* _executor;
diff --git a/src/mongo/client/fetcher_test.cpp b/src/mongo/client/fetcher_test.cpp
index 2a6f3c26024..e013691b77d 100644
--- a/src/mongo/client/fetcher_test.cpp
+++ b/src/mongo/client/fetcher_test.cpp
@@ -36,6 +36,7 @@
#include "mongo/executor/network_interface_mock.h"
#include "mongo/executor/thread_pool_task_executor_test_fixture.h"
#include "mongo/rpc/metadata.h"
+#include "mongo/util/future_test_utils.h"
#include "mongo/unittest/unittest.h"
@@ -409,7 +410,7 @@ TEST_F(FetcherTest, CancelWithoutSchedule) {
TEST_F(FetcherTest, WaitWithoutSchedule) {
ASSERT_FALSE(fetcher->isActive());
- fetcher->join();
+ ASSERT_OK(fetcher->join(Interruptible::notInterruptible()));
ASSERT_FALSE(fetcher->isActive());
}
@@ -445,6 +446,46 @@ TEST_F(FetcherTest, ScheduleAndCancel) {
ASSERT_EQUALS(Fetcher::State::kComplete, fetcher->getState_forTest());
}
+
+TEST_F(FetcherTest, ScheduleAndCancelDueToJoinInterruption) {
+ ASSERT_EQUALS(Fetcher::State::kPreStart, fetcher->getState_forTest());
+
+ ASSERT_OK(fetcher->schedule());
+ ASSERT_EQUALS(Fetcher::State::kRunning, fetcher->getState_forTest());
+
+ auto net = getNet();
+ {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(net);
+ assertRemoteCommandNameEquals("find", net->scheduleSuccessfulResponse(BSON("ok" << 1)));
+ }
+
+ ASSERT_TRUE(fetcher->isActive());
+ ASSERT_EQUALS(Fetcher::State::kRunning, fetcher->getState_forTest());
+
+ DummyInterruptible interruptible;
+ ASSERT_EQ(fetcher->join(&interruptible), ErrorCodes::Interrupted);
+
+ // To make this test deterministic, we need the Fetcher to already be shut down so it doesn't
+ // attempt to process the scheduled response. Normally Fetcher::join() would be solely
+ // responsible for calling Fetcher::shutdown().
+ fetcher->shutdown();
+
+ // The finishProcessingNetworkResponseThread is needed to prevent the main test thread from
+ // blocking on the NetworkInterfaceMock.
+ stdx::thread finishProcessingNetworkResponseThread([&]() {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(net);
+ getNet()->runReadyNetworkOperations();
+ });
+
+ // We destroy the Fetcher before shutting down the task executor to reflect what would
+ // ordinarily happen after Fetcher::join() returns an error Status from the Interruptible being
+ // interrupted.
+ fetcher.reset();
+ finishProcessingNetworkResponseThread.join();
+
+ ASSERT_EQUALS(ErrorCodes::CallbackCanceled, status.code());
+}
+
TEST_F(FetcherTest, ScheduleButShutdown) {
ASSERT_EQUALS(Fetcher::State::kPreStart, fetcher->getState_forTest());
@@ -464,7 +505,7 @@ TEST_F(FetcherTest, ScheduleButShutdown) {
getExecutor().shutdown();
- fetcher->join();
+ ASSERT_OK(fetcher->join(Interruptible::notInterruptible()));
ASSERT_FALSE(fetcher->isActive());
ASSERT_EQUALS(ErrorCodes::CallbackCanceled, status.code());
diff --git a/src/mongo/s/client/shard_remote.cpp b/src/mongo/s/client/shard_remote.cpp
index 78ca5ddd6e8..b678c276510 100644
--- a/src/mongo/s/client/shard_remote.cpp
+++ b/src/mongo/s/client/shard_remote.cpp
@@ -328,7 +328,17 @@ StatusWith<Shard::QueryResponse> ShardRemote::_runExhaustiveCursorCommand(
return scheduleStatus;
}
- fetcher.join();
+ Status joinStatus = fetcher.join(opCtx);
+ if (!joinStatus.isOK()) {
+ if (ErrorCodes::isExceededTimeLimitError(joinStatus.code())) {
+ LOGV2(6195000,
+ "Operation timed out {error}",
+ "Operation timed out",
+ "error"_attr = joinStatus);
+ }
+
+ return joinStatus;
+ }
updateReplSetMonitor(host.getValue(), status);
@@ -513,7 +523,10 @@ Status ShardRemote::runAggregation(
return scheduleStatus;
}
- fetcher.join();
+ Status joinStatus = fetcher.join(opCtx);
+ if (!joinStatus.isOK()) {
+ return joinStatus;
+ }
updateReplSetMonitor(host, status);