diff options
author | George Wangensteen <george.wangensteen@mongodb.com> | 2022-01-13 03:39:01 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-01-13 04:28:13 +0000 |
commit | 7aa8af0297f66f827d093b31de311174236c859a (patch) | |
tree | 74f1f2ce4480578407440ea54616441a10364d0c | |
parent | 09400b1f2c9cbc9df2daf85522d0c3cdfa2f86bf (diff) | |
download | mongo-7aa8af0297f66f827d093b31de311174236c859a.tar.gz |
SERVER-60059 Add a Mongo PackagedTask
-rw-r--r-- | src/mongo/base/error_codes.yml | 2 | ||||
-rw-r--r-- | src/mongo/util/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/util/future_util.h | 3 | ||||
-rw-r--r-- | src/mongo/util/packaged_task.h | 126 | ||||
-rw-r--r-- | src/mongo/util/packaged_task_test.cpp | 171 |
5 files changed, 300 insertions, 3 deletions
diff --git a/src/mongo/base/error_codes.yml b/src/mongo/base/error_codes.yml index 8d919914e73..3640e2c3775 100644 --- a/src/mongo/base/error_codes.yml +++ b/src/mongo/base/error_codes.yml @@ -462,6 +462,8 @@ error_codes: - {code: 360, name: ShardVersionRefreshCanceled, categories: [InternalOnly]} - {code: 361, name: CollectionUUIDMismatch, extra: CollectionUUIDMismatchInfo} + + - {code: 362, name: FutureAlreadyRetrieved} # Error codes 4000-8999 are reserved. diff --git a/src/mongo/util/SConscript b/src/mongo/util/SConscript index 3a2a26934ac..3998716f7f4 100644 --- a/src/mongo/util/SConscript +++ b/src/mongo/util/SConscript @@ -708,6 +708,7 @@ icuEnv.CppUnitTest( 'md5_test.cpp', 'md5main.cpp', 'out_of_line_executor_test.cpp', + 'packaged_task_test.cpp', 'periodic_runner_impl_test.cpp', 'processinfo_test.cpp', 'procparser_test.cpp' if env.TargetOSIs('linux') else [], diff --git a/src/mongo/util/future_util.h b/src/mongo/util/future_util.h index 34509cb2e4a..ea9e2127aa1 100644 --- a/src/mongo/util/future_util.h +++ b/src/mongo/util/future_util.h @@ -449,7 +449,6 @@ std::vector<T> variadicArgsToVector(U&&... elems) { (vector.push_back(std::forward<U>(elems)), ...); return vector; } - } // namespace future_util_details /** @@ -865,7 +864,5 @@ template <typename T, typename... Args> auto makeState(Args&&... args) { return AsyncState<T>::make(std::forward<Args>(args)...); } - } // namespace future_util - } // namespace mongo diff --git a/src/mongo/util/packaged_task.h b/src/mongo/util/packaged_task.h new file mode 100644 index 00000000000..3f21808f11f --- /dev/null +++ b/src/mongo/util/packaged_task.h @@ -0,0 +1,126 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ +#pragma once + +#include "mongo/base/error_codes.h" + +#include "mongo/stdx/type_traits.h" +#include "mongo/util/future.h" + +namespace mongo { +namespace packaged_task_detail { +/** + * SigHelper is a family of types helpful for deducing the type signature of the callable wrapped by + * a PackagedTask. + */ +template <typename> +struct SigHelper {}; +// Function Type +template <typename Ret, typename... Args> +struct SigHelper<Ret (*)(Args...)> : stdx::type_identity<Ret(Args...)> {}; +// Member Function Pointers +template <typename Class, typename Ret, typename... Args> +struct SigHelper<Ret (Class::*)(Args...)> : stdx::type_identity<Ret(Args...)> {}; +template <typename Class, typename Ret, typename... Args> +struct SigHelper<Ret (Class::*)(Args...)&> : stdx::type_identity<Ret(Args...)> {}; +template <typename Class, typename Ret, typename... Args> +struct SigHelper<Ret (Class::*)(Args...) const> : stdx::type_identity<Ret(Args...)> {}; +template <typename Class, typename Ret, typename... Args> +struct SigHelper<Ret (Class::*)(Args...) const&> : stdx::type_identity<Ret(Args...)> {}; + +template <typename T> +using getCallOperator = decltype(&T::operator()); + +template <typename T> +constexpr bool hasCallOperator = stdx::is_detected_v<getCallOperator, T>; + +template <typename Callable> +using SigFor = typename std::conditional_t<hasCallOperator<Callable>, + SigHelper<decltype(&Callable::operator())>, + SigHelper<Callable>>::type; +} // namespace packaged_task_detail + +/** + * A PackagedTask wraps anything Callable, but packages the return value of the Callable in a Future + * that can be accessed before the Callable is run. Construct a PackagedTask by giving it a + * Callable. Once the PackagedTask is constructed, you can extract a Future that will contain the + * result of running the packaged task. The PackagedTask can be invoked as if it was the Callable + * that it wraps. + */ +template <typename Sig> +class PackagedTask; +template <typename R, typename... Args> +class PackagedTask<R(Args...)> { + using ReturnType = FutureContinuationResult<unique_function<R(Args...)>, Args...>; + +public: + template <typename F> + explicit PackagedTask(F&& f) : _f(std::forward<F>(f)) {} + PackagedTask(const PackagedTask&) = delete; + PackagedTask& operator=(const PackagedTask&) = delete; + PackagedTask(PackagedTask&&) = default; + PackagedTask& operator=(PackagedTask&&) = default; + + /** + * Invokes the Callable wrapped by this PackagedTask. This can only be called once, as a + * PackagedTask produces at most one result obtained from running the wrapped Callable at most + * one time. It is invalid to call this more than once. + */ + void operator()(Args... args) { + _p.setWith([&] { return _f(std::forward<Args>(args)...); }); + } + + /** + * Returns a Future that represents the (possibly-deferred) result of the wrapped task. Because + * running the task will produce exactly one result, it is safe to call getFuture() at most once + * on any PackagedTask; subsequent calls will throw a DBException set with + * ErrorCodes::FutureAlreadyRetrieved. + */ + Future<ReturnType> getFuture() { + if (_futureExtracted) { + iasserted(ErrorCodes::FutureAlreadyRetrieved, + "Attempted to extract more than one future from a PackagedTask"); + } + _futureExtracted = true; + return std::move(_fut); + } + +private: + unique_function<R(Args...)> _f; + Promise<ReturnType> _p{NonNullPromiseTag{}}; + Future<ReturnType> _fut{_p.getFuture()}; + bool _futureExtracted{false}; +}; + +template <typename F, typename Sig = packaged_task_detail::SigFor<F>> +PackagedTask(F&& f)->PackagedTask<Sig>; + +template <typename R, typename... Args> +PackagedTask(R (*)(Args...))->PackagedTask<R(Args...)>; +} // namespace mongo diff --git a/src/mongo/util/packaged_task_test.cpp b/src/mongo/util/packaged_task_test.cpp new file mode 100644 index 00000000000..b3af9376c25 --- /dev/null +++ b/src/mongo/util/packaged_task_test.cpp @@ -0,0 +1,171 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include <fmt/format.h> + +#include "mongo/util/packaged_task.h" + +#include "mongo/base/error_codes.h" +#include "mongo/unittest/bson_test_util.h" +#include "mongo/unittest/unittest.h" +#include "mongo/util/future.h" + +namespace mongo { +namespace { +using namespace fmt::literals; + +TEST(PackagedTaskTest, LambdaTaskNoArgs) { + auto packagedGetBSON = PackagedTask([] { return BSON("x" << 42); }); + auto bsonFut = packagedGetBSON.getFuture(); + ASSERT_FALSE(bsonFut.isReady()); + packagedGetBSON(); + ASSERT_BSONOBJ_EQ(bsonFut.get(), BSON("x" << 42)); +} + +TEST(PackagedTaskTest, LambdaTaskOneArg) { + auto packagedHelloName = + PackagedTask([](std::string name) { return "Hello, {}!"_format(name); }); + auto helloFuture = packagedHelloName.getFuture(); + ASSERT_FALSE(helloFuture.isReady()); + packagedHelloName("George"); + ASSERT_EQ(std::move(helloFuture).get(), "Hello, George!"); +} + +TEST(PackagedTaskTest, LambdaTaskMultipleArgs) { + auto packagedPrintNameAndAge = PackagedTask( + [](std::string name, int age) { return "{} is {} years old!"_format(name, age); }); + auto nameAndAgeFut = packagedPrintNameAndAge.getFuture(); + ASSERT_FALSE(nameAndAgeFut.isReady()); + packagedPrintNameAndAge("George", 24); + ASSERT_EQ(std::move(nameAndAgeFut).get(), "George is 24 years old!"); +} + +TEST(PackagedTaskTest, UniqueFunctionTaskNoArgs) { + unique_function<BSONObj()> fn = [] { return BSON("x" << 42); }; + auto packagedGetBSON = PackagedTask(std::move(fn)); + auto bsonFut = packagedGetBSON.getFuture(); + ASSERT_FALSE(bsonFut.isReady()); + packagedGetBSON(); + ASSERT_BSONOBJ_EQ(bsonFut.get(), BSON("x" << 42)); +} + +TEST(PackagedTaskTest, UniqueFunctionTaskOneArg) { + unique_function<std::string(std::string)> fn = [](std::string name) { + return "Hello, {}!"_format(name); + }; + auto packagedHelloName = PackagedTask(std::move(fn)); + auto helloFuture = packagedHelloName.getFuture(); + ASSERT_FALSE(helloFuture.isReady()); + packagedHelloName("George"); + ASSERT_EQ(std::move(helloFuture).get(), "Hello, George!"); +} + +TEST(PackagedTaskTest, UniqueFunctionTaskMultipleArgs) { + unique_function<std::string(std::string, int)> fn = [](std::string name, int age) { + return "{} is {} years old!"_format(name, age); + }; + auto packagedPrintNameAndAge = PackagedTask(std::move(fn)); + auto nameAndAgeFut = packagedPrintNameAndAge.getFuture(); + ASSERT_FALSE(nameAndAgeFut.isReady()); + packagedPrintNameAndAge("George", 24); + ASSERT_EQ(std::move(nameAndAgeFut).get(), "George is 24 years old!"); +} + +TEST(PackagedTaskTest, FunctionPointerTaskNoArgs) { + auto getNumPackagedTask = PackagedTask(+[] { return 42; }); + auto getNumFut = getNumPackagedTask.getFuture(); + ASSERT_FALSE(getNumFut.isReady()); + getNumPackagedTask(); + ASSERT_EQ(std::move(getNumFut).get(), 42); +} + +TEST(PackagedTaskTest, FunctionPointerTaskOneArg) { + auto getNumPlusNPackagedTask = PackagedTask(+[](int n) { return 42 + n; }); + auto getNumPlusNFut = getNumPlusNPackagedTask.getFuture(); + ASSERT_FALSE(getNumPlusNFut.isReady()); + getNumPlusNPackagedTask(2); + ASSERT_EQ(std::move(getNumPlusNFut).get(), 44); +} + +TEST(PackagedTaskTest, FunctionPointerTaskMultipleArgs) { + auto getSumPackagedTask = PackagedTask(+[](int x, int y) { return x + y; }); + auto getSumFut = getSumPackagedTask.getFuture(); + ASSERT_FALSE(getSumFut.isReady()); + getSumPackagedTask(2, 6); + ASSERT_EQ(std::move(getSumFut).get(), 8); +} + +TEST(PackagedTaskTest, FutureReturningTaskNoArgument) { + auto [p, f] = makePromiseFuture<std::string>(); + auto greeting = PackagedTask([greetWordFut = std::move(f)]() mutable { + return std::move(greetWordFut).then([](std::string greetWord) { + return "{}George"_format(greetWord); + }); + }); + auto greetingFut = greeting.getFuture(); + ASSERT_FALSE(greetingFut.isReady()); + greeting(); + ASSERT_FALSE(greetingFut.isReady()); + p.emplaceValue("Aloha, "); + ASSERT_EQ(std::move(greetingFut).get(), "Aloha, George"); +} + +TEST(PackagedTaskTest, FutureReturningTaskWithArgument) { + auto [p, f] = makePromiseFuture<std::string>(); + auto greetingWithName = PackagedTask([greetingFut = std::move(f)](std::string name) mutable { + return std::move(greetingFut).then([name = std::move(name)](std::string greeting) { + return "{}{}"_format(greeting, name); + }); + }); + auto greetingWithNameFut = greetingWithName.getFuture(); + ASSERT_FALSE(greetingWithNameFut.isReady()); + greetingWithName("George"); + ASSERT_FALSE(greetingWithNameFut.isReady()); + p.emplaceValue("Aloha, "); + ASSERT_EQ(std::move(greetingWithNameFut).get(), "Aloha, George"); +} + +TEST(PackagedTaskTest, CanOnlyExtractOneFuture) { + auto packagedHelloWorld = PackagedTask([] { return "Hello, World!"; }); + auto future = packagedHelloWorld.getFuture(); + ASSERT_THROWS_CODE( + packagedHelloWorld.getFuture(), DBException, ErrorCodes::FutureAlreadyRetrieved); +} + +TEST(PackagedTaskTest, BreaksPromiseIfNeverRun) { + Future<const char*> fut = [&] { + auto packagedHelloWorld = PackagedTask([] { return "Hello, World!"; }); + auto fut = packagedHelloWorld.getFuture(); + ASSERT_FALSE(fut.isReady()); + return fut; + }(); + ASSERT_THROWS_CODE(fut.get(), DBException, ErrorCodes::BrokenPromise); +} +} // namespace +} // namespace mongo |