diff options
Diffstat (limited to 'src/mongo')
-rw-r--r-- | src/mongo/executor/task_executor.cpp | 99 |
1 files changed, 64 insertions, 35 deletions
diff --git a/src/mongo/executor/task_executor.cpp b/src/mongo/executor/task_executor.cpp index ad1bfd1496c..d2405050a12 100644 --- a/src/mongo/executor/task_executor.cpp +++ b/src/mongo/executor/task_executor.cpp @@ -26,8 +26,8 @@ * exception statement from all source files in the program, then also delete * it in the license file. */ - #include "mongo/platform/basic.h" +#include "mongo/platform/mutex.h" #include "mongo/executor/task_executor.h" @@ -38,9 +38,46 @@ namespace { MONGO_FAIL_POINT_DEFINE(pauseScheduleCallWithCancelTokenUntilCanceled); +/** + * Provides exclusive access to an underlying Promise at set-time, guaranteeing that the Promise + * will be set at most one time globally. This prevents races between completion and cancellation, + * which normally would result in Promise throwing an invariant. + */ +template <typename T> +class ExclusivePromiseAccess { +public: + explicit ExclusivePromiseAccess(Promise<T>&& promise) : _promise(std::move(promise)) {} + + /** + * Sets an error on the Promise if no result has been set; otherwise, does nothing. + */ + template <typename Error> + void setError(Error&& e) { + if (!_completed.swap(true)) { + _promise.setError(std::forward<Error>(e)); + } + } + + /** + * Sets a valid result on the Promise if no result has been set; otherwise, does nothing. + */ + template <typename... Args> + void emplaceValue(Args&&... args) { + if (!_completed.swap(true)) { + _promise.emplaceValue(std::forward<Args>(args)...); + } + } + +private: + Promise<T> _promise; + AtomicWord<bool> _completed; +}; + +template <typename T> Status wrapCallbackHandleWithCancelToken( - const std::shared_ptr<TaskExecutor>& executor, - const StatusWith<TaskExecutor::CallbackHandle>& swCallbackHandle, + const std::shared_ptr<TaskExecutor> executor, + const StatusWith<TaskExecutor::CallbackHandle> swCallbackHandle, + std::shared_ptr<ExclusivePromiseAccess<T>> promise, const CancellationToken& token) { if (!swCallbackHandle.isOK()) { return swCallbackHandle.getStatus(); @@ -48,9 +85,11 @@ Status wrapCallbackHandleWithCancelToken( token.onCancel() .unsafeToInlineFuture() - .then([executor, callbackHandle = std::move(swCallbackHandle.getValue())]() mutable { - executor->cancel(callbackHandle); - }) + .then( + [executor, promise, callbackHandle = std::move(swCallbackHandle.getValue())]() mutable { + executor->cancel(callbackHandle); + promise->setError(TaskExecutor::kCallbackCanceledErrorStatus); + }) .getAsync([](auto) {}); return Status::OK(); } @@ -72,10 +111,13 @@ ExecutorFuture<Response> wrapScheduleCallWithCancelTokenAndFuture( } auto [promise, future] = makePromiseFuture<Response>(); - // This has to be made shared because otherwise we'd have to move the promise into this + + // This has to be made shared because otherwise we'd have to move the access into this // callback, and would be unable to use it in the case where scheduling the request fails below. - auto sharedPromise = std::make_shared<Promise<Response>>(std::move(promise)); - auto signalPromiseOnCompletion = [sharedPromise, cb = std::move(cb)](const auto& args) mutable { + auto exclusivePromiseAccess = + std::make_shared<ExclusivePromiseAccess<Response>>(std::move(promise)); + auto signalPromiseOnCompletion = [exclusivePromiseAccess, + cb = std::move(cb)](const auto& args) mutable { // Upon completion, unconditionally run our callback. cb(args); auto status = args.response.status; @@ -84,11 +126,11 @@ ExecutorFuture<Response> wrapScheduleCallWithCancelTokenAndFuture( // occured. if (!args.response.moreToCome) { if (status.isOK()) { - sharedPromise->emplaceValue(std::move(args.response)); + exclusivePromiseAccess->emplaceValue(std::move(args.response)); } else { // Only set an error on failures to send the request (including if the request was // canceled). Errors from the remote host will be contained in the response. - sharedPromise->setError(status); + exclusivePromiseAccess->setError(status); } } }; @@ -106,31 +148,16 @@ ExecutorFuture<Response> wrapScheduleCallWithCancelTokenAndFuture( auto scheduleStatus = wrapCallbackHandleWithCancelToken( executor, std::forward<ScheduleFn>(schedule)(request, std::move(signalPromiseOnCompletion), baton), + exclusivePromiseAccess, token); if (!scheduleStatus.isOK()) { // If scheduleStatus is not okay, then the callback signalPromiseOnCompletion should never // run, meaning that it will be okay to set the promise here. - sharedPromise->setError(scheduleStatus); + exclusivePromiseAccess->setError(scheduleStatus); } - return std::move(future) - .template onError<ErrorCodes::BrokenPromise>( - [token](Status&& status) -> StatusWith<Response> { - // When an exhaust command is canceled, its callback is not run, so the - // SharedPromise captured in the callback will be destroyed and set a BrokenPromise - // error status. We check here if the token was canceled, and if so convert that to - // a CallbackCanceled error. N.B. this means that we will pave over truly broken - // promises with a callback canceled error if our cancellation token is canceled - // after the promise is broken; we are willing to accept this behavior, as fixing it - // requires extra synchronization that we'd like to avoid. - if (token.isCanceled()) { - return TaskExecutor::kCallbackCanceledErrorStatus; - } else { - return std::move(status); - } - }) - .thenRunOn(executor); + return std::move(future).thenRunOn(executor); } } // namespace @@ -164,28 +191,30 @@ ExecutorFuture<void> TaskExecutor::sleepUntil(Date_t when, const CancellationTok struct AlarmState { void signal(const Status& status) { if (status.isOK()) { - promise.emplaceValue(); + promise->emplaceValue(); } else { - promise.setError(status); + promise->setError(status); } } - Promise<void> promise; + std::shared_ptr<ExclusivePromiseAccess<void>> promise; }; auto [promise, future] = makePromiseFuture<void>(); // This has to be shared because Promises (and therefore AlarmState) are move-only and we need // to maintain two copies: One to capture in the scheduleWorkAt callback, and one locally in // case scheduling the request fails. - auto alarmState = std::make_shared<AlarmState>(AlarmState{std::move(promise)}); + auto exclusivePromiseAccess = + std::make_shared<ExclusivePromiseAccess<void>>(std::move(promise)); + auto alarmState = std::make_shared<AlarmState>(AlarmState{exclusivePromiseAccess}); // Schedule a task to signal the alarm when the deadline is reached. auto cbHandle = scheduleWorkAt( when, [alarmState](const auto& args) mutable { alarmState->signal(args.status); }); // Handle cancellation via the input CancellationToken. - auto scheduleStatus = - wrapCallbackHandleWithCancelToken(shared_from_this(), std::move(cbHandle), token); + auto scheduleStatus = wrapCallbackHandleWithCancelToken( + shared_from_this(), std::move(cbHandle), exclusivePromiseAccess, token); if (!scheduleStatus.isOK()) { // If scheduleStatus is not okay, then the callback passed to scheduleWorkAt should never |