/**
* Copyright 2018 MongoDB, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects
* for all of the code used other than as permitted herein. If you modify
* file(s) with this exception, you may extend this exception to your
* version of the file(s), but you are not obligated to do so. If you do not
* wish to do so, delete this exception statement from your version. If you
* delete this exception statement from all source files in the program,
* then also delete it in the license file.
*/
#pragma once
#include
#include
#include
#include "mongo/base/checked_cast.h"
#include "mongo/base/static_assert.h"
#include "mongo/base/status.h"
#include "mongo/base/status_with.h"
#include "mongo/platform/atomic_word.h"
#include "mongo/stdx/condition_variable.h"
#include "mongo/stdx/mutex.h"
#include "mongo/stdx/utility.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/debug_util.h"
#include "mongo/util/intrusive_counter.h"
#include "mongo/util/scopeguard.h"
namespace mongo {
template
class SharedPromise;
namespace future_details {
template
class Promise;
template
class Future;
template <>
class Future;
// Using extern constexpr to prevent the compiler from allocating storage as a poor man's c++17
// inline constexpr variable.
// TODO delete extern in c++17 because inline is the default for constexper variables.
template
extern constexpr bool isFuture = false;
template
extern constexpr bool isFuture> = true;
// This is used to "normalize" void since it can't be used as an argument and it becomes Status
// rather than StatusWith.
struct FakeVoid {};
template
using VoidToFakeVoid = std::conditional_t::value, FakeVoid, T>;
/**
* This is a poor-man's implementation of c++17 std::is_invocable. We should replace it with the
* stdlib one once we can make call() use std::invoke.
*/
template ::type>
auto is_invocable_impl(Func&& func, Args&&... args) -> std::true_type;
auto is_invocable_impl(...) -> std::false_type;
template
struct is_invocable
: public decltype(is_invocable_impl(std::declval(), std::declval()...)) {};
// call(func, FakeVoid) -> func(Status::OK())
// This simulates the implicit Status/T overloading you get by taking a StatusWith that doesn't
// work for Status/void and Status.
// TODO replace this dispatch with constexpr if in c++17
template
inline auto callVoidOrStatus(Func&& func, std::true_type useStatus) {
return func(Status::OK());
}
template
inline auto callVoidOrStatus(Func&& func, std::false_type useStatus) {
return func();
}
/**
* call() normalizes arguments to hide the FakeVoid shenanigans from users of Futures.
* In the future it may also expand tuples to argument lists.
*/
template
inline auto call(Func&& func, Arg&& arg) {
return func(std::forward(arg));
}
template
inline auto call(Func&& func) {
return func();
}
template
inline auto call(Func&& func, FakeVoid) {
auto useStatus =
std::integral_constant() && is_invocable())>();
return callVoidOrStatus(func, useStatus);
}
template
inline auto call(Func&& func, StatusWith sw) {
return func(sw.getStatus());
}
/**
* statusCall() normalizes return values so everything returns StatusWith. Exceptions are
* converted to !OK statuses. void and Status returns are converted to StatusWith
*/
template <
typename Func,
typename... Args,
typename RawResult = decltype(call(std::declval(), std::declval()...)),
typename = std::enable_if_t::value &&
!std::is_same::value>,
typename Result = std::conditional_t, RawResult, StatusWith>>
inline Result statusCall(Func&& func, Args&&... args) noexcept {
try {
return call(func, std::forward(args)...);
} catch (const DBException& ex) {
return ex.toStatus();
}
}
template (), std::declval()...)),
typename = std::enable_if_t::value>>
inline StatusWith statusCall(Func&& func, Args&&... args) noexcept {
try {
call(func, std::forward(args)...);
return FakeVoid{};
} catch (const DBException& ex) {
return ex.toStatus();
}
}
template (), std::declval()...)),
typename = std::enable_if_t::value>,
typename = void,
typename = void>
inline StatusWith statusCall(Func&& func, Args&&... args) noexcept {
try {
auto status = call(func, std::forward(args)...);
if (status.isOK())
return FakeVoid{};
return std::move(status);
} catch (const DBException& ex) {
return ex.toStatus();
}
}
/**
* throwingCall() normalizes return values so everything returns T or FakeVoid. !OK Statuses are
* converted exceptions. void and Status returns are converted to FakeVoid.
*
* This is equivalent to uassertStatusOK(statusCall(func, args...)), but avoids catching just to
* rethrow.
*/
template <
typename Func,
typename... Args,
typename Result = decltype(call(std::declval(), std::declval()...)),
typename = std::enable_if_t::value && !isStatusOrStatusWith>>
inline Result throwingCall(Func&& func, Args&&... args) {
return call(func, std::forward(args)...);
}
template (), std::declval()...)),
typename = std::enable_if_t::value>>
inline FakeVoid throwingCall(Func&& func, Args&&... args) {
call(func, std::forward(args)...);
return FakeVoid{};
}
template (), std::declval()...)),
typename = std::enable_if_t::value>,
typename = void>
inline FakeVoid throwingCall(Func&& func, Args&&... args) {
uassertStatusOK(call(func, std::forward(args)...));
return FakeVoid{};
}
template (), std::declval()...)),
typename = std::enable_if_t>,
typename = void,
typename = void>
inline typename StatusWithResult::value_type throwingCall(Func&& func, Args&&... args) noexcept {
return uassertStatusOK(call(func, std::forward(args)...));
}
template
using RawNormalizedCallResult =
decltype(throwingCall(std::declval(), std::declval()...));
template
using NormalizedCallResult =
std::conditional_t, FakeVoid>::value,
void,
RawNormalizedCallResult>;
template
struct FutureContinuationResultImpl {
using type = T;
};
template
struct FutureContinuationResultImpl> {
using type = T;
};
template
struct FutureContinuationResultImpl> {
using type = T;
};
template <>
struct FutureContinuationResultImpl {
using type = void;
};
/**
* A base class that handles the ref-count for boost::intrusive_ptr compatibility.
*
* This is taken from RefCountable which is used for the aggregation types, adding in a way to set
* the refcount non-atomically during initialization. Also using explicit memory orderings for all
* operations on the count.
* TODO look into merging back.
*/
class FutureRefCountable {
MONGO_DISALLOW_COPYING(FutureRefCountable);
public:
/**
* Sets the refcount to count, assuming it is currently one less. This should only be used
* during logical initialization before another thread could possibly have access to this
* object.
*/
void threadUnsafeIncRefCountTo(uint32_t count) const {
dassert(_count.load(std::memory_order_relaxed) == (count - 1));
_count.store(count, std::memory_order_relaxed);
}
friend void intrusive_ptr_add_ref(const FutureRefCountable* ptr) {
// See this for a description of why relaxed is OK here. It is also used in libc++.
// http://www.boost.org/doc/libs/1_66_0/doc/html/atomic/usage_examples.html#boost_atomic.usage_examples.example_reference_counters.discussion
ptr->_count.fetch_add(1, std::memory_order_relaxed);
};
friend void intrusive_ptr_release(const FutureRefCountable* ptr) {
if (ptr->_count.fetch_sub(1, std::memory_order_acq_rel) == 1) {
delete ptr;
}
};
protected:
FutureRefCountable() = default;
virtual ~FutureRefCountable() = default;
private:
mutable std::atomic _count{0}; // NOLINT
};
template ::value>>
boost::intrusive_ptr make_intrusive(Args&&... args) {
auto ptr = new T(std::forward(args)...);
ptr->threadUnsafeIncRefCountTo(1);
return boost::intrusive_ptr(ptr, /*add ref*/ false);
}
template
struct SharedStateImpl;
template
using SharedState = SharedStateImpl>;
/**
* SSB is SharedStateBase, and this is its current state.
*/
enum class SSBState : uint8_t {
kInit,
kWaiting,
kFinished, // This should stay last since we have code like assert(state < kFinished).
};
class SharedStateBase : public FutureRefCountable {
public:
SharedStateBase(const SharedStateBase&) = delete;
SharedStateBase(SharedStateBase&&) = delete;
SharedStateBase& operator=(const SharedStateBase&) = delete;
SharedStateBase& operator=(SharedStateBase&&) = delete;
virtual ~SharedStateBase() = default;
// Only called by future side.
void wait() noexcept {
if (state.load(std::memory_order_acquire) == SSBState::kFinished)
return;
cv.emplace();
auto oldState = SSBState::kInit;
if (MONGO_unlikely(!state.compare_exchange_strong(
oldState, SSBState::kWaiting, std::memory_order_acq_rel))) {
// transitionToFinished() transitioned after we did our initial check.
dassert(oldState == SSBState::kFinished);
return;
}
stdx::unique_lock lk(mx);
cv->wait(lk, [&] {
// The mx locking above is insufficient to establish an acquire if state transitions to
// kFinished before we get here, but we aquire mx before the producer does.
return state.load(std::memory_order_acquire) == SSBState::kFinished;
});
}
// Remaining methods only called from promise side.
void transitionToFinished() noexcept {
auto oldState = state.exchange(SSBState::kFinished, std::memory_order_acq_rel);
if (oldState == SSBState::kInit)
return;
dassert(oldState == SSBState::kWaiting);
DEV {
// If you hit this limit one of two things has probably happened
//
// 1. The justForContinuation optimization isn't working.
// 2. You may be creating a variable length chain.
//
// If those statements don't mean anything to you, please ask an editor of this file.
// If they don't work here anymore, I'm sorry.
const size_t kMaxDepth = 32;
size_t depth = 0;
for (auto ssb = continuation.get(); ssb;
ssb = ssb->state.load(std::memory_order_acquire) == SSBState::kWaiting
? ssb->continuation.get()
: nullptr) {
depth++;
invariant(depth < kMaxDepth);
}
}
if (callback) {
callback(this);
}
if (cv) {
stdx::unique_lock lk(mx);
// This must be done inside the lock to correctly synchronize with wait().
cv->notify_all();
}
}
void setError(Status statusArg) noexcept {
invariant(!statusArg.isOK());
dassert(state.load() < SSBState::kFinished, statusArg.toString());
status = std::move(statusArg);
transitionToFinished();
}
//
// Concurrency Rules for members: Each non-atomic member is initially owned by either the
// Promise side or the Future side, indicated by a P/F comment. The general rule is that members
// representing the propagating data are owned by Promise, while members representing what
// to do with the data are owned by Future. The owner may freely modify the members it owns
// until it releases them by doing a release-store to state of kFinished from Promise or
// kWaiting from Future. Promise can acquire access to all members by doing an acquire-load of
// state and seeing kWaiting (or Future with kFinished). Transitions should be done via
// acquire-release exchanges to combine both actions.
//
// Future::propagateResults uses an alternative mechanism to transfer ownership of the
// continuation member. The logical Future-side does a release-store of true to
// isJustForContinuation, and the Promise-side can do an acquire-load seeing true to get access.
//
std::atomic state{SSBState::kInit}; // NOLINT
// This is used to prevent infinite chains of SharedStates that just propagate results.
std::atomic isJustForContinuation{false}; // NOLINT
// This is likely to be a different derived type from this, since it is the logical output of
// callback.
boost::intrusive_ptr continuation; // F
// Takes this as argument and usually writes to continuation.
std::function callback; // F
// These are only used to signal completion to blocking waiters. Benchmarks showed that it was
// worth deferring the construction of cv, so it can be avoided when it isn't necessary.
stdx::mutex mx; // F (not that it matters)
boost::optional cv; // F
Status status = Status::OK(); // P
protected:
SharedStateBase() = default;
};
template
struct SharedStateImpl final : SharedStateBase {
MONGO_STATIC_ASSERT(!std::is_void::value);
// Remaining methods only called by promise side.
void fillFrom(SharedState&& other) {
dassert(state.load() < SSBState::kFinished);
dassert(other.state.load() == SSBState::kFinished);
if (other.status.isOK()) {
data = std::move(other.data);
} else {
status = std::move(other.status);
}
transitionToFinished();
}
template
void emplaceValue(Args&&... args) noexcept {
dassert(state.load() < SSBState::kFinished);
try {
data.emplace(std::forward(args)...);
} catch (const DBException& ex) {
status = ex.toStatus();
}
transitionToFinished();
}
void setFromStatusWith(StatusWith sw) {
if (sw.isOK()) {
emplaceValue(std::move(sw.getValue()));
} else {
setError(std::move(sw.getStatus()));
}
}
boost::optional data; // P
};
} // namespace future_details
// These are in the future_details namespace to get access to its contents, but they are part of the
// public API.
using future_details::Promise;
using future_details::Future;
/**
* This class represents the producer side of a Future.
*
* This is a single-shot class. You may only extract the Future once, and you may either set a value
* or error at most once. Extracting the future and setting the value/error can be done in either
* order.
*
* If the Future has been extracted, but no value or error has been set at the time this Promise is
* destroyed, a error will be set with ErrorCode::BrokenPromise. This should generally be considered
* a programmer error, and should not be relied upon. We may make it debug-fatal in the future.
*
* Only one thread can use a given Promise at a time. It is legal to have different threads setting
* the value/error and extracting the Future, but it is the user's responsibility to ensure that
* those calls are strictly synchronized. This is usually easiest to achieve by calling
* makePromiseFuture() then passing a SharedPromise to the completing threads.
*
* If the result is ready when producing the Future, it is more efficient to use
* makeReadyFutureWith() or Future::makeReady() than to use a Promise.
*
* A default constructed `Promise` is in a null state. Null `Promises` can only be assigned over
* and destroyed. It is a programmer error to call any methods on a null `Promise`. Any methods
* that complete a `Promise` leave it in the null state.
*/
template
class future_details::Promise {
public:
using value_type = T;
/**
* Creates a null `Promise`.
*/
Promise() = default;
~Promise() {
breakPromiseIfNeeded();
}
Promise(const Promise&) = delete;
Promise& operator=(const Promise&) = delete;
/**
* Breaks this `Promise`, if not fulfilled and not in a moved-from state.
*/
Promise& operator=(Promise&& p) noexcept {
breakPromiseIfNeeded();
_sharedState = std::move(p._sharedState);
return *this;
}
Promise(Promise&&) = default;
/**
* Sets a value or error into this Promise by calling func, which must take no arguments and
* return one of T, StatusWith (or Status when T is void), or Future. All errors, whether
* returned or thrown, will be correctly propagated.
*
* If the function returns a Future, this Promise's Future will complete when the returned
* Future completes, as-if it was passed to Promise::setFrom().
*
* If any work is needed to produce the result, prefer doing something like:
* promise.setWith([&]{ return makeResult(); });
* over code like:
* promise.emplaceValue(makeResult());
* because this method will correctly propagate errors thrown from makeResult(), rather than
* ErrorCodes::BrokenPromise.
*/
template
void setWith(Func&& func) noexcept;
/**
* Sets the value into this Promise when the passed-in Future completes, which may have already
* happened. If it hasn't, it is still safe to destroy this Promise since it is no longer
* involved.
*/
void setFrom(Future&& future) noexcept;
template
void emplaceValue(Args&&... args) noexcept {
setImpl([&](boost::intrusive_ptr>&& sharedState) {
sharedState->emplaceValue(std::forward(args)...);
});
}
void setError(Status status) noexcept {
invariant(!status.isOK());
setImpl([&](boost::intrusive_ptr>&& sharedState) {
sharedState->setError(std::move(status));
});
}
// TODO rename to not XXXWith and handle void
void setFromStatusWith(StatusWith sw) noexcept {
setImpl([&](boost::intrusive_ptr>&& sharedState) {
sharedState->setFromStatusWith(std::move(sw));
});
}
/**
* Get a copyable SharedPromise that can be used to complete this Promise's Future.
*
* Callers are required to extract the Future before calling share() to prevent race conditions.
* Even with a SharedPromise, callers must ensure it is only completed at most once. Copyability
* is primarily to allow capturing lambdas to be put in std::functions which don't support
* move-only types.
*
* It is safe to destroy the original Promise as soon as this call returns.
*/
SharedPromise share() noexcept;
static auto makePromiseFutureImpl() {
struct PromiseAndFuture {
Promise promise{make_intrusive>()};
Future future = promise.getFuture();
};
return PromiseAndFuture();
}
private:
explicit Promise(boost::intrusive_ptr>&& sharedState)
: _sharedState(std::move(sharedState)) {}
// This is not public because we found it frequently was involved in races. The
// `makePromiseFuture` API avoids those races entirely.
Future getFuture() noexcept;
friend class Future;
template
void setImpl(Func&& doSet) noexcept {
invariant(_sharedState);
// We keep `sharedState` as a stack local, to preserve ownership of the resource,
// in case the code in `doSet` unblocks a thread which winds up causing
// `~Promise` to be invoked.
auto sharedState = std::move(_sharedState);
doSet(std::move(sharedState));
// Note: `this` is potentially dead, at this point.
}
// The current promise will be broken, if not already fulfilled.
void breakPromiseIfNeeded() {
if (MONGO_unlikely(_sharedState)) {
_sharedState->setError({ErrorCodes::BrokenPromise, "broken promise"});
}
}
boost::intrusive_ptr> _sharedState;
};
/**
* A SharedPromise is a copyable object that can be used to complete a Promise.
*
* All copies derived from the same call to Promise::share() will complete the same shared state.
* Callers must ensure that the shared state is only completed at most once. Copyability is
* primarily to allow capturing lambdas to be put in std::functions which don't support move-only
* types. If the final derived SharedPromise is destroyed without completion, the Promise will be
* broken.
*
* All methods behave the same as on the underlying Promise.
*/
template
class SharedPromise {
public:
SharedPromise() = default;
template
void setWith(Func&& func) noexcept {
_promise->setWith(std::forward(func));
}
void setFrom(Future&& future) noexcept {
_promise->setFrom(std::move(future));
}
template
void emplaceValue(Args&&... args) noexcept {
_promise->emplaceValue(std::forward(args)...);
}
void setError(Status status) noexcept {
_promise->setError(std::move(status));
}
private:
// Only Promise needs to be a friend, but MSVC2015 doesn't respect that friendship.
// TODO see if this is still needed on MSVC2017+
template
friend class Promise;
explicit SharedPromise(std::shared_ptr>&& promise) : _promise(std::move(promise)) {}
// TODO consider adding a SharedPromise refcount to SharedStateBase to avoid the extra
// allocation. The tricky part will be ensuring that BrokenPromise is set when the last copy is
// destroyed.
std::shared_ptr> _promise;
};
/**
* Future is logically a possibly-deferred StatusWith (or Status when T is void).
*
* As is usual for rvalue-qualified methods, you may call at most one of them on a given Future.
*
* A future may be passed between threads, but only one thread may use it at a time.
*
* TODO decide if destroying a Future before extracting the result should cancel work or should
* cancellation be explicit. For now avoid unnecessarily throwing away active Futures since the
* behavior may change. End all Future chains with either a blocking call to get()/getNoThrow() or a
* non-blocking call to getAsync().
*/
template
class MONGO_WARN_UNUSED_RESULT_CLASS future_details::Future {
public:
static_assert(!std::is_same::value,
"Future is banned. Use Future instead.");
static_assert(!isStatusWith, "Future> is banned. Just use Future instead.");
static_assert(!isFuture, "Future> is banned. Just use Future instead.");
static_assert(!std::is_reference::value, "Future is banned.");
static_assert(!std::is_const::value, "Future is banned.");
static_assert(!std::is_array::value, "Future is banned.");
using value_type = T;
/**
* Constructs a Future in a moved-from state that can only be assigned to or destroyed.
*/
Future() = default;
Future& operator=(Future&&) = default;
Future(Future&&) = default;
Future(const Future&) = delete;
Future& operator=(const Future&) = delete;
/* implicit */ Future(T val) : Future(makeReady(std::move(val))) {}
/* implicit */ Future(Status status) : Future(makeReady(std::move(status))) {}
/* implicit */ Future(StatusWith sw) : Future(makeReady(std::move(sw))) {}
/**
* Make a ready Future from a value for cases where you don't need to wait asynchronously.
*
* Calling this is faster than getting a Future out of a Promise, and is effectively free. It is
* fast enough that you never need to avoid returning a Future from an API, even if the result
* is ready 99.99% of the time.
*
* As an example, if you are handing out results from a batch, you can use this when for each
* result while you have a batch, then use a Promise to return a not-ready Future when you need
* to get another batch.
*/
static Future makeReady(T val) { // TODO emplace?
Future out;
out._immediate = std::move(val);
return out;
}
static Future makeReady(Status status) {
invariant(!status.isOK());
auto out = Future(make_intrusive>());
out._shared->setError(std::move(status));
return out;
}
static Future makeReady(StatusWith val) {
if (val.isOK())
return makeReady(std::move(val.getValue()));
return makeReady(val.getStatus());
}
/**
* If this returns true, get() is guaranteed not to block and callbacks will be immediately
* invoked. You can't assume anything if this returns false since it may be completed
* immediately after checking (unless you have independent knowledge that this Future can't
* complete in the background).
*
* Callers must still call get() or similar, even on Future, to ensure that they are
* correctly sequenced with the completing task, and to be informed about whether the Promise
* completed successfully.
*
* This is generally only useful as an optimization to avoid prep work, such as setting up
* timeouts, that is unnecessary if the Future is ready already.
*/
bool isReady() const {
return _immediate || _shared->state.load(std::memory_order_acquire) == SSBState::kFinished;
}
/**
* Gets the value out of this Future, blocking until it is ready.
*
* get() methods throw on error, while getNoThrow() returns a !OK status.
*
* These methods can be called multiple times, except for the rvalue overloads.
*/
T get() && {
return std::move(getImpl());
}
T& get() & {
return getImpl();
}
const T& get() const& {
return const_cast(this)->getImpl();
}
StatusWith getNoThrow() && noexcept {
if (_immediate) {
return std::move(*_immediate);
}
_shared->wait();
if (!_shared->status.isOK())
return std::move(_shared->status);
return std::move(*_shared->data);
}
StatusWith getNoThrow() const& noexcept {
if (_immediate) {
return *_immediate;
}
_shared->wait();
if (!_shared->status.isOK())
return _shared->status;
return *_shared->data;
}
/**
* This ends the Future continuation chain by calling a callback on completion. Use this to
* escape back into a callback-based API.
*
* For now, the callback must not fail, since there is nowhere to propagate the error to.
* TODO decide how to handle func throwing.
*/
template // StatusWith -> void
void getAsync(Func&& func) && noexcept {
static_assert(std::is_void>()))>::value,
"func passed to getAsync must return void");
return generalImpl(
// on ready success:
[&](T&& val) { call(func, std::move(val)); },
// on ready failure:
[&](Status&& status) { call(func, std::move(status)); },
// on not ready yet:
[&] {
_shared->callback = [func = std::forward(func)](SharedStateBase *
ssb) mutable noexcept {
const auto input = checked_cast*>(ssb);
if (input->status.isOK()) {
call(func, std::move(*input->data));
} else {
call(func, std::move(input->status));
}
};
});
}
//
// The remaining methods are all continuation based and take a callback and return a Future.
// Each method has a comment indicating the supported signatures for that callback, and a
// description of when the callback is invoked and how the impacts the returned Future. It may
// be helpful to think of Future continuation chains as a pipeline of stages that take input
// from earlier stages and produce output for later stages.
//
// Be aware that the callback may be invoked inline at the call-site or at the producer when
// setting the value. Therefore, you should avoid doing blocking work inside of a callback.
// Additionally, avoid acquiring any locks or mutexes that the caller already holds, otherwise
// you risk a deadlock. If either of these concerns apply to your callback, it should schedule
// itself on an executor, rather than doing work in the callback.
// TODO make this easier to do by having executor APIs return Futures.
//
// Error handling in callbacks: all exceptions thrown propagate to the returned Future
// automatically. Callbacks that return Status or StatusWith behave as-if they were wrapped
// in something that called uassertStatusOK() on the return value. There is no way to
// distinguish between a function throwing or returning a !OK status.
//
// Callbacks that return Future are automatically unwrapped and connected to the returned
// Future, rather than producing a Future>.
//
/**
* Callbacks passed to then() are only called if the input Future completes successfully.
* Otherwise the error propagates automatically, bypassing the callback.
*/
template Result or T -> StatusWith
typename Result = NormalizedCallResult,
typename = std::enable_if_t>>
Future then(Func&& func) && noexcept {
return generalImpl(
// on ready success:
[&](T&& val) { return Future::makeReady(statusCall(func, std::move(val))); },
// on ready failure:
[&](Status&& status) { return Future::makeReady(std::move(status)); },
// on not ready yet:
[&] {
return makeContinuation([func = std::forward(func)](
SharedState * input, SharedState * output) mutable noexcept {
if (!input->status.isOK())
return output->setError(std::move(input->status));
output->setFromStatusWith(statusCall(func, std::move(*input->data)));
});
});
}
/**
* Same as above then() but for case where func returns a Future that needs to be unwrapped.
*/
template Future
typename RawResult = NormalizedCallResult,
typename = std::enable_if_t>,
typename UnwrappedResult = typename RawResult::value_type>
Future then(Func&& func) && noexcept {
return generalImpl(
// on ready success:
[&](T&& val) {
try {
return Future(throwingCall(func, std::move(val)));
} catch (const DBException& ex) {
return Future::makeReady(ex.toStatus());
}
},
// on ready failure:
[&](Status&& status) { return Future::makeReady(std::move(status)); },
// on not ready yet:
[&] {
return makeContinuation([func = std::forward(func)](
SharedState * input,
SharedState * output) mutable noexcept {
if (!input->status.isOK())
return output->setError(std::move(input->status));
try {
throwingCall(func, std::move(*input->data)).propagateResultTo(output);
} catch (const DBException& ex) {
output->setError(ex.toStatus());
}
});
});
}
/**
* Callbacks passed to onError() are only called if the input Future completes with an error.
* Otherwise, the successful result propagates automatically, bypassing the callback.
*
* The callback can either produce a replacement value (which must be a T), return a replacement
* Future (such as a by retrying), or return/throw a replacement error.
*
* Note that this will only catch errors produced by earlier stages; it is not registering a
* general error handler for the entire chain.
*/
template T or Status -> StatusWith
typename Result = RawNormalizedCallResult,
typename = std::enable_if_t>>
Future onError(Func&& func) && noexcept {
static_assert(
std::is_same::value,
"func passed to Future::onError must return T, StatusWith, or Future");
return generalImpl(
// on ready success:
[&](T&& val) { return Future::makeReady(std::move(val)); },
// on ready failure:
[&](Status&& status) {
return Future::makeReady(statusCall(func, std::move(status)));
},
// on not ready yet:
[&] {
return makeContinuation([func = std::forward(func)](
SharedState * input, SharedState * output) mutable noexcept {
if (input->status.isOK())
return output->emplaceValue(std::move(*input->data));
output->setFromStatusWith(statusCall(func, std::move(input->status)));
});
});
}
/**
* Same as above onError() but for case where func returns a Future that needs to be unwrapped.
*/
template Future
typename Result = RawNormalizedCallResult,
typename = std::enable_if_t>,
typename = void>
Future onError(Func&& func) && noexcept {
static_assert(
std::is_same>::value ||
(std::is_same::value && std::is_same>::value),
"func passed to Future::onError must return T, StatusWith, or Future");
return generalImpl(
// on ready success:
[&](T&& val) { return Future::makeReady(std::move(val)); },
// on ready failure:
[&](Status&& status) {
try {
return Future(throwingCall(func, std::move(status)));
} catch (const DBException& ex) {
return Future::makeReady(ex.toStatus());
}
},
// on not ready yet:
[&] {
return makeContinuation([func = std::forward(func)](
SharedState * input, SharedState * output) mutable noexcept {
if (input->status.isOK())
return output->emplaceValue(std::move(*input->data));
try {
throwingCall(func, std::move(input->status)).propagateResultTo(output);
} catch (const DBException& ex) {
output->setError(ex.toStatus());
}
});
});
}
/**
* Same as the other two onErrors but only calls the callback if the code matches the template
* parameter. Otherwise lets the error propagate unchanged.
*/
template
Future onError(Func&& func) && noexcept {
using Result = RawNormalizedCallResult;
static_assert(
std::is_same::value || std::is_same>::value ||
(std::is_same::value && std::is_same>::value),
"func passed to Future::onError must return T, StatusWith, or Future");
if (_immediate || (isReady() && _shared->status.isOK()))
return std::move(*this); // Avoid copy/moving func if we know we won't call it.
// TODO in C++17 with constexpr if this can be done cleaner and more efficiently by not
// throwing.
return std::move(*this).onError([func =
std::forward(func)](Status && status) mutable {
if (status != code)
uassertStatusOK(status);
return throwingCall(func, std::move(status));
});
}
/**
* TODO do we need a version of then/onError like onCompletion() that handles both success and
* Failure, but doesn't end the chain like getAsync()? Right now we don't, and we can add one if
* we do.
*/
//
// The tap/tapError/tapAll family of functions take callbacks to observe the flow through a
// future chain without affecting the propagating result, except possibly if they throw. If the
// naming seems odd, you can think of it like a "wire tap" in that it allows you to observe a
// conversation between two parties (the promise-producer and future-consumer) without adding
// messages of your own. This is why all callbacks are required to return void.
//
// TODO decide what to do if callback throws:
// - transition the future chain to failure
// - ignore
// - fatal (current impl)
//
/**
* Callback is called if the input completes successfully.
*
* This can be used to inform some outside system of the result.
*/
template // T -> void
Future tap(Func&& func) && noexcept {
static_assert(std::is_void()))>::value,
"func passed to tap must return void");
return tapImpl(std::forward(func),
[](Func && func, const T& val) noexcept { call(func, val); },
[](Func && func, const Status& status) noexcept {});
}
/**
* Callback is called if the input completes with an error.
*
* This can be used to log.
*/
template // Status -> void
Future tapError(Func&& func) && noexcept {
static_assert(std::is_void()))>::value,
"func passed to tapError must return void");
return tapImpl(std::forward(func),
[](Func && func, const T& val) noexcept {},
[](Func && func, const Status& status) noexcept { call(func, status); });
}
/**
* Callback is called when the input completes, regardless of success or failure.
*
* This can be used for cleanup. Some other libraries name the equivalent method finally to
* match the common semantic from other languages.
*
* Warning: If func takes a StatusWith, it requires copying the value on success. If that is
* too expensive, it can be avoided by either providing a function object with separate
* Status/const T& overloads, or by using a generic lambda if you don't need to consult the
* value for your cleanup.
*/
template // StatusWith -> void, or Status/const T& overloads.
Future tapAll(Func&& func) && noexcept {
static_assert(std::is_void()))>::value,
"func passed to tapAll must return void");
static_assert(std::is_void()))>::value,
"func passed to tapAll must return void");
return tapImpl(std::forward(func),
[](Func && func, const T& val) noexcept { call(func, val); },
[](Func && func, const Status& status) noexcept { call(func, status); });
}
/**
* Ignores the return value of a future, transforming it down into a Future.
*
* This only ignores values, not errors. Those remain propogated until an onError handler.
*
* Equivalent to then([](auto&&){});
*/
Future ignoreValue() && noexcept;
private:
template