summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/SConscript22
-rw-r--r--src/mongo/db/client_out_of_line_executor.cpp111
-rw-r--r--src/mongo/db/client_out_of_line_executor.h89
-rw-r--r--src/mongo/db/client_out_of_line_executor_test.cpp171
-rw-r--r--src/mongo/db/mirror_maestro.cpp61
-rw-r--r--src/mongo/util/producer_consumer_queue.h1
-rw-r--r--src/mongo/util/producer_consumer_queue_test.cpp1
7 files changed, 442 insertions, 14 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript
index 9eec9eb1fe7..90f4c489508 100644
--- a/src/mongo/db/SConscript
+++ b/src/mongo/db/SConscript
@@ -494,6 +494,17 @@ env.Clone().InjectModule("enterprise").Library(
)
env.Library(
+ target='client_out_of_line_executor',
+ source=[
+ 'client_out_of_line_executor.cpp',
+ ],
+ LIBDEPS=[
+ '$BUILD_DIR/mongo/base',
+ '$BUILD_DIR/mongo/db/service_context',
+ ],
+)
+
+env.Library(
target='mirror_maestro',
source=[
'mirror_maestro.cpp',
@@ -504,6 +515,7 @@ env.Library(
'$BUILD_DIR/mongo/base',
'$BUILD_DIR/mongo/executor/task_executor_interface',
"$BUILD_DIR/mongo/rpc/protocol",
+ 'client_out_of_line_executor',
'service_context',
"$BUILD_DIR/mongo/util/net/network",
],
@@ -520,6 +532,16 @@ env.Library(
],
)
+env.CppUnitTest(
+ target='client_out_of_line_executor_test',
+ source=[
+ 'client_out_of_line_executor_test.cpp',
+ ],
+ LIBDEPS=[
+ 'client_out_of_line_executor',
+ ],
+)
+
env.Library(
target="commands",
source=[
diff --git a/src/mongo/db/client_out_of_line_executor.cpp b/src/mongo/db/client_out_of_line_executor.cpp
new file mode 100644
index 00000000000..2fd1787a7f1
--- /dev/null
+++ b/src/mongo/db/client_out_of_line_executor.cpp
@@ -0,0 +1,111 @@
+/**
+ * Copyright (C) 2020-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kDefault
+
+#include "mongo/db/client_out_of_line_executor.h"
+
+#include "mongo/base/error_codes.h"
+#include "mongo/base/status.h"
+#include "mongo/logger/log_severity_limiter.h"
+#include "mongo/logv2/log.h"
+#include "mongo/util/scopeguard.h"
+
+namespace mongo {
+
+ClientOutOfLineExecutor::ClientOutOfLineExecutor() noexcept {
+ _taskQueue = std::make_shared<QueueType>();
+}
+
+ClientOutOfLineExecutor::~ClientOutOfLineExecutor() noexcept {
+ // Force producers to consume their tasks beyond this point.
+ _taskQueue->closeProducerEnd();
+
+ // Only call `tryPop()` when there's a task to pop to avoid lifetime issues with logging.
+ auto toCollect = _taskQueue->getStats().queueDepth;
+ while (toCollect) {
+ auto task = _taskQueue->tryPop();
+ invariant(task);
+ (*task)(Status(ErrorCodes::ClientDisconnect, "Client's executor has stopped"));
+ toCollect--;
+ }
+ invariant(toCollect == 0);
+}
+
+static const Client::Decoration<ClientOutOfLineExecutor> getClientExecutor =
+ Client::declareDecoration<ClientOutOfLineExecutor>();
+
+ClientOutOfLineExecutor* ClientOutOfLineExecutor::get(const Client* client) noexcept {
+ return const_cast<ClientOutOfLineExecutor*>(&getClientExecutor(client));
+}
+
+void ClientOutOfLineExecutor::schedule(Task task) {
+ _taskQueue->push(std::move(task));
+}
+
+void ClientOutOfLineExecutor::consumeAllTasks() noexcept {
+ // This limit allows logging incidents that the executor consumes too much of the client's
+ // time running scheduled tasks. The value is only used for debugging, and should be an
+ // approximation of the acceptable overhead in the context of normal client operations.
+ static constexpr auto kTimeLimit = Microseconds(30);
+
+ _stopWatch.restart();
+
+ while (auto maybeTask = _taskQueue->tryPop()) {
+ auto task = std::move(*maybeTask);
+ task(Status::OK());
+ }
+
+ auto elapsed = _stopWatch.elapsed();
+
+ auto severity = MONGO_GET_LIMITED_SEVERITY(this, Seconds{1}, 0, 2);
+ if (MONGO_unlikely(elapsed > kTimeLimit)) {
+ LOGV2_DEBUG(4651401,
+ logSeverityV1toV2(severity).toInt(),
+ "Client's executor exceeded time limit",
+ "elapsed"_attr = elapsed,
+ "limit"_attr = kTimeLimit);
+ }
+}
+
+void ClientOutOfLineExecutor::QueueHandle::schedule(Task&& task) {
+ auto guard = makeGuard(
+ [&task] { task(Status(ErrorCodes::CallbackCanceled, "Client no longer exists")); });
+
+ if (auto queue = _weakQueue.lock()) {
+ try {
+ queue->push(std::move(task));
+ guard.dismiss();
+ } catch (const ExceptionFor<ErrorCodes::ProducerConsumerQueueEndClosed>&) {
+ // The destructor for the out-of-line executor has already been called.
+ }
+ }
+}
+
+} // namespace mongo
diff --git a/src/mongo/db/client_out_of_line_executor.h b/src/mongo/db/client_out_of_line_executor.h
new file mode 100644
index 00000000000..7146b11c1df
--- /dev/null
+++ b/src/mongo/db/client_out_of_line_executor.h
@@ -0,0 +1,89 @@
+/**
+ * Copyright (C) 2020-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <functional>
+#include <memory>
+
+#include "mongo/db/client.h"
+#include "mongo/util/clock_source.h"
+#include "mongo/util/out_of_line_executor.h"
+#include "mongo/util/producer_consumer_queue.h"
+
+namespace mongo {
+
+/**
+ * A client decoration that allows out-of-line, client-local execution of tasks.
+ * So long as the client thread exists, tasks can be scheduled to execute on the client thread.
+ * Other threads must always schedule tasks through an instance of `QueueHandle`.
+ * Once the decoration is destructed, `QueueHandle` runs tasks on the caller's thread, providing
+ * `ErrorCodes::CallbackCanceled` as the status.
+ */
+class ClientOutOfLineExecutor final : public OutOfLineExecutor {
+public:
+ ClientOutOfLineExecutor() noexcept;
+
+ ~ClientOutOfLineExecutor() noexcept;
+
+ static ClientOutOfLineExecutor* get(const Client*) noexcept;
+
+ using Task = OutOfLineExecutor::Task;
+
+ void schedule(Task) override;
+
+ // Blocks until the executor is done running all scheduled tasks.
+ void consumeAllTasks() noexcept;
+
+ using QueueType = MultiProducerSingleConsumerQueue<Task>;
+
+ // Allows other threads to access the queue irrespective of the client's lifetime.
+ class QueueHandle final {
+ public:
+ QueueHandle() = default;
+
+ QueueHandle(const std::shared_ptr<QueueType>& queue) : _weakQueue(queue) {}
+
+ void schedule(Task&&);
+
+ private:
+ std::weak_ptr<QueueType> _weakQueue;
+ };
+
+ auto getHandle() noexcept {
+ return QueueHandle(_taskQueue);
+ }
+
+private:
+ std::shared_ptr<QueueType> _taskQueue;
+
+ ClockSource::StopWatch _stopWatch;
+};
+
+} // namespace mongo
diff --git a/src/mongo/db/client_out_of_line_executor_test.cpp b/src/mongo/db/client_out_of_line_executor_test.cpp
new file mode 100644
index 00000000000..cffeb633c87
--- /dev/null
+++ b/src/mongo/db/client_out_of_line_executor_test.cpp
@@ -0,0 +1,171 @@
+/**
+ * Copyright (C) 2020-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include <memory>
+
+#include "mongo/db/client.h"
+#include "mongo/db/client_out_of_line_executor.h"
+#include "mongo/db/service_context.h"
+#include "mongo/platform/atomic_word.h"
+#include "mongo/stdx/thread.h"
+#include "mongo/unittest/barrier.h"
+#include "mongo/unittest/unittest.h"
+
+namespace mongo {
+namespace {
+
+/**
+ * The following implements a pseudo garbage collector to test client's out-of-line executor.
+ */
+class ClientOutOfLineExecutorTest : public unittest::Test {
+public:
+ class DummyInstance {
+ public:
+ DummyInstance() = delete;
+
+ DummyInstance(ClientOutOfLineExecutorTest* parent) : _parent(parent) {
+ _parent->_instanceCount.fetchAndAdd(1);
+ }
+
+ DummyInstance(DummyInstance&& other) : _parent(std::move(other._parent)) {
+ _parent->_instanceCount.fetchAndAdd(1);
+ }
+
+ DummyInstance(const DummyInstance& other) {
+ MONGO_UNREACHABLE;
+ }
+
+ ~DummyInstance() {
+ _parent->_instanceCount.fetchAndAdd(-1);
+ }
+
+ private:
+ ClientOutOfLineExecutorTest* _parent;
+ };
+
+ void setUp() override {
+ setGlobalServiceContext(ServiceContext::make());
+ Client::initThread(kClientThreadName);
+ _instanceCount.store(0);
+ }
+
+ void tearDown() override {
+ auto client = Client::releaseCurrent();
+ client.reset(nullptr);
+ }
+
+ auto getDecoration() noexcept {
+ return ClientOutOfLineExecutor::get(Client::getCurrent());
+ }
+
+ int countDummies() const noexcept {
+ return _instanceCount.load();
+ }
+
+ static constexpr auto kClientThreadName = "ClientOutOfLineExecutorTest"_sd;
+
+private:
+ friend class DummyInstance;
+
+ AtomicWord<int> _instanceCount;
+};
+
+TEST_F(ClientOutOfLineExecutorTest, CheckDecoration) {
+ auto decoration = getDecoration();
+ ASSERT(decoration);
+}
+
+TEST_F(ClientOutOfLineExecutorTest, ScheduleAndExecute) {
+ auto thread = stdx::thread([this, handle = getDecoration()->getHandle()]() mutable {
+ DummyInstance dummy(this);
+ handle.schedule([dummy = std::move(dummy)](const Status& status) {
+ ASSERT_OK(status);
+ ASSERT_EQ(getThreadName(), ClientOutOfLineExecutorTest::kClientThreadName);
+ });
+ });
+ thread.join();
+ ASSERT_EQ(countDummies(), 1);
+
+ getDecoration()->consumeAllTasks();
+ ASSERT_EQ(countDummies(), 0);
+}
+
+TEST_F(ClientOutOfLineExecutorTest, DestructorExecutesLeftovers) {
+ const auto kDummiesCount = 8;
+ unittest::Barrier b1(2), b2(2);
+
+ auto thread = stdx::thread([this, kDummiesCount, b1 = &b1, b2 = &b2]() {
+ Client::initThread("ThreadWithLeftovers"_sd);
+
+ auto handle = ClientOutOfLineExecutor::get(Client::getCurrent())->getHandle();
+ for (auto i = 0; i < kDummiesCount; i++) {
+ DummyInstance dummy(this);
+ handle.schedule([dummy = std::move(dummy),
+ threadId = stdx::this_thread::get_id()](const Status& status) {
+ ASSERT(status == ErrorCodes::ClientDisconnect);
+ // Avoid using `getThreadName()` here as it'll cause read-after-delete errors.
+ ASSERT_EQ(threadId, stdx::this_thread::get_id());
+ });
+ }
+
+ b1->countDownAndWait();
+ // Wait for the main thread to count dummies.
+ b2->countDownAndWait();
+ });
+
+ b1.countDownAndWait();
+ ASSERT_EQ(countDummies(), kDummiesCount);
+ b2.countDownAndWait();
+
+ thread.join();
+ ASSERT_EQ(countDummies(), 0);
+}
+
+TEST_F(ClientOutOfLineExecutorTest, ScheduleAfterClientThreadReturns) {
+ ClientOutOfLineExecutor::QueueHandle handle;
+
+ auto thread = stdx::thread([&handle]() mutable {
+ Client::initThread("ClientThread"_sd);
+ handle = ClientOutOfLineExecutor::get(Client::getCurrent())->getHandle();
+ // Return to destroy the client, and close the task queue.
+ });
+
+ thread.join();
+
+ bool taskCalled = false;
+ handle.schedule([&taskCalled, threadName = getThreadName()](const Status& status) {
+ ASSERT(status == ErrorCodes::CallbackCanceled);
+ ASSERT_EQ(getThreadName(), threadName);
+ taskCalled = true;
+ });
+ ASSERT(taskCalled);
+}
+
+} // namespace
+} // namespace mongo
diff --git a/src/mongo/db/mirror_maestro.cpp b/src/mongo/db/mirror_maestro.cpp
index 31d9de386d9..7f898db664d 100644
--- a/src/mongo/db/mirror_maestro.cpp
+++ b/src/mongo/db/mirror_maestro.cpp
@@ -41,6 +41,7 @@
#include "mongo/bson/bsonelement.h"
#include "mongo/bson/bsonobjbuilder.h"
#include "mongo/bson/json.h"
+#include "mongo/db/client_out_of_line_executor.h"
#include "mongo/db/commands.h"
#include "mongo/db/commands/server_status.h"
#include "mongo/db/mirror_maestro_gen.h"
@@ -90,15 +91,43 @@ public:
*/
void tryMirror(std::shared_ptr<CommandInvocation> invocation) noexcept;
+ /**
+ * Maintains the state required for mirroring requests.
+ */
+ class MirroredRequestState {
+ public:
+ MirroredRequestState(MirrorMaestroImpl* maestro,
+ std::vector<HostAndPort> hosts,
+ std::shared_ptr<CommandInvocation> invocation,
+ MirroredReadsParameters params)
+ : _maestro(std::move(maestro)),
+ _hosts(std::move(hosts)),
+ _invocation(std::move(invocation)),
+ _params(std::move(params)) {}
+
+ MirroredRequestState() = delete;
+
+ void mirror() noexcept {
+ invariant(_maestro);
+ _maestro->_mirror(_hosts, _invocation, _params);
+ }
+
+ private:
+ MirrorMaestroImpl* _maestro;
+ std::vector<HostAndPort> _hosts;
+ std::shared_ptr<CommandInvocation> _invocation;
+ MirroredReadsParameters _params;
+ };
+
private:
/**
* Attempt to mirror invocation to a subset of hosts based on params
*
* This command is expected to only run on the _executor
*/
- void _mirror(std::vector<HostAndPort> hosts,
+ void _mirror(const std::vector<HostAndPort>& hosts,
std::shared_ptr<CommandInvocation> invocation,
- MirroredReadsParameters params) noexcept;
+ const MirroredReadsParameters& params) noexcept;
/**
* An enum detailing the liveness of the Maestro
@@ -256,26 +285,32 @@ void MirrorMaestroImpl::tryMirror(std::shared_ptr<CommandInvocation> invocation)
return;
}
+ auto clientExecutor = ClientOutOfLineExecutor::get(Client::getCurrent());
+ auto clientExecutorHandle = clientExecutor->getHandle();
+
+ // TODO SERVER-46619 delegates collection to the client's baton
+ clientExecutor->consumeAllTasks();
+
// There is the potential to actually mirror requests, so schedule the _mirror() invocation
// out-of-line. This means the command itself can return quickly and we do the arduous work of
// building new bsons and evaluating randomness in a less important context.
+ auto requestState = std::make_unique<MirroredRequestState>(
+ this, std::move(hosts), std::move(invocation), std::move(params));
ExecutorFuture(_executor) //
- .getAsync([this,
- hosts = std::move(hosts),
- invocation = std::move(invocation),
- params = std::move(params)](const auto& status) mutable {
- if (ErrorCodes::isShutdownError(status)) {
- return;
+ .getAsync([clientExecutorHandle,
+ requestState = std::move(requestState)](const auto& status) mutable {
+ if (!ErrorCodes::isShutdownError(status)) {
+ invariant(status.isOK());
+ requestState->mirror();
}
- invariant(status.isOK());
-
- _mirror(std::move(hosts), std::move(invocation), std::move(params));
+ clientExecutorHandle.schedule([requestState = std::move(requestState)](
+ const Status&) mutable { requestState.reset(); });
});
}
-void MirrorMaestroImpl::_mirror(std::vector<HostAndPort> hosts,
+void MirrorMaestroImpl::_mirror(const std::vector<HostAndPort>& hosts,
std::shared_ptr<CommandInvocation> invocation,
- MirroredReadsParameters params) noexcept try {
+ const MirroredReadsParameters& params) noexcept try {
auto payload = [&] {
BSONObjBuilder bob;
diff --git a/src/mongo/util/producer_consumer_queue.h b/src/mongo/util/producer_consumer_queue.h
index 0836bbb28c5..d3ef0749e52 100644
--- a/src/mongo/util/producer_consumer_queue.h
+++ b/src/mongo/util/producer_consumer_queue.h
@@ -34,7 +34,6 @@
#include <list>
#include <numeric>
-#include "mongo/db/operation_context.h"
#include "mongo/platform/mutex.h"
#include "mongo/stdx/condition_variable.h"
#include "mongo/util/concurrency/with_lock.h"
diff --git a/src/mongo/util/producer_consumer_queue_test.cpp b/src/mongo/util/producer_consumer_queue_test.cpp
index 5ba6a4d43de..b0f319c8711 100644
--- a/src/mongo/util/producer_consumer_queue_test.cpp
+++ b/src/mongo/util/producer_consumer_queue_test.cpp
@@ -33,6 +33,7 @@
#include "mongo/util/producer_consumer_queue.h"
+#include "mongo/db/operation_context.h"
#include "mongo/db/service_context.h"
#include "mongo/platform/mutex.h"
#include "mongo/stdx/condition_variable.h"