summaryrefslogtreecommitdiff
path: root/src/mongo/executor
diff options
context:
space:
mode:
authorTyler Seip <Tyler.Seip@mongodb.com>2021-06-28 03:16:53 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-07-08 18:45:01 +0000
commit3364cab736849d175c5d783f34910d3c89d70af0 (patch)
treee8fc849828fcca57b86cb3321ecb616c474aa839 /src/mongo/executor
parent38e21ea6a60f4a8f8817fe3520c61a0c03e0aea1 (diff)
downloadmongo-3364cab736849d175c5d783f34910d3c89d70af0.tar.gz
SERVER-58112: Add explicit synchronization to TaskExecutor's exhaust command promises
Diffstat (limited to 'src/mongo/executor')
-rw-r--r--src/mongo/executor/task_executor.cpp99
1 files changed, 64 insertions, 35 deletions
diff --git a/src/mongo/executor/task_executor.cpp b/src/mongo/executor/task_executor.cpp
index ad1bfd1496c..d2405050a12 100644
--- a/src/mongo/executor/task_executor.cpp
+++ b/src/mongo/executor/task_executor.cpp
@@ -26,8 +26,8 @@
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
-
#include "mongo/platform/basic.h"
+#include "mongo/platform/mutex.h"
#include "mongo/executor/task_executor.h"
@@ -38,9 +38,46 @@ namespace {
MONGO_FAIL_POINT_DEFINE(pauseScheduleCallWithCancelTokenUntilCanceled);
+/**
+ * Provides exclusive access to an underlying Promise at set-time, guaranteeing that the Promise
+ * will be set at most one time globally. This prevents races between completion and cancellation,
+ * which normally would result in Promise throwing an invariant.
+ */
+template <typename T>
+class ExclusivePromiseAccess {
+public:
+ explicit ExclusivePromiseAccess(Promise<T>&& promise) : _promise(std::move(promise)) {}
+
+ /**
+ * Sets an error on the Promise if no result has been set; otherwise, does nothing.
+ */
+ template <typename Error>
+ void setError(Error&& e) {
+ if (!_completed.swap(true)) {
+ _promise.setError(std::forward<Error>(e));
+ }
+ }
+
+ /**
+ * Sets a valid result on the Promise if no result has been set; otherwise, does nothing.
+ */
+ template <typename... Args>
+ void emplaceValue(Args&&... args) {
+ if (!_completed.swap(true)) {
+ _promise.emplaceValue(std::forward<Args>(args)...);
+ }
+ }
+
+private:
+ Promise<T> _promise;
+ AtomicWord<bool> _completed;
+};
+
+template <typename T>
Status wrapCallbackHandleWithCancelToken(
- const std::shared_ptr<TaskExecutor>& executor,
- const StatusWith<TaskExecutor::CallbackHandle>& swCallbackHandle,
+ const std::shared_ptr<TaskExecutor> executor,
+ const StatusWith<TaskExecutor::CallbackHandle> swCallbackHandle,
+ std::shared_ptr<ExclusivePromiseAccess<T>> promise,
const CancellationToken& token) {
if (!swCallbackHandle.isOK()) {
return swCallbackHandle.getStatus();
@@ -48,9 +85,11 @@ Status wrapCallbackHandleWithCancelToken(
token.onCancel()
.unsafeToInlineFuture()
- .then([executor, callbackHandle = std::move(swCallbackHandle.getValue())]() mutable {
- executor->cancel(callbackHandle);
- })
+ .then(
+ [executor, promise, callbackHandle = std::move(swCallbackHandle.getValue())]() mutable {
+ executor->cancel(callbackHandle);
+ promise->setError(TaskExecutor::kCallbackCanceledErrorStatus);
+ })
.getAsync([](auto) {});
return Status::OK();
}
@@ -72,10 +111,13 @@ ExecutorFuture<Response> wrapScheduleCallWithCancelTokenAndFuture(
}
auto [promise, future] = makePromiseFuture<Response>();
- // This has to be made shared because otherwise we'd have to move the promise into this
+
+ // This has to be made shared because otherwise we'd have to move the access into this
// callback, and would be unable to use it in the case where scheduling the request fails below.
- auto sharedPromise = std::make_shared<Promise<Response>>(std::move(promise));
- auto signalPromiseOnCompletion = [sharedPromise, cb = std::move(cb)](const auto& args) mutable {
+ auto exclusivePromiseAccess =
+ std::make_shared<ExclusivePromiseAccess<Response>>(std::move(promise));
+ auto signalPromiseOnCompletion = [exclusivePromiseAccess,
+ cb = std::move(cb)](const auto& args) mutable {
// Upon completion, unconditionally run our callback.
cb(args);
auto status = args.response.status;
@@ -84,11 +126,11 @@ ExecutorFuture<Response> wrapScheduleCallWithCancelTokenAndFuture(
// occured.
if (!args.response.moreToCome) {
if (status.isOK()) {
- sharedPromise->emplaceValue(std::move(args.response));
+ exclusivePromiseAccess->emplaceValue(std::move(args.response));
} else {
// Only set an error on failures to send the request (including if the request was
// canceled). Errors from the remote host will be contained in the response.
- sharedPromise->setError(status);
+ exclusivePromiseAccess->setError(status);
}
}
};
@@ -106,31 +148,16 @@ ExecutorFuture<Response> wrapScheduleCallWithCancelTokenAndFuture(
auto scheduleStatus = wrapCallbackHandleWithCancelToken(
executor,
std::forward<ScheduleFn>(schedule)(request, std::move(signalPromiseOnCompletion), baton),
+ exclusivePromiseAccess,
token);
if (!scheduleStatus.isOK()) {
// If scheduleStatus is not okay, then the callback signalPromiseOnCompletion should never
// run, meaning that it will be okay to set the promise here.
- sharedPromise->setError(scheduleStatus);
+ exclusivePromiseAccess->setError(scheduleStatus);
}
- return std::move(future)
- .template onError<ErrorCodes::BrokenPromise>(
- [token](Status&& status) -> StatusWith<Response> {
- // When an exhaust command is canceled, its callback is not run, so the
- // SharedPromise captured in the callback will be destroyed and set a BrokenPromise
- // error status. We check here if the token was canceled, and if so convert that to
- // a CallbackCanceled error. N.B. this means that we will pave over truly broken
- // promises with a callback canceled error if our cancellation token is canceled
- // after the promise is broken; we are willing to accept this behavior, as fixing it
- // requires extra synchronization that we'd like to avoid.
- if (token.isCanceled()) {
- return TaskExecutor::kCallbackCanceledErrorStatus;
- } else {
- return std::move(status);
- }
- })
- .thenRunOn(executor);
+ return std::move(future).thenRunOn(executor);
}
} // namespace
@@ -164,28 +191,30 @@ ExecutorFuture<void> TaskExecutor::sleepUntil(Date_t when, const CancellationTok
struct AlarmState {
void signal(const Status& status) {
if (status.isOK()) {
- promise.emplaceValue();
+ promise->emplaceValue();
} else {
- promise.setError(status);
+ promise->setError(status);
}
}
- Promise<void> promise;
+ std::shared_ptr<ExclusivePromiseAccess<void>> promise;
};
auto [promise, future] = makePromiseFuture<void>();
// This has to be shared because Promises (and therefore AlarmState) are move-only and we need
// to maintain two copies: One to capture in the scheduleWorkAt callback, and one locally in
// case scheduling the request fails.
- auto alarmState = std::make_shared<AlarmState>(AlarmState{std::move(promise)});
+ auto exclusivePromiseAccess =
+ std::make_shared<ExclusivePromiseAccess<void>>(std::move(promise));
+ auto alarmState = std::make_shared<AlarmState>(AlarmState{exclusivePromiseAccess});
// Schedule a task to signal the alarm when the deadline is reached.
auto cbHandle = scheduleWorkAt(
when, [alarmState](const auto& args) mutable { alarmState->signal(args.status); });
// Handle cancellation via the input CancellationToken.
- auto scheduleStatus =
- wrapCallbackHandleWithCancelToken(shared_from_this(), std::move(cbHandle), token);
+ auto scheduleStatus = wrapCallbackHandleWithCancelToken(
+ shared_from_this(), std::move(cbHandle), exclusivePromiseAccess, token);
if (!scheduleStatus.isOK()) {
// If scheduleStatus is not okay, then the callback passed to scheduleWorkAt should never