summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorSpencer T Brody <spencer@mongodb.com>2018-09-26 15:00:10 -0400
committerSpencer T Brody <spencer@mongodb.com>2018-09-28 13:14:15 -0400
commit56aa77807e5ff288635b69c40bf4d201e715051d (patch)
tree4daa30697715b7c6f1c1c7cfb3e4ad084f36d202 /src
parenta73b2fc77d72a06380826259620f105e179a4c5c (diff)
downloadmongo-56aa77807e5ff288635b69c40bf4d201e715051d.tar.gz
SERVER-37329 Make ShardRemote interruptable while waiting for a response
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/operation_context.cpp2
-rw-r--r--src/mongo/db/s/config/configsvr_shard_collection_command.cpp17
-rw-r--r--src/mongo/executor/task_executor.h5
-rw-r--r--src/mongo/executor/thread_pool_task_executor.cpp4
-rw-r--r--src/mongo/executor/thread_pool_task_executor.h3
-rw-r--r--src/mongo/s/client/shard_remote.cpp13
-rw-r--r--src/mongo/s/sharding_task_executor.cpp4
-rw-r--r--src/mongo/s/sharding_task_executor.h3
-rw-r--r--src/mongo/unittest/task_executor_proxy.cpp4
-rw-r--r--src/mongo/unittest/task_executor_proxy.h3
10 files changed, 39 insertions, 19 deletions
diff --git a/src/mongo/db/operation_context.cpp b/src/mongo/db/operation_context.cpp
index 873972a8c30..bcac35e0a00 100644
--- a/src/mongo/db/operation_context.cpp
+++ b/src/mongo/db/operation_context.cpp
@@ -86,7 +86,7 @@ OperationContext::OperationContext(Client* client, unsigned int opId)
void OperationContext::setDeadlineAndMaxTime(Date_t when,
Microseconds maxTime,
ErrorCodes::Error timeoutError) {
- invariant(!getClient()->isInDirectClient());
+ invariant(!getClient()->isInDirectClient() || _hasArtificialDeadline);
invariant(ErrorCodes::isExceededTimeLimitError(timeoutError));
uassert(40120,
"Illegal attempt to change operation deadline",
diff --git a/src/mongo/db/s/config/configsvr_shard_collection_command.cpp b/src/mongo/db/s/config/configsvr_shard_collection_command.cpp
index 788c1fe1f38..3b9161182be 100644
--- a/src/mongo/db/s/config/configsvr_shard_collection_command.cpp
+++ b/src/mongo/db/s/config/configsvr_shard_collection_command.cpp
@@ -767,13 +767,16 @@ public:
shardsvrShardCollectionRequest.setGetUUIDfromPrimaryShard(
request.getGetUUIDfromPrimaryShard());
- auto cmdResponse = uassertStatusOK(primaryShard->runCommandWithFixedRetryAttempts(
- opCtx,
- ReadPreferenceSetting(ReadPreference::PrimaryOnly),
- "admin",
- CommandHelpers::appendMajorityWriteConcern(CommandHelpers::appendPassthroughFields(
- cmdObj, shardsvrShardCollectionRequest.toBSON())),
- Shard::RetryPolicy::kIdempotent));
+ auto cmdResponse = opCtx->runWithoutInterruption([&] {
+ return uassertStatusOK(primaryShard->runCommandWithFixedRetryAttempts(
+ opCtx,
+ ReadPreferenceSetting(ReadPreference::PrimaryOnly),
+ "admin",
+ CommandHelpers::appendMajorityWriteConcern(CommandHelpers::appendPassthroughFields(
+ cmdObj, shardsvrShardCollectionRequest.toBSON())),
+ Shard::RetryPolicy::kIdempotent));
+ });
+
if (cmdResponse.commandStatus != ErrorCodes::CommandNotFound) {
uassertStatusOK(cmdResponse.commandStatus);
diff --git a/src/mongo/executor/task_executor.h b/src/mongo/executor/task_executor.h
index d23069ba3ac..404065f3d27 100644
--- a/src/mongo/executor/task_executor.h
+++ b/src/mongo/executor/task_executor.h
@@ -257,8 +257,11 @@ public:
* callback ran.
*
* NOTE: Do not call from a callback running in the executor.
+ *
+ * Prefer the version that takes an OperationContext* to this version.
*/
- virtual void wait(const CallbackHandle& cbHandle) = 0;
+ virtual void wait(const CallbackHandle& cbHandle,
+ Interruptible* interruptible = Interruptible::notInterruptible()) = 0;
/**
* Appends information about the underlying network interface's connections to the given
diff --git a/src/mongo/executor/thread_pool_task_executor.cpp b/src/mongo/executor/thread_pool_task_executor.cpp
index 37181bca6a9..c6015df0150 100644
--- a/src/mongo/executor/thread_pool_task_executor.cpp
+++ b/src/mongo/executor/thread_pool_task_executor.cpp
@@ -473,7 +473,7 @@ void ThreadPoolTaskExecutor::cancel(const CallbackHandle& cbHandle) {
}
}
-void ThreadPoolTaskExecutor::wait(const CallbackHandle& cbHandle) {
+void ThreadPoolTaskExecutor::wait(const CallbackHandle& cbHandle, Interruptible* interruptible) {
invariant(cbHandle.isValid());
auto cbState = checked_cast<CallbackState*>(getCallbackFromHandle(cbHandle));
if (cbState->isFinished.load()) {
@@ -484,7 +484,7 @@ void ThreadPoolTaskExecutor::wait(const CallbackHandle& cbHandle) {
cbState->finishedCondition.emplace();
}
while (!cbState->isFinished.load()) {
- cbState->finishedCondition->wait(lk);
+ interruptible->waitForConditionOrInterrupt(*cbState->finishedCondition, lk);
}
}
diff --git a/src/mongo/executor/thread_pool_task_executor.h b/src/mongo/executor/thread_pool_task_executor.h
index 92b809bc0db..7beea67f044 100644
--- a/src/mongo/executor/thread_pool_task_executor.h
+++ b/src/mongo/executor/thread_pool_task_executor.h
@@ -85,7 +85,8 @@ public:
const RemoteCommandCallbackFn& cb,
const transport::BatonHandle& baton = nullptr) override;
void cancel(const CallbackHandle& cbHandle) override;
- void wait(const CallbackHandle& cbHandle) override;
+ void wait(const CallbackHandle& cbHandle,
+ Interruptible* interruptible = Interruptible::notInterruptible()) override;
void appendConnectionStats(ConnectionPoolStats* stats) const override;
diff --git a/src/mongo/s/client/shard_remote.cpp b/src/mongo/s/client/shard_remote.cpp
index 59692fc28c9..84a089711c6 100644
--- a/src/mongo/s/client/shard_remote.cpp
+++ b/src/mongo/s/client/shard_remote.cpp
@@ -210,7 +210,18 @@ StatusWith<Shard::CommandResponse> ShardRemote::_runCommand(OperationContext* op
// Block until the command is carried out
auto executor = Grid::get(opCtx)->getExecutorPool()->getFixedExecutor();
- executor->wait(asyncHandle.handle);
+ try {
+ executor->wait(asyncHandle.handle, opCtx);
+ } catch (const DBException& e) {
+ // If waiting for the response is interrupted, then we still have a callback out and
+ // registered with the TaskExecutor to run when the response finally does come back.
+ // Since the callback references local state, it would be invalid for the callback to run
+ // after leaving the scope of this method. Therefore we cancel the callback and wait
+ // uninterruptably for the callback to be run.
+ executor->cancel(asyncHandle.handle);
+ executor->wait(asyncHandle.handle);
+ return e.toStatus();
+ }
const auto& host = asyncHandle.hostTargetted;
updateReplSetMonitor(host, response.status);
diff --git a/src/mongo/s/sharding_task_executor.cpp b/src/mongo/s/sharding_task_executor.cpp
index 7a25805d05b..5ae93c3e33e 100644
--- a/src/mongo/s/sharding_task_executor.cpp
+++ b/src/mongo/s/sharding_task_executor.cpp
@@ -211,8 +211,8 @@ void ShardingTaskExecutor::cancel(const CallbackHandle& cbHandle) {
_executor->cancel(cbHandle);
}
-void ShardingTaskExecutor::wait(const CallbackHandle& cbHandle) {
- _executor->wait(cbHandle);
+void ShardingTaskExecutor::wait(const CallbackHandle& cbHandle, Interruptible* interruptible) {
+ _executor->wait(cbHandle, interruptible);
}
void ShardingTaskExecutor::appendConnectionStats(ConnectionPoolStats* stats) const {
diff --git a/src/mongo/s/sharding_task_executor.h b/src/mongo/s/sharding_task_executor.h
index 92a78169549..25cb48cc4e6 100644
--- a/src/mongo/s/sharding_task_executor.h
+++ b/src/mongo/s/sharding_task_executor.h
@@ -72,7 +72,8 @@ public:
const RemoteCommandCallbackFn& cb,
const transport::BatonHandle& baton = nullptr) override;
void cancel(const CallbackHandle& cbHandle) override;
- void wait(const CallbackHandle& cbHandle) override;
+ void wait(const CallbackHandle& cbHandle,
+ Interruptible* interruptible = Interruptible::notInterruptible()) override;
void appendConnectionStats(ConnectionPoolStats* stats) const override;
diff --git a/src/mongo/unittest/task_executor_proxy.cpp b/src/mongo/unittest/task_executor_proxy.cpp
index b58d75a394e..72f6c052231 100644
--- a/src/mongo/unittest/task_executor_proxy.cpp
+++ b/src/mongo/unittest/task_executor_proxy.cpp
@@ -109,8 +109,8 @@ void TaskExecutorProxy::cancel(const CallbackHandle& cbHandle) {
_executor->cancel(cbHandle);
}
-void TaskExecutorProxy::wait(const CallbackHandle& cbHandle) {
- _executor->wait(cbHandle);
+void TaskExecutorProxy::wait(const CallbackHandle& cbHandle, Interruptible* interruptible) {
+ _executor->wait(cbHandle, interruptible);
}
void TaskExecutorProxy::appendConnectionStats(executor::ConnectionPoolStats* stats) const {
diff --git a/src/mongo/unittest/task_executor_proxy.h b/src/mongo/unittest/task_executor_proxy.h
index 3c2bb935692..1ad2d26d4f5 100644
--- a/src/mongo/unittest/task_executor_proxy.h
+++ b/src/mongo/unittest/task_executor_proxy.h
@@ -70,7 +70,8 @@ public:
const RemoteCommandCallbackFn& cb,
const transport::BatonHandle& baton = nullptr) override;
virtual void cancel(const CallbackHandle& cbHandle) override;
- virtual void wait(const CallbackHandle& cbHandle) override;
+ virtual void wait(const CallbackHandle& cbHandle,
+ Interruptible* interruptible = Interruptible::notInterruptible()) override;
virtual void appendConnectionStats(executor::ConnectionPoolStats* stats) const override;
private: