summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGeorge Wangensteen <george.wangensteen@mongodb.com>2022-01-13 03:39:01 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-01-13 04:28:13 +0000
commit7aa8af0297f66f827d093b31de311174236c859a (patch)
tree74f1f2ce4480578407440ea54616441a10364d0c
parent09400b1f2c9cbc9df2daf85522d0c3cdfa2f86bf (diff)
downloadmongo-7aa8af0297f66f827d093b31de311174236c859a.tar.gz
SERVER-60059 Add a Mongo PackagedTask
-rw-r--r--src/mongo/base/error_codes.yml2
-rw-r--r--src/mongo/util/SConscript1
-rw-r--r--src/mongo/util/future_util.h3
-rw-r--r--src/mongo/util/packaged_task.h126
-rw-r--r--src/mongo/util/packaged_task_test.cpp171
5 files changed, 300 insertions, 3 deletions
diff --git a/src/mongo/base/error_codes.yml b/src/mongo/base/error_codes.yml
index 8d919914e73..3640e2c3775 100644
--- a/src/mongo/base/error_codes.yml
+++ b/src/mongo/base/error_codes.yml
@@ -462,6 +462,8 @@ error_codes:
- {code: 360, name: ShardVersionRefreshCanceled, categories: [InternalOnly]}
- {code: 361, name: CollectionUUIDMismatch, extra: CollectionUUIDMismatchInfo}
+
+ - {code: 362, name: FutureAlreadyRetrieved}
# Error codes 4000-8999 are reserved.
diff --git a/src/mongo/util/SConscript b/src/mongo/util/SConscript
index 3a2a26934ac..3998716f7f4 100644
--- a/src/mongo/util/SConscript
+++ b/src/mongo/util/SConscript
@@ -708,6 +708,7 @@ icuEnv.CppUnitTest(
'md5_test.cpp',
'md5main.cpp',
'out_of_line_executor_test.cpp',
+ 'packaged_task_test.cpp',
'periodic_runner_impl_test.cpp',
'processinfo_test.cpp',
'procparser_test.cpp' if env.TargetOSIs('linux') else [],
diff --git a/src/mongo/util/future_util.h b/src/mongo/util/future_util.h
index 34509cb2e4a..ea9e2127aa1 100644
--- a/src/mongo/util/future_util.h
+++ b/src/mongo/util/future_util.h
@@ -449,7 +449,6 @@ std::vector<T> variadicArgsToVector(U&&... elems) {
(vector.push_back(std::forward<U>(elems)), ...);
return vector;
}
-
} // namespace future_util_details
/**
@@ -865,7 +864,5 @@ template <typename T, typename... Args>
auto makeState(Args&&... args) {
return AsyncState<T>::make(std::forward<Args>(args)...);
}
-
} // namespace future_util
-
} // namespace mongo
diff --git a/src/mongo/util/packaged_task.h b/src/mongo/util/packaged_task.h
new file mode 100644
index 00000000000..3f21808f11f
--- /dev/null
+++ b/src/mongo/util/packaged_task.h
@@ -0,0 +1,126 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+#pragma once
+
+#include "mongo/base/error_codes.h"
+
+#include "mongo/stdx/type_traits.h"
+#include "mongo/util/future.h"
+
+namespace mongo {
+namespace packaged_task_detail {
+/**
+ * SigHelper is a family of types helpful for deducing the type signature of the callable wrapped by
+ * a PackagedTask.
+ */
+template <typename>
+struct SigHelper {};
+// Function Type
+template <typename Ret, typename... Args>
+struct SigHelper<Ret (*)(Args...)> : stdx::type_identity<Ret(Args...)> {};
+// Member Function Pointers
+template <typename Class, typename Ret, typename... Args>
+struct SigHelper<Ret (Class::*)(Args...)> : stdx::type_identity<Ret(Args...)> {};
+template <typename Class, typename Ret, typename... Args>
+struct SigHelper<Ret (Class::*)(Args...)&> : stdx::type_identity<Ret(Args...)> {};
+template <typename Class, typename Ret, typename... Args>
+struct SigHelper<Ret (Class::*)(Args...) const> : stdx::type_identity<Ret(Args...)> {};
+template <typename Class, typename Ret, typename... Args>
+struct SigHelper<Ret (Class::*)(Args...) const&> : stdx::type_identity<Ret(Args...)> {};
+
+template <typename T>
+using getCallOperator = decltype(&T::operator());
+
+template <typename T>
+constexpr bool hasCallOperator = stdx::is_detected_v<getCallOperator, T>;
+
+template <typename Callable>
+using SigFor = typename std::conditional_t<hasCallOperator<Callable>,
+ SigHelper<decltype(&Callable::operator())>,
+ SigHelper<Callable>>::type;
+} // namespace packaged_task_detail
+
+/**
+ * A PackagedTask wraps anything Callable, but packages the return value of the Callable in a Future
+ * that can be accessed before the Callable is run. Construct a PackagedTask by giving it a
+ * Callable. Once the PackagedTask is constructed, you can extract a Future that will contain the
+ * result of running the packaged task. The PackagedTask can be invoked as if it was the Callable
+ * that it wraps.
+ */
+template <typename Sig>
+class PackagedTask;
+template <typename R, typename... Args>
+class PackagedTask<R(Args...)> {
+ using ReturnType = FutureContinuationResult<unique_function<R(Args...)>, Args...>;
+
+public:
+ template <typename F>
+ explicit PackagedTask(F&& f) : _f(std::forward<F>(f)) {}
+ PackagedTask(const PackagedTask&) = delete;
+ PackagedTask& operator=(const PackagedTask&) = delete;
+ PackagedTask(PackagedTask&&) = default;
+ PackagedTask& operator=(PackagedTask&&) = default;
+
+ /**
+ * Invokes the Callable wrapped by this PackagedTask. This can only be called once, as a
+ * PackagedTask produces at most one result obtained from running the wrapped Callable at most
+ * one time. It is invalid to call this more than once.
+ */
+ void operator()(Args... args) {
+ _p.setWith([&] { return _f(std::forward<Args>(args)...); });
+ }
+
+ /**
+ * Returns a Future that represents the (possibly-deferred) result of the wrapped task. Because
+ * running the task will produce exactly one result, it is safe to call getFuture() at most once
+ * on any PackagedTask; subsequent calls will throw a DBException set with
+ * ErrorCodes::FutureAlreadyRetrieved.
+ */
+ Future<ReturnType> getFuture() {
+ if (_futureExtracted) {
+ iasserted(ErrorCodes::FutureAlreadyRetrieved,
+ "Attempted to extract more than one future from a PackagedTask");
+ }
+ _futureExtracted = true;
+ return std::move(_fut);
+ }
+
+private:
+ unique_function<R(Args...)> _f;
+ Promise<ReturnType> _p{NonNullPromiseTag{}};
+ Future<ReturnType> _fut{_p.getFuture()};
+ bool _futureExtracted{false};
+};
+
+template <typename F, typename Sig = packaged_task_detail::SigFor<F>>
+PackagedTask(F&& f)->PackagedTask<Sig>;
+
+template <typename R, typename... Args>
+PackagedTask(R (*)(Args...))->PackagedTask<R(Args...)>;
+} // namespace mongo
diff --git a/src/mongo/util/packaged_task_test.cpp b/src/mongo/util/packaged_task_test.cpp
new file mode 100644
index 00000000000..b3af9376c25
--- /dev/null
+++ b/src/mongo/util/packaged_task_test.cpp
@@ -0,0 +1,171 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include <fmt/format.h>
+
+#include "mongo/util/packaged_task.h"
+
+#include "mongo/base/error_codes.h"
+#include "mongo/unittest/bson_test_util.h"
+#include "mongo/unittest/unittest.h"
+#include "mongo/util/future.h"
+
+namespace mongo {
+namespace {
+using namespace fmt::literals;
+
+TEST(PackagedTaskTest, LambdaTaskNoArgs) {
+ auto packagedGetBSON = PackagedTask([] { return BSON("x" << 42); });
+ auto bsonFut = packagedGetBSON.getFuture();
+ ASSERT_FALSE(bsonFut.isReady());
+ packagedGetBSON();
+ ASSERT_BSONOBJ_EQ(bsonFut.get(), BSON("x" << 42));
+}
+
+TEST(PackagedTaskTest, LambdaTaskOneArg) {
+ auto packagedHelloName =
+ PackagedTask([](std::string name) { return "Hello, {}!"_format(name); });
+ auto helloFuture = packagedHelloName.getFuture();
+ ASSERT_FALSE(helloFuture.isReady());
+ packagedHelloName("George");
+ ASSERT_EQ(std::move(helloFuture).get(), "Hello, George!");
+}
+
+TEST(PackagedTaskTest, LambdaTaskMultipleArgs) {
+ auto packagedPrintNameAndAge = PackagedTask(
+ [](std::string name, int age) { return "{} is {} years old!"_format(name, age); });
+ auto nameAndAgeFut = packagedPrintNameAndAge.getFuture();
+ ASSERT_FALSE(nameAndAgeFut.isReady());
+ packagedPrintNameAndAge("George", 24);
+ ASSERT_EQ(std::move(nameAndAgeFut).get(), "George is 24 years old!");
+}
+
+TEST(PackagedTaskTest, UniqueFunctionTaskNoArgs) {
+ unique_function<BSONObj()> fn = [] { return BSON("x" << 42); };
+ auto packagedGetBSON = PackagedTask(std::move(fn));
+ auto bsonFut = packagedGetBSON.getFuture();
+ ASSERT_FALSE(bsonFut.isReady());
+ packagedGetBSON();
+ ASSERT_BSONOBJ_EQ(bsonFut.get(), BSON("x" << 42));
+}
+
+TEST(PackagedTaskTest, UniqueFunctionTaskOneArg) {
+ unique_function<std::string(std::string)> fn = [](std::string name) {
+ return "Hello, {}!"_format(name);
+ };
+ auto packagedHelloName = PackagedTask(std::move(fn));
+ auto helloFuture = packagedHelloName.getFuture();
+ ASSERT_FALSE(helloFuture.isReady());
+ packagedHelloName("George");
+ ASSERT_EQ(std::move(helloFuture).get(), "Hello, George!");
+}
+
+TEST(PackagedTaskTest, UniqueFunctionTaskMultipleArgs) {
+ unique_function<std::string(std::string, int)> fn = [](std::string name, int age) {
+ return "{} is {} years old!"_format(name, age);
+ };
+ auto packagedPrintNameAndAge = PackagedTask(std::move(fn));
+ auto nameAndAgeFut = packagedPrintNameAndAge.getFuture();
+ ASSERT_FALSE(nameAndAgeFut.isReady());
+ packagedPrintNameAndAge("George", 24);
+ ASSERT_EQ(std::move(nameAndAgeFut).get(), "George is 24 years old!");
+}
+
+TEST(PackagedTaskTest, FunctionPointerTaskNoArgs) {
+ auto getNumPackagedTask = PackagedTask(+[] { return 42; });
+ auto getNumFut = getNumPackagedTask.getFuture();
+ ASSERT_FALSE(getNumFut.isReady());
+ getNumPackagedTask();
+ ASSERT_EQ(std::move(getNumFut).get(), 42);
+}
+
+TEST(PackagedTaskTest, FunctionPointerTaskOneArg) {
+ auto getNumPlusNPackagedTask = PackagedTask(+[](int n) { return 42 + n; });
+ auto getNumPlusNFut = getNumPlusNPackagedTask.getFuture();
+ ASSERT_FALSE(getNumPlusNFut.isReady());
+ getNumPlusNPackagedTask(2);
+ ASSERT_EQ(std::move(getNumPlusNFut).get(), 44);
+}
+
+TEST(PackagedTaskTest, FunctionPointerTaskMultipleArgs) {
+ auto getSumPackagedTask = PackagedTask(+[](int x, int y) { return x + y; });
+ auto getSumFut = getSumPackagedTask.getFuture();
+ ASSERT_FALSE(getSumFut.isReady());
+ getSumPackagedTask(2, 6);
+ ASSERT_EQ(std::move(getSumFut).get(), 8);
+}
+
+TEST(PackagedTaskTest, FutureReturningTaskNoArgument) {
+ auto [p, f] = makePromiseFuture<std::string>();
+ auto greeting = PackagedTask([greetWordFut = std::move(f)]() mutable {
+ return std::move(greetWordFut).then([](std::string greetWord) {
+ return "{}George"_format(greetWord);
+ });
+ });
+ auto greetingFut = greeting.getFuture();
+ ASSERT_FALSE(greetingFut.isReady());
+ greeting();
+ ASSERT_FALSE(greetingFut.isReady());
+ p.emplaceValue("Aloha, ");
+ ASSERT_EQ(std::move(greetingFut).get(), "Aloha, George");
+}
+
+TEST(PackagedTaskTest, FutureReturningTaskWithArgument) {
+ auto [p, f] = makePromiseFuture<std::string>();
+ auto greetingWithName = PackagedTask([greetingFut = std::move(f)](std::string name) mutable {
+ return std::move(greetingFut).then([name = std::move(name)](std::string greeting) {
+ return "{}{}"_format(greeting, name);
+ });
+ });
+ auto greetingWithNameFut = greetingWithName.getFuture();
+ ASSERT_FALSE(greetingWithNameFut.isReady());
+ greetingWithName("George");
+ ASSERT_FALSE(greetingWithNameFut.isReady());
+ p.emplaceValue("Aloha, ");
+ ASSERT_EQ(std::move(greetingWithNameFut).get(), "Aloha, George");
+}
+
+TEST(PackagedTaskTest, CanOnlyExtractOneFuture) {
+ auto packagedHelloWorld = PackagedTask([] { return "Hello, World!"; });
+ auto future = packagedHelloWorld.getFuture();
+ ASSERT_THROWS_CODE(
+ packagedHelloWorld.getFuture(), DBException, ErrorCodes::FutureAlreadyRetrieved);
+}
+
+TEST(PackagedTaskTest, BreaksPromiseIfNeverRun) {
+ Future<const char*> fut = [&] {
+ auto packagedHelloWorld = PackagedTask([] { return "Hello, World!"; });
+ auto fut = packagedHelloWorld.getFuture();
+ ASSERT_FALSE(fut.isReady());
+ return fut;
+ }();
+ ASSERT_THROWS_CODE(fut.get(), DBException, ErrorCodes::BrokenPromise);
+}
+} // namespace
+} // namespace mongo