summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBrett Nawrocki <brett.nawrocki@mongodb.com>2022-08-13 16:42:04 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-09-13 20:17:04 +0000
commitfdf878f2d224ac2786fff8bafe003c2ef4cb5b1b (patch)
tree5b5284eab20f2d0391a3c791aa6eb44976625ae7
parent1451ddccfc4d24b3686a4851d17c82cbf3d7f35b (diff)
downloadmongo-fdf878f2d224ac2786fff8bafe003c2ef4cb5b1b.tar.gz
SERVER-67916 Fix semantics of cancelWhenAnyErrorThenQuiesce
(cherry picked from commit 8ea624563847736c94f0e500d3097557ab4d8315)
-rw-r--r--src/mongo/db/s/SConscript1
-rw-r--r--src/mongo/db/s/resharding/resharding_future_util.cpp33
-rw-r--r--src/mongo/db/s/resharding/resharding_future_util_test.cpp100
3 files changed, 129 insertions, 5 deletions
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript
index 08af4e461bd..fa4b7a7f1d6 100644
--- a/src/mongo/db/s/SConscript
+++ b/src/mongo/db/s/SConscript
@@ -569,6 +569,7 @@ env.CppUnitTest(
'resharding/resharding_donor_oplog_iterator_test.cpp',
'resharding/resharding_donor_recipient_common_test.cpp',
'resharding/resharding_donor_service_test.cpp',
+ 'resharding/resharding_future_util_test.cpp',
'resharding/resharding_metrics_new_test.cpp',
'resharding/resharding_metrics_test.cpp',
'resharding/resharding_oplog_applier_test.cpp',
diff --git a/src/mongo/db/s/resharding/resharding_future_util.cpp b/src/mongo/db/s/resharding/resharding_future_util.cpp
index 849a14bd80e..62c95fbaf3f 100644
--- a/src/mongo/db/s/resharding/resharding_future_util.cpp
+++ b/src/mongo/db/s/resharding/resharding_future_util.cpp
@@ -51,19 +51,42 @@ ExecutorFuture<void> whenAllSucceedOn(const std::vector<SharedSemiFuture<void>>&
: ExecutorFuture(executor);
}
+std::vector<Future<void>> runAllInlineUnsafe(const std::vector<SharedSemiFuture<void>>& futures) {
+ std::vector<Future<void>> result;
+ result.reserve(futures.size());
+
+ for (const auto& future : futures) {
+ result.emplace_back(future.unsafeToInlineFuture());
+ }
+
+ return result;
+}
+
ExecutorFuture<void> cancelWhenAnyErrorThenQuiesce(
const std::vector<SharedSemiFuture<void>>& futures,
ExecutorPtr executor,
CancellationSource cancelSource) {
- return whenAllSucceedOn(futures, executor)
- .onError([futures, executor, cancelSource](Status originalError) mutable {
+ if (futures.empty()) {
+ return ExecutorFuture(executor);
+ }
+ // Run all futures inline so that the onError callback is called even if that error was caused
+ // by the executor shutting down. This causes the logic for whenAllSucceed, whenAll, and the
+ // onError callback to potentially run on the threads of the setters of the promises
+ // associated with the input futures. Since this logic is thread safe, not blocking, and does
+ // not acquire additional resources, this is safe, but beware if making further changes to this
+ // function.
+ return whenAllSucceed(runAllInlineUnsafe(futures))
+ .unsafeToInlineFuture()
+ .onError([futures, cancelSource](Status originalError) mutable {
cancelSource.cancel();
- return whenAll(thenRunAllOn(futures, executor))
+ return whenAll(runAllInlineUnsafe(futures))
.ignoreValue()
- .thenRunOn(executor)
+ .unsafeToInlineFuture()
.onCompletion([originalError](auto) { return originalError; });
- });
+ })
+ .thenRunOn(executor);
}
+
} // namespace mongo::resharding
diff --git a/src/mongo/db/s/resharding/resharding_future_util_test.cpp b/src/mongo/db/s/resharding/resharding_future_util_test.cpp
new file mode 100644
index 00000000000..e37a13a314b
--- /dev/null
+++ b/src/mongo/db/s/resharding/resharding_future_util_test.cpp
@@ -0,0 +1,100 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/db/s/resharding/resharding_future_util.h"
+#include "mongo/unittest/unittest.h"
+#include "mongo/util/concurrency/thread_pool.h"
+
+namespace mongo {
+namespace {
+class ReshardingFutureUtilTest : public unittest::Test {
+protected:
+ void setUp() override {
+ _executor = std::make_shared<ThreadPool>([]() {
+ ThreadPool::Options options;
+ options.maxThreads = 2;
+ return options;
+ }());
+ _executor->startup();
+ }
+
+ void tearDown() override {
+ _executor->shutdown();
+ _executor->join();
+ }
+
+ std::shared_ptr<ThreadPool> getExecutor() const {
+ return _executor;
+ }
+
+private:
+ std::shared_ptr<ThreadPool> _executor;
+};
+
+TEST_F(ReshardingFutureUtilTest, CancelWhenAnyErrorThenQuiesceDuringExecutorShutdown) {
+ CancellationSource cancelSource;
+ auto token = cancelSource.token();
+ PromiseAndFuture<void> taskThreadsReady;
+ AtomicWord<int> tasksRunningCount{0};
+ AtomicWord<bool> taskWasCancelled{false};
+ auto checkSignalReady = [&]() {
+ auto running = tasksRunningCount.addAndFetch(1);
+ if (running == 2) {
+ taskThreadsReady.promise.emplaceValue();
+ }
+ };
+ PromiseAndFuture<void> executorShutDownTriggered;
+ auto quiesced = ExecutorFuture(getExecutor()).then([&]() {
+ return resharding::cancelWhenAnyErrorThenQuiesce(
+ {ExecutorFuture(getExecutor())
+ .then([&]() {
+ checkSignalReady();
+ executorShutDownTriggered.future.wait();
+ uasserted(6791600, "Executor shut down");
+ })
+ .share(),
+ ExecutorFuture(getExecutor())
+ .then([&]() {
+ checkSignalReady();
+ token.onCancel().wait();
+ taskWasCancelled.store(true);
+ })
+ .share()},
+ getExecutor(),
+ cancelSource);
+ });
+ taskThreadsReady.future.wait();
+ getExecutor()->shutdown();
+ executorShutDownTriggered.promise.emplaceValue();
+ auto status = quiesced.getNoThrow();
+ ASSERT_EQ(status.code(), 6791600);
+ ASSERT_TRUE(taskWasCancelled.load());
+}
+} // namespace
+} // namespace mongo