summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorADAM David Alan Martin <adam.martin@10gen.com>2018-07-25 16:04:37 -0400
committerADAM David Alan Martin <adam.martin@10gen.com>2018-07-26 12:41:18 -0400
commit2338f365430d7f395faf73bff6c64def505da1b3 (patch)
treee665157e6553284420be40d3e41f1736c1a3436d /src/mongo
parent2471e2bb651442d84aef69a7c1885f653a179b35 (diff)
downloadmongo-2338f365430d7f395faf73bff6c64def505da1b3.tar.gz
SERVER-35684 Remove `promise.getFuture()`
This API invites subtle race conditions. So just remove it, and force everyone to use a unified API which creates a promise and a future at the same time.
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/executor/connection_pool_tl.cpp12
-rw-r--r--src/mongo/executor/network_interface_tl.cpp31
-rw-r--r--src/mongo/executor/network_interface_tl.h10
-rw-r--r--src/mongo/transport/transport_layer_asio.cpp16
-rw-r--r--src/mongo/util/future.h49
-rw-r--r--src/mongo/util/future_bm.cpp132
-rw-r--r--src/mongo/util/future_test.cpp201
-rw-r--r--src/mongo/util/keyed_executor.h7
8 files changed, 222 insertions, 236 deletions
diff --git a/src/mongo/executor/connection_pool_tl.cpp b/src/mongo/executor/connection_pool_tl.cpp
index bb41a687a6e..0c9cf2e0f76 100644
--- a/src/mongo/executor/connection_pool_tl.cpp
+++ b/src/mongo/executor/connection_pool_tl.cpp
@@ -44,6 +44,8 @@ const auto kMaxTimerDuration = Milliseconds::max();
struct TimeoutHandler {
AtomicBool done;
Promise<void> promise;
+
+ explicit TimeoutHandler(Promise<void> p) : promise(std::move(p)) {}
};
} // namespace
@@ -111,8 +113,9 @@ void TLConnection::cancelTimeout() {
void TLConnection::setup(Milliseconds timeout, SetupCallback cb) {
auto anchor = shared_from_this();
- auto handler = std::make_shared<TimeoutHandler>();
- handler->promise.getFuture().getAsync(
+ auto pf = makePromiseFuture<void>();
+ auto handler = std::make_shared<TimeoutHandler>(std::move(pf.promise));
+ std::move(pf.future).getAsync(
[ this, cb = std::move(cb) ](Status status) { cb(this, std::move(status)); });
log() << "Connecting to " << _peer;
@@ -175,8 +178,9 @@ void TLConnection::resetToUnknown() {
void TLConnection::refresh(Milliseconds timeout, RefreshCallback cb) {
auto anchor = shared_from_this();
- auto handler = std::make_shared<TimeoutHandler>();
- handler->promise.getFuture().getAsync(
+ auto pf = makePromiseFuture<void>();
+ auto handler = std::make_shared<TimeoutHandler>(std::move(pf.promise));
+ std::move(pf.future).getAsync(
[ this, cb = std::move(cb) ](Status status) { cb(this, status); });
setTimeout(timeout, [this, handler] {
diff --git a/src/mongo/executor/network_interface_tl.cpp b/src/mongo/executor/network_interface_tl.cpp
index 2bb57e38678..cffe9a74596 100644
--- a/src/mongo/executor/network_interface_tl.cpp
+++ b/src/mongo/executor/network_interface_tl.cpp
@@ -169,7 +169,8 @@ Status NetworkInterfaceTL::startCommand(const TaskExecutor::CallbackHandle& cbHa
request.metadata = newMetadata.obj();
}
- auto state = std::make_shared<CommandState>(request, cbHandle);
+ auto pf = makePromiseFuture<RemoteCommandResponse>();
+ auto state = std::make_shared<CommandState>(request, cbHandle, std::move(pf.promise));
{
stdx::lock_guard<stdx::mutex> lk(_inProgressMutex);
_inProgress.insert({state->cbHandle, state});
@@ -182,10 +183,9 @@ Status NetworkInterfaceTL::startCommand(const TaskExecutor::CallbackHandle& cbHa
if (MONGO_FAIL_POINT(networkInterfaceDiscardCommandsBeforeAcquireConn)) {
log() << "Discarding command due to failpoint before acquireConn";
- std::move(state->mergedFuture)
- .getAsync([onFinish](StatusWith<RemoteCommandResponse> response) {
- onFinish(RemoteCommandResponse(response.getStatus(), Milliseconds{0}));
- });
+ std::move(pf.future).getAsync([onFinish](StatusWith<RemoteCommandResponse> response) {
+ onFinish(RemoteCommandResponse(response.getStatus(), Milliseconds{0}));
+ });
return Status::OK();
}
@@ -215,10 +215,18 @@ Status NetworkInterfaceTL::startCommand(const TaskExecutor::CallbackHandle& cbHa
});
});
- auto remainingWork = [this, state, baton, onFinish](
- StatusWith<std::shared_ptr<CommandState::ConnHandle>> swConn) {
- makeReadyFutureWith(
- [&] { return _onAcquireConn(state, std::move(*uassertStatusOK(swConn)), baton); })
+ auto remainingWork = [
+ this,
+ state,
+ // TODO: once SERVER-35685 is done, stop using a `std::shared_ptr<Future>` here.
+ future = std::make_shared<decltype(pf.future)>(std::move(pf.future)),
+ baton,
+ onFinish
+ ](StatusWith<std::shared_ptr<CommandState::ConnHandle>> swConn) mutable {
+ makeReadyFutureWith([&] {
+ return _onAcquireConn(
+ state, std::move(*future), std::move(*uassertStatusOK(swConn)), baton);
+ })
.onError([](Status error) -> StatusWith<RemoteCommandResponse> {
// The TransportLayer has, for historical reasons returned SocketException for
// network errors, but sharding assumes HostUnreachable on network errors.
@@ -267,11 +275,12 @@ Status NetworkInterfaceTL::startCommand(const TaskExecutor::CallbackHandle& cbHa
// returning a ready Future with a not-OK status.
Future<RemoteCommandResponse> NetworkInterfaceTL::_onAcquireConn(
std::shared_ptr<CommandState> state,
+ Future<RemoteCommandResponse> future,
CommandState::ConnHandle conn,
const transport::BatonHandle& baton) {
if (MONGO_FAIL_POINT(networkInterfaceDiscardCommandsAfterAcquireConn)) {
conn->indicateSuccess();
- return std::move(state->mergedFuture);
+ return future;
}
if (state->done.load()) {
@@ -366,7 +375,7 @@ Future<RemoteCommandResponse> NetworkInterfaceTL::_onAcquireConn(
state->promise.setFromStatusWith(std::move(swr));
});
- return std::move(state->mergedFuture);
+ return future;
}
void NetworkInterfaceTL::_eraseInUseConn(const TaskExecutor::CallbackHandle& cbHandle) {
diff --git a/src/mongo/executor/network_interface_tl.h b/src/mongo/executor/network_interface_tl.h
index 068e5d310bb..603eb59ef8d 100644
--- a/src/mongo/executor/network_interface_tl.h
+++ b/src/mongo/executor/network_interface_tl.h
@@ -80,8 +80,12 @@ public:
private:
struct CommandState {
- CommandState(RemoteCommandRequest request_, TaskExecutor::CallbackHandle cbHandle_)
- : request(std::move(request_)), cbHandle(std::move(cbHandle_)) {}
+ CommandState(RemoteCommandRequest request_,
+ TaskExecutor::CallbackHandle cbHandle_,
+ Promise<RemoteCommandResponse> promise_)
+ : request(std::move(request_)),
+ cbHandle(std::move(cbHandle_)),
+ promise(std::move(promise_)) {}
RemoteCommandRequest request;
TaskExecutor::CallbackHandle cbHandle;
@@ -104,11 +108,11 @@ private:
AtomicBool done;
Promise<RemoteCommandResponse> promise;
- Future<RemoteCommandResponse> mergedFuture = promise.getFuture();
};
void _eraseInUseConn(const TaskExecutor::CallbackHandle& handle);
Future<RemoteCommandResponse> _onAcquireConn(std::shared_ptr<CommandState> state,
+ Future<RemoteCommandResponse> future,
CommandState::ConnHandle conn,
const transport::BatonHandle& baton);
diff --git a/src/mongo/transport/transport_layer_asio.cpp b/src/mongo/transport/transport_layer_asio.cpp
index 4cc72dc99fe..cbcde155864 100644
--- a/src/mongo/transport/transport_layer_asio.cpp
+++ b/src/mongo/transport/transport_layer_asio.cpp
@@ -557,8 +557,14 @@ Future<SessionHandle> TransportLayerASIO::asyncConnect(HostAndPort peer,
Milliseconds timeout) {
struct AsyncConnectState {
- AsyncConnectState(HostAndPort peer, asio::io_context& context)
- : socket(context), timeoutTimer(context), resolver(context), peer(std::move(peer)) {}
+ AsyncConnectState(HostAndPort peer,
+ asio::io_context& context,
+ Promise<SessionHandle> promise_)
+ : promise(std::move(promise_)),
+ socket(context),
+ timeoutTimer(context),
+ resolver(context),
+ peer(std::move(peer)) {}
AtomicBool done{false};
Promise<SessionHandle> promise;
@@ -573,8 +579,10 @@ Future<SessionHandle> TransportLayerASIO::asyncConnect(HostAndPort peer,
};
auto reactorImpl = checked_cast<ASIOReactor*>(reactor.get());
- auto connector = std::make_shared<AsyncConnectState>(std::move(peer), *reactorImpl);
- Future<SessionHandle> mergedFuture = connector->promise.getFuture();
+ auto pf = makePromiseFuture<SessionHandle>();
+ auto connector =
+ std::make_shared<AsyncConnectState>(std::move(peer), *reactorImpl, std::move(pf.promise));
+ Future<SessionHandle> mergedFuture = std::move(pf.future);
if (connector->peer.host().empty()) {
return Status{ErrorCodes::HostNotFound, "Hostname or IP address to connect to is empty"};
diff --git a/src/mongo/util/future.h b/src/mongo/util/future.h
index 3f227761b0e..c50bab8c7ae 100644
--- a/src/mongo/util/future.h
+++ b/src/mongo/util/future.h
@@ -508,9 +508,7 @@ public:
~Promise() {
if (MONGO_unlikely(sharedState)) {
- if (haveExtractedFuture) {
- sharedState->setError({ErrorCodes::BrokenPromise, "broken promise"});
- }
+ sharedState->setError({ErrorCodes::BrokenPromise, "broken promise"});
}
}
@@ -576,25 +574,28 @@ public:
*/
SharedPromise<T> share() noexcept;
- /**
- * Prefer using makePromiseFuture<T>() over constructing a promise and calling this method.
- */
- Future<T> getFuture() noexcept;
+ static auto makePromiseFutureImpl() {
+ struct PromiseAndFuture {
+ Promise<T> promise;
+ Future<T> future = promise.getFuture();
+ };
+ return PromiseAndFuture();
+ }
private:
+ // This is not public because we found it frequently was involved in races. The
+ // `makePromiseFuture<T>` API avoids those races entirely.
+ Future<T> getFuture() noexcept;
+
friend class Future<void>;
template <typename Func>
void setImpl(Func&& doSet) noexcept {
- invariant(!haveSetValue);
- haveSetValue = true;
+ invariant(sharedState);
doSet();
- if (haveExtractedFuture)
- sharedState.reset();
+ sharedState.reset();
}
- bool haveSetValue = false;
- bool haveExtractedFuture = false;
boost::intrusive_ptr<SharedState<T>> sharedState = make_intrusive<SharedState<T>>();
};
@@ -1310,11 +1311,7 @@ auto makeReadyFutureWith(Func&& func) {
*/
template <typename T>
inline auto makePromiseFuture() {
- struct PromiseAndFuture {
- Promise<T> promise;
- Future<T> future = promise.getFuture();
- };
- return PromiseAndFuture();
+ return Promise<T>::makePromiseFutureImpl();
}
/**
@@ -1349,23 +1346,13 @@ using FutureContinuationResult =
template <typename T>
inline Future<T> Promise<T>::getFuture() noexcept {
- invariant(!haveExtractedFuture);
- haveExtractedFuture = true;
-
- if (!haveSetValue) {
- sharedState->threadUnsafeIncRefCountTo(2);
- return Future<T>(
- boost::intrusive_ptr<SharedState<T>>(sharedState.get(), /*add ref*/ false));
- }
-
- // Let the Future steal our ref-count since we don't need it anymore.
- return Future<T>(std::move(sharedState));
+ sharedState->threadUnsafeIncRefCountTo(2);
+ return Future<T>(boost::intrusive_ptr<SharedState<T>>(sharedState.get(), /*add ref*/ false));
}
template <typename T>
inline SharedPromise<T> Promise<T>::share() noexcept {
- invariant(haveExtractedFuture);
- invariant(!haveSetValue);
+ invariant(sharedState);
return SharedPromise<T>(std::make_shared<Promise<T>>(std::move(*this)));
}
diff --git a/src/mongo/util/future_bm.cpp b/src/mongo/util/future_bm.cpp
index 28d57bfe010..5aad4b6bb48 100644
--- a/src/mongo/util/future_bm.cpp
+++ b/src/mongo/util/future_bm.cpp
@@ -65,9 +65,9 @@ void BM_futureIntReadyThen(benchmark::State& state) {
NOINLINE_DECL Future<int> makeReadyFutWithPromise() {
benchmark::ClobberMemory();
- Promise<int> p;
- p.emplaceValue(1); // before getFuture().
- return p.getFuture();
+ auto pf = makePromiseFuture<int>();
+ pf.promise.emplaceValue(1);
+ return std::move(pf.future);
}
void BM_futureIntReadyWithPromise(benchmark::State& state) {
@@ -83,27 +83,12 @@ void BM_futureIntReadyWithPromiseThen(benchmark::State& state) {
}
}
-NOINLINE_DECL Future<int> makeReadyFutWithPromise2() {
- // This is the same as makeReadyFutWithPromise() except that this gets the Future first.
- benchmark::ClobberMemory();
- Promise<int> p;
- auto fut = p.getFuture();
- p.emplaceValue(1); // after getFuture().
- return fut;
-}
-
-void BM_futureIntReadyWithPromise2(benchmark::State& state) {
- for (auto _ : state) {
- benchmark::DoNotOptimize(makeReadyFutWithPromise().then([](int i) { return i + 1; }).get());
- }
-}
-
void BM_futureIntDeferredThen(benchmark::State& state) {
for (auto _ : state) {
benchmark::ClobberMemory();
- Promise<int> p;
- auto fut = p.getFuture().then([](int i) { return i + 1; });
- p.emplaceValue(1);
+ auto pf = makePromiseFuture<int>();
+ auto fut = std::move(pf.future).then([](int i) { return i + 1; });
+ pf.promise.emplaceValue(1);
benchmark::DoNotOptimize(std::move(fut).get());
}
}
@@ -111,9 +96,9 @@ void BM_futureIntDeferredThen(benchmark::State& state) {
void BM_futureIntDeferredThenImmediate(benchmark::State& state) {
for (auto _ : state) {
benchmark::ClobberMemory();
- Promise<int> p;
- auto fut = p.getFuture().then([](int i) { return Future<int>::makeReady(i + 1); });
- p.emplaceValue(1);
+ auto pf = makePromiseFuture<int>();
+ auto fut = std::move(pf.future).then([](int i) { return Future<int>::makeReady(i + 1); });
+ pf.promise.emplaceValue(1);
benchmark::DoNotOptimize(std::move(fut).get());
}
}
@@ -122,9 +107,9 @@ void BM_futureIntDeferredThenImmediate(benchmark::State& state) {
void BM_futureIntDeferredThenReady(benchmark::State& state) {
for (auto _ : state) {
benchmark::ClobberMemory();
- Promise<int> p1;
- auto fut = p1.getFuture().then([&](int i) { return makeReadyFutWithPromise(); });
- p1.emplaceValue(1);
+ auto pf = makePromiseFuture<int>();
+ auto fut = std::move(pf.future).then([](int i) { return makeReadyFutWithPromise(); });
+ pf.promise.emplaceValue(1);
benchmark::DoNotOptimize(std::move(fut).get());
}
}
@@ -132,11 +117,11 @@ void BM_futureIntDeferredThenReady(benchmark::State& state) {
void BM_futureIntDoubleDeferredThen(benchmark::State& state) {
for (auto _ : state) {
benchmark::ClobberMemory();
- Promise<int> p1;
- Promise<int> p2;
- auto fut = p1.getFuture().then([&](int i) { return p2.getFuture(); });
- p1.emplaceValue(1);
- p2.emplaceValue(1);
+ auto pf1 = makePromiseFuture<int>();
+ auto pf2 = makePromiseFuture<int>();
+ auto fut = std::move(pf1.future).then([&](int i) { return std::move(pf2.future); });
+ pf1.promise.emplaceValue(1);
+ pf2.promise.emplaceValue(1);
benchmark::DoNotOptimize(std::move(fut).get());
}
}
@@ -144,14 +129,15 @@ void BM_futureIntDoubleDeferredThen(benchmark::State& state) {
void BM_futureInt3xDeferredThenNested(benchmark::State& state) {
for (auto _ : state) {
benchmark::ClobberMemory();
- Promise<int> p1;
- Promise<int> p2;
- Promise<int> p3;
- auto fut = p1.getFuture().then(
- [&](int i) { return p2.getFuture().then([&](int) { return p3.getFuture(); }); });
- p1.emplaceValue(1);
- p2.emplaceValue(1);
- p3.emplaceValue(1);
+ auto pf1 = makePromiseFuture<int>();
+ auto pf2 = makePromiseFuture<int>();
+ auto pf3 = makePromiseFuture<int>();
+ auto fut = std::move(pf1.future).then([&](int i) {
+ return std::move(pf2.future).then([&](int) { return std::move(pf3.future); });
+ });
+ pf1.promise.emplaceValue(1);
+ pf2.promise.emplaceValue(1);
+ pf3.promise.emplaceValue(1);
benchmark::DoNotOptimize(std::move(fut).get());
}
}
@@ -159,15 +145,15 @@ void BM_futureInt3xDeferredThenNested(benchmark::State& state) {
void BM_futureInt3xDeferredThenChained(benchmark::State& state) {
for (auto _ : state) {
benchmark::ClobberMemory();
- Promise<int> p1;
- Promise<int> p2;
- Promise<int> p3;
- auto fut = p1.getFuture().then([&](int i) { return p2.getFuture(); }).then([&](int i) {
- return p3.getFuture();
- });
- p1.emplaceValue(1);
- p2.emplaceValue(1);
- p3.emplaceValue(1);
+ auto pf1 = makePromiseFuture<int>();
+ auto pf2 = makePromiseFuture<int>();
+ auto pf3 = makePromiseFuture<int>();
+ auto fut = std::move(pf1.future)
+ .then([&](int i) { return std::move(pf2.future); })
+ .then([&](int i) { return std::move(pf3.future); });
+ pf1.promise.emplaceValue(1);
+ pf2.promise.emplaceValue(1);
+ pf3.promise.emplaceValue(1);
benchmark::DoNotOptimize(std::move(fut).get());
}
}
@@ -176,18 +162,19 @@ void BM_futureInt3xDeferredThenChained(benchmark::State& state) {
void BM_futureInt4xDeferredThenNested(benchmark::State& state) {
for (auto _ : state) {
benchmark::ClobberMemory();
- Promise<int> p1;
- Promise<int> p2;
- Promise<int> p3;
- Promise<int> p4;
- auto fut = p1.getFuture().then([&](int i) {
- return p2.getFuture().then(
- [&](int) { return p3.getFuture().then([&](int) { return p4.getFuture(); }); });
+ auto pf1 = makePromiseFuture<int>();
+ auto pf2 = makePromiseFuture<int>();
+ auto pf3 = makePromiseFuture<int>();
+ auto pf4 = makePromiseFuture<int>();
+ auto fut = std::move(pf1.future).then([&](int i) {
+ return std::move(pf2.future).then([&](int) {
+ return std::move(pf3.future).then([&](int) { return std::move(pf4.future); });
+ });
});
- p1.emplaceValue(1);
- p2.emplaceValue(1);
- p3.emplaceValue(1);
- p4.emplaceValue(1);
+ pf1.promise.emplaceValue(1);
+ pf2.promise.emplaceValue(1);
+ pf3.promise.emplaceValue(1);
+ pf4.promise.emplaceValue(1);
benchmark::DoNotOptimize(std::move(fut).get());
}
}
@@ -195,18 +182,18 @@ void BM_futureInt4xDeferredThenNested(benchmark::State& state) {
void BM_futureInt4xDeferredThenChained(benchmark::State& state) {
for (auto _ : state) {
benchmark::ClobberMemory();
- Promise<int> p1;
- Promise<int> p2;
- Promise<int> p3;
- Promise<int> p4;
- auto fut = p1.getFuture() //
- .then([&](int i) { return p2.getFuture(); })
- .then([&](int i) { return p3.getFuture(); })
- .then([&](int i) { return p4.getFuture(); });
- p1.emplaceValue(1);
- p2.emplaceValue(1);
- p3.emplaceValue(1);
- p4.emplaceValue(1);
+ auto pf1 = makePromiseFuture<int>();
+ auto pf2 = makePromiseFuture<int>();
+ auto pf3 = makePromiseFuture<int>();
+ auto pf4 = makePromiseFuture<int>();
+ auto fut = std::move(pf1.future) //
+ .then([&](int i) { return std::move(pf2.future); })
+ .then([&](int i) { return std::move(pf3.future); })
+ .then([&](int i) { return std::move(pf4.future); });
+ pf1.promise.emplaceValue(1);
+ pf2.promise.emplaceValue(1);
+ pf3.promise.emplaceValue(1);
+ pf4.promise.emplaceValue(1);
benchmark::DoNotOptimize(std::move(fut).get());
}
}
@@ -217,7 +204,6 @@ BENCHMARK(BM_futureIntReady);
BENCHMARK(BM_futureIntReadyThen);
BENCHMARK(BM_futureIntReadyWithPromise);
BENCHMARK(BM_futureIntReadyWithPromiseThen);
-BENCHMARK(BM_futureIntReadyWithPromise2);
BENCHMARK(BM_futureIntDeferredThen);
BENCHMARK(BM_futureIntDeferredThenImmediate);
BENCHMARK(BM_futureIntDeferredThenReady);
diff --git a/src/mongo/util/future_test.cpp b/src/mongo/util/future_test.cpp
index 2a755460ce3..ffeb101cbad 100644
--- a/src/mongo/util/future_test.cpp
+++ b/src/mongo/util/future_test.cpp
@@ -110,10 +110,9 @@ void FUTURE_SUCCESS_TEST(const CompletionFunc& completion, const TestFunc& test)
test(Future<CompletionType>::makeReady(completion()));
}
{ // ready future from promise
- Promise<CompletionType> promise;
- auto fut = promise.getFuture(); // before setting value to bypass opt to immediate
- promise.emplaceValue(completion());
- test(std::move(fut));
+ auto pf = makePromiseFuture<CompletionType>();
+ pf.promise.emplaceValue(completion());
+ test(std::move(pf.future));
}
{ // async future
@@ -132,11 +131,10 @@ void FUTURE_SUCCESS_TEST(const CompletionFunc& completion, const TestFunc& test)
test(Future<CompletionType>::makeReady());
}
{ // ready future from promise
- Promise<CompletionType> promise;
- auto fut = promise.getFuture(); // before setting value to bypass opt to immediate
+ auto pf = makePromiseFuture<CompletionType>();
completion();
- promise.emplaceValue();
- test(std::move(fut));
+ pf.promise.emplaceValue();
+ test(std::move(pf.future));
}
{ // async future
@@ -150,10 +148,9 @@ void FUTURE_FAIL_TEST(const TestFunc& test) {
test(Future<CompletionType>::makeReady(failStatus));
}
{ // ready future from promise
- Promise<CompletionType> promise;
- auto fut = promise.getFuture(); // before setting value to bypass opt to immediate
- promise.setError(failStatus);
- test(std::move(fut));
+ auto pf = makePromiseFuture<CompletionType>();
+ pf.promise.setError(failStatus);
+ test(std::move(pf.future));
}
{ // async future
@@ -196,13 +193,12 @@ TEST(Future, Success_getAsync) {
FUTURE_SUCCESS_TEST(
[] { return 1; },
[](Future<int>&& fut) {
- auto outside = Promise<int>();
- auto outsideFut = outside.getFuture();
- std::move(fut).getAsync([outside = outside.share()](StatusWith<int> sw) mutable {
+ auto pf = makePromiseFuture<int>();
+ std::move(fut).getAsync([outside = pf.promise.share()](StatusWith<int> sw) mutable {
ASSERT_OK(sw);
outside.emplaceValue(sw.getValue());
});
- ASSERT_EQ(std::move(outsideFut).get(), 1);
+ ASSERT_EQ(std::move(pf.future).get(), 1);
});
}
@@ -234,13 +230,12 @@ TEST(Future, Fail_getNothrowRvalue) {
TEST(Future, Fail_getAsync) {
FUTURE_FAIL_TEST<int>([](Future<int>&& fut) {
- auto outside = Promise<int>();
- auto outsideFut = outside.getFuture();
- std::move(fut).getAsync([outside = outside.share()](StatusWith<int> sw) mutable {
+ auto pf = makePromiseFuture<int>();
+ std::move(fut).getAsync([outside = pf.promise.share()](StatusWith<int> sw) mutable {
ASSERT(!sw.isOK());
outside.setError(sw.getStatus());
});
- ASSERT_EQ(std::move(outsideFut).getNoThrow(), failStatus);
+ ASSERT_EQ(std::move(pf.future).getNoThrow(), failStatus);
});
}
@@ -357,10 +352,9 @@ TEST(Future, Success_thenFutureReady) {
[](Future<int>&& fut) {
ASSERT_EQ(std::move(fut)
.then([](int i) {
- Promise<int> promise;
- auto fut = promise.getFuture();
- promise.emplaceValue(i + 2);
- return fut;
+ auto pf = makePromiseFuture<int>();
+ pf.promise.emplaceValue(i + 2);
+ return std::move(pf.future);
})
.get(),
3);
@@ -490,10 +484,9 @@ TEST(Future, Fail_onErrorFutureReady) {
ASSERT_EQ(std::move(fut)
.onError([](Status s) {
ASSERT_EQ(s, failStatus);
- Promise<int> promise;
- auto fut = promise.getFuture();
- promise.emplaceValue(3);
- return fut;
+ auto pf = makePromiseFuture<int>();
+ pf.promise.emplaceValue(3);
+ return std::move(pf.future);
})
.get(),
3);
@@ -744,13 +737,12 @@ TEST(Future_Void, Success_getAsync) {
FUTURE_SUCCESS_TEST(
[] {},
[](Future<void>&& fut) {
- auto outside = Promise<void>();
- auto outsideFut = outside.getFuture();
- std::move(fut).getAsync([outside = outside.share()](Status status) mutable {
+ auto pf = makePromiseFuture<void>();
+ std::move(fut).getAsync([outside = pf.promise.share()](Status status) mutable {
ASSERT_OK(status);
outside.emplaceValue();
});
- ASSERT_EQ(std::move(outsideFut).getNoThrow(), Status::OK());
+ ASSERT_EQ(std::move(pf.future).getNoThrow(), Status::OK());
});
}
@@ -783,13 +775,12 @@ TEST(Future_Void, Fail_getNothrowRvalue) {
TEST(Future_Void, Fail_getAsync) {
FUTURE_FAIL_TEST<void>([](Future<void>&& fut) {
- auto outside = Promise<void>();
- auto outsideFut = outside.getFuture();
- std::move(fut).getAsync([outside = outside.share()](Status status) mutable {
+ auto pf = makePromiseFuture<void>();
+ std::move(fut).getAsync([outside = pf.promise.share()](Status status) mutable {
ASSERT(!status.isOK());
outside.setError(status);
});
- ASSERT_EQ(std::move(outsideFut).getNoThrow(), failStatus);
+ ASSERT_EQ(std::move(pf.future).getNoThrow(), failStatus);
});
}
@@ -884,10 +875,9 @@ TEST(Future_Void, Success_thenFutureReady) {
[](Future<void>&& fut) {
ASSERT_EQ(std::move(fut)
.then([]() {
- Promise<int> promise;
- auto fut = promise.getFuture();
- promise.emplaceValue(3);
- return fut;
+ auto pf = makePromiseFuture<int>();
+ pf.promise.emplaceValue(3);
+ return std::move(pf.future);
})
.get(),
3);
@@ -1000,10 +990,9 @@ TEST(Future_Void, Fail_onErrorFutureReady) {
ASSERT_EQ(std::move(fut)
.onError([](Status s) {
ASSERT_EQ(s, failStatus);
- Promise<void> promise;
- auto fut = promise.getFuture();
- promise.emplaceValue();
- return fut;
+ auto pf = makePromiseFuture<void>();
+ pf.promise.emplaceValue();
+ return std::move(pf.future);
})
.then([] { return 3; })
.get(),
@@ -1284,13 +1273,12 @@ TEST(Future_MoveOnly, Success_getAsync) {
FUTURE_SUCCESS_TEST(
[] { return Widget(1); },
[](Future<Widget>&& fut) {
- auto outside = Promise<Widget>();
- auto outsideFut = outside.getFuture();
- std::move(fut).getAsync([outside = outside.share()](StatusWith<Widget> sw) mutable {
+ auto pf = makePromiseFuture<Widget>();
+ std::move(fut).getAsync([outside = pf.promise.share()](StatusWith<Widget> sw) mutable {
ASSERT_OK(sw);
outside.emplaceValue(std::move(sw.getValue()));
});
- ASSERT_EQ(std::move(outsideFut).get(), 1);
+ ASSERT_EQ(std::move(pf.future).get(), 1);
});
}
@@ -1327,13 +1315,12 @@ TEST(Future_MoveOnly, Fail_getNothrowRvalue) {
TEST(Future_MoveOnly, Fail_getAsync) {
FUTURE_FAIL_TEST<Widget>([](Future<Widget>&& fut) {
- auto outside = Promise<Widget>();
- auto outsideFut = outside.getFuture();
- std::move(fut).getAsync([outside = outside.share()](StatusWith<Widget> sw) mutable {
+ auto pf = makePromiseFuture<Widget>();
+ std::move(fut).getAsync([outside = pf.promise.share()](StatusWith<Widget> sw) mutable {
ASSERT(!sw.isOK());
outside.setError(sw.getStatus());
});
- ASSERT_EQ(std::move(outsideFut).getNoThrow(), failStatus);
+ ASSERT_EQ(std::move(pf.future).getNoThrow(), failStatus);
});
}
@@ -1413,10 +1400,9 @@ TEST(Future_MoveOnly, Success_thenFutureReady) {
[](Future<Widget>&& fut) {
ASSERT_EQ(std::move(fut)
.then([](Widget i) {
- Promise<Widget> promise;
- auto fut = promise.getFuture();
- promise.emplaceValue(i + 2);
- return fut;
+ auto pf = makePromiseFuture<Widget>();
+ pf.promise.emplaceValue(i + 2);
+ return std::move(pf.future);
})
.get(),
3);
@@ -1548,10 +1534,9 @@ TEST(Future_MoveOnly, Fail_onErrorFutureReady) {
ASSERT_EQ(std::move(fut)
.onError([](Status s) {
ASSERT_EQ(s, failStatus);
- Promise<Widget> promise;
- auto fut = promise.getFuture();
- promise.emplaceValue(3);
- return fut;
+ auto pf = makePromiseFuture<Widget>();
+ pf.promise.emplaceValue(3);
+ return std::move(pf.future);
})
.get(),
3);
@@ -1760,119 +1745,119 @@ DEATH_TEST(Future_EdgeCases, Success_getAsync_throw, "terminate() called") {
TEST(Promise, Success_setFrom) {
FUTURE_SUCCESS_TEST([] { return 1; },
[](Future<int>&& fut) {
- Promise<int> p;
- p.setFrom(std::move(fut));
- ASSERT_EQ(p.getFuture().get(), 1);
+ auto pf = makePromiseFuture<int>();
+ pf.promise.setFrom(std::move(fut));
+ ASSERT_EQ(std::move(pf.future).get(), 1);
});
}
TEST(Promise, Fail_setFrom) {
FUTURE_FAIL_TEST<int>([](Future<int>&& fut) {
- Promise<int> p;
- p.setFrom(std::move(fut));
- ASSERT_THROWS_failStatus(p.getFuture().get());
+ auto pf = makePromiseFuture<int>();
+ pf.promise.setFrom(std::move(fut));
+ ASSERT_THROWS_failStatus(std::move(pf.future).get());
});
}
TEST(Promise, Success_setWith_value) {
- Promise<int> p;
- p.setWith([&] { return 1; });
- ASSERT_EQ(p.getFuture().get(), 1);
+ auto pf = makePromiseFuture<int>();
+ pf.promise.setWith([&] { return 1; });
+ ASSERT_EQ(std::move(pf.future).get(), 1);
}
TEST(Promise, Fail_setWith_throw) {
- Promise<int> p;
- p.setWith([&] {
+ auto pf = makePromiseFuture<int>();
+ pf.promise.setWith([&] {
uassertStatusOK(failStatus);
return 1;
});
- ASSERT_THROWS_failStatus(p.getFuture().get());
+ ASSERT_THROWS_failStatus(std::move(pf.future).get());
}
TEST(Promise, Success_setWith_StatusWith) {
- Promise<int> p;
- p.setWith([&] { return StatusWith<int>(1); });
- ASSERT_EQ(p.getFuture().get(), 1);
+ auto pf = makePromiseFuture<int>();
+ pf.promise.setWith([&] { return StatusWith<int>(1); });
+ ASSERT_EQ(std::move(pf.future).get(), 1);
}
TEST(Promise, Fail_setWith_StatusWith) {
- Promise<int> p;
- p.setWith([&] { return StatusWith<int>(failStatus); });
- ASSERT_THROWS_failStatus(p.getFuture().get());
+ auto pf = makePromiseFuture<int>();
+ pf.promise.setWith([&] { return StatusWith<int>(failStatus); });
+ ASSERT_THROWS_failStatus(std::move(pf.future).get());
}
TEST(Promise, Success_setWith_Future) {
FUTURE_SUCCESS_TEST([] { return 1; },
[](Future<int>&& fut) {
- Promise<int> p;
- p.setWith([&] { return std::move(fut); });
- ASSERT_EQ(p.getFuture().get(), 1);
+ auto pf = makePromiseFuture<int>();
+ pf.promise.setWith([&] { return std::move(fut); });
+ ASSERT_EQ(std::move(pf.future).get(), 1);
});
}
TEST(Promise, Fail_setWith_Future) {
FUTURE_FAIL_TEST<int>([](Future<int>&& fut) {
- Promise<int> p;
- p.setWith([&] { return std::move(fut); });
- ASSERT_THROWS_failStatus(p.getFuture().get());
+ auto pf = makePromiseFuture<int>();
+ pf.promise.setWith([&] { return std::move(fut); });
+ ASSERT_THROWS_failStatus(std::move(pf.future).get());
});
}
TEST(Promise_void, Success_setFrom) {
FUTURE_SUCCESS_TEST([] {},
[](Future<void>&& fut) {
- Promise<void> p;
- p.setFrom(std::move(fut));
- ASSERT_OK(p.getFuture().getNoThrow());
+ auto pf = makePromiseFuture<void>();
+ pf.promise.setFrom(std::move(fut));
+ ASSERT_OK(std::move(pf.future).getNoThrow());
});
}
TEST(Promise_void, Fail_setFrom) {
FUTURE_FAIL_TEST<void>([](Future<void>&& fut) {
- Promise<void> p;
- p.setFrom(std::move(fut));
- ASSERT_THROWS_failStatus(p.getFuture().get());
+ auto pf = makePromiseFuture<void>();
+ pf.promise.setFrom(std::move(fut));
+ ASSERT_THROWS_failStatus(std::move(pf.future).get());
});
}
TEST(Promise_void, Success_setWith_value) {
- Promise<void> p;
- p.setWith([&] {});
- ASSERT_OK(p.getFuture().getNoThrow());
+ auto pf = makePromiseFuture<void>();
+ pf.promise.setWith([&] {});
+ ASSERT_OK(std::move(pf.future).getNoThrow());
}
TEST(Promise_void, Fail_setWith_throw) {
- Promise<void> p;
- p.setWith([&] { uassertStatusOK(failStatus); });
- ASSERT_THROWS_failStatus(p.getFuture().get());
+ auto pf = makePromiseFuture<void>();
+ pf.promise.setWith([&] { uassertStatusOK(failStatus); });
+ ASSERT_THROWS_failStatus(std::move(pf.future).get());
}
TEST(Promise_void, Success_setWith_Status) {
- Promise<void> p;
- p.setWith([&] { return Status::OK(); });
- ASSERT_OK(p.getFuture().getNoThrow());
+ auto pf = makePromiseFuture<void>();
+ pf.promise.setWith([&] { return Status::OK(); });
+ ASSERT_OK(std::move(pf.future).getNoThrow());
}
TEST(Promise_void, Fail_setWith_Status) {
- Promise<void> p;
- p.setWith([&] { return failStatus; });
- ASSERT_THROWS_failStatus(p.getFuture().get());
+ auto pf = makePromiseFuture<void>();
+ pf.promise.setWith([&] { return failStatus; });
+ ASSERT_THROWS_failStatus(std::move(pf.future).get());
}
TEST(Promise_void, Success_setWith_Future) {
FUTURE_SUCCESS_TEST([] {},
[](Future<void>&& fut) {
- Promise<void> p;
- p.setWith([&] { return std::move(fut); });
- ASSERT_OK(p.getFuture().getNoThrow());
+ auto pf = makePromiseFuture<void>();
+ pf.promise.setWith([&] { return std::move(fut); });
+ ASSERT_OK(std::move(pf.future).getNoThrow());
});
}
TEST(Promise_void, Fail_setWith_Future) {
FUTURE_FAIL_TEST<void>([](Future<void>&& fut) {
- Promise<void> p;
- p.setWith([&] { return std::move(fut); });
- ASSERT_THROWS_failStatus(p.getFuture().get());
+ auto pf = makePromiseFuture<void>();
+ pf.promise.setWith([&] { return std::move(fut); });
+ ASSERT_THROWS_failStatus(std::move(pf.future).get());
});
}
diff --git a/src/mongo/util/keyed_executor.h b/src/mongo/util/keyed_executor.h
index c0d25110d44..ba0fc28ee87 100644
--- a/src/mongo/util/keyed_executor.h
+++ b/src/mongo/util/keyed_executor.h
@@ -157,6 +157,8 @@ public:
promise.emplaceValue();
}
+ explicit Latch(Promise<void> p) : promise(std::move(p)) {}
+
Promise<void> promise;
};
@@ -167,9 +169,10 @@ public:
return Future<void>::makeReady();
}
+ auto pf = makePromiseFuture<void>();
// We rely on shard_ptr to handle the atomic refcounting before emplacing for us.
- auto latch = std::make_shared<Latch>();
- auto future = latch->promise.getFuture();
+ auto latch = std::make_shared<Latch>(std::move(pf.promise));
+ auto future = std::move(pf.future);
for (auto& pair : _map) {
_onCleared(lk, pair.second).getAsync([latch](const Status& status) mutable {