diff options
author | Amirsaman Memaripour <amirsaman.memaripour@mongodb.com> | 2020-07-27 22:17:21 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-07-28 23:31:54 +0000 |
commit | 165dfb50f7693cadb80e988d08222700b1cc6b5c (patch) | |
tree | 4f8512dcb1c9637feee9cbf7693d46185190cdc1 /src/mongo | |
parent | 48a125c2ae350d0e915c9aa3e6212dda190bacb5 (diff) | |
download | mongo-165dfb50f7693cadb80e988d08222700b1cc6b5c.tar.gz |
SERVER-49106 Add poll function to the ServiceExecutor API
Diffstat (limited to 'src/mongo')
16 files changed, 132 insertions, 68 deletions
diff --git a/src/mongo/transport/SConscript b/src/mongo/transport/SConscript index 813401c3c94..1f7f6a16107 100644 --- a/src/mongo/transport/SConscript +++ b/src/mongo/transport/SConscript @@ -8,7 +8,6 @@ env.Library( target='transport_layer_common', source=[ 'ismaster_metrics.cpp', - 'service_entry_point_utils.cpp', 'session.cpp', 'transport_layer.cpp', ], @@ -89,6 +88,7 @@ tlEnv.Library( 'service_executor_fixed.cpp', 'service_executor_reserved.cpp', 'service_executor_synchronous.cpp', + 'service_executor_utils.cpp', env.Idlc('service_executor.idl')[0], ], LIBDEPS=[ diff --git a/src/mongo/transport/mock_session.h b/src/mongo/transport/mock_session.h index d88a0af263e..72d05aa3e66 100644 --- a/src/mongo/transport/mock_session.h +++ b/src/mongo/transport/mock_session.h @@ -29,8 +29,11 @@ #pragma once +#include <list> + #include "mongo/base/checked_cast.h" #include "mongo/config.h" +#include "mongo/stdx/mutex.h" #include "mongo/transport/session.h" #include "mongo/transport/transport_layer_mock.h" #include "mongo/util/net/hostandport.h" @@ -100,6 +103,22 @@ public: return Future<Message>::makeReady(sourceMessage()); } + Future<void> waitForData() override { + auto fp = makePromiseFuture<void>(); + stdx::lock_guard<Latch> lk(_waitForDataMutex); + _waitForDataQueue.emplace_back(std::move(fp.promise)); + return std::move(fp.future); + } + + void signalAvailableData() { + stdx::lock_guard<Latch> lk(_waitForDataMutex); + if (_waitForDataQueue.size() == 0) + return; + Promise<void> promise = std::move(_waitForDataQueue.front()); + _waitForDataQueue.pop_front(); + promise.emplaceValue(); + } + Status sinkMessage(Message message) override { if (!_tl || _tl->inShutdown()) { return TransportLayer::ShutdownStatus; @@ -154,6 +173,9 @@ protected: HostAndPort _local; SockAddr _remoteAddr; SockAddr _localAddr; + + mutable Mutex _waitForDataMutex = MONGO_MAKE_LATCH("MockSession::_waitForDataMutex"); + std::list<Promise<void>> _waitForDataQueue; }; } // namespace transport diff --git a/src/mongo/transport/service_executor.h b/src/mongo/transport/service_executor.h index 859969efa66..b702198e6b5 100644 --- a/src/mongo/transport/service_executor.h +++ b/src/mongo/transport/service_executor.h @@ -45,6 +45,8 @@ class ServiceContext; namespace transport { +class Session; + /* * This is the interface for all ServiceExecutors. */ @@ -95,6 +97,14 @@ public: } /* + * Awaits the availability of incoming data for the specified session. On success, it will + * schedule the callback on current executor. Otherwise, it will invoke the callback with a + * non-okay status on the caller thread. + */ + virtual void runOnDataAvailable(Session* session, + OutOfLineExecutor::Task onCompletionCallback) = 0; + + /* * Stops and joins the ServiceExecutor. Any outstanding tasks will not be executed, and any * associated callbacks waiting on I/O may get called with an error code. * diff --git a/src/mongo/transport/service_executor_fixed.cpp b/src/mongo/transport/service_executor_fixed.cpp index ac3653126e0..abdd99ca586 100644 --- a/src/mongo/transport/service_executor_fixed.cpp +++ b/src/mongo/transport/service_executor_fixed.cpp @@ -34,6 +34,8 @@ #include "mongo/base/error_codes.h" #include "mongo/logv2/log.h" #include "mongo/transport/service_executor_gen.h" +#include "mongo/transport/session.h" +#include "mongo/util/assert_util.h" #include "mongo/util/fail_point.h" namespace mongo { @@ -149,6 +151,12 @@ Status ServiceExecutorFixed::scheduleTask(Task task, ScheduleFlags flags) { return Status::OK(); } +void ServiceExecutorFixed::runOnDataAvailable(Session* session, + OutOfLineExecutor::Task onCompletionCallback) { + invariant(session); + session->waitForData().thenRunOn(shared_from_this()).getAsync(std::move(onCompletionCallback)); +} + void ServiceExecutorFixed::appendStats(BSONObjBuilder* bob) const { *bob << kExecutorLabel << kExecutorName << kThreadsRunning << static_cast<int>(_numRunningExecutorThreads.load()); diff --git a/src/mongo/transport/service_executor_fixed.h b/src/mongo/transport/service_executor_fixed.h index 127c3e5d24d..b7a90d43447 100644 --- a/src/mongo/transport/service_executor_fixed.h +++ b/src/mongo/transport/service_executor_fixed.h @@ -58,6 +58,9 @@ public: Status shutdown(Milliseconds timeout) override; Status scheduleTask(Task task, ScheduleFlags flags) override; + void runOnDataAvailable(Session* session, + OutOfLineExecutor::Task onCompletionCallback) override; + Mode transportMode() const override { return Mode::kSynchronous; } diff --git a/src/mongo/transport/service_executor_noop.h b/src/mongo/transport/service_executor_noop.h deleted file mode 100644 index b12de2247f3..00000000000 --- a/src/mongo/transport/service_executor_noop.h +++ /dev/null @@ -1,64 +0,0 @@ -/** - * Copyright (C) 2018-present MongoDB, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the Server Side Public License, version 1, - * as published by MongoDB, Inc. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * Server Side Public License for more details. - * - * You should have received a copy of the Server Side Public License - * along with this program. If not, see - * <http://www.mongodb.com/licensing/server-side-public-license>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the Server Side Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#pragma once - -#include "mongo/base/status.h" -#include "mongo/transport/service_executor.h" - -namespace mongo { -namespace transport { - -/** - * The noop service executor provides the necessary interface for some unittests. Doesn't actually - * execute any work - */ -class ServiceExecutorNoop final : public ServiceExecutor { -public: - explicit ServiceExecutorNoop(ServiceContext* ctx) {} - - Status start() override { - return Status::OK(); - } - Status shutdown(Milliseconds timeout) override { - return Status::OK(); - } - Status schedule(Task task, ScheduleFlags flags) override { - return Status::OK(); - } - - Mode transportMode() const override { - return Mode::kSynchronous; - } - - void appendStats(BSONObjBuilder* bob) const override {} -}; - -} // namespace transport -} // namespace mongo diff --git a/src/mongo/transport/service_executor_reserved.cpp b/src/mongo/transport/service_executor_reserved.cpp index c03070a43e9..cae9653e60f 100644 --- a/src/mongo/transport/service_executor_reserved.cpp +++ b/src/mongo/transport/service_executor_reserved.cpp @@ -35,8 +35,8 @@ #include "mongo/logv2/log.h" #include "mongo/stdx/thread.h" -#include "mongo/transport/service_entry_point_utils.h" #include "mongo/transport/service_executor_gen.h" +#include "mongo/transport/service_executor_utils.h" #include "mongo/util/processinfo.h" namespace mongo { @@ -200,5 +200,10 @@ void ServiceExecutorReserved::appendStats(BSONObjBuilder* bob) const { << static_cast<int>(_numStartingThreads); } +void ServiceExecutorReserved::runOnDataAvailable(Session* session, + OutOfLineExecutor::Task onCompletionCallback) { + scheduleCallbackOnDataAvailable(session, std::move(onCompletionCallback), this); +} + } // namespace transport } // namespace mongo diff --git a/src/mongo/transport/service_executor_reserved.h b/src/mongo/transport/service_executor_reserved.h index 4d2aabbce36..e3acf6febd9 100644 --- a/src/mongo/transport/service_executor_reserved.h +++ b/src/mongo/transport/service_executor_reserved.h @@ -62,6 +62,9 @@ public: return Mode::kSynchronous; } + void runOnDataAvailable(Session* session, + OutOfLineExecutor::Task onCompletionCallback) override; + void appendStats(BSONObjBuilder* bob) const override; private: diff --git a/src/mongo/transport/service_executor_synchronous.cpp b/src/mongo/transport/service_executor_synchronous.cpp index cab4c158433..d034e208954 100644 --- a/src/mongo/transport/service_executor_synchronous.cpp +++ b/src/mongo/transport/service_executor_synchronous.cpp @@ -35,8 +35,8 @@ #include "mongo/logv2/log.h" #include "mongo/stdx/thread.h" -#include "mongo/transport/service_entry_point_utils.h" #include "mongo/transport/service_executor_gen.h" +#include "mongo/transport/service_executor_utils.h" #include "mongo/util/processinfo.h" namespace mongo { @@ -141,5 +141,11 @@ void ServiceExecutorSynchronous::appendStats(BSONObjBuilder* bob) const { << static_cast<int>(_numRunningWorkerThreads.loadRelaxed()); } +void ServiceExecutorSynchronous::runOnDataAvailable(Session* session, + OutOfLineExecutor::Task onCompletionCallback) { + scheduleCallbackOnDataAvailable(session, std::move(onCompletionCallback), this); +} + + } // namespace transport } // namespace mongo diff --git a/src/mongo/transport/service_executor_synchronous.h b/src/mongo/transport/service_executor_synchronous.h index 19a651f9f09..940382b53e8 100644 --- a/src/mongo/transport/service_executor_synchronous.h +++ b/src/mongo/transport/service_executor_synchronous.h @@ -57,6 +57,9 @@ public: return Mode::kSynchronous; } + void runOnDataAvailable(Session* session, + OutOfLineExecutor::Task onCompletionCallback) override; + void appendStats(BSONObjBuilder* bob) const override; private: diff --git a/src/mongo/transport/service_executor_test.cpp b/src/mongo/transport/service_executor_test.cpp index eb1927da8aa..59bed8cc568 100644 --- a/src/mongo/transport/service_executor_test.cpp +++ b/src/mongo/transport/service_executor_test.cpp @@ -37,10 +37,12 @@ #include "mongo/bson/bsonobjbuilder.h" #include "mongo/db/service_context.h" #include "mongo/logv2/log.h" +#include "mongo/transport/mock_session.h" #include "mongo/transport/service_executor_fixed.h" #include "mongo/transport/service_executor_gen.h" #include "mongo/transport/service_executor_synchronous.h" #include "mongo/transport/transport_layer.h" +#include "mongo/transport/transport_layer_mock.h" #include "mongo/unittest/barrier.h" #include "mongo/unittest/death_test.h" #include "mongo/unittest/unittest.h" @@ -343,5 +345,27 @@ TEST_F(ServiceExecutorFixedFixture, ScheduleFailsAfterShutdown) { schedulerThread->join(); } +TEST_F(ServiceExecutorFixedFixture, RunTaskAfterWaitingForData) { + auto tl = std::make_unique<TransportLayerMock>(); + auto session = tl->createSession(); + + auto executor = startAndGetServiceExecutor(); + + const auto mainThreadId = stdx::this_thread::get_id(); + AtomicWord<bool> ranOnDataAvailable{false}; + auto barrier = std::make_shared<unittest::Barrier>(2); + executor->runOnDataAvailable( + session.get(), [&ranOnDataAvailable, mainThreadId, barrier](Status) mutable -> void { + ranOnDataAvailable.store(true); + ASSERT(stdx::this_thread::get_id() != mainThreadId); + barrier->countDownAndWait(); + }); + + ASSERT(!ranOnDataAvailable.load()); + reinterpret_cast<MockSession*>(session.get())->signalAvailableData(); + barrier->countDownAndWait(); + ASSERT(ranOnDataAvailable.load()); +} + } // namespace } // namespace mongo diff --git a/src/mongo/transport/service_entry_point_utils.cpp b/src/mongo/transport/service_executor_utils.cpp index f0c95a28dba..450babfb36c 100644 --- a/src/mongo/transport/service_entry_point_utils.cpp +++ b/src/mongo/transport/service_executor_utils.cpp @@ -31,7 +31,7 @@ #include "mongo/platform/basic.h" -#include "mongo/transport/service_entry_point_utils.h" +#include "mongo/transport/service_executor_utils.h" #include <fmt/format.h> #include <functional> @@ -39,6 +39,7 @@ #include "mongo/logv2/log.h" #include "mongo/stdx/thread.h" +#include "mongo/transport/service_executor.h" #include "mongo/util/assert_util.h" #include "mongo/util/debug_util.h" #include "mongo/util/thread_safety_context.h" @@ -128,4 +129,16 @@ Status launchServiceWorkerThread(unique_function<void()> task) noexcept { return Status::OK(); } +void scheduleCallbackOnDataAvailable(transport::Session* session, + unique_function<void(Status)> callback, + transport::ServiceExecutor* executor) noexcept { + invariant(session); + try { + session->waitForData().get(); + executor->schedule(std::move(callback)); + } catch (DBException& e) { + callback(e.toStatus()); + } +} + } // namespace mongo diff --git a/src/mongo/transport/service_entry_point_utils.h b/src/mongo/transport/service_executor_utils.h index f6f83d6d37e..3dc0c4ab11f 100644 --- a/src/mongo/transport/service_entry_point_utils.h +++ b/src/mongo/transport/service_executor_utils.h @@ -36,6 +36,18 @@ namespace mongo { +namespace transport { +class ServiceExecutor; +} + Status launchServiceWorkerThread(unique_function<void()> task) noexcept; +/* The default implementation for "ServiceExecutor::runOnDataAvailable()", which blocks the caller + * thread until data is available for reading. On success, it schedules "callback" on "executor". + * Other implementations (e.g., "ServiceExecutorFixed") may provide asynchronous variants. + */ +void scheduleCallbackOnDataAvailable(transport::Session* session, + unique_function<void(Status)> callback, + transport::ServiceExecutor* executor) noexcept; + } // namespace mongo diff --git a/src/mongo/transport/service_state_machine_test.cpp b/src/mongo/transport/service_state_machine_test.cpp index 0189093a34e..b5aceac7ea5 100644 --- a/src/mongo/transport/service_state_machine_test.cpp +++ b/src/mongo/transport/service_state_machine_test.cpp @@ -44,6 +44,7 @@ #include "mongo/transport/mock_session.h" #include "mongo/transport/service_entry_point.h" #include "mongo/transport/service_executor.h" +#include "mongo/transport/service_executor_utils.h" #include "mongo/transport/service_state_machine.h" #include "mongo/transport/transport_layer_mock.h" #include "mongo/unittest/unittest.h" @@ -266,6 +267,11 @@ public: return Mode::kSynchronous; } + void runOnDataAvailable(Session* session, + OutOfLineExecutor::Task onCompletionCallback) override { + scheduleCallbackOnDataAvailable(session, std::move(onCompletionCallback), this); + } + void appendStats(BSONObjBuilder* bob) const override {} void setScheduleHook(ScheduleHook hook) { diff --git a/src/mongo/transport/session.h b/src/mongo/transport/session.h index 8f51cdc80e3..39595d8ed37 100644 --- a/src/mongo/transport/session.h +++ b/src/mongo/transport/session.h @@ -116,6 +116,11 @@ public: virtual Future<Message> asyncSourceMessage(const BatonHandle& handle = nullptr) = 0; /** + * Asynchronously waits for the availability of incoming data. + */ + virtual Future<void> waitForData() = 0; + + /** * Sink (send) a Message to the remote host for this Session. * * Async version will keep the buffer alive until the operation completes. diff --git a/src/mongo/transport/session_asio.h b/src/mongo/transport/session_asio.h index 315cef28975..e0e9701857b 100644 --- a/src/mongo/transport/session_asio.h +++ b/src/mongo/transport/session_asio.h @@ -167,6 +167,14 @@ public: return sourceMessageImpl(baton); } + Future<void> waitForData() override { +#ifdef MONGO_CONFIG_SSL + if (_sslSocket) + return asio::async_read(*_sslSocket, asio::null_buffers(), UseFuture{}).ignoreValue(); +#endif + return asio::async_read(_socket, asio::null_buffers(), UseFuture{}).ignoreValue(); + } + Status sinkMessage(Message message) override { ensureSync(); |