diff options
author | Matthew Saltz <matthew.saltz@mongodb.com> | 2020-10-08 18:24:22 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-10-09 00:20:21 +0000 |
commit | d5e397c7f2a332b8149c245c5ade96c1ba986125 (patch) | |
tree | 67d671768dfe12b3cca3f7f6a9311049f872d17d | |
parent | e83d35051f18330d5bab6da729b2629b38a729ab (diff) | |
download | mongo-d5e397c7f2a332b8149c245c5ade96c1ba986125.tar.gz |
SERVER-50658 Add support for CancelationTokens to TaskExecutor::scheduleRemoteCommand
-rw-r--r-- | src/mongo/executor/scoped_task_executor.cpp | 17 | ||||
-rw-r--r-- | src/mongo/executor/task_executor.cpp | 90 | ||||
-rw-r--r-- | src/mongo/executor/task_executor.h | 31 | ||||
-rw-r--r-- | src/mongo/executor/task_executor_test_common.cpp | 113 | ||||
-rw-r--r-- | src/mongo/executor/task_executor_test_common.h | 2 | ||||
-rw-r--r-- | src/mongo/executor/task_executor_test_fixture.cpp | 2 | ||||
-rw-r--r-- | src/mongo/executor/task_executor_test_fixture.h | 4 | ||||
-rw-r--r-- | src/mongo/executor/thread_pool_task_executor.cpp | 8 | ||||
-rw-r--r-- | src/mongo/executor/thread_pool_task_executor_test.cpp | 2 | ||||
-rw-r--r-- | src/mongo/executor/thread_pool_task_executor_test_fixture.cpp | 4 | ||||
-rw-r--r-- | src/mongo/executor/thread_pool_task_executor_test_fixture.h | 2 |
11 files changed, 241 insertions, 34 deletions
diff --git a/src/mongo/executor/scoped_task_executor.cpp b/src/mongo/executor/scoped_task_executor.cpp index b0f471cbd65..fe1d178e111 100644 --- a/src/mongo/executor/scoped_task_executor.cpp +++ b/src/mongo/executor/scoped_task_executor.cpp @@ -47,9 +47,7 @@ static const inline auto kDefaultShutdownStatus = * Implements the wrapping indirection needed to satisfy the ScopedTaskExecutor contract. Note * that at least shutdown() must be called on this type before destruction. */ -class ScopedTaskExecutor::Impl : public std::enable_shared_from_this<ScopedTaskExecutor::Impl>, - public TaskExecutor { - +class ScopedTaskExecutor::Impl : public TaskExecutor { public: Impl(std::shared_ptr<TaskExecutor> executor, Status shutdownStatus) : _executor(std::move(executor)), _shutdownStatus(std::move(shutdownStatus)) {} @@ -184,6 +182,17 @@ public: private: /** + * Helper function to get a shared_ptr<ScopedTaskExecutor::Impl> to this object, akin to + * shared_from_this(). TaskExecutor (the parent class of ScopedTaskExecutor::Impl) inherits from + * std::enable_shared_from_this, so shared_from_this() returns a std::shared_ptr<TaskExecutor>, + * which means we need to cast it to use it as a pointer to the subclass + * ScopedTaskExecutor::Impl. + */ + std::shared_ptr<ScopedTaskExecutor::Impl> shared_self() { + return std::static_pointer_cast<ScopedTaskExecutor::Impl>(shared_from_this()); + } + + /** * Wraps a scheduling call, along with its callback, so that: * * 1. If the callback is run, it is invoked with a not-okay argument if this task executor or @@ -250,7 +259,7 @@ private: // State 2 - Indeterminate state. We don't know yet if the task will get scheduled. auto swCbHandle = std::forward<ScheduleCall>(schedule)( - [id, work = std::forward<Work>(work), self = shared_from_this()](const auto& cargs) { + [id, work = std::forward<Work>(work), self = shared_self()](const auto& cargs) { using ArgsT = std::decay_t<decltype(cargs)>; stdx::unique_lock<Latch> lk(self->_mutex); diff --git a/src/mongo/executor/task_executor.cpp b/src/mongo/executor/task_executor.cpp index f784515c9cc..8a4b30f08e6 100644 --- a/src/mongo/executor/task_executor.cpp +++ b/src/mongo/executor/task_executor.cpp @@ -34,6 +34,71 @@ namespace mongo { namespace executor { +namespace { + +Status wrapCallbackHandleWithCancelToken( + const std::shared_ptr<TaskExecutor>& executor, + const StatusWith<TaskExecutor::CallbackHandle>& swCallbackHandle, + const CancelationToken& token) { + if (!swCallbackHandle.isOK()) { + return swCallbackHandle.getStatus(); + } + + token.onCancel() + .unsafeToInlineFuture() + .then([executor, callbackHandle = std::move(swCallbackHandle.getValue())]() mutable { + executor->cancel(callbackHandle); + }) + .getAsync([](auto) {}); + return Status::OK(); +} + +/** + * Takes a schedule(Exhaust)RemoteCommand(OnAny)-style function and wraps it to return a future and + * be cancelable with CancelationTokens. + */ +template <typename Request, typename Response, typename ScheduleFn> +ExecutorFuture<Response> wrapScheduleCallWithCancelTokenAndFuture( + const std::shared_ptr<TaskExecutor>& executor, + ScheduleFn&& schedule, + const Request& request, + const CancelationToken& token, + const BatonHandle& baton) { + if (token.isCanceled()) { + return ExecutorFuture<Response>(executor, TaskExecutor::kCallbackCanceledErrorStatus); + } + + auto [promise, future] = makePromiseFuture<Response>(); + // This has to be made shared because otherwise we'd have to move the promise into this + // callback, and would be unable to use it in the case where scheduling the request fails below. + auto sharedPromise = std::make_shared<Promise<Response>>(std::move(promise)); + auto signalPromiseOnCompletion = [sharedPromise](const auto& args) mutable { + auto status = args.response.status; + if (status.isOK()) { + sharedPromise->emplaceValue(std::move(args.response)); + } else { + // Only set an error on failures to send the request (including if the request was + // canceled). Errors from the remote host will be contained in the response. + sharedPromise->setError(status); + } + }; + + auto scheduleStatus = wrapCallbackHandleWithCancelToken( + executor, + std::forward<ScheduleFn>(schedule)(request, std::move(signalPromiseOnCompletion), baton), + token); + + if (!scheduleStatus.isOK()) { + // If scheduleStatus is not okay, then the callback signalPromiseOnCompletion should never + // run, meaning that it will be okay to set the promise here. + sharedPromise->setError(scheduleStatus); + } + + return std::move(future).thenRunOn(executor); +} + +} // namespace + TaskExecutor::TaskExecutor() = default; TaskExecutor::~TaskExecutor() = default; @@ -123,6 +188,30 @@ StatusWith<TaskExecutor::CallbackHandle> TaskExecutor::scheduleRemoteCommand( baton); } +ExecutorFuture<TaskExecutor::ResponseStatus> TaskExecutor::scheduleRemoteCommand( + const RemoteCommandRequest& request, const CancelationToken& token, const BatonHandle& baton) { + return wrapScheduleCallWithCancelTokenAndFuture<decltype(request), + TaskExecutor::ResponseStatus>( + shared_from_this(), + [this](const auto&... args) { return scheduleRemoteCommand(args...); }, + request, + token, + baton); +} + +ExecutorFuture<TaskExecutor::ResponseOnAnyStatus> TaskExecutor::scheduleRemoteCommandOnAny( + const RemoteCommandRequestOnAny& request, + const CancelationToken& token, + const BatonHandle& baton) { + return wrapScheduleCallWithCancelTokenAndFuture<decltype(request), + TaskExecutor::ResponseOnAnyStatus>( + shared_from_this(), + [this](const auto&... args) { return scheduleRemoteCommandOnAny(args...); }, + request, + token, + baton); +} + StatusWith<TaskExecutor::CallbackHandle> TaskExecutor::scheduleExhaustRemoteCommand( const RemoteCommandRequest& request, const RemoteCommandCallbackFn& cb, @@ -133,6 +222,5 @@ StatusWith<TaskExecutor::CallbackHandle> TaskExecutor::scheduleExhaustRemoteComm }, baton); } - } // namespace executor } // namespace mongo diff --git a/src/mongo/executor/task_executor.h b/src/mongo/executor/task_executor.h index c3a46671672..97eeee7d43a 100644 --- a/src/mongo/executor/task_executor.h +++ b/src/mongo/executor/task_executor.h @@ -40,6 +40,7 @@ #include "mongo/executor/remote_command_response.h" #include "mongo/stdx/condition_variable.h" #include "mongo/transport/baton.h" +#include "mongo/util/cancelation.h" #include "mongo/util/future.h" #include "mongo/util/out_of_line_executor.h" #include "mongo/util/time_support.h" @@ -71,11 +72,18 @@ struct ConnectionPoolStats; * If an event is unsignaled when shutdown is called, the executor will ensure that any threads * blocked in waitForEvent() eventually return. */ -class TaskExecutor : public OutOfLineExecutor { +class TaskExecutor : public OutOfLineExecutor, public std::enable_shared_from_this<TaskExecutor> { TaskExecutor(const TaskExecutor&) = delete; TaskExecutor& operator=(const TaskExecutor&) = delete; public: + /** + * Error status that should be used by implementations of TaskExecutor when + * a callback is canceled. + */ + static const inline Status kCallbackCanceledErrorStatus{ErrorCodes::CallbackCanceled, + "Callback canceled"}; + struct CallbackArgs; struct RemoteCommandCallbackArgs; struct RemoteCommandOnAnyCallbackArgs; @@ -275,11 +283,32 @@ public: const RemoteCommandCallbackFn& cb, const BatonHandle& baton = nullptr); + /** + * Schedules the given request to be sent and returns a future containing the response. The + * resulting future will be set with an error only if there is a failure to send the request. + * Errors from the remote node will be contained in the ResponseStatus object. + * + * The input CancelationToken may be used to cancel sending the request. There is no guarantee + * that this will succeed in canceling the request and the resulting ExecutorFuture may contain + * either success or error. If cancelation is successful, the resulting ExecutorFuture will be + * set with an error. + */ + ExecutorFuture<TaskExecutor::ResponseStatus> scheduleRemoteCommand( + const RemoteCommandRequest& request, + const CancelationToken& token, + const BatonHandle& baton = nullptr); + virtual StatusWith<CallbackHandle> scheduleRemoteCommandOnAny( const RemoteCommandRequestOnAny& request, const RemoteCommandOnAnyCallbackFn& cb, const BatonHandle& baton = nullptr) = 0; + ExecutorFuture<TaskExecutor::ResponseOnAnyStatus> scheduleRemoteCommandOnAny( + const RemoteCommandRequestOnAny& request, + const CancelationToken& token, + const BatonHandle& baton = nullptr); + + /** * Schedules "cb" to be run by the executor on each reply recevied from executing the exhaust * remote command described by "request". diff --git a/src/mongo/executor/task_executor_test_common.cpp b/src/mongo/executor/task_executor_test_common.cpp index 2cede709253..22841eaaf89 100644 --- a/src/mongo/executor/task_executor_test_common.cpp +++ b/src/mongo/executor/task_executor_test_common.cpp @@ -44,6 +44,7 @@ #include "mongo/stdx/thread.h" #include "mongo/stdx/unordered_map.h" #include "mongo/unittest/unittest.h" +#include "mongo/util/cancelation.h" #include "mongo/util/clock_source_mock.h" #include "mongo/util/str.h" @@ -52,7 +53,7 @@ namespace executor { namespace { using ExecutorFactory = - std::function<std::unique_ptr<TaskExecutor>(std::unique_ptr<NetworkInterfaceMock>)>; + std::function<std::shared_ptr<TaskExecutor>(std::unique_ptr<NetworkInterfaceMock>)>; class CommonTaskExecutorTestFixture : public TaskExecutorTest { public: @@ -60,7 +61,7 @@ public: : _makeExecutor(std::move(makeExecutor)) {} private: - std::unique_ptr<TaskExecutor> makeTaskExecutor( + std::shared_ptr<TaskExecutor> makeTaskExecutor( std::unique_ptr<NetworkInterfaceMock> net) override { return _makeExecutor(std::move(net)); } @@ -160,6 +161,12 @@ auto makeSetStatusOnRemoteCommandCompletionClosure(const RemoteCommandRequest* e }; } +static inline const RemoteCommandRequest kDummyRequest{HostAndPort("localhost", 27017), + "mydb", + BSON("whatsUp" + << "doc"), + nullptr}; + COMMON_EXECUTOR_TEST(RunOne) { TaskExecutor& executor = getExecutor(); Status status = getDetectableErrorStatus(); @@ -416,13 +423,8 @@ COMMON_EXECUTOR_TEST(ScheduleRemoteCommand) { TaskExecutor& executor = getExecutor(); launchExecutorThread(); Status status1 = getDetectableErrorStatus(); - const RemoteCommandRequest request(HostAndPort("localhost", 27017), - "mydb", - BSON("whatsUp" - << "doc"), - nullptr); TaskExecutor::CallbackHandle cbHandle = unittest::assertGet(executor.scheduleRemoteCommand( - request, makeSetStatusOnRemoteCommandCompletionClosure(&request, &status1))); + kDummyRequest, makeSetStatusOnRemoteCommandCompletionClosure(&kDummyRequest, &status1))); net->enterNetwork(); ASSERT(net->hasReadyRequests()); NetworkInterfaceMock::NetworkOperationIterator noi = net->getNextReadyRequest(); @@ -439,13 +441,8 @@ COMMON_EXECUTOR_TEST(ScheduleRemoteCommand) { COMMON_EXECUTOR_TEST(ScheduleAndCancelRemoteCommand) { TaskExecutor& executor = getExecutor(); Status status1 = getDetectableErrorStatus(); - const RemoteCommandRequest request(HostAndPort("localhost", 27017), - "mydb", - BSON("whatsUp" - << "doc"), - nullptr); TaskExecutor::CallbackHandle cbHandle = unittest::assertGet(executor.scheduleRemoteCommand( - request, makeSetStatusOnRemoteCommandCompletionClosure(&request, &status1))); + kDummyRequest, makeSetStatusOnRemoteCommandCompletionClosure(&kDummyRequest, &status1))); executor.cancel(cbHandle); launchExecutorThread(); getNet()->enterNetwork(); @@ -457,6 +454,94 @@ COMMON_EXECUTOR_TEST(ScheduleAndCancelRemoteCommand) { ASSERT_EQUALS(ErrorCodes::CallbackCanceled, status1); } +COMMON_EXECUTOR_TEST( + ScheduleRemoteCommandWithCancelationTokenSuccessfullyCancelsRequestIfCanceledAfterFunctionCallButBeforeProcessing) { + TaskExecutor& executor = getExecutor(); + CancelationSource cancelSource; + auto responseFuture = executor.scheduleRemoteCommand(kDummyRequest, cancelSource.token()); + + cancelSource.cancel(); + + launchExecutorThread(); + getNet()->enterNetwork(); + getNet()->runReadyNetworkOperations(); + getNet()->exitNetwork(); + + // Wait for cancelation to happen and expect error status on future. + ASSERT_EQUALS(ErrorCodes::CallbackCanceled, responseFuture.getNoThrow()); + + shutdownExecutorThread(); + joinExecutorThread(); +} + +COMMON_EXECUTOR_TEST( + ScheduleRemoteCommandWithCancelationTokenSuccessfullyCancelsRequestIfCanceledBeforeFunctionCallAndBeforeProcessing) { + TaskExecutor& executor = getExecutor(); + + CancelationSource cancelSource; + // Cancel before calling scheduleRemoteCommand. + cancelSource.cancel(); + auto responseFuture = executor.scheduleRemoteCommand(kDummyRequest, cancelSource.token()); + + // The result should be immediately available. + ASSERT_EQUALS(ErrorCodes::CallbackCanceled, responseFuture.getNoThrow()); +} + +COMMON_EXECUTOR_TEST( + ScheduleRemoteCommandWithCancelationTokenDoesNotCancelRequestIfCanceledAfterProcessing) { + TaskExecutor& executor = getExecutor(); + CancelationSource cancelSource; + auto responseFuture = executor.scheduleRemoteCommand(kDummyRequest, cancelSource.token()); + + launchExecutorThread(); + getNet()->enterNetwork(); + // Respond to the request. + getNet()->scheduleSuccessfulResponse(BSONObj{}); + getNet()->runReadyNetworkOperations(); + getNet()->exitNetwork(); + + // Response should be ready and okay. + ASSERT_OK(responseFuture.getNoThrow()); + + // Cancel after the response has already been processed. This shouldn't do anything or cause an + // error. + cancelSource.cancel(); + + shutdownExecutorThread(); + joinExecutorThread(); +} + +COMMON_EXECUTOR_TEST( + ScheduleRemoteCommandWithCancelationTokenReturnsShutdownInProgressIfExecutorAlreadyShutdownAndCancelNotCalled) { + TaskExecutor& executor = getExecutor(); + + launchExecutorThread(); + shutdownExecutorThread(); + joinExecutorThread(); + + auto responseFuture = + executor.scheduleRemoteCommand(kDummyRequest, CancelationToken::uncancelable()); + ASSERT_EQ(responseFuture.getNoThrow().getStatus().code(), ErrorCodes::ShutdownInProgress); +} + +COMMON_EXECUTOR_TEST( + ScheduleRemoteCommandWithCancelationTokenReturnsShutdownInProgressIfExecutorAlreadyShutdownAndCancelCalled) { + TaskExecutor& executor = getExecutor(); + + CancelationSource cancelSource; + auto responseFuture = executor.scheduleRemoteCommand(kDummyRequest, cancelSource.token()); + + launchExecutorThread(); + shutdownExecutorThread(); + joinExecutorThread(); + + // Should already be ready. Returns CallbackCanceled and not ShutdownInProgress as an + // implementation detail. + ASSERT_EQ(responseFuture.getNoThrow().getStatus().code(), ErrorCodes::CallbackCanceled); + + // Shouldn't do anything or cause an error. + cancelSource.cancel(); +} COMMON_EXECUTOR_TEST(RemoteCommandWithTimeout) { NetworkInterfaceMock* net = getNet(); diff --git a/src/mongo/executor/task_executor_test_common.h b/src/mongo/executor/task_executor_test_common.h index 69e5a37a0e9..77558c09565 100644 --- a/src/mongo/executor/task_executor_test_common.h +++ b/src/mongo/executor/task_executor_test_common.h @@ -51,7 +51,7 @@ class TaskExecutor; */ void addTestsForExecutor( const std::string& suiteName, - std::function<std::unique_ptr<TaskExecutor>(std::unique_ptr<NetworkInterfaceMock>)> + std::function<std::shared_ptr<TaskExecutor>(std::unique_ptr<NetworkInterfaceMock>)> makeExecutor); } // namespace executor diff --git a/src/mongo/executor/task_executor_test_fixture.cpp b/src/mongo/executor/task_executor_test_fixture.cpp index 085c9aef560..7f52fc06c19 100644 --- a/src/mongo/executor/task_executor_test_fixture.cpp +++ b/src/mongo/executor/task_executor_test_fixture.cpp @@ -68,7 +68,7 @@ void TaskExecutorTest::setUp() { } void TaskExecutorTest::tearDown() { - _executor.reset(nullptr); + _executor.reset(); _net = nullptr; } diff --git a/src/mongo/executor/task_executor_test_fixture.h b/src/mongo/executor/task_executor_test_fixture.h index f8815b45360..fe7ed6554ad 100644 --- a/src/mongo/executor/task_executor_test_fixture.h +++ b/src/mongo/executor/task_executor_test_fixture.h @@ -90,13 +90,13 @@ private: */ void _doTest() override; - virtual std::unique_ptr<TaskExecutor> makeTaskExecutor( + virtual std::shared_ptr<TaskExecutor> makeTaskExecutor( std::unique_ptr<NetworkInterfaceMock> net) = 0; virtual void postExecutorThreadLaunch(); NetworkInterfaceMock* _net; - std::unique_ptr<TaskExecutor> _executor; + std::shared_ptr<TaskExecutor> _executor; }; } // namespace executor diff --git a/src/mongo/executor/thread_pool_task_executor.cpp b/src/mongo/executor/thread_pool_task_executor.cpp index 92ea98ab094..3c9e4c5db7b 100644 --- a/src/mongo/executor/thread_pool_task_executor.cpp +++ b/src/mongo/executor/thread_pool_task_executor.cpp @@ -633,9 +633,7 @@ void ThreadPoolTaskExecutor::runCallback(std::shared_ptr<CallbackState> cbStateA setCallbackForHandle(&cbHandle, cbStateArg); CallbackArgs args(this, std::move(cbHandle), - cbStateArg->canceled.load() - ? Status({ErrorCodes::CallbackCanceled, "Callback canceled"}) - : Status::OK()); + cbStateArg->canceled.load() ? kCallbackCanceledErrorStatus : Status::OK()); invariant(!cbStateArg->isFinished.load()); { // After running callback function, clear 'cbStateArg->callback' to release any resources @@ -809,9 +807,7 @@ void ThreadPoolTaskExecutor::runCallbackExhaust(std::shared_ptr<CallbackState> c setCallbackForHandle(&cbHandle, cbState); CallbackArgs args(this, std::move(cbHandle), - cbState->canceled.load() - ? Status({ErrorCodes::CallbackCanceled, "Callback canceled"}) - : Status::OK()); + cbState->canceled.load() ? kCallbackCanceledErrorStatus : Status::OK()); if (!cbState->isFinished.load()) { TaskExecutor::CallbackFn callback = [](const CallbackArgs&) {}; diff --git a/src/mongo/executor/thread_pool_task_executor_test.cpp b/src/mongo/executor/thread_pool_task_executor_test.cpp index c436417be3d..556d9ee95f0 100644 --- a/src/mongo/executor/thread_pool_task_executor_test.cpp +++ b/src/mongo/executor/thread_pool_task_executor_test.cpp @@ -51,7 +51,7 @@ namespace { MONGO_INITIALIZER(ThreadPoolExecutorCommonTests)(InitializerContext*) { addTestsForExecutor("ThreadPoolExecutorCommon", [](std::unique_ptr<NetworkInterfaceMock> net) { - return makeThreadPoolTestExecutor(std::move(net)); + return makeSharedThreadPoolTestExecutor(std::move(net)); }); return Status::OK(); } diff --git a/src/mongo/executor/thread_pool_task_executor_test_fixture.cpp b/src/mongo/executor/thread_pool_task_executor_test_fixture.cpp index 4498692f0a5..1f19aeeefe6 100644 --- a/src/mongo/executor/thread_pool_task_executor_test_fixture.cpp +++ b/src/mongo/executor/thread_pool_task_executor_test_fixture.cpp @@ -61,10 +61,10 @@ ThreadPoolMock::Options ThreadPoolExecutorTest::makeThreadPoolMockOptions() cons return _options; } -std::unique_ptr<TaskExecutor> ThreadPoolExecutorTest::makeTaskExecutor( +std::shared_ptr<TaskExecutor> ThreadPoolExecutorTest::makeTaskExecutor( std::unique_ptr<NetworkInterfaceMock> net) { auto options = makeThreadPoolMockOptions(); - return makeThreadPoolTestExecutor(std::move(net), std::move(options)); + return makeSharedThreadPoolTestExecutor(std::move(net), std::move(options)); } } // namespace executor diff --git a/src/mongo/executor/thread_pool_task_executor_test_fixture.h b/src/mongo/executor/thread_pool_task_executor_test_fixture.h index 07d593b244f..47862bde8ae 100644 --- a/src/mongo/executor/thread_pool_task_executor_test_fixture.h +++ b/src/mongo/executor/thread_pool_task_executor_test_fixture.h @@ -69,7 +69,7 @@ public: private: virtual ThreadPoolMock::Options makeThreadPoolMockOptions() const; - std::unique_ptr<TaskExecutor> makeTaskExecutor( + std::shared_ptr<TaskExecutor> makeTaskExecutor( std::unique_ptr<NetworkInterfaceMock> net) override; // Returned by makeThreadPoolMockOptions(). |