diff options
-rw-r--r-- | src/mongo/db/s/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/db/s/resharding/resharding_future_util.cpp | 33 | ||||
-rw-r--r-- | src/mongo/db/s/resharding/resharding_future_util_test.cpp | 100 |
3 files changed, 129 insertions, 5 deletions
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript index 08af4e461bd..fa4b7a7f1d6 100644 --- a/src/mongo/db/s/SConscript +++ b/src/mongo/db/s/SConscript @@ -569,6 +569,7 @@ env.CppUnitTest( 'resharding/resharding_donor_oplog_iterator_test.cpp', 'resharding/resharding_donor_recipient_common_test.cpp', 'resharding/resharding_donor_service_test.cpp', + 'resharding/resharding_future_util_test.cpp', 'resharding/resharding_metrics_new_test.cpp', 'resharding/resharding_metrics_test.cpp', 'resharding/resharding_oplog_applier_test.cpp', diff --git a/src/mongo/db/s/resharding/resharding_future_util.cpp b/src/mongo/db/s/resharding/resharding_future_util.cpp index 849a14bd80e..62c95fbaf3f 100644 --- a/src/mongo/db/s/resharding/resharding_future_util.cpp +++ b/src/mongo/db/s/resharding/resharding_future_util.cpp @@ -51,19 +51,42 @@ ExecutorFuture<void> whenAllSucceedOn(const std::vector<SharedSemiFuture<void>>& : ExecutorFuture(executor); } +std::vector<Future<void>> runAllInlineUnsafe(const std::vector<SharedSemiFuture<void>>& futures) { + std::vector<Future<void>> result; + result.reserve(futures.size()); + + for (const auto& future : futures) { + result.emplace_back(future.unsafeToInlineFuture()); + } + + return result; +} + ExecutorFuture<void> cancelWhenAnyErrorThenQuiesce( const std::vector<SharedSemiFuture<void>>& futures, ExecutorPtr executor, CancellationSource cancelSource) { - return whenAllSucceedOn(futures, executor) - .onError([futures, executor, cancelSource](Status originalError) mutable { + if (futures.empty()) { + return ExecutorFuture(executor); + } + // Run all futures inline so that the onError callback is called even if that error was caused + // by the executor shutting down. This causes the logic for whenAllSucceed, whenAll, and the + // onError callback to potentially run on the threads of the setters of the promises + // associated with the input futures. Since this logic is thread safe, not blocking, and does + // not acquire additional resources, this is safe, but beware if making further changes to this + // function. + return whenAllSucceed(runAllInlineUnsafe(futures)) + .unsafeToInlineFuture() + .onError([futures, cancelSource](Status originalError) mutable { cancelSource.cancel(); - return whenAll(thenRunAllOn(futures, executor)) + return whenAll(runAllInlineUnsafe(futures)) .ignoreValue() - .thenRunOn(executor) + .unsafeToInlineFuture() .onCompletion([originalError](auto) { return originalError; }); - }); + }) + .thenRunOn(executor); } + } // namespace mongo::resharding diff --git a/src/mongo/db/s/resharding/resharding_future_util_test.cpp b/src/mongo/db/s/resharding/resharding_future_util_test.cpp new file mode 100644 index 00000000000..e37a13a314b --- /dev/null +++ b/src/mongo/db/s/resharding/resharding_future_util_test.cpp @@ -0,0 +1,100 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/db/s/resharding/resharding_future_util.h" +#include "mongo/unittest/unittest.h" +#include "mongo/util/concurrency/thread_pool.h" + +namespace mongo { +namespace { +class ReshardingFutureUtilTest : public unittest::Test { +protected: + void setUp() override { + _executor = std::make_shared<ThreadPool>([]() { + ThreadPool::Options options; + options.maxThreads = 2; + return options; + }()); + _executor->startup(); + } + + void tearDown() override { + _executor->shutdown(); + _executor->join(); + } + + std::shared_ptr<ThreadPool> getExecutor() const { + return _executor; + } + +private: + std::shared_ptr<ThreadPool> _executor; +}; + +TEST_F(ReshardingFutureUtilTest, CancelWhenAnyErrorThenQuiesceDuringExecutorShutdown) { + CancellationSource cancelSource; + auto token = cancelSource.token(); + PromiseAndFuture<void> taskThreadsReady; + AtomicWord<int> tasksRunningCount{0}; + AtomicWord<bool> taskWasCancelled{false}; + auto checkSignalReady = [&]() { + auto running = tasksRunningCount.addAndFetch(1); + if (running == 2) { + taskThreadsReady.promise.emplaceValue(); + } + }; + PromiseAndFuture<void> executorShutDownTriggered; + auto quiesced = ExecutorFuture(getExecutor()).then([&]() { + return resharding::cancelWhenAnyErrorThenQuiesce( + {ExecutorFuture(getExecutor()) + .then([&]() { + checkSignalReady(); + executorShutDownTriggered.future.wait(); + uasserted(6791600, "Executor shut down"); + }) + .share(), + ExecutorFuture(getExecutor()) + .then([&]() { + checkSignalReady(); + token.onCancel().wait(); + taskWasCancelled.store(true); + }) + .share()}, + getExecutor(), + cancelSource); + }); + taskThreadsReady.future.wait(); + getExecutor()->shutdown(); + executorShutDownTriggered.promise.emplaceValue(); + auto status = quiesced.getNoThrow(); + ASSERT_EQ(status.code(), 6791600); + ASSERT_TRUE(taskWasCancelled.load()); +} +} // namespace +} // namespace mongo |