diff options
author | ADAM David Alan Martin <adam.martin@10gen.com> | 2018-07-25 16:04:37 -0400 |
---|---|---|
committer | ADAM David Alan Martin <adam.martin@10gen.com> | 2018-07-26 12:41:18 -0400 |
commit | 2338f365430d7f395faf73bff6c64def505da1b3 (patch) | |
tree | e665157e6553284420be40d3e41f1736c1a3436d /src/mongo | |
parent | 2471e2bb651442d84aef69a7c1885f653a179b35 (diff) | |
download | mongo-2338f365430d7f395faf73bff6c64def505da1b3.tar.gz |
SERVER-35684 Remove `promise.getFuture()`
This API invites subtle race conditions. So just remove it, and
force everyone to use a unified API which creates a promise and
a future at the same time.
Diffstat (limited to 'src/mongo')
-rw-r--r-- | src/mongo/executor/connection_pool_tl.cpp | 12 | ||||
-rw-r--r-- | src/mongo/executor/network_interface_tl.cpp | 31 | ||||
-rw-r--r-- | src/mongo/executor/network_interface_tl.h | 10 | ||||
-rw-r--r-- | src/mongo/transport/transport_layer_asio.cpp | 16 | ||||
-rw-r--r-- | src/mongo/util/future.h | 49 | ||||
-rw-r--r-- | src/mongo/util/future_bm.cpp | 132 | ||||
-rw-r--r-- | src/mongo/util/future_test.cpp | 201 | ||||
-rw-r--r-- | src/mongo/util/keyed_executor.h | 7 |
8 files changed, 222 insertions, 236 deletions
diff --git a/src/mongo/executor/connection_pool_tl.cpp b/src/mongo/executor/connection_pool_tl.cpp index bb41a687a6e..0c9cf2e0f76 100644 --- a/src/mongo/executor/connection_pool_tl.cpp +++ b/src/mongo/executor/connection_pool_tl.cpp @@ -44,6 +44,8 @@ const auto kMaxTimerDuration = Milliseconds::max(); struct TimeoutHandler { AtomicBool done; Promise<void> promise; + + explicit TimeoutHandler(Promise<void> p) : promise(std::move(p)) {} }; } // namespace @@ -111,8 +113,9 @@ void TLConnection::cancelTimeout() { void TLConnection::setup(Milliseconds timeout, SetupCallback cb) { auto anchor = shared_from_this(); - auto handler = std::make_shared<TimeoutHandler>(); - handler->promise.getFuture().getAsync( + auto pf = makePromiseFuture<void>(); + auto handler = std::make_shared<TimeoutHandler>(std::move(pf.promise)); + std::move(pf.future).getAsync( [ this, cb = std::move(cb) ](Status status) { cb(this, std::move(status)); }); log() << "Connecting to " << _peer; @@ -175,8 +178,9 @@ void TLConnection::resetToUnknown() { void TLConnection::refresh(Milliseconds timeout, RefreshCallback cb) { auto anchor = shared_from_this(); - auto handler = std::make_shared<TimeoutHandler>(); - handler->promise.getFuture().getAsync( + auto pf = makePromiseFuture<void>(); + auto handler = std::make_shared<TimeoutHandler>(std::move(pf.promise)); + std::move(pf.future).getAsync( [ this, cb = std::move(cb) ](Status status) { cb(this, status); }); setTimeout(timeout, [this, handler] { diff --git a/src/mongo/executor/network_interface_tl.cpp b/src/mongo/executor/network_interface_tl.cpp index 2bb57e38678..cffe9a74596 100644 --- a/src/mongo/executor/network_interface_tl.cpp +++ b/src/mongo/executor/network_interface_tl.cpp @@ -169,7 +169,8 @@ Status NetworkInterfaceTL::startCommand(const TaskExecutor::CallbackHandle& cbHa request.metadata = newMetadata.obj(); } - auto state = std::make_shared<CommandState>(request, cbHandle); + auto pf = makePromiseFuture<RemoteCommandResponse>(); + auto state = std::make_shared<CommandState>(request, cbHandle, std::move(pf.promise)); { stdx::lock_guard<stdx::mutex> lk(_inProgressMutex); _inProgress.insert({state->cbHandle, state}); @@ -182,10 +183,9 @@ Status NetworkInterfaceTL::startCommand(const TaskExecutor::CallbackHandle& cbHa if (MONGO_FAIL_POINT(networkInterfaceDiscardCommandsBeforeAcquireConn)) { log() << "Discarding command due to failpoint before acquireConn"; - std::move(state->mergedFuture) - .getAsync([onFinish](StatusWith<RemoteCommandResponse> response) { - onFinish(RemoteCommandResponse(response.getStatus(), Milliseconds{0})); - }); + std::move(pf.future).getAsync([onFinish](StatusWith<RemoteCommandResponse> response) { + onFinish(RemoteCommandResponse(response.getStatus(), Milliseconds{0})); + }); return Status::OK(); } @@ -215,10 +215,18 @@ Status NetworkInterfaceTL::startCommand(const TaskExecutor::CallbackHandle& cbHa }); }); - auto remainingWork = [this, state, baton, onFinish]( - StatusWith<std::shared_ptr<CommandState::ConnHandle>> swConn) { - makeReadyFutureWith( - [&] { return _onAcquireConn(state, std::move(*uassertStatusOK(swConn)), baton); }) + auto remainingWork = [ + this, + state, + // TODO: once SERVER-35685 is done, stop using a `std::shared_ptr<Future>` here. + future = std::make_shared<decltype(pf.future)>(std::move(pf.future)), + baton, + onFinish + ](StatusWith<std::shared_ptr<CommandState::ConnHandle>> swConn) mutable { + makeReadyFutureWith([&] { + return _onAcquireConn( + state, std::move(*future), std::move(*uassertStatusOK(swConn)), baton); + }) .onError([](Status error) -> StatusWith<RemoteCommandResponse> { // The TransportLayer has, for historical reasons returned SocketException for // network errors, but sharding assumes HostUnreachable on network errors. @@ -267,11 +275,12 @@ Status NetworkInterfaceTL::startCommand(const TaskExecutor::CallbackHandle& cbHa // returning a ready Future with a not-OK status. Future<RemoteCommandResponse> NetworkInterfaceTL::_onAcquireConn( std::shared_ptr<CommandState> state, + Future<RemoteCommandResponse> future, CommandState::ConnHandle conn, const transport::BatonHandle& baton) { if (MONGO_FAIL_POINT(networkInterfaceDiscardCommandsAfterAcquireConn)) { conn->indicateSuccess(); - return std::move(state->mergedFuture); + return future; } if (state->done.load()) { @@ -366,7 +375,7 @@ Future<RemoteCommandResponse> NetworkInterfaceTL::_onAcquireConn( state->promise.setFromStatusWith(std::move(swr)); }); - return std::move(state->mergedFuture); + return future; } void NetworkInterfaceTL::_eraseInUseConn(const TaskExecutor::CallbackHandle& cbHandle) { diff --git a/src/mongo/executor/network_interface_tl.h b/src/mongo/executor/network_interface_tl.h index 068e5d310bb..603eb59ef8d 100644 --- a/src/mongo/executor/network_interface_tl.h +++ b/src/mongo/executor/network_interface_tl.h @@ -80,8 +80,12 @@ public: private: struct CommandState { - CommandState(RemoteCommandRequest request_, TaskExecutor::CallbackHandle cbHandle_) - : request(std::move(request_)), cbHandle(std::move(cbHandle_)) {} + CommandState(RemoteCommandRequest request_, + TaskExecutor::CallbackHandle cbHandle_, + Promise<RemoteCommandResponse> promise_) + : request(std::move(request_)), + cbHandle(std::move(cbHandle_)), + promise(std::move(promise_)) {} RemoteCommandRequest request; TaskExecutor::CallbackHandle cbHandle; @@ -104,11 +108,11 @@ private: AtomicBool done; Promise<RemoteCommandResponse> promise; - Future<RemoteCommandResponse> mergedFuture = promise.getFuture(); }; void _eraseInUseConn(const TaskExecutor::CallbackHandle& handle); Future<RemoteCommandResponse> _onAcquireConn(std::shared_ptr<CommandState> state, + Future<RemoteCommandResponse> future, CommandState::ConnHandle conn, const transport::BatonHandle& baton); diff --git a/src/mongo/transport/transport_layer_asio.cpp b/src/mongo/transport/transport_layer_asio.cpp index 4cc72dc99fe..cbcde155864 100644 --- a/src/mongo/transport/transport_layer_asio.cpp +++ b/src/mongo/transport/transport_layer_asio.cpp @@ -557,8 +557,14 @@ Future<SessionHandle> TransportLayerASIO::asyncConnect(HostAndPort peer, Milliseconds timeout) { struct AsyncConnectState { - AsyncConnectState(HostAndPort peer, asio::io_context& context) - : socket(context), timeoutTimer(context), resolver(context), peer(std::move(peer)) {} + AsyncConnectState(HostAndPort peer, + asio::io_context& context, + Promise<SessionHandle> promise_) + : promise(std::move(promise_)), + socket(context), + timeoutTimer(context), + resolver(context), + peer(std::move(peer)) {} AtomicBool done{false}; Promise<SessionHandle> promise; @@ -573,8 +579,10 @@ Future<SessionHandle> TransportLayerASIO::asyncConnect(HostAndPort peer, }; auto reactorImpl = checked_cast<ASIOReactor*>(reactor.get()); - auto connector = std::make_shared<AsyncConnectState>(std::move(peer), *reactorImpl); - Future<SessionHandle> mergedFuture = connector->promise.getFuture(); + auto pf = makePromiseFuture<SessionHandle>(); + auto connector = + std::make_shared<AsyncConnectState>(std::move(peer), *reactorImpl, std::move(pf.promise)); + Future<SessionHandle> mergedFuture = std::move(pf.future); if (connector->peer.host().empty()) { return Status{ErrorCodes::HostNotFound, "Hostname or IP address to connect to is empty"}; diff --git a/src/mongo/util/future.h b/src/mongo/util/future.h index 3f227761b0e..c50bab8c7ae 100644 --- a/src/mongo/util/future.h +++ b/src/mongo/util/future.h @@ -508,9 +508,7 @@ public: ~Promise() { if (MONGO_unlikely(sharedState)) { - if (haveExtractedFuture) { - sharedState->setError({ErrorCodes::BrokenPromise, "broken promise"}); - } + sharedState->setError({ErrorCodes::BrokenPromise, "broken promise"}); } } @@ -576,25 +574,28 @@ public: */ SharedPromise<T> share() noexcept; - /** - * Prefer using makePromiseFuture<T>() over constructing a promise and calling this method. - */ - Future<T> getFuture() noexcept; + static auto makePromiseFutureImpl() { + struct PromiseAndFuture { + Promise<T> promise; + Future<T> future = promise.getFuture(); + }; + return PromiseAndFuture(); + } private: + // This is not public because we found it frequently was involved in races. The + // `makePromiseFuture<T>` API avoids those races entirely. + Future<T> getFuture() noexcept; + friend class Future<void>; template <typename Func> void setImpl(Func&& doSet) noexcept { - invariant(!haveSetValue); - haveSetValue = true; + invariant(sharedState); doSet(); - if (haveExtractedFuture) - sharedState.reset(); + sharedState.reset(); } - bool haveSetValue = false; - bool haveExtractedFuture = false; boost::intrusive_ptr<SharedState<T>> sharedState = make_intrusive<SharedState<T>>(); }; @@ -1310,11 +1311,7 @@ auto makeReadyFutureWith(Func&& func) { */ template <typename T> inline auto makePromiseFuture() { - struct PromiseAndFuture { - Promise<T> promise; - Future<T> future = promise.getFuture(); - }; - return PromiseAndFuture(); + return Promise<T>::makePromiseFutureImpl(); } /** @@ -1349,23 +1346,13 @@ using FutureContinuationResult = template <typename T> inline Future<T> Promise<T>::getFuture() noexcept { - invariant(!haveExtractedFuture); - haveExtractedFuture = true; - - if (!haveSetValue) { - sharedState->threadUnsafeIncRefCountTo(2); - return Future<T>( - boost::intrusive_ptr<SharedState<T>>(sharedState.get(), /*add ref*/ false)); - } - - // Let the Future steal our ref-count since we don't need it anymore. - return Future<T>(std::move(sharedState)); + sharedState->threadUnsafeIncRefCountTo(2); + return Future<T>(boost::intrusive_ptr<SharedState<T>>(sharedState.get(), /*add ref*/ false)); } template <typename T> inline SharedPromise<T> Promise<T>::share() noexcept { - invariant(haveExtractedFuture); - invariant(!haveSetValue); + invariant(sharedState); return SharedPromise<T>(std::make_shared<Promise<T>>(std::move(*this))); } diff --git a/src/mongo/util/future_bm.cpp b/src/mongo/util/future_bm.cpp index 28d57bfe010..5aad4b6bb48 100644 --- a/src/mongo/util/future_bm.cpp +++ b/src/mongo/util/future_bm.cpp @@ -65,9 +65,9 @@ void BM_futureIntReadyThen(benchmark::State& state) { NOINLINE_DECL Future<int> makeReadyFutWithPromise() { benchmark::ClobberMemory(); - Promise<int> p; - p.emplaceValue(1); // before getFuture(). - return p.getFuture(); + auto pf = makePromiseFuture<int>(); + pf.promise.emplaceValue(1); + return std::move(pf.future); } void BM_futureIntReadyWithPromise(benchmark::State& state) { @@ -83,27 +83,12 @@ void BM_futureIntReadyWithPromiseThen(benchmark::State& state) { } } -NOINLINE_DECL Future<int> makeReadyFutWithPromise2() { - // This is the same as makeReadyFutWithPromise() except that this gets the Future first. - benchmark::ClobberMemory(); - Promise<int> p; - auto fut = p.getFuture(); - p.emplaceValue(1); // after getFuture(). - return fut; -} - -void BM_futureIntReadyWithPromise2(benchmark::State& state) { - for (auto _ : state) { - benchmark::DoNotOptimize(makeReadyFutWithPromise().then([](int i) { return i + 1; }).get()); - } -} - void BM_futureIntDeferredThen(benchmark::State& state) { for (auto _ : state) { benchmark::ClobberMemory(); - Promise<int> p; - auto fut = p.getFuture().then([](int i) { return i + 1; }); - p.emplaceValue(1); + auto pf = makePromiseFuture<int>(); + auto fut = std::move(pf.future).then([](int i) { return i + 1; }); + pf.promise.emplaceValue(1); benchmark::DoNotOptimize(std::move(fut).get()); } } @@ -111,9 +96,9 @@ void BM_futureIntDeferredThen(benchmark::State& state) { void BM_futureIntDeferredThenImmediate(benchmark::State& state) { for (auto _ : state) { benchmark::ClobberMemory(); - Promise<int> p; - auto fut = p.getFuture().then([](int i) { return Future<int>::makeReady(i + 1); }); - p.emplaceValue(1); + auto pf = makePromiseFuture<int>(); + auto fut = std::move(pf.future).then([](int i) { return Future<int>::makeReady(i + 1); }); + pf.promise.emplaceValue(1); benchmark::DoNotOptimize(std::move(fut).get()); } } @@ -122,9 +107,9 @@ void BM_futureIntDeferredThenImmediate(benchmark::State& state) { void BM_futureIntDeferredThenReady(benchmark::State& state) { for (auto _ : state) { benchmark::ClobberMemory(); - Promise<int> p1; - auto fut = p1.getFuture().then([&](int i) { return makeReadyFutWithPromise(); }); - p1.emplaceValue(1); + auto pf = makePromiseFuture<int>(); + auto fut = std::move(pf.future).then([](int i) { return makeReadyFutWithPromise(); }); + pf.promise.emplaceValue(1); benchmark::DoNotOptimize(std::move(fut).get()); } } @@ -132,11 +117,11 @@ void BM_futureIntDeferredThenReady(benchmark::State& state) { void BM_futureIntDoubleDeferredThen(benchmark::State& state) { for (auto _ : state) { benchmark::ClobberMemory(); - Promise<int> p1; - Promise<int> p2; - auto fut = p1.getFuture().then([&](int i) { return p2.getFuture(); }); - p1.emplaceValue(1); - p2.emplaceValue(1); + auto pf1 = makePromiseFuture<int>(); + auto pf2 = makePromiseFuture<int>(); + auto fut = std::move(pf1.future).then([&](int i) { return std::move(pf2.future); }); + pf1.promise.emplaceValue(1); + pf2.promise.emplaceValue(1); benchmark::DoNotOptimize(std::move(fut).get()); } } @@ -144,14 +129,15 @@ void BM_futureIntDoubleDeferredThen(benchmark::State& state) { void BM_futureInt3xDeferredThenNested(benchmark::State& state) { for (auto _ : state) { benchmark::ClobberMemory(); - Promise<int> p1; - Promise<int> p2; - Promise<int> p3; - auto fut = p1.getFuture().then( - [&](int i) { return p2.getFuture().then([&](int) { return p3.getFuture(); }); }); - p1.emplaceValue(1); - p2.emplaceValue(1); - p3.emplaceValue(1); + auto pf1 = makePromiseFuture<int>(); + auto pf2 = makePromiseFuture<int>(); + auto pf3 = makePromiseFuture<int>(); + auto fut = std::move(pf1.future).then([&](int i) { + return std::move(pf2.future).then([&](int) { return std::move(pf3.future); }); + }); + pf1.promise.emplaceValue(1); + pf2.promise.emplaceValue(1); + pf3.promise.emplaceValue(1); benchmark::DoNotOptimize(std::move(fut).get()); } } @@ -159,15 +145,15 @@ void BM_futureInt3xDeferredThenNested(benchmark::State& state) { void BM_futureInt3xDeferredThenChained(benchmark::State& state) { for (auto _ : state) { benchmark::ClobberMemory(); - Promise<int> p1; - Promise<int> p2; - Promise<int> p3; - auto fut = p1.getFuture().then([&](int i) { return p2.getFuture(); }).then([&](int i) { - return p3.getFuture(); - }); - p1.emplaceValue(1); - p2.emplaceValue(1); - p3.emplaceValue(1); + auto pf1 = makePromiseFuture<int>(); + auto pf2 = makePromiseFuture<int>(); + auto pf3 = makePromiseFuture<int>(); + auto fut = std::move(pf1.future) + .then([&](int i) { return std::move(pf2.future); }) + .then([&](int i) { return std::move(pf3.future); }); + pf1.promise.emplaceValue(1); + pf2.promise.emplaceValue(1); + pf3.promise.emplaceValue(1); benchmark::DoNotOptimize(std::move(fut).get()); } } @@ -176,18 +162,19 @@ void BM_futureInt3xDeferredThenChained(benchmark::State& state) { void BM_futureInt4xDeferredThenNested(benchmark::State& state) { for (auto _ : state) { benchmark::ClobberMemory(); - Promise<int> p1; - Promise<int> p2; - Promise<int> p3; - Promise<int> p4; - auto fut = p1.getFuture().then([&](int i) { - return p2.getFuture().then( - [&](int) { return p3.getFuture().then([&](int) { return p4.getFuture(); }); }); + auto pf1 = makePromiseFuture<int>(); + auto pf2 = makePromiseFuture<int>(); + auto pf3 = makePromiseFuture<int>(); + auto pf4 = makePromiseFuture<int>(); + auto fut = std::move(pf1.future).then([&](int i) { + return std::move(pf2.future).then([&](int) { + return std::move(pf3.future).then([&](int) { return std::move(pf4.future); }); + }); }); - p1.emplaceValue(1); - p2.emplaceValue(1); - p3.emplaceValue(1); - p4.emplaceValue(1); + pf1.promise.emplaceValue(1); + pf2.promise.emplaceValue(1); + pf3.promise.emplaceValue(1); + pf4.promise.emplaceValue(1); benchmark::DoNotOptimize(std::move(fut).get()); } } @@ -195,18 +182,18 @@ void BM_futureInt4xDeferredThenNested(benchmark::State& state) { void BM_futureInt4xDeferredThenChained(benchmark::State& state) { for (auto _ : state) { benchmark::ClobberMemory(); - Promise<int> p1; - Promise<int> p2; - Promise<int> p3; - Promise<int> p4; - auto fut = p1.getFuture() // - .then([&](int i) { return p2.getFuture(); }) - .then([&](int i) { return p3.getFuture(); }) - .then([&](int i) { return p4.getFuture(); }); - p1.emplaceValue(1); - p2.emplaceValue(1); - p3.emplaceValue(1); - p4.emplaceValue(1); + auto pf1 = makePromiseFuture<int>(); + auto pf2 = makePromiseFuture<int>(); + auto pf3 = makePromiseFuture<int>(); + auto pf4 = makePromiseFuture<int>(); + auto fut = std::move(pf1.future) // + .then([&](int i) { return std::move(pf2.future); }) + .then([&](int i) { return std::move(pf3.future); }) + .then([&](int i) { return std::move(pf4.future); }); + pf1.promise.emplaceValue(1); + pf2.promise.emplaceValue(1); + pf3.promise.emplaceValue(1); + pf4.promise.emplaceValue(1); benchmark::DoNotOptimize(std::move(fut).get()); } } @@ -217,7 +204,6 @@ BENCHMARK(BM_futureIntReady); BENCHMARK(BM_futureIntReadyThen); BENCHMARK(BM_futureIntReadyWithPromise); BENCHMARK(BM_futureIntReadyWithPromiseThen); -BENCHMARK(BM_futureIntReadyWithPromise2); BENCHMARK(BM_futureIntDeferredThen); BENCHMARK(BM_futureIntDeferredThenImmediate); BENCHMARK(BM_futureIntDeferredThenReady); diff --git a/src/mongo/util/future_test.cpp b/src/mongo/util/future_test.cpp index 2a755460ce3..ffeb101cbad 100644 --- a/src/mongo/util/future_test.cpp +++ b/src/mongo/util/future_test.cpp @@ -110,10 +110,9 @@ void FUTURE_SUCCESS_TEST(const CompletionFunc& completion, const TestFunc& test) test(Future<CompletionType>::makeReady(completion())); } { // ready future from promise - Promise<CompletionType> promise; - auto fut = promise.getFuture(); // before setting value to bypass opt to immediate - promise.emplaceValue(completion()); - test(std::move(fut)); + auto pf = makePromiseFuture<CompletionType>(); + pf.promise.emplaceValue(completion()); + test(std::move(pf.future)); } { // async future @@ -132,11 +131,10 @@ void FUTURE_SUCCESS_TEST(const CompletionFunc& completion, const TestFunc& test) test(Future<CompletionType>::makeReady()); } { // ready future from promise - Promise<CompletionType> promise; - auto fut = promise.getFuture(); // before setting value to bypass opt to immediate + auto pf = makePromiseFuture<CompletionType>(); completion(); - promise.emplaceValue(); - test(std::move(fut)); + pf.promise.emplaceValue(); + test(std::move(pf.future)); } { // async future @@ -150,10 +148,9 @@ void FUTURE_FAIL_TEST(const TestFunc& test) { test(Future<CompletionType>::makeReady(failStatus)); } { // ready future from promise - Promise<CompletionType> promise; - auto fut = promise.getFuture(); // before setting value to bypass opt to immediate - promise.setError(failStatus); - test(std::move(fut)); + auto pf = makePromiseFuture<CompletionType>(); + pf.promise.setError(failStatus); + test(std::move(pf.future)); } { // async future @@ -196,13 +193,12 @@ TEST(Future, Success_getAsync) { FUTURE_SUCCESS_TEST( [] { return 1; }, [](Future<int>&& fut) { - auto outside = Promise<int>(); - auto outsideFut = outside.getFuture(); - std::move(fut).getAsync([outside = outside.share()](StatusWith<int> sw) mutable { + auto pf = makePromiseFuture<int>(); + std::move(fut).getAsync([outside = pf.promise.share()](StatusWith<int> sw) mutable { ASSERT_OK(sw); outside.emplaceValue(sw.getValue()); }); - ASSERT_EQ(std::move(outsideFut).get(), 1); + ASSERT_EQ(std::move(pf.future).get(), 1); }); } @@ -234,13 +230,12 @@ TEST(Future, Fail_getNothrowRvalue) { TEST(Future, Fail_getAsync) { FUTURE_FAIL_TEST<int>([](Future<int>&& fut) { - auto outside = Promise<int>(); - auto outsideFut = outside.getFuture(); - std::move(fut).getAsync([outside = outside.share()](StatusWith<int> sw) mutable { + auto pf = makePromiseFuture<int>(); + std::move(fut).getAsync([outside = pf.promise.share()](StatusWith<int> sw) mutable { ASSERT(!sw.isOK()); outside.setError(sw.getStatus()); }); - ASSERT_EQ(std::move(outsideFut).getNoThrow(), failStatus); + ASSERT_EQ(std::move(pf.future).getNoThrow(), failStatus); }); } @@ -357,10 +352,9 @@ TEST(Future, Success_thenFutureReady) { [](Future<int>&& fut) { ASSERT_EQ(std::move(fut) .then([](int i) { - Promise<int> promise; - auto fut = promise.getFuture(); - promise.emplaceValue(i + 2); - return fut; + auto pf = makePromiseFuture<int>(); + pf.promise.emplaceValue(i + 2); + return std::move(pf.future); }) .get(), 3); @@ -490,10 +484,9 @@ TEST(Future, Fail_onErrorFutureReady) { ASSERT_EQ(std::move(fut) .onError([](Status s) { ASSERT_EQ(s, failStatus); - Promise<int> promise; - auto fut = promise.getFuture(); - promise.emplaceValue(3); - return fut; + auto pf = makePromiseFuture<int>(); + pf.promise.emplaceValue(3); + return std::move(pf.future); }) .get(), 3); @@ -744,13 +737,12 @@ TEST(Future_Void, Success_getAsync) { FUTURE_SUCCESS_TEST( [] {}, [](Future<void>&& fut) { - auto outside = Promise<void>(); - auto outsideFut = outside.getFuture(); - std::move(fut).getAsync([outside = outside.share()](Status status) mutable { + auto pf = makePromiseFuture<void>(); + std::move(fut).getAsync([outside = pf.promise.share()](Status status) mutable { ASSERT_OK(status); outside.emplaceValue(); }); - ASSERT_EQ(std::move(outsideFut).getNoThrow(), Status::OK()); + ASSERT_EQ(std::move(pf.future).getNoThrow(), Status::OK()); }); } @@ -783,13 +775,12 @@ TEST(Future_Void, Fail_getNothrowRvalue) { TEST(Future_Void, Fail_getAsync) { FUTURE_FAIL_TEST<void>([](Future<void>&& fut) { - auto outside = Promise<void>(); - auto outsideFut = outside.getFuture(); - std::move(fut).getAsync([outside = outside.share()](Status status) mutable { + auto pf = makePromiseFuture<void>(); + std::move(fut).getAsync([outside = pf.promise.share()](Status status) mutable { ASSERT(!status.isOK()); outside.setError(status); }); - ASSERT_EQ(std::move(outsideFut).getNoThrow(), failStatus); + ASSERT_EQ(std::move(pf.future).getNoThrow(), failStatus); }); } @@ -884,10 +875,9 @@ TEST(Future_Void, Success_thenFutureReady) { [](Future<void>&& fut) { ASSERT_EQ(std::move(fut) .then([]() { - Promise<int> promise; - auto fut = promise.getFuture(); - promise.emplaceValue(3); - return fut; + auto pf = makePromiseFuture<int>(); + pf.promise.emplaceValue(3); + return std::move(pf.future); }) .get(), 3); @@ -1000,10 +990,9 @@ TEST(Future_Void, Fail_onErrorFutureReady) { ASSERT_EQ(std::move(fut) .onError([](Status s) { ASSERT_EQ(s, failStatus); - Promise<void> promise; - auto fut = promise.getFuture(); - promise.emplaceValue(); - return fut; + auto pf = makePromiseFuture<void>(); + pf.promise.emplaceValue(); + return std::move(pf.future); }) .then([] { return 3; }) .get(), @@ -1284,13 +1273,12 @@ TEST(Future_MoveOnly, Success_getAsync) { FUTURE_SUCCESS_TEST( [] { return Widget(1); }, [](Future<Widget>&& fut) { - auto outside = Promise<Widget>(); - auto outsideFut = outside.getFuture(); - std::move(fut).getAsync([outside = outside.share()](StatusWith<Widget> sw) mutable { + auto pf = makePromiseFuture<Widget>(); + std::move(fut).getAsync([outside = pf.promise.share()](StatusWith<Widget> sw) mutable { ASSERT_OK(sw); outside.emplaceValue(std::move(sw.getValue())); }); - ASSERT_EQ(std::move(outsideFut).get(), 1); + ASSERT_EQ(std::move(pf.future).get(), 1); }); } @@ -1327,13 +1315,12 @@ TEST(Future_MoveOnly, Fail_getNothrowRvalue) { TEST(Future_MoveOnly, Fail_getAsync) { FUTURE_FAIL_TEST<Widget>([](Future<Widget>&& fut) { - auto outside = Promise<Widget>(); - auto outsideFut = outside.getFuture(); - std::move(fut).getAsync([outside = outside.share()](StatusWith<Widget> sw) mutable { + auto pf = makePromiseFuture<Widget>(); + std::move(fut).getAsync([outside = pf.promise.share()](StatusWith<Widget> sw) mutable { ASSERT(!sw.isOK()); outside.setError(sw.getStatus()); }); - ASSERT_EQ(std::move(outsideFut).getNoThrow(), failStatus); + ASSERT_EQ(std::move(pf.future).getNoThrow(), failStatus); }); } @@ -1413,10 +1400,9 @@ TEST(Future_MoveOnly, Success_thenFutureReady) { [](Future<Widget>&& fut) { ASSERT_EQ(std::move(fut) .then([](Widget i) { - Promise<Widget> promise; - auto fut = promise.getFuture(); - promise.emplaceValue(i + 2); - return fut; + auto pf = makePromiseFuture<Widget>(); + pf.promise.emplaceValue(i + 2); + return std::move(pf.future); }) .get(), 3); @@ -1548,10 +1534,9 @@ TEST(Future_MoveOnly, Fail_onErrorFutureReady) { ASSERT_EQ(std::move(fut) .onError([](Status s) { ASSERT_EQ(s, failStatus); - Promise<Widget> promise; - auto fut = promise.getFuture(); - promise.emplaceValue(3); - return fut; + auto pf = makePromiseFuture<Widget>(); + pf.promise.emplaceValue(3); + return std::move(pf.future); }) .get(), 3); @@ -1760,119 +1745,119 @@ DEATH_TEST(Future_EdgeCases, Success_getAsync_throw, "terminate() called") { TEST(Promise, Success_setFrom) { FUTURE_SUCCESS_TEST([] { return 1; }, [](Future<int>&& fut) { - Promise<int> p; - p.setFrom(std::move(fut)); - ASSERT_EQ(p.getFuture().get(), 1); + auto pf = makePromiseFuture<int>(); + pf.promise.setFrom(std::move(fut)); + ASSERT_EQ(std::move(pf.future).get(), 1); }); } TEST(Promise, Fail_setFrom) { FUTURE_FAIL_TEST<int>([](Future<int>&& fut) { - Promise<int> p; - p.setFrom(std::move(fut)); - ASSERT_THROWS_failStatus(p.getFuture().get()); + auto pf = makePromiseFuture<int>(); + pf.promise.setFrom(std::move(fut)); + ASSERT_THROWS_failStatus(std::move(pf.future).get()); }); } TEST(Promise, Success_setWith_value) { - Promise<int> p; - p.setWith([&] { return 1; }); - ASSERT_EQ(p.getFuture().get(), 1); + auto pf = makePromiseFuture<int>(); + pf.promise.setWith([&] { return 1; }); + ASSERT_EQ(std::move(pf.future).get(), 1); } TEST(Promise, Fail_setWith_throw) { - Promise<int> p; - p.setWith([&] { + auto pf = makePromiseFuture<int>(); + pf.promise.setWith([&] { uassertStatusOK(failStatus); return 1; }); - ASSERT_THROWS_failStatus(p.getFuture().get()); + ASSERT_THROWS_failStatus(std::move(pf.future).get()); } TEST(Promise, Success_setWith_StatusWith) { - Promise<int> p; - p.setWith([&] { return StatusWith<int>(1); }); - ASSERT_EQ(p.getFuture().get(), 1); + auto pf = makePromiseFuture<int>(); + pf.promise.setWith([&] { return StatusWith<int>(1); }); + ASSERT_EQ(std::move(pf.future).get(), 1); } TEST(Promise, Fail_setWith_StatusWith) { - Promise<int> p; - p.setWith([&] { return StatusWith<int>(failStatus); }); - ASSERT_THROWS_failStatus(p.getFuture().get()); + auto pf = makePromiseFuture<int>(); + pf.promise.setWith([&] { return StatusWith<int>(failStatus); }); + ASSERT_THROWS_failStatus(std::move(pf.future).get()); } TEST(Promise, Success_setWith_Future) { FUTURE_SUCCESS_TEST([] { return 1; }, [](Future<int>&& fut) { - Promise<int> p; - p.setWith([&] { return std::move(fut); }); - ASSERT_EQ(p.getFuture().get(), 1); + auto pf = makePromiseFuture<int>(); + pf.promise.setWith([&] { return std::move(fut); }); + ASSERT_EQ(std::move(pf.future).get(), 1); }); } TEST(Promise, Fail_setWith_Future) { FUTURE_FAIL_TEST<int>([](Future<int>&& fut) { - Promise<int> p; - p.setWith([&] { return std::move(fut); }); - ASSERT_THROWS_failStatus(p.getFuture().get()); + auto pf = makePromiseFuture<int>(); + pf.promise.setWith([&] { return std::move(fut); }); + ASSERT_THROWS_failStatus(std::move(pf.future).get()); }); } TEST(Promise_void, Success_setFrom) { FUTURE_SUCCESS_TEST([] {}, [](Future<void>&& fut) { - Promise<void> p; - p.setFrom(std::move(fut)); - ASSERT_OK(p.getFuture().getNoThrow()); + auto pf = makePromiseFuture<void>(); + pf.promise.setFrom(std::move(fut)); + ASSERT_OK(std::move(pf.future).getNoThrow()); }); } TEST(Promise_void, Fail_setFrom) { FUTURE_FAIL_TEST<void>([](Future<void>&& fut) { - Promise<void> p; - p.setFrom(std::move(fut)); - ASSERT_THROWS_failStatus(p.getFuture().get()); + auto pf = makePromiseFuture<void>(); + pf.promise.setFrom(std::move(fut)); + ASSERT_THROWS_failStatus(std::move(pf.future).get()); }); } TEST(Promise_void, Success_setWith_value) { - Promise<void> p; - p.setWith([&] {}); - ASSERT_OK(p.getFuture().getNoThrow()); + auto pf = makePromiseFuture<void>(); + pf.promise.setWith([&] {}); + ASSERT_OK(std::move(pf.future).getNoThrow()); } TEST(Promise_void, Fail_setWith_throw) { - Promise<void> p; - p.setWith([&] { uassertStatusOK(failStatus); }); - ASSERT_THROWS_failStatus(p.getFuture().get()); + auto pf = makePromiseFuture<void>(); + pf.promise.setWith([&] { uassertStatusOK(failStatus); }); + ASSERT_THROWS_failStatus(std::move(pf.future).get()); } TEST(Promise_void, Success_setWith_Status) { - Promise<void> p; - p.setWith([&] { return Status::OK(); }); - ASSERT_OK(p.getFuture().getNoThrow()); + auto pf = makePromiseFuture<void>(); + pf.promise.setWith([&] { return Status::OK(); }); + ASSERT_OK(std::move(pf.future).getNoThrow()); } TEST(Promise_void, Fail_setWith_Status) { - Promise<void> p; - p.setWith([&] { return failStatus; }); - ASSERT_THROWS_failStatus(p.getFuture().get()); + auto pf = makePromiseFuture<void>(); + pf.promise.setWith([&] { return failStatus; }); + ASSERT_THROWS_failStatus(std::move(pf.future).get()); } TEST(Promise_void, Success_setWith_Future) { FUTURE_SUCCESS_TEST([] {}, [](Future<void>&& fut) { - Promise<void> p; - p.setWith([&] { return std::move(fut); }); - ASSERT_OK(p.getFuture().getNoThrow()); + auto pf = makePromiseFuture<void>(); + pf.promise.setWith([&] { return std::move(fut); }); + ASSERT_OK(std::move(pf.future).getNoThrow()); }); } TEST(Promise_void, Fail_setWith_Future) { FUTURE_FAIL_TEST<void>([](Future<void>&& fut) { - Promise<void> p; - p.setWith([&] { return std::move(fut); }); - ASSERT_THROWS_failStatus(p.getFuture().get()); + auto pf = makePromiseFuture<void>(); + pf.promise.setWith([&] { return std::move(fut); }); + ASSERT_THROWS_failStatus(std::move(pf.future).get()); }); } diff --git a/src/mongo/util/keyed_executor.h b/src/mongo/util/keyed_executor.h index c0d25110d44..ba0fc28ee87 100644 --- a/src/mongo/util/keyed_executor.h +++ b/src/mongo/util/keyed_executor.h @@ -157,6 +157,8 @@ public: promise.emplaceValue(); } + explicit Latch(Promise<void> p) : promise(std::move(p)) {} + Promise<void> promise; }; @@ -167,9 +169,10 @@ public: return Future<void>::makeReady(); } + auto pf = makePromiseFuture<void>(); // We rely on shard_ptr to handle the atomic refcounting before emplacing for us. - auto latch = std::make_shared<Latch>(); - auto future = latch->promise.getFuture(); + auto latch = std::make_shared<Latch>(std::move(pf.promise)); + auto future = std::move(pf.future); for (auto& pair : _map) { _onCleared(lk, pair.second).getAsync([latch](const Status& status) mutable { |