diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/SConscript | 22 | ||||
-rw-r--r-- | src/mongo/db/client_out_of_line_executor.cpp | 111 | ||||
-rw-r--r-- | src/mongo/db/client_out_of_line_executor.h | 89 | ||||
-rw-r--r-- | src/mongo/db/client_out_of_line_executor_test.cpp | 171 | ||||
-rw-r--r-- | src/mongo/db/mirror_maestro.cpp | 61 | ||||
-rw-r--r-- | src/mongo/util/producer_consumer_queue.h | 1 | ||||
-rw-r--r-- | src/mongo/util/producer_consumer_queue_test.cpp | 1 |
7 files changed, 442 insertions, 14 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index 9eec9eb1fe7..90f4c489508 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -494,6 +494,17 @@ env.Clone().InjectModule("enterprise").Library( ) env.Library( + target='client_out_of_line_executor', + source=[ + 'client_out_of_line_executor.cpp', + ], + LIBDEPS=[ + '$BUILD_DIR/mongo/base', + '$BUILD_DIR/mongo/db/service_context', + ], +) + +env.Library( target='mirror_maestro', source=[ 'mirror_maestro.cpp', @@ -504,6 +515,7 @@ env.Library( '$BUILD_DIR/mongo/base', '$BUILD_DIR/mongo/executor/task_executor_interface', "$BUILD_DIR/mongo/rpc/protocol", + 'client_out_of_line_executor', 'service_context', "$BUILD_DIR/mongo/util/net/network", ], @@ -520,6 +532,16 @@ env.Library( ], ) +env.CppUnitTest( + target='client_out_of_line_executor_test', + source=[ + 'client_out_of_line_executor_test.cpp', + ], + LIBDEPS=[ + 'client_out_of_line_executor', + ], +) + env.Library( target="commands", source=[ diff --git a/src/mongo/db/client_out_of_line_executor.cpp b/src/mongo/db/client_out_of_line_executor.cpp new file mode 100644 index 00000000000..2fd1787a7f1 --- /dev/null +++ b/src/mongo/db/client_out_of_line_executor.cpp @@ -0,0 +1,111 @@ +/** + * Copyright (C) 2020-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kDefault + +#include "mongo/db/client_out_of_line_executor.h" + +#include "mongo/base/error_codes.h" +#include "mongo/base/status.h" +#include "mongo/logger/log_severity_limiter.h" +#include "mongo/logv2/log.h" +#include "mongo/util/scopeguard.h" + +namespace mongo { + +ClientOutOfLineExecutor::ClientOutOfLineExecutor() noexcept { + _taskQueue = std::make_shared<QueueType>(); +} + +ClientOutOfLineExecutor::~ClientOutOfLineExecutor() noexcept { + // Force producers to consume their tasks beyond this point. + _taskQueue->closeProducerEnd(); + + // Only call `tryPop()` when there's a task to pop to avoid lifetime issues with logging. + auto toCollect = _taskQueue->getStats().queueDepth; + while (toCollect) { + auto task = _taskQueue->tryPop(); + invariant(task); + (*task)(Status(ErrorCodes::ClientDisconnect, "Client's executor has stopped")); + toCollect--; + } + invariant(toCollect == 0); +} + +static const Client::Decoration<ClientOutOfLineExecutor> getClientExecutor = + Client::declareDecoration<ClientOutOfLineExecutor>(); + +ClientOutOfLineExecutor* ClientOutOfLineExecutor::get(const Client* client) noexcept { + return const_cast<ClientOutOfLineExecutor*>(&getClientExecutor(client)); +} + +void ClientOutOfLineExecutor::schedule(Task task) { + _taskQueue->push(std::move(task)); +} + +void ClientOutOfLineExecutor::consumeAllTasks() noexcept { + // This limit allows logging incidents that the executor consumes too much of the client's + // time running scheduled tasks. The value is only used for debugging, and should be an + // approximation of the acceptable overhead in the context of normal client operations. + static constexpr auto kTimeLimit = Microseconds(30); + + _stopWatch.restart(); + + while (auto maybeTask = _taskQueue->tryPop()) { + auto task = std::move(*maybeTask); + task(Status::OK()); + } + + auto elapsed = _stopWatch.elapsed(); + + auto severity = MONGO_GET_LIMITED_SEVERITY(this, Seconds{1}, 0, 2); + if (MONGO_unlikely(elapsed > kTimeLimit)) { + LOGV2_DEBUG(4651401, + logSeverityV1toV2(severity).toInt(), + "Client's executor exceeded time limit", + "elapsed"_attr = elapsed, + "limit"_attr = kTimeLimit); + } +} + +void ClientOutOfLineExecutor::QueueHandle::schedule(Task&& task) { + auto guard = makeGuard( + [&task] { task(Status(ErrorCodes::CallbackCanceled, "Client no longer exists")); }); + + if (auto queue = _weakQueue.lock()) { + try { + queue->push(std::move(task)); + guard.dismiss(); + } catch (const ExceptionFor<ErrorCodes::ProducerConsumerQueueEndClosed>&) { + // The destructor for the out-of-line executor has already been called. + } + } +} + +} // namespace mongo diff --git a/src/mongo/db/client_out_of_line_executor.h b/src/mongo/db/client_out_of_line_executor.h new file mode 100644 index 00000000000..7146b11c1df --- /dev/null +++ b/src/mongo/db/client_out_of_line_executor.h @@ -0,0 +1,89 @@ +/** + * Copyright (C) 2020-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <functional> +#include <memory> + +#include "mongo/db/client.h" +#include "mongo/util/clock_source.h" +#include "mongo/util/out_of_line_executor.h" +#include "mongo/util/producer_consumer_queue.h" + +namespace mongo { + +/** + * A client decoration that allows out-of-line, client-local execution of tasks. + * So long as the client thread exists, tasks can be scheduled to execute on the client thread. + * Other threads must always schedule tasks through an instance of `QueueHandle`. + * Once the decoration is destructed, `QueueHandle` runs tasks on the caller's thread, providing + * `ErrorCodes::CallbackCanceled` as the status. + */ +class ClientOutOfLineExecutor final : public OutOfLineExecutor { +public: + ClientOutOfLineExecutor() noexcept; + + ~ClientOutOfLineExecutor() noexcept; + + static ClientOutOfLineExecutor* get(const Client*) noexcept; + + using Task = OutOfLineExecutor::Task; + + void schedule(Task) override; + + // Blocks until the executor is done running all scheduled tasks. + void consumeAllTasks() noexcept; + + using QueueType = MultiProducerSingleConsumerQueue<Task>; + + // Allows other threads to access the queue irrespective of the client's lifetime. + class QueueHandle final { + public: + QueueHandle() = default; + + QueueHandle(const std::shared_ptr<QueueType>& queue) : _weakQueue(queue) {} + + void schedule(Task&&); + + private: + std::weak_ptr<QueueType> _weakQueue; + }; + + auto getHandle() noexcept { + return QueueHandle(_taskQueue); + } + +private: + std::shared_ptr<QueueType> _taskQueue; + + ClockSource::StopWatch _stopWatch; +}; + +} // namespace mongo diff --git a/src/mongo/db/client_out_of_line_executor_test.cpp b/src/mongo/db/client_out_of_line_executor_test.cpp new file mode 100644 index 00000000000..cffeb633c87 --- /dev/null +++ b/src/mongo/db/client_out_of_line_executor_test.cpp @@ -0,0 +1,171 @@ +/** + * Copyright (C) 2020-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include <memory> + +#include "mongo/db/client.h" +#include "mongo/db/client_out_of_line_executor.h" +#include "mongo/db/service_context.h" +#include "mongo/platform/atomic_word.h" +#include "mongo/stdx/thread.h" +#include "mongo/unittest/barrier.h" +#include "mongo/unittest/unittest.h" + +namespace mongo { +namespace { + +/** + * The following implements a pseudo garbage collector to test client's out-of-line executor. + */ +class ClientOutOfLineExecutorTest : public unittest::Test { +public: + class DummyInstance { + public: + DummyInstance() = delete; + + DummyInstance(ClientOutOfLineExecutorTest* parent) : _parent(parent) { + _parent->_instanceCount.fetchAndAdd(1); + } + + DummyInstance(DummyInstance&& other) : _parent(std::move(other._parent)) { + _parent->_instanceCount.fetchAndAdd(1); + } + + DummyInstance(const DummyInstance& other) { + MONGO_UNREACHABLE; + } + + ~DummyInstance() { + _parent->_instanceCount.fetchAndAdd(-1); + } + + private: + ClientOutOfLineExecutorTest* _parent; + }; + + void setUp() override { + setGlobalServiceContext(ServiceContext::make()); + Client::initThread(kClientThreadName); + _instanceCount.store(0); + } + + void tearDown() override { + auto client = Client::releaseCurrent(); + client.reset(nullptr); + } + + auto getDecoration() noexcept { + return ClientOutOfLineExecutor::get(Client::getCurrent()); + } + + int countDummies() const noexcept { + return _instanceCount.load(); + } + + static constexpr auto kClientThreadName = "ClientOutOfLineExecutorTest"_sd; + +private: + friend class DummyInstance; + + AtomicWord<int> _instanceCount; +}; + +TEST_F(ClientOutOfLineExecutorTest, CheckDecoration) { + auto decoration = getDecoration(); + ASSERT(decoration); +} + +TEST_F(ClientOutOfLineExecutorTest, ScheduleAndExecute) { + auto thread = stdx::thread([this, handle = getDecoration()->getHandle()]() mutable { + DummyInstance dummy(this); + handle.schedule([dummy = std::move(dummy)](const Status& status) { + ASSERT_OK(status); + ASSERT_EQ(getThreadName(), ClientOutOfLineExecutorTest::kClientThreadName); + }); + }); + thread.join(); + ASSERT_EQ(countDummies(), 1); + + getDecoration()->consumeAllTasks(); + ASSERT_EQ(countDummies(), 0); +} + +TEST_F(ClientOutOfLineExecutorTest, DestructorExecutesLeftovers) { + const auto kDummiesCount = 8; + unittest::Barrier b1(2), b2(2); + + auto thread = stdx::thread([this, kDummiesCount, b1 = &b1, b2 = &b2]() { + Client::initThread("ThreadWithLeftovers"_sd); + + auto handle = ClientOutOfLineExecutor::get(Client::getCurrent())->getHandle(); + for (auto i = 0; i < kDummiesCount; i++) { + DummyInstance dummy(this); + handle.schedule([dummy = std::move(dummy), + threadId = stdx::this_thread::get_id()](const Status& status) { + ASSERT(status == ErrorCodes::ClientDisconnect); + // Avoid using `getThreadName()` here as it'll cause read-after-delete errors. + ASSERT_EQ(threadId, stdx::this_thread::get_id()); + }); + } + + b1->countDownAndWait(); + // Wait for the main thread to count dummies. + b2->countDownAndWait(); + }); + + b1.countDownAndWait(); + ASSERT_EQ(countDummies(), kDummiesCount); + b2.countDownAndWait(); + + thread.join(); + ASSERT_EQ(countDummies(), 0); +} + +TEST_F(ClientOutOfLineExecutorTest, ScheduleAfterClientThreadReturns) { + ClientOutOfLineExecutor::QueueHandle handle; + + auto thread = stdx::thread([&handle]() mutable { + Client::initThread("ClientThread"_sd); + handle = ClientOutOfLineExecutor::get(Client::getCurrent())->getHandle(); + // Return to destroy the client, and close the task queue. + }); + + thread.join(); + + bool taskCalled = false; + handle.schedule([&taskCalled, threadName = getThreadName()](const Status& status) { + ASSERT(status == ErrorCodes::CallbackCanceled); + ASSERT_EQ(getThreadName(), threadName); + taskCalled = true; + }); + ASSERT(taskCalled); +} + +} // namespace +} // namespace mongo diff --git a/src/mongo/db/mirror_maestro.cpp b/src/mongo/db/mirror_maestro.cpp index 31d9de386d9..7f898db664d 100644 --- a/src/mongo/db/mirror_maestro.cpp +++ b/src/mongo/db/mirror_maestro.cpp @@ -41,6 +41,7 @@ #include "mongo/bson/bsonelement.h" #include "mongo/bson/bsonobjbuilder.h" #include "mongo/bson/json.h" +#include "mongo/db/client_out_of_line_executor.h" #include "mongo/db/commands.h" #include "mongo/db/commands/server_status.h" #include "mongo/db/mirror_maestro_gen.h" @@ -90,15 +91,43 @@ public: */ void tryMirror(std::shared_ptr<CommandInvocation> invocation) noexcept; + /** + * Maintains the state required for mirroring requests. + */ + class MirroredRequestState { + public: + MirroredRequestState(MirrorMaestroImpl* maestro, + std::vector<HostAndPort> hosts, + std::shared_ptr<CommandInvocation> invocation, + MirroredReadsParameters params) + : _maestro(std::move(maestro)), + _hosts(std::move(hosts)), + _invocation(std::move(invocation)), + _params(std::move(params)) {} + + MirroredRequestState() = delete; + + void mirror() noexcept { + invariant(_maestro); + _maestro->_mirror(_hosts, _invocation, _params); + } + + private: + MirrorMaestroImpl* _maestro; + std::vector<HostAndPort> _hosts; + std::shared_ptr<CommandInvocation> _invocation; + MirroredReadsParameters _params; + }; + private: /** * Attempt to mirror invocation to a subset of hosts based on params * * This command is expected to only run on the _executor */ - void _mirror(std::vector<HostAndPort> hosts, + void _mirror(const std::vector<HostAndPort>& hosts, std::shared_ptr<CommandInvocation> invocation, - MirroredReadsParameters params) noexcept; + const MirroredReadsParameters& params) noexcept; /** * An enum detailing the liveness of the Maestro @@ -256,26 +285,32 @@ void MirrorMaestroImpl::tryMirror(std::shared_ptr<CommandInvocation> invocation) return; } + auto clientExecutor = ClientOutOfLineExecutor::get(Client::getCurrent()); + auto clientExecutorHandle = clientExecutor->getHandle(); + + // TODO SERVER-46619 delegates collection to the client's baton + clientExecutor->consumeAllTasks(); + // There is the potential to actually mirror requests, so schedule the _mirror() invocation // out-of-line. This means the command itself can return quickly and we do the arduous work of // building new bsons and evaluating randomness in a less important context. + auto requestState = std::make_unique<MirroredRequestState>( + this, std::move(hosts), std::move(invocation), std::move(params)); ExecutorFuture(_executor) // - .getAsync([this, - hosts = std::move(hosts), - invocation = std::move(invocation), - params = std::move(params)](const auto& status) mutable { - if (ErrorCodes::isShutdownError(status)) { - return; + .getAsync([clientExecutorHandle, + requestState = std::move(requestState)](const auto& status) mutable { + if (!ErrorCodes::isShutdownError(status)) { + invariant(status.isOK()); + requestState->mirror(); } - invariant(status.isOK()); - - _mirror(std::move(hosts), std::move(invocation), std::move(params)); + clientExecutorHandle.schedule([requestState = std::move(requestState)]( + const Status&) mutable { requestState.reset(); }); }); } -void MirrorMaestroImpl::_mirror(std::vector<HostAndPort> hosts, +void MirrorMaestroImpl::_mirror(const std::vector<HostAndPort>& hosts, std::shared_ptr<CommandInvocation> invocation, - MirroredReadsParameters params) noexcept try { + const MirroredReadsParameters& params) noexcept try { auto payload = [&] { BSONObjBuilder bob; diff --git a/src/mongo/util/producer_consumer_queue.h b/src/mongo/util/producer_consumer_queue.h index 0836bbb28c5..d3ef0749e52 100644 --- a/src/mongo/util/producer_consumer_queue.h +++ b/src/mongo/util/producer_consumer_queue.h @@ -34,7 +34,6 @@ #include <list> #include <numeric> -#include "mongo/db/operation_context.h" #include "mongo/platform/mutex.h" #include "mongo/stdx/condition_variable.h" #include "mongo/util/concurrency/with_lock.h" diff --git a/src/mongo/util/producer_consumer_queue_test.cpp b/src/mongo/util/producer_consumer_queue_test.cpp index 5ba6a4d43de..b0f319c8711 100644 --- a/src/mongo/util/producer_consumer_queue_test.cpp +++ b/src/mongo/util/producer_consumer_queue_test.cpp @@ -33,6 +33,7 @@ #include "mongo/util/producer_consumer_queue.h" +#include "mongo/db/operation_context.h" #include "mongo/db/service_context.h" #include "mongo/platform/mutex.h" #include "mongo/stdx/condition_variable.h" |