diff options
author | Jason Carey <jcarey@argv.me> | 2019-01-23 13:18:49 -0500 |
---|---|---|
committer | Jason Carey <jcarey@argv.me> | 2019-02-05 22:41:49 -0500 |
commit | a23cdb1bd0f8fbe9cd79db08a24b8a89dc54ff81 (patch) | |
tree | 1adc2fdb36e6c8babaab134d53f84de3020c2404 /src | |
parent | 5fd66f15797c45c9bab7b59f9e55e0a2f7ad5cd0 (diff) | |
download | mongo-a23cdb1bd0f8fbe9cd79db08a24b8a89dc54ff81.tar.gz |
SERVER-39146 Refactor Baton
Refactor the baton into regular and networking batons while also
cleaning up the basic baton implementation.
Diffstat (limited to 'src')
44 files changed, 785 insertions, 347 deletions
diff --git a/src/mongo/base/error_codes.err b/src/mongo/base/error_codes.err index b0be7a0303d..8d01f077791 100644 --- a/src/mongo/base/error_codes.err +++ b/src/mongo/base/error_codes.err @@ -276,6 +276,7 @@ error_code("ExchangePassthrough", 275) # For exchange execution in aggregation. error_code("IndexBuildAborted", 276) error_code("AlarmAlreadyFulfilled", 277) error_code("UnsatisfiableCommitQuorum", 278) +error_code("ClientDisconnect", 279) # Error codes 4000-8999 are reserved. # Non-sequential error codes (for compatibility only) diff --git a/src/mongo/client/async_client.cpp b/src/mongo/client/async_client.cpp index 520e69606bc..1a2c01ed335 100644 --- a/src/mongo/client/async_client.cpp +++ b/src/mongo/client/async_client.cpp @@ -193,7 +193,7 @@ Future<void> AsyncDBClient::initWireVersion(const std::string& appName, }); } -Future<Message> AsyncDBClient::_call(Message request, const transport::BatonHandle& baton) { +Future<Message> AsyncDBClient::_call(Message request, const BatonHandle& baton) { auto swm = _compressorManager.compressMessage(request); if (!swm.isOK()) { return swm.getStatus(); @@ -219,8 +219,7 @@ Future<Message> AsyncDBClient::_call(Message request, const transport::BatonHand }); } -Future<rpc::UniqueReply> AsyncDBClient::runCommand(OpMsgRequest request, - const transport::BatonHandle& baton) { +Future<rpc::UniqueReply> AsyncDBClient::runCommand(OpMsgRequest request, const BatonHandle& baton) { invariant(_negotiatedProtocol); auto requestMsg = rpc::messageFromOpMsgRequest(*_negotiatedProtocol, std::move(request)); return _call(std::move(requestMsg), baton) @@ -230,7 +229,7 @@ Future<rpc::UniqueReply> AsyncDBClient::runCommand(OpMsgRequest request, } Future<executor::RemoteCommandResponse> AsyncDBClient::runCommandRequest( - executor::RemoteCommandRequest request, const transport::BatonHandle& baton) { + executor::RemoteCommandRequest request, const BatonHandle& baton) { auto clkSource = _svcCtx->getPreciseClockSource(); auto start = clkSource->now(); auto opMsgRequest = OpMsgRequest::fromDBAndBody( @@ -246,7 +245,7 @@ Future<executor::RemoteCommandResponse> AsyncDBClient::runCommandRequest( }); } -void AsyncDBClient::cancel(const transport::BatonHandle& baton) { +void AsyncDBClient::cancel(const BatonHandle& baton) { _session->cancelAsyncOperations(baton); } diff --git a/src/mongo/client/async_client.h b/src/mongo/client/async_client.h index 356f4e0f0ca..e7a40dcc73f 100644 --- a/src/mongo/client/async_client.h +++ b/src/mongo/client/async_client.h @@ -62,9 +62,8 @@ public: Milliseconds timeout); Future<executor::RemoteCommandResponse> runCommandRequest( - executor::RemoteCommandRequest request, const transport::BatonHandle& baton = nullptr); - Future<rpc::UniqueReply> runCommand(OpMsgRequest request, - const transport::BatonHandle& baton = nullptr); + executor::RemoteCommandRequest request, const BatonHandle& baton = nullptr); + Future<rpc::UniqueReply> runCommand(OpMsgRequest request, const BatonHandle& baton = nullptr); Future<void> authenticate(const BSONObj& params); @@ -73,7 +72,7 @@ public: Future<void> initWireVersion(const std::string& appName, executor::NetworkConnectionHook* const hook); - void cancel(const transport::BatonHandle& baton = nullptr); + void cancel(const BatonHandle& baton = nullptr); bool isStillConnected(); @@ -83,7 +82,7 @@ public: const HostAndPort& local() const; private: - Future<Message> _call(Message request, const transport::BatonHandle& baton = nullptr); + Future<Message> _call(Message request, const BatonHandle& baton = nullptr); BSONObj _buildIsMasterRequest(const std::string& appName, executor::NetworkConnectionHook* hook); void _parseIsMasterResponse(BSONObj request, diff --git a/src/mongo/client/remote_command_retry_scheduler_test.cpp b/src/mongo/client/remote_command_retry_scheduler_test.cpp index 503a7c4a42e..81ecd179dcc 100644 --- a/src/mongo/client/remote_command_retry_scheduler_test.cpp +++ b/src/mongo/client/remote_command_retry_scheduler_test.cpp @@ -94,7 +94,7 @@ public: virtual StatusWith<executor::TaskExecutor::CallbackHandle> scheduleRemoteCommand( const executor::RemoteCommandRequest& request, const RemoteCommandCallbackFn& cb, - const transport::BatonHandle& baton = nullptr) override { + const BatonHandle& baton = nullptr) override { if (scheduleRemoteCommandFailPoint) { return Status(ErrorCodes::ShutdownInProgress, "failed to send remote command - shutdown in progress"); diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index b135cb559a2..a8fe9910b39 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -527,13 +527,27 @@ env.Library( '$BUILD_DIR/mongo/db/logical_session_id', '$BUILD_DIR/mongo/db/multi_key_path_tracker', '$BUILD_DIR/mongo/db/storage/write_unit_of_work', - '$BUILD_DIR/mongo/transport/transport_layer_common', '$BUILD_DIR/mongo/util/clock_sources', '$BUILD_DIR/mongo/util/concurrency/spin_lock', '$BUILD_DIR/mongo/util/fail_point', '$BUILD_DIR/mongo/util/net/network', '$BUILD_DIR/mongo/util/periodic_runner', ], + LIBDEPS_PRIVATE=[ + 'default_baton', + ], +) + +env.Library( + target='default_baton', + source=[ + 'default_baton.cpp', + ], + LIBDEPS_PRIVATE=[ + '$BUILD_DIR/mongo/base', + '$BUILD_DIR/mongo/util/clock_sources', + '$BUILD_DIR/mongo/util/concurrency/spin_lock', + ], ) env.CppUnitTest( diff --git a/src/mongo/db/baton.h b/src/mongo/db/baton.h new file mode 100644 index 00000000000..6e1d1319d2b --- /dev/null +++ b/src/mongo/db/baton.h @@ -0,0 +1,107 @@ +/** + * Copyright (C) 2019-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <memory> + +#include "mongo/util/functional.h" +#include "mongo/util/future.h" +#include "mongo/util/time_support.h" +#include "mongo/util/waitable.h" + +namespace mongo { + +class OperationContext; + +namespace transport { + +class NetworkingBaton; + +} // namespace transport + +/** + * A Baton is lightweight executor, with parallel forward progress guarantees. Rather than + * asynchronously running tasks through one, the baton records the intent of those tasks and defers + * waiting and execution to a later call to run(). + * + * Note: This occurs automatically when opCtx waiting on a condition variable. + */ +class Baton : public Waitable, public std::enable_shared_from_this<Baton> { +public: + virtual ~Baton() = default; + + /** + * Detaches a baton from an associated opCtx. + * + * This invokes all callbacks currently inside the baton (from the detaching thread) with a void + * opCtx. Also, any calls to schedule after this point will immediately invoke their callback + * with a null opCtx. + */ + void detach() noexcept { + // We make this anchor so that deleting the shared_ptr inside opCtx doesn't remove the last + // reference to this type until we return from detach. + const auto anchor = shared_from_this(); + + detachImpl(); + } + + /** + * Schedules a callback to run on the baton. + * + * The function will be invoked in one of 3 ways: + * 1. With an opCtx inside run() (I.e. opCtx->waitForConditionOrInterrupt) + * 2. Without an opCtx inside detach() + * 3. Without an opCtx inside schedule() after detach() + * + * Note that the latter two cases are indistinguishable, so the callback should be safe to run + * inline if passed a nullptr. Examples of such work are logging, simple cleanup and + * rescheduling the task on another executor. + */ + virtual void schedule(unique_function<void(OperationContext*)> func) noexcept = 0; + + /** + * Returns a networking view of the baton, if this baton supports networking functionality + */ + virtual transport::NetworkingBaton* networking() noexcept { + return nullptr; + } + + /** + * Marks the baton to wake up on client socket disconnect + */ + virtual void markKillOnClientDisconnect() noexcept = 0; + +private: + virtual void detachImpl() noexcept = 0; +}; + +using BatonHandle = std::shared_ptr<Baton>; + +} // namespace mongo diff --git a/src/mongo/db/default_baton.cpp b/src/mongo/db/default_baton.cpp new file mode 100644 index 00000000000..9855957135a --- /dev/null +++ b/src/mongo/db/default_baton.cpp @@ -0,0 +1,154 @@ + +/** + * Copyright (C) 2019-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kDefault + +#include "mongo/platform/basic.h" + +#include "mongo/db/default_baton.h" + +#include "mongo/db/operation_context.h" +#include "mongo/util/scopeguard.h" + +namespace mongo { + +DefaultBaton::DefaultBaton(OperationContext* opCtx) : _opCtx(opCtx) {} + +DefaultBaton::~DefaultBaton() { + invariant(!_opCtx); + invariant(_scheduled.empty()); +} + +void DefaultBaton::markKillOnClientDisconnect() noexcept { + if (_opCtx->getClient() && _opCtx->getClient()->session()) { + _hasIngressSocket = true; + } +} + +void DefaultBaton::detachImpl() noexcept { + decltype(_scheduled) scheduled; + + { + stdx::lock_guard<stdx::mutex> lk(_mutex); + + { + stdx::lock_guard<Client> lk(*_opCtx->getClient()); + invariant(_opCtx->getBaton().get() == this); + _opCtx->setBaton(nullptr); + } + + _opCtx = nullptr; + _hasIngressSocket = false; + + using std::swap; + swap(_scheduled, scheduled); + } + + for (auto& job : scheduled) { + job(nullptr); + } +} + +void DefaultBaton::schedule(unique_function<void(OperationContext*)> func) noexcept { + stdx::unique_lock<stdx::mutex> lk(_mutex); + + if (!_opCtx) { + lk.unlock(); + func(nullptr); + + return; + } + + _scheduled.push_back(std::move(func)); + + if (_sleeping && !_notified) { + _notified = true; + _cv.notify_one(); + } +} + +void DefaultBaton::notify() noexcept { + stdx::lock_guard<stdx::mutex> lk(_mutex); + _notified = true; + _cv.notify_one(); +} + +Waitable::TimeoutState DefaultBaton::run_until(ClockSource* clkSource, + Date_t oldDeadline) noexcept { + stdx::unique_lock<stdx::mutex> lk(_mutex); + + // We'll fulfill promises and run jobs on the way out, ensuring we don't hold any locks + const auto guard = makeGuard([&] { + // While we have scheduled work, keep running jobs + while (_scheduled.size()) { + auto toRun = std::exchange(_scheduled, {}); + + lk.unlock(); + for (auto& job : toRun) { + job(_opCtx); + } + lk.lock(); + } + }); + + // If anything was scheduled, run it now. + if (_scheduled.size()) { + return Waitable::TimeoutState::NoTimeout; + } + + auto newDeadline = oldDeadline; + + // If we have an ingress socket, sleep no more than 1 second (so we poll for closure in the + // outside opCtx waitForConditionOrInterruptUntil implementation) + if (_hasIngressSocket) { + newDeadline = std::min(oldDeadline, clkSource->now() + Seconds(1)); + } + + // we mark sleeping, so that we receive notifications + _sleeping = true; + + // while we're not notified + auto notified = + clkSource->waitForConditionUntil(_cv, lk, newDeadline, [&] { return _notified; }); + + _sleeping = false; + _notified = false; + + // If we've been notified, or we haven't timed out yet, return as if by spurious wakeup. + // Otherwise call it a timeout. + return (notified || (clkSource->now() < oldDeadline)) ? Waitable::TimeoutState::NoTimeout + : Waitable::TimeoutState::Timeout; +} + +void DefaultBaton::run(ClockSource* clkSource) noexcept { + run_until(clkSource, Date_t::max()); +} + +} // namespace mongo diff --git a/src/mongo/db/default_baton.h b/src/mongo/db/default_baton.h new file mode 100644 index 00000000000..6311f397e95 --- /dev/null +++ b/src/mongo/db/default_baton.h @@ -0,0 +1,78 @@ + +/** + * Copyright (C) 2019-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <vector> + +#include "mongo/db/baton.h" +#include "mongo/stdx/condition_variable.h" +#include "mongo/stdx/mutex.h" +#include "mongo/util/functional.h" + +namespace mongo { + +class OperationContext; + +/** + * The most basic Baton implementation. + */ +class DefaultBaton : public Baton { +public: + explicit DefaultBaton(OperationContext* opCtx); + + ~DefaultBaton(); + + void markKillOnClientDisconnect() noexcept override; + + void schedule(unique_function<void(OperationContext*)> func) noexcept override; + + void notify() noexcept override; + + Waitable::TimeoutState run_until(ClockSource* clkSource, Date_t oldDeadline) noexcept override; + + void run(ClockSource* clkSource) noexcept override; + +private: + void detachImpl() noexcept override; + + stdx::mutex _mutex; + stdx::condition_variable _cv; + bool _notified = false; + bool _sleeping = false; + + OperationContext* _opCtx; + + bool _hasIngressSocket = false; + + std::vector<unique_function<void(OperationContext*)>> _scheduled; +}; + +} // namespace mongo diff --git a/src/mongo/db/operation_context.cpp b/src/mongo/db/operation_context.cpp index 11ca8b8f641..319b803288c 100644 --- a/src/mongo/db/operation_context.cpp +++ b/src/mongo/db/operation_context.cpp @@ -77,6 +77,8 @@ MONGO_FAIL_POINT_DEFINE(maxTimeNeverTimeOut); // inclusive. MONGO_FAIL_POINT_DEFINE(checkForInterruptFail); +const auto kNoWaiterThread = stdx::thread::id(); + } // namespace OperationContext::OperationContext(Client* client, unsigned int opId) @@ -261,6 +263,7 @@ StatusWith<stdx::cv_status> OperationContext::waitForConditionOrInterruptNoAsser stdx::lock_guard<Client> clientLock(*getClient()); invariant(!_waitMutex); invariant(!_waitCV); + invariant(_waitThread == kNoWaiterThread); invariant(0 == _numKillers); // This interrupt check must be done while holding the client lock, so as not to race with a @@ -271,6 +274,7 @@ StatusWith<stdx::cv_status> OperationContext::waitForConditionOrInterruptNoAsser } _waitMutex = m.mutex(); _waitCV = &cv; + _waitThread = stdx::this_thread::get_id(); } // If the maxTimeNeverTimeOut failpoint is set, behave as though the operation's deadline does @@ -302,6 +306,7 @@ StatusWith<stdx::cv_status> OperationContext::waitForConditionOrInterruptNoAsser if (0 == _numKillers) { _waitMutex = nullptr; _waitCV = nullptr; + _waitThread = kNoWaiterThread; return true; } return false; @@ -328,7 +333,20 @@ StatusWith<stdx::cv_status> OperationContext::waitForConditionOrInterruptNoAsser void OperationContext::markKilled(ErrorCodes::Error killCode) { invariant(killCode != ErrorCodes::OK); stdx::unique_lock<stdx::mutex> lkWaitMutex; - if (_waitMutex) { + + // If we have a _waitMutex, it means this opCtx is currently blocked in + // waitForConditionOrInterrupt. + // + // From there, we also know which thread is actually doing that waiting (it's recorded in + // _waitThread). If that thread isn't our thread, it's necessary to do the regular numKillers + // song and dance mentioned in waitForConditionOrInterrupt. + // + // If it is our thread, we know that we're currently inside that call to + // waitForConditionOrInterrupt and are being invoked by a callback run from Baton->run. And + // that means we don't need to deal with the waitMutex and waitCV (because we're running + // callbacks, which means run is returning, which means we'll be checking _killCode in the near + // future). + if (_waitMutex && stdx::this_thread::get_id() != _waitThread) { invariant(++_numKillers > 0); getClient()->unlock(); ON_BLOCK_EXIT([this] { diff --git a/src/mongo/db/operation_context.h b/src/mongo/db/operation_context.h index 389797c2e1c..f376a98e048 100644 --- a/src/mongo/db/operation_context.h +++ b/src/mongo/db/operation_context.h @@ -184,14 +184,14 @@ public: /** * Sets a transport Baton on the operation. This will trigger the Baton on markKilled. */ - void setBaton(const transport::BatonHandle& baton) { + void setBaton(const BatonHandle& baton) { _baton = baton; } /** * Retrieves the baton associated with the operation. */ - const transport::BatonHandle& getBaton() const { + const BatonHandle& getBaton() const { return _baton; } @@ -436,13 +436,17 @@ private: // A transport Baton associated with the operation. The presence of this object implies that a // client thread is doing it's own async networking by blocking on it's own thread. - transport::BatonHandle _baton; + BatonHandle _baton; // If non-null, _waitMutex and _waitCV are the (mutex, condition variable) pair that the // operation is currently waiting on inside a call to waitForConditionOrInterrupt...(). + // + // _waitThread is the calling thread's thread id. + // // All access guarded by the Client's lock. stdx::mutex* _waitMutex = nullptr; stdx::condition_variable* _waitCV = nullptr; + stdx::thread::id _waitThread; // If _waitMutex and _waitCV are non-null, this is the number of threads in a call to markKilled // actively attempting to kill the operation. If this value is non-zero, the operation is inside diff --git a/src/mongo/db/repl/abstract_oplog_fetcher_test.cpp b/src/mongo/db/repl/abstract_oplog_fetcher_test.cpp index 330f7c1e199..463c990052a 100644 --- a/src/mongo/db/repl/abstract_oplog_fetcher_test.cpp +++ b/src/mongo/db/repl/abstract_oplog_fetcher_test.cpp @@ -372,10 +372,9 @@ public: ShouldFailRequestFn shouldFailRequest) : unittest::TaskExecutorProxy(executor), _shouldFailRequest(shouldFailRequest) {} - StatusWith<CallbackHandle> scheduleRemoteCommand( - const executor::RemoteCommandRequest& request, - const RemoteCommandCallbackFn& cb, - const transport::BatonHandle& baton = nullptr) override { + StatusWith<CallbackHandle> scheduleRemoteCommand(const executor::RemoteCommandRequest& request, + const RemoteCommandCallbackFn& cb, + const BatonHandle& baton = nullptr) override { if (_shouldFailRequest(request)) { return Status(ErrorCodes::OperationFailed, "failed to schedule remote command"); } diff --git a/src/mongo/db/repl/collection_cloner_test.cpp b/src/mongo/db/repl/collection_cloner_test.cpp index 466bd80a4ce..a8ca6efb7c7 100644 --- a/src/mongo/db/repl/collection_cloner_test.cpp +++ b/src/mongo/db/repl/collection_cloner_test.cpp @@ -507,10 +507,9 @@ public: ShouldFailRequestFn shouldFailRequest) : unittest::TaskExecutorProxy(executor), _shouldFailRequest(shouldFailRequest) {} - StatusWith<CallbackHandle> scheduleRemoteCommand( - const executor::RemoteCommandRequest& request, - const RemoteCommandCallbackFn& cb, - const transport::BatonHandle& baton = nullptr) override { + StatusWith<CallbackHandle> scheduleRemoteCommand(const executor::RemoteCommandRequest& request, + const RemoteCommandCallbackFn& cb, + const BatonHandle& baton = nullptr) override { if (_shouldFailRequest(request)) { return Status(ErrorCodes::OperationFailed, "failed to schedule remote command"); } diff --git a/src/mongo/db/repl/database_cloner_test.cpp b/src/mongo/db/repl/database_cloner_test.cpp index af688a3b26f..07f7bc7bfff 100644 --- a/src/mongo/db/repl/database_cloner_test.cpp +++ b/src/mongo/db/repl/database_cloner_test.cpp @@ -233,10 +233,9 @@ public: ShouldFailRequestFn shouldFailRequest) : unittest::TaskExecutorProxy(executor), _shouldFailRequest(shouldFailRequest) {} - StatusWith<CallbackHandle> scheduleRemoteCommand( - const executor::RemoteCommandRequest& request, - const RemoteCommandCallbackFn& cb, - const transport::BatonHandle& baton = nullptr) override { + StatusWith<CallbackHandle> scheduleRemoteCommand(const executor::RemoteCommandRequest& request, + const RemoteCommandCallbackFn& cb, + const BatonHandle& baton = nullptr) override { if (_shouldFailRequest(request)) { return Status(ErrorCodes::OperationFailed, "failed to schedule remote command"); } diff --git a/src/mongo/db/repl/databases_cloner_test.cpp b/src/mongo/db/repl/databases_cloner_test.cpp index 7463f93ab7c..02cdcd1118d 100644 --- a/src/mongo/db/repl/databases_cloner_test.cpp +++ b/src/mongo/db/repl/databases_cloner_test.cpp @@ -772,10 +772,9 @@ public: ShouldFailRequestFn shouldFailRequest) : unittest::TaskExecutorProxy(executor), _shouldFailRequest(shouldFailRequest) {} - StatusWith<CallbackHandle> scheduleRemoteCommand( - const executor::RemoteCommandRequest& request, - const RemoteCommandCallbackFn& cb, - const transport::BatonHandle& baton = nullptr) override { + StatusWith<CallbackHandle> scheduleRemoteCommand(const executor::RemoteCommandRequest& request, + const RemoteCommandCallbackFn& cb, + const BatonHandle& baton = nullptr) override { if (_shouldFailRequest(request)) { return Status(ErrorCodes::OperationFailed, "failed to schedule remote command"); } diff --git a/src/mongo/db/repl/reporter_test.cpp b/src/mongo/db/repl/reporter_test.cpp index 7ce5768cf62..97e8e8d707a 100644 --- a/src/mongo/db/repl/reporter_test.cpp +++ b/src/mongo/db/repl/reporter_test.cpp @@ -604,7 +604,7 @@ TEST_F(ReporterTestNoTriggerAtSetUp, FailingToScheduleRemoteCommandTaskShouldMak virtual StatusWith<executor::TaskExecutor::CallbackHandle> scheduleRemoteCommand( const executor::RemoteCommandRequest& request, const RemoteCommandCallbackFn& cb, - const transport::BatonHandle& baton = nullptr) override { + const BatonHandle& baton = nullptr) override { // Any error status other than ShutdownInProgress will cause the reporter to fassert. return Status(ErrorCodes::ShutdownInProgress, "failed to send remote command - shutdown in progress"); diff --git a/src/mongo/db/repl/sync_source_resolver_test.cpp b/src/mongo/db/repl/sync_source_resolver_test.cpp index 7d00daff9e7..6084dc12041 100644 --- a/src/mongo/db/repl/sync_source_resolver_test.cpp +++ b/src/mongo/db/repl/sync_source_resolver_test.cpp @@ -59,10 +59,9 @@ public: ShouldFailRequestFn shouldFailRequest) : unittest::TaskExecutorProxy(executor), _shouldFailRequest(shouldFailRequest) {} - StatusWith<CallbackHandle> scheduleRemoteCommand( - const executor::RemoteCommandRequest& request, - const RemoteCommandCallbackFn& cb, - const transport::BatonHandle& baton = nullptr) override { + StatusWith<CallbackHandle> scheduleRemoteCommand(const executor::RemoteCommandRequest& request, + const RemoteCommandCallbackFn& cb, + const BatonHandle& baton = nullptr) override { if (_shouldFailRequest(request)) { return Status(ErrorCodes::OperationFailed, "failed to schedule remote command"); } diff --git a/src/mongo/db/repl/task_executor_mock.cpp b/src/mongo/db/repl/task_executor_mock.cpp index 727014a5cc2..6c994dc719d 100644 --- a/src/mongo/db/repl/task_executor_mock.cpp +++ b/src/mongo/db/repl/task_executor_mock.cpp @@ -61,7 +61,7 @@ StatusWith<executor::TaskExecutor::CallbackHandle> TaskExecutorMock::scheduleWor StatusWith<executor::TaskExecutor::CallbackHandle> TaskExecutorMock::scheduleRemoteCommand( const executor::RemoteCommandRequest& request, const RemoteCommandCallbackFn& cb, - const transport::BatonHandle& baton) { + const BatonHandle& baton) { if (shouldFailScheduleRemoteCommandRequest(request)) { return Status(ErrorCodes::OperationFailed, "failed to schedule remote command"); } diff --git a/src/mongo/db/repl/task_executor_mock.h b/src/mongo/db/repl/task_executor_mock.h index e5e92e0f0e1..ffceae96d23 100644 --- a/src/mongo/db/repl/task_executor_mock.h +++ b/src/mongo/db/repl/task_executor_mock.h @@ -49,10 +49,9 @@ public: StatusWith<CallbackHandle> scheduleWork(CallbackFn work) override; StatusWith<CallbackHandle> scheduleWorkAt(Date_t when, CallbackFn work) override; - StatusWith<CallbackHandle> scheduleRemoteCommand( - const executor::RemoteCommandRequest& request, - const RemoteCommandCallbackFn& cb, - const transport::BatonHandle& baton = nullptr) override; + StatusWith<CallbackHandle> scheduleRemoteCommand(const executor::RemoteCommandRequest& request, + const RemoteCommandCallbackFn& cb, + const BatonHandle& baton = nullptr) override; // Override to make scheduleWork() fail during testing. ShouldFailScheduleWorkRequestFn shouldFailScheduleWorkRequest = []() { return false; }; diff --git a/src/mongo/db/service_context.cpp b/src/mongo/db/service_context.cpp index f2f72ac6ed0..4b1683edab0 100644 --- a/src/mongo/db/service_context.cpp +++ b/src/mongo/db/service_context.cpp @@ -38,6 +38,7 @@ #include "mongo/bson/bsonobj.h" #include "mongo/db/client.h" #include "mongo/db/concurrency/locker_noop.h" +#include "mongo/db/default_baton.h" #include "mongo/db/op_observer.h" #include "mongo/db/operation_context.h" #include "mongo/db/storage/recovery_unit_noop.h" @@ -153,7 +154,6 @@ void onCreate(T* object, const ObserversContainer& observers) { onCreate(object, observers.cbegin(), observers.cend()); } - } // namespace ServiceContext::UniqueClient ServiceContext::makeClient(std::string desc, @@ -406,4 +406,16 @@ void ServiceContext::ServiceContextDeleter::operator()(ServiceContext* service) delete service; } +BatonHandle ServiceContext::makeBaton(OperationContext* opCtx) const { + auto baton = std::make_shared<DefaultBaton>(opCtx); + + { + stdx::lock_guard<Client> lk(*opCtx->getClient()); + invariant(!opCtx->getBaton()); + opCtx->setBaton(baton); + } + + return baton; +} + } // namespace mongo diff --git a/src/mongo/db/service_context.h b/src/mongo/db/service_context.h index 31ad873085a..548efe1cd88 100644 --- a/src/mongo/db/service_context.h +++ b/src/mongo/db/service_context.h @@ -511,6 +511,11 @@ public: */ void setServiceExecutor(std::unique_ptr<transport::ServiceExecutor> exec); + /** + * Creates a delayed execution baton with basic functionality + */ + BatonHandle makeBaton(OperationContext* opCtx) const; + private: class ClientObserverHolder { public: diff --git a/src/mongo/executor/network_interface.h b/src/mongo/executor/network_interface.h index 62012162bf6..34685392f1e 100644 --- a/src/mongo/executor/network_interface.h +++ b/src/mongo/executor/network_interface.h @@ -144,12 +144,11 @@ public: virtual Status startCommand(const TaskExecutor::CallbackHandle& cbHandle, RemoteCommandRequest& request, RemoteCommandCompletionFn&& onFinish, - const transport::BatonHandle& baton = nullptr) = 0; + const BatonHandle& baton = nullptr) = 0; - Future<TaskExecutor::ResponseStatus> startCommand( - const TaskExecutor::CallbackHandle& cbHandle, - RemoteCommandRequest& request, - const transport::BatonHandle& baton = nullptr) { + Future<TaskExecutor::ResponseStatus> startCommand(const TaskExecutor::CallbackHandle& cbHandle, + RemoteCommandRequest& request, + const BatonHandle& baton = nullptr) { auto pf = makePromiseFuture<TaskExecutor::ResponseStatus>(); auto status = startCommand( @@ -171,7 +170,7 @@ public: * completed. */ virtual void cancelCommand(const TaskExecutor::CallbackHandle& cbHandle, - const transport::BatonHandle& baton = nullptr) = 0; + const BatonHandle& baton = nullptr) = 0; /** * Sets an alarm, which schedules "action" to run no sooner than "when". @@ -187,9 +186,7 @@ public: * Any callbacks invoked from setAlarm must observe onNetworkThread to * return true. See that method for why. */ - virtual Status setAlarm(Date_t when, - unique_function<void()> action, - const transport::BatonHandle& baton = nullptr) = 0; + virtual Status setAlarm(Date_t when, unique_function<void()> action) = 0; /** * Returns true if called from a thread dedicated to networking. I.e. not a diff --git a/src/mongo/executor/network_interface_mock.cpp b/src/mongo/executor/network_interface_mock.cpp index f700f6c211e..641f1095530 100644 --- a/src/mongo/executor/network_interface_mock.cpp +++ b/src/mongo/executor/network_interface_mock.cpp @@ -108,7 +108,7 @@ std::string NetworkInterfaceMock::getHostName() { Status NetworkInterfaceMock::startCommand(const CallbackHandle& cbHandle, RemoteCommandRequest& request, RemoteCommandCompletionFn&& onFinish, - const transport::BatonHandle& baton) { + const BatonHandle& baton) { if (inShutdown()) { return {ErrorCodes::ShutdownInProgress, "NetworkInterfaceMock shutdown in progress"}; } @@ -140,8 +140,7 @@ void NetworkInterfaceMock::setHandshakeReplyForHost( } } -void NetworkInterfaceMock::cancelCommand(const CallbackHandle& cbHandle, - const transport::BatonHandle& baton) { +void NetworkInterfaceMock::cancelCommand(const CallbackHandle& cbHandle, const BatonHandle& baton) { invariant(!inShutdown()); stdx::lock_guard<stdx::mutex> lk(_mutex); @@ -171,9 +170,7 @@ void NetworkInterfaceMock::_interruptWithResponse_inlock( } } -Status NetworkInterfaceMock::setAlarm(const Date_t when, - unique_function<void()> action, - const transport::BatonHandle& baton) { +Status NetworkInterfaceMock::setAlarm(const Date_t when, unique_function<void()> action) { if (inShutdown()) { return {ErrorCodes::ShutdownInProgress, "NetworkInterfaceMock shutdown in progress"}; } diff --git a/src/mongo/executor/network_interface_mock.h b/src/mongo/executor/network_interface_mock.h index 937bed41525..c0f1693a660 100644 --- a/src/mongo/executor/network_interface_mock.h +++ b/src/mongo/executor/network_interface_mock.h @@ -114,7 +114,7 @@ public: Status startCommand(const TaskExecutor::CallbackHandle& cbHandle, RemoteCommandRequest& request, RemoteCommandCompletionFn&& onFinish, - const transport::BatonHandle& baton = nullptr) override; + const BatonHandle& baton = nullptr) override; /** * If the network operation is in the _unscheduled or _processing queues, moves the operation @@ -123,14 +123,12 @@ public: * called after the task has already completed, but its callback has not yet been run. */ void cancelCommand(const TaskExecutor::CallbackHandle& cbHandle, - const transport::BatonHandle& baton = nullptr) override; + const BatonHandle& baton = nullptr) override; /** * Not implemented. */ - Status setAlarm(Date_t when, - unique_function<void()> action, - const transport::BatonHandle& baton = nullptr) override; + Status setAlarm(Date_t when, unique_function<void()> action) override; bool onNetworkThread() override; diff --git a/src/mongo/executor/network_interface_tl.cpp b/src/mongo/executor/network_interface_tl.cpp index 512b3fee58b..5ca24991e8c 100644 --- a/src/mongo/executor/network_interface_tl.cpp +++ b/src/mongo/executor/network_interface_tl.cpp @@ -173,7 +173,7 @@ Date_t NetworkInterfaceTL::now() { Status NetworkInterfaceTL::startCommand(const TaskExecutor::CallbackHandle& cbHandle, RemoteCommandRequest& request, RemoteCommandCompletionFn&& onFinish, - const transport::BatonHandle& baton) { + const BatonHandle& baton) { if (inShutdown()) { return {ErrorCodes::ShutdownInProgress, "NetworkInterface shutdown in progress"}; } @@ -280,14 +280,19 @@ Status NetworkInterfaceTL::startCommand(const TaskExecutor::CallbackHandle& cbHa if (baton) { // If we have a baton, we want to get back to the baton thread immediately after we get a // connection - std::move(connFuture).getAsync([ - baton, - rw = std::move(remainingWork) - ](StatusWith<std::shared_ptr<CommandState::ConnHandle>> swConn) mutable { - baton->schedule([ rw = std::move(rw), swConn = std::move(swConn) ]() mutable { - std::move(rw)(std::move(swConn)); + std::move(connFuture) + .getAsync([ baton, reactor = _reactor.get(), rw = std::move(remainingWork) ]( + StatusWith<std::shared_ptr<CommandState::ConnHandle>> swConn) mutable { + baton->schedule([ rw = std::move(rw), + swConn = std::move(swConn) ](OperationContext * opCtx) mutable { + if (opCtx) { + std::move(rw)(std::move(swConn)); + } else { + std::move(rw)(Status(ErrorCodes::ShutdownInProgress, + "baton is detached, failing operation")); + } + }); }); - }); } else { // otherwise we're happy to run inline std::move(connFuture) @@ -306,7 +311,7 @@ Future<RemoteCommandResponse> NetworkInterfaceTL::_onAcquireConn( std::shared_ptr<CommandState> state, Future<RemoteCommandResponse> future, CommandState::ConnHandle conn, - const transport::BatonHandle& baton) { + const BatonHandle& baton) { if (MONGO_FAIL_POINT(networkInterfaceDiscardCommandsAfterAcquireConn)) { conn->indicateSuccess(); return future; @@ -416,7 +421,7 @@ void NetworkInterfaceTL::_eraseInUseConn(const TaskExecutor::CallbackHandle& cbH } void NetworkInterfaceTL::cancelCommand(const TaskExecutor::CallbackHandle& cbHandle, - const transport::BatonHandle& baton) { + const BatonHandle& baton) { stdx::unique_lock<stdx::mutex> lk(_inProgressMutex); auto it = _inProgress.find(cbHandle); if (it == _inProgress.end()) { @@ -445,19 +450,13 @@ void NetworkInterfaceTL::cancelCommand(const TaskExecutor::CallbackHandle& cbHan } } -Status NetworkInterfaceTL::setAlarm(Date_t when, - unique_function<void()> action, - const transport::BatonHandle& baton) { +Status NetworkInterfaceTL::setAlarm(Date_t when, unique_function<void()> action) { if (inShutdown()) { return {ErrorCodes::ShutdownInProgress, "NetworkInterface shutdown in progress"}; } if (when <= now()) { - if (baton) { - baton->schedule(std::move(action)); - } else { - _reactor->schedule(transport::Reactor::kPost, std::move(action)); - } + _reactor->schedule(transport::Reactor::kPost, std::move(action)); return Status::OK(); } @@ -469,39 +468,34 @@ Status NetworkInterfaceTL::setAlarm(Date_t when, _inProgressAlarms.insert(alarmTimer); } - alarmTimer->waitUntil(when, baton) - .getAsync( - [ this, weakTimer, action = std::move(action), when, baton ](Status status) mutable { - auto alarmTimer = weakTimer.lock(); - if (!alarmTimer) { - return; - } else { - stdx::lock_guard<stdx::mutex> lk(_inProgressMutex); - _inProgressAlarms.erase(alarmTimer); - } - - auto nowVal = now(); - if (nowVal < when) { - warning() << "Alarm returned early. Expected at: " << when - << ", fired at: " << nowVal; - const auto status = setAlarm(when, std::move(action), baton); - if ((!status.isOK()) && (status != ErrorCodes::ShutdownInProgress)) { - fassertFailedWithStatus(50785, status); - } + alarmTimer->waitUntil(when, nullptr) + .getAsync([ this, weakTimer, action = std::move(action), when ](Status status) mutable { + auto alarmTimer = weakTimer.lock(); + if (!alarmTimer) { + return; + } else { + stdx::lock_guard<stdx::mutex> lk(_inProgressMutex); + _inProgressAlarms.erase(alarmTimer); + } - return; + auto nowVal = now(); + if (nowVal < when) { + warning() << "Alarm returned early. Expected at: " << when + << ", fired at: " << nowVal; + const auto status = setAlarm(when, std::move(action)); + if ((!status.isOK()) && (status != ErrorCodes::ShutdownInProgress)) { + fassertFailedWithStatus(50785, status); } - if (status.isOK()) { - if (baton) { - baton->schedule(std::move(action)); - } else { - _reactor->schedule(transport::Reactor::kPost, std::move(action)); - } - } else if (status != ErrorCodes::CallbackCanceled) { - warning() << "setAlarm() received an error: " << status; - } - }); + return; + } + + if (status.isOK()) { + _reactor->schedule(transport::Reactor::kPost, std::move(action)); + } else if (status != ErrorCodes::CallbackCanceled) { + warning() << "setAlarm() received an error: " << status; + } + }); return Status::OK(); } diff --git a/src/mongo/executor/network_interface_tl.h b/src/mongo/executor/network_interface_tl.h index 0b0c99cd03e..5cd5dd6115b 100644 --- a/src/mongo/executor/network_interface_tl.h +++ b/src/mongo/executor/network_interface_tl.h @@ -68,13 +68,11 @@ public: Status startCommand(const TaskExecutor::CallbackHandle& cbHandle, RemoteCommandRequest& request, RemoteCommandCompletionFn&& onFinish, - const transport::BatonHandle& baton) override; + const BatonHandle& baton) override; void cancelCommand(const TaskExecutor::CallbackHandle& cbHandle, - const transport::BatonHandle& baton) override; - Status setAlarm(Date_t when, - unique_function<void()> action, - const transport::BatonHandle& baton) override; + const BatonHandle& baton) override; + Status setAlarm(Date_t when, unique_function<void()> action) override; bool onNetworkThread() override; @@ -117,7 +115,7 @@ private: Future<RemoteCommandResponse> _onAcquireConn(std::shared_ptr<CommandState> state, Future<RemoteCommandResponse> future, CommandState::ConnHandle conn, - const transport::BatonHandle& baton); + const BatonHandle& baton); std::string _instanceName; ServiceContext* _svcCtx; diff --git a/src/mongo/executor/task_executor.h b/src/mongo/executor/task_executor.h index 1e3051e17b4..37c2669f306 100644 --- a/src/mongo/executor/task_executor.h +++ b/src/mongo/executor/task_executor.h @@ -239,7 +239,7 @@ public: virtual StatusWith<CallbackHandle> scheduleRemoteCommand( const RemoteCommandRequest& request, const RemoteCommandCallbackFn& cb, - const transport::BatonHandle& baton = nullptr) = 0; + const BatonHandle& baton = nullptr) = 0; /** * If the callback referenced by "cbHandle" hasn't already executed, marks it as diff --git a/src/mongo/executor/thread_pool_task_executor.cpp b/src/mongo/executor/thread_pool_task_executor.cpp index 1af20aa8692..4800de47db1 100644 --- a/src/mongo/executor/thread_pool_task_executor.cpp +++ b/src/mongo/executor/thread_pool_task_executor.cpp @@ -67,14 +67,14 @@ class ThreadPoolTaskExecutor::CallbackState : public TaskExecutor::CallbackState public: static std::shared_ptr<CallbackState> make(CallbackFn&& cb, Date_t readyDate, - const transport::BatonHandle& baton) { + const BatonHandle& baton) { return std::make_shared<CallbackState>(std::move(cb), readyDate, baton); } /** * Do not call directly. Use make. */ - CallbackState(CallbackFn&& cb, Date_t theReadyDate, const transport::BatonHandle& baton) + CallbackState(CallbackFn&& cb, Date_t theReadyDate, const BatonHandle& baton) : callback(std::move(cb)), readyDate(theReadyDate), baton(baton) {} virtual ~CallbackState() = default; @@ -102,7 +102,7 @@ public: bool isNetworkOperation = false; AtomicWord<bool> isFinished{false}; boost::optional<stdx::condition_variable> finishedCondition; - transport::BatonHandle baton; + BatonHandle baton; }; class ThreadPoolTaskExecutor::EventState : public TaskExecutor::EventState { @@ -350,21 +350,23 @@ StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::scheduleWorkAt( return cbHandle; } lk.unlock(); - _net->setAlarm(when, - [this, cbHandle] { - auto cbState = - checked_cast<CallbackState*>(getCallbackFromHandle(cbHandle.getValue())); - if (cbState->canceled.load()) { - return; - } - stdx::unique_lock<stdx::mutex> lk(_mutex); - if (cbState->canceled.load()) { - return; - } - scheduleIntoPool_inlock(&_sleepersQueue, cbState->iter, std::move(lk)); - }, - nullptr) - .transitional_ignore(); + + auto status = _net->setAlarm(when, [this, cbHandle] { + auto cbState = checked_cast<CallbackState*>(getCallbackFromHandle(cbHandle.getValue())); + if (cbState->canceled.load()) { + return; + } + stdx::unique_lock<stdx::mutex> lk(_mutex); + if (cbState->canceled.load()) { + return; + } + scheduleIntoPool_inlock(&_sleepersQueue, cbState->iter, std::move(lk)); + }); + + if (!status.isOK()) { + cancel(cbHandle.getValue()); + return status; + } return cbHandle; } @@ -406,7 +408,7 @@ const auto initialSyncPauseCmds = StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::scheduleRemoteCommand( const RemoteCommandRequest& request, const RemoteCommandCallbackFn& cb, - const transport::BatonHandle& baton) { + const BatonHandle& baton) { if (MONGO_FAIL_POINT(initialSyncFuzzerSynchronizationPoint1)) { // We are only going to pause on these failpoints if the command issued is for the @@ -537,7 +539,7 @@ StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::enqueueCallback } ThreadPoolTaskExecutor::WorkQueue ThreadPoolTaskExecutor::makeSingletonWorkQueue( - CallbackFn work, const transport::BatonHandle& baton, Date_t when) { + CallbackFn work, const BatonHandle& baton, Date_t when) { WorkQueue result; result.emplace_front(CallbackState::make(std::move(work), when, baton)); result.front()->iter = result.begin(); @@ -592,7 +594,17 @@ void ThreadPoolTaskExecutor::scheduleIntoPool_inlock(WorkQueue* fromQueue, for (const auto& cbState : todo) { if (cbState->baton) { - cbState->baton->schedule([this, cbState] { runCallback(std::move(cbState)); }); + cbState->baton->schedule([this, cbState](OperationContext* opCtx) { + if (opCtx) { + runCallback(std::move(cbState)); + return; + } + + cbState->canceled.store(1); + const auto status = + _pool->schedule([this, cbState] { runCallback(std::move(cbState)); }); + invariant(status.isOK() || status == ErrorCodes::ShutdownInProgress); + }); } else { const auto status = _pool->schedule([this, cbState] { runCallback(std::move(cbState)); }); diff --git a/src/mongo/executor/thread_pool_task_executor.h b/src/mongo/executor/thread_pool_task_executor.h index 1558eaf7f01..1832a021d2c 100644 --- a/src/mongo/executor/thread_pool_task_executor.h +++ b/src/mongo/executor/thread_pool_task_executor.h @@ -74,7 +74,7 @@ public: void startup() override; void shutdown() override; void join() override; - void appendDiagnosticBSON(BSONObjBuilder* b) const; + void appendDiagnosticBSON(BSONObjBuilder* b) const override; Date_t now() override; StatusWith<EventHandle> makeEvent() override; void signalEvent(const EventHandle& event) override; @@ -85,10 +85,9 @@ public: void waitForEvent(const EventHandle& event) override; StatusWith<CallbackHandle> scheduleWork(CallbackFn work) override; StatusWith<CallbackHandle> scheduleWorkAt(Date_t when, CallbackFn work) override; - StatusWith<CallbackHandle> scheduleRemoteCommand( - const RemoteCommandRequest& request, - const RemoteCommandCallbackFn& cb, - const transport::BatonHandle& baton = nullptr) override; + StatusWith<CallbackHandle> scheduleRemoteCommand(const RemoteCommandRequest& request, + const RemoteCommandCallbackFn& cb, + const BatonHandle& baton = nullptr) override; void cancel(const CallbackHandle& cbHandle) override; void wait(const CallbackHandle& cbHandle, Interruptible* interruptible = Interruptible::notInterruptible()) override; @@ -138,7 +137,7 @@ private: * called outside of _mutex. */ static WorkQueue makeSingletonWorkQueue(CallbackFn work, - const transport::BatonHandle& baton, + const BatonHandle& baton, Date_t when = {}); /** diff --git a/src/mongo/s/async_requests_sender.h b/src/mongo/s/async_requests_sender.h index 4646d371db8..a9bdcc43cb0 100644 --- a/src/mongo/s/async_requests_sender.h +++ b/src/mongo/s/async_requests_sender.h @@ -227,15 +227,15 @@ private: explicit BatonDetacher(OperationContext* opCtx); ~BatonDetacher(); - transport::Baton& operator*() const { + Baton& operator*() const { return *_baton; } - transport::Baton* operator->() const noexcept { + Baton* operator->() const noexcept { return _baton.get(); } - operator transport::BatonHandle() const { + operator BatonHandle() const { return _baton; } @@ -244,7 +244,7 @@ private: } private: - transport::BatonHandle _baton; + BatonHandle _baton; }; /** diff --git a/src/mongo/s/sharding_task_executor.cpp b/src/mongo/s/sharding_task_executor.cpp index d11de192011..8189b308c33 100644 --- a/src/mongo/s/sharding_task_executor.cpp +++ b/src/mongo/s/sharding_task_executor.cpp @@ -115,7 +115,7 @@ StatusWith<TaskExecutor::CallbackHandle> ShardingTaskExecutor::scheduleWorkAt(Da StatusWith<TaskExecutor::CallbackHandle> ShardingTaskExecutor::scheduleRemoteCommand( const RemoteCommandRequest& request, const RemoteCommandCallbackFn& cb, - const transport::BatonHandle& baton) { + const BatonHandle& baton) { // schedule the user's callback if there is not opCtx if (!request.opCtx) { diff --git a/src/mongo/s/sharding_task_executor.h b/src/mongo/s/sharding_task_executor.h index 3a0df5fb57e..4f43a70b5a0 100644 --- a/src/mongo/s/sharding_task_executor.h +++ b/src/mongo/s/sharding_task_executor.h @@ -69,10 +69,9 @@ public: Date_t deadline) override; StatusWith<CallbackHandle> scheduleWork(CallbackFn work) override; StatusWith<CallbackHandle> scheduleWorkAt(Date_t when, CallbackFn work) override; - StatusWith<CallbackHandle> scheduleRemoteCommand( - const RemoteCommandRequest& request, - const RemoteCommandCallbackFn& cb, - const transport::BatonHandle& baton = nullptr) override; + StatusWith<CallbackHandle> scheduleRemoteCommand(const RemoteCommandRequest& request, + const RemoteCommandCallbackFn& cb, + const BatonHandle& baton = nullptr) override; void cancel(const CallbackHandle& cbHandle) override; void wait(const CallbackHandle& cbHandle, Interruptible* interruptible = Interruptible::notInterruptible()) override; diff --git a/src/mongo/transport/SConscript b/src/mongo/transport/SConscript index 43032ccb1b7..12b2cf44899 100644 --- a/src/mongo/transport/SConscript +++ b/src/mongo/transport/SConscript @@ -14,6 +14,9 @@ env.Library( LIBDEPS=[ '$BUILD_DIR/mongo/base', ], + LIBDEPS_PRIVATE=[ + '$BUILD_DIR/mongo/db/service_context', + ], ) env.Library( diff --git a/src/mongo/transport/baton.h b/src/mongo/transport/baton.h index 9dbf570ede2..de981ef6284 100644 --- a/src/mongo/transport/baton.h +++ b/src/mongo/transport/baton.h @@ -1,6 +1,6 @@ /** - * Copyright (C) 2018-present MongoDB, Inc. + * Copyright (C) 2019-present MongoDB, Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the Server Side Public License, version 1, @@ -32,9 +32,11 @@ #include <memory> +#include "mongo/db/baton.h" #include "mongo/transport/transport_layer.h" #include "mongo/util/functional.h" #include "mongo/util/future.h" +#include "mongo/util/out_of_line_executor.h" #include "mongo/util/time_support.h" #include "mongo/util/waitable.h" @@ -49,44 +51,17 @@ class Session; class ReactorTimer; /** - * A Baton is basically a networking reactor, with limited functionality and no forward progress - * guarantees. Rather than asynchronously running tasks through one, the baton records the intent - * of those tasks and defers waiting and execution to a later call to run(); + * A NetworkingBaton is basically a networking reactor, with limited functionality and parallel + * forward progress guarantees. Rather than asynchronously running tasks through one, the baton + * records the intent of those tasks and defers waiting and execution to a later call to run(); * - * Baton's provide a mechanism to allow consumers of a transport layer to execute IO themselves, - * rather than having this occur on another thread. This can improve performance by minimizing - * context switches, as well as improving the readability of stack traces by grounding async - * execution on top of a regular client call stack. + * NetworkingBaton's provide a mechanism to allow consumers of a transport layer to execute IO + * themselves, rather than having this occur on another thread. This can improve performance by + * minimizing context switches, as well as improving the readability of stack traces by grounding + * async execution on top of a regular client call stack. */ -class Baton : public Waitable { +class NetworkingBaton : public Baton { public: - virtual ~Baton() = default; - - /** - * Detaches a baton from an associated opCtx. - */ - virtual void detach() = 0; - - /** - * Executes a callback on the baton via schedule. Returns a future which will execute on the - * baton runner. - */ - template <typename Callback> - Future<FutureContinuationResult<Callback>> execute(Callback&& cb) { - auto pf = makePromiseFuture<FutureContinuationResult<Callback>>(); - - schedule([ cb = std::forward<Callback>(cb), p = std::move(pf.promise) ]() mutable { - p.setWith(std::move(cb)); - }); - - return std::move(pf.future); - } - - /** - * Executes a callback on the baton. - */ - virtual void schedule(unique_function<void()> func) = 0; - /** * Adds a session, returning a future which activates on read/write-ability of the session. */ @@ -94,29 +69,31 @@ public: In, Out, }; - virtual Future<void> addSession(Session& session, Type type) = 0; + virtual Future<void> addSession(Session& session, Type type) noexcept = 0; /** * Adds a timer, returning a future which activates after a deadline. */ - virtual Future<void> waitUntil(const ReactorTimer& timer, Date_t expiration) = 0; + virtual Future<void> waitUntil(const ReactorTimer& timer, Date_t expiration) noexcept = 0; /** * Cancels waiting on a session. * * Returns true if the session was in the baton to be cancelled. */ - virtual bool cancelSession(Session& session) = 0; + virtual bool cancelSession(Session& session) noexcept = 0; /** * Cancels waiting on a timer * * Returns true if the timer was in the baton to be cancelled. */ - virtual bool cancelTimer(const ReactorTimer& timer) = 0; -}; + virtual bool cancelTimer(const ReactorTimer& timer) noexcept = 0; -using BatonHandle = std::shared_ptr<Baton>; + NetworkingBaton* networking() noexcept final { + return this; + } +}; } // namespace transport } // namespace mongo diff --git a/src/mongo/transport/baton_asio_linux.h b/src/mongo/transport/baton_asio_linux.h index 63bfe67da3f..55570768ba7 100644 --- a/src/mongo/transport/baton_asio_linux.h +++ b/src/mongo/transport/baton_asio_linux.h @@ -30,8 +30,8 @@ #pragma once +#include <map> #include <memory> -#include <set> #include <vector> #include <poll.h> @@ -55,7 +55,7 @@ namespace transport { * * We implement our networking reactor on top of poll + eventfd for wakeups */ -class TransportLayerASIO::BatonASIO : public Baton { +class TransportLayerASIO::BatonASIO : public NetworkingBaton { /** * We use this internal reactor timer to exit run_until calls (by forcing an early timeout for * ::poll). @@ -117,6 +117,8 @@ class TransportLayerASIO::BatonASIO : public Baton { } const int fd; + + static const Client::Decoration<EventFDHolder> getForClient; }; public: @@ -129,75 +131,69 @@ public: invariant(_timers.empty()); } - void detach() override { - { - stdx::lock_guard<stdx::mutex> lk(_mutex); - invariant(_sessions.empty()); - invariant(_scheduled.empty()); - invariant(_timers.empty()); - } + void markKillOnClientDisconnect() noexcept override { + if (_opCtx->getClient() && _opCtx->getClient()->session()) { + addSessionImpl(*(_opCtx->getClient()->session()), POLLRDHUP).getAsync([this](Status s) { + if (!s.isOK()) { + return; + } - { - stdx::lock_guard<Client> lk(*_opCtx->getClient()); - invariant(_opCtx->getBaton().get() == this); - _opCtx->setBaton(nullptr); + _opCtx->markKilled(ErrorCodes::ClientDisconnect); + }); } + } - _opCtx = nullptr; + Future<void> addSession(Session& session, Type type) noexcept override { + return addSessionImpl(session, type == Type::In ? POLLIN : POLLOUT); } - Future<void> addSession(Session& session, Type type) override { - auto fd = checked_cast<ASIOSession&>(session).getSocket().native_handle(); + Future<void> waitUntil(const ReactorTimer& timer, Date_t expiration) noexcept override { auto pf = makePromiseFuture<void>(); + auto id = timer.id(); - _safeExecute([ fd, type, promise = std::move(pf.promise), this ]() mutable { - _sessions[fd] = TransportSession{type, std::move(promise)}; - }); + stdx::unique_lock<stdx::mutex> lk(_mutex); - return std::move(pf.future); - } + if (!_opCtx) { + return Status(ErrorCodes::ShutdownInProgress, + "baton is detached, cannot waitUntil on timer"); + } - Future<void> waitUntil(const ReactorTimer& timer, Date_t expiration) override { - auto pf = makePromiseFuture<void>(); - _safeExecute( - [ timerPtr = &timer, expiration, promise = std::move(pf.promise), this ]() mutable { - auto pair = _timers.insert({ - timerPtr, expiration, std::move(promise), - }); - invariant(pair.second); - _timersById[pair.first->id] = pair.first; - }); + _safeExecute(std::move(lk), + [ id, expiration, promise = std::move(pf.promise), this ]() mutable { + auto iter = _timers.emplace(std::piecewise_construct, + std::forward_as_tuple(expiration), + std::forward_as_tuple(id, std::move(promise))); + _timersById[id] = iter; + }); return std::move(pf.future); } - bool cancelSession(Session& session) override { - const auto fd = checked_cast<ASIOSession&>(session).getSocket().native_handle(); + bool cancelSession(Session& session) noexcept override { + const auto id = session.id(); stdx::unique_lock<stdx::mutex> lk(_mutex); - if (_sessions.find(fd) == _sessions.end()) { + if (_sessions.find(id) == _sessions.end()) { return false; } - // TODO: There's an ABA issue here with fds where between previously and before we could - // have removed the fd, then opened and added a new socket with the same fd. We need to - // solve it via using session id's for handles. - _safeExecute(std::move(lk), [fd, this] { _sessions.erase(fd); }); + _safeExecute(std::move(lk), [id, this] { _sessions.erase(id); }); return true; } - bool cancelTimer(const ReactorTimer& timer) override { + bool cancelTimer(const ReactorTimer& timer) noexcept override { + const auto id = timer.id(); + stdx::unique_lock<stdx::mutex> lk(_mutex); - if (_timersById.find(&timer) == _timersById.end()) { + if (_timersById.find(id) == _timersById.end()) { return false; } - // TODO: Same ABA issue as above, but for pointers. - _safeExecute(std::move(lk), [ timerPtr = &timer, this ] { - auto iter = _timersById.find(timerPtr); + _safeExecute(std::move(lk), [id, this] { + auto iter = _timersById.find(id); if (iter != _timersById.end()) { _timers.erase(iter->second); @@ -208,18 +204,24 @@ public: return true; } - void schedule(unique_function<void()> func) override { + void schedule(unique_function<void(OperationContext*)> func) noexcept override { stdx::lock_guard<stdx::mutex> lk(_mutex); + if (!_opCtx) { + func(nullptr); + + return; + } + _scheduled.push_back(std::move(func)); if (_inPoll) { - _efd.notify(); + efd().notify(); } } void notify() noexcept override { - _efd.notify(); + efd().notify(); } /** @@ -263,7 +265,7 @@ public: lk.unlock(); for (auto& job : toRun) { - job(); + job(_opCtx); } lk.lock(); } @@ -280,21 +282,19 @@ public: // If we have a timer, poll no longer than that if (_timers.size()) { - deadline = _timers.begin()->expiration; + deadline = _timers.begin()->first; } std::vector<decltype(_sessions)::iterator> sessions; sessions.reserve(_sessions.size()); std::vector<pollfd> pollSet; + pollSet.reserve(_sessions.size() + 1); - pollSet.push_back(pollfd{_efd.fd, POLLIN, 0}); + pollSet.push_back(pollfd{efd().fd, POLLIN, 0}); for (auto iter = _sessions.begin(); iter != _sessions.end(); ++iter) { - pollSet.push_back( - pollfd{iter->first, - static_cast<short>(iter->second.type == Type::In ? POLLIN : POLLOUT), - 0}); + pollSet.push_back(pollfd{iter->second.fd, iter->second.type, 0}); sessions.push_back(iter); } @@ -330,9 +330,9 @@ public: now = clkSource->now(); // Fire expired timers - for (auto iter = _timers.begin(); iter != _timers.end() && iter->expiration < now;) { - toFulfill.push_back(std::move(iter->promise)); - _timersById.erase(iter->id); + for (auto iter = _timers.begin(); iter != _timers.end() && iter->first < now;) { + toFulfill.push_back(std::move(iter->second.promise)); + _timersById.erase(iter->second.id); iter = _timers.erase(iter); } @@ -343,12 +343,13 @@ public: auto pollIter = pollSet.begin(); if (pollIter->revents) { - _efd.wait(); + efd().wait(); remaining--; } ++pollIter; + for (auto sessionIter = sessions.begin(); sessionIter != sessions.end() && remaining; ++sessionIter, ++pollIter) { if (pollIter->revents) { @@ -366,28 +367,75 @@ public: } private: - struct Timer { - const ReactorTimer* id; - Date_t expiration; - mutable Promise<void> promise; // Needs to be mutable to move from it while in std::set. + Future<void> addSessionImpl(Session& session, short type) noexcept { + auto fd = checked_cast<ASIOSession&>(session).getSocket().native_handle(); + auto id = session.id(); + auto pf = makePromiseFuture<void>(); + + stdx::unique_lock<stdx::mutex> lk(_mutex); + + if (!_opCtx) { + return Status(ErrorCodes::ShutdownInProgress, "baton is detached, cannot addSession"); + } - struct LessThan { - bool operator()(const Timer& lhs, const Timer& rhs) const { - return std::tie(lhs.expiration, lhs.id) < std::tie(rhs.expiration, rhs.id); + _safeExecute(std::move(lk), + [ id, fd, type, promise = std::move(pf.promise), this ]() mutable { + _sessions[id] = TransportSession{fd, type, std::move(promise)}; + }); + + return std::move(pf.future); + } + + void detachImpl() noexcept override { + decltype(_sessions) sessions; + decltype(_scheduled) scheduled; + decltype(_timers) timers; + + { + stdx::lock_guard<stdx::mutex> lk(_mutex); + + { + stdx::lock_guard<Client> lk(*_opCtx->getClient()); + invariant(_opCtx->getBaton().get() == this); + _opCtx->setBaton(nullptr); } - }; + + _opCtx = nullptr; + + using std::swap; + swap(_sessions, sessions); + swap(_scheduled, scheduled); + swap(_timers, timers); + } + + for (auto& job : scheduled) { + job(nullptr); + } + + for (auto& session : sessions) { + session.second.promise.setError(Status(ErrorCodes::ShutdownInProgress, + "baton is detached, cannot wait for socket")); + } + + for (auto& pair : timers) { + pair.second.promise.setError(Status(ErrorCodes::ShutdownInProgress, + "baton is detached, completing timer early")); + } + } + + struct Timer { + Timer(size_t id, Promise<void> promise) : id(id), promise(std::move(promise)) {} + + size_t id; + Promise<void> promise; // Needs to be mutable to move from it while in std::set. }; struct TransportSession { - Type type; + int fd; + short type; Promise<void> promise; }; - template <typename Callback> - void _safeExecute(Callback&& cb) { - return _safeExecute(stdx::unique_lock<stdx::mutex>(_mutex), std::forward<Callback>(cb)); - } - /** * Safely executes method on the reactor. If we're in poll, we schedule a task, then write to * the eventfd. If not, we run inline. @@ -395,37 +443,44 @@ private: template <typename Callback> void _safeExecute(stdx::unique_lock<stdx::mutex> lk, Callback&& cb) { if (_inPoll) { - _scheduled.push_back([ cb = std::forward<Callback>(cb), this ]() mutable { - stdx::lock_guard<stdx::mutex> lk(_mutex); - cb(); - }); + _scheduled.push_back( + [ cb = std::forward<Callback>(cb), this ](OperationContext*) mutable { + stdx::lock_guard<stdx::mutex> lk(_mutex); + cb(); + }); - _efd.notify(); + efd().notify(); } else { cb(); } } + EventFDHolder& efd() { + return EventFDHolder::getForClient(_opCtx->getClient()); + } + stdx::mutex _mutex; OperationContext* _opCtx; bool _inPoll = false; - EventFDHolder _efd; - // This map stores the sessions we need to poll on. We unwind it into a pollset for every // blocking call to run - stdx::unordered_map<int, TransportSession> _sessions; + stdx::unordered_map<SessionId, TransportSession> _sessions; // The set is used to find the next timer which will fire. The unordered_map looks up the // timers so we can remove them in O(1) - std::set<Timer, Timer::LessThan> _timers; - stdx::unordered_map<const ReactorTimer*, decltype(_timers)::const_iterator> _timersById; + std::multimap<Date_t, Timer> _timers; + stdx::unordered_map<size_t, decltype(_timers)::const_iterator> _timersById; // For tasks that come in via schedule. Or that were deferred because we were in poll - std::vector<unique_function<void()>> _scheduled; + std::vector<unique_function<void(OperationContext*)>> _scheduled; }; +const Client::Decoration<TransportLayerASIO::BatonASIO::EventFDHolder> + TransportLayerASIO::BatonASIO::EventFDHolder::getForClient = + Client::declareDecoration<TransportLayerASIO::BatonASIO::EventFDHolder>(); + } // namespace transport } // namespace mongo diff --git a/src/mongo/transport/mock_session.h b/src/mongo/transport/mock_session.h index a5c4b78de82..4e350887446 100644 --- a/src/mongo/transport/mock_session.h +++ b/src/mongo/transport/mock_session.h @@ -85,7 +85,7 @@ public: return Message(); // Subclasses can do something different. } - Future<Message> asyncSourceMessage(const transport::BatonHandle& handle = nullptr) override { + Future<Message> asyncSourceMessage(const BatonHandle& handle = nullptr) override { return Future<Message>::makeReady(sourceMessage()); } @@ -101,12 +101,11 @@ public: return Status::OK(); } - Future<void> asyncSinkMessage(Message message, - const transport::BatonHandle& handle = nullptr) override { + Future<void> asyncSinkMessage(Message message, const BatonHandle& handle = nullptr) override { return Future<void>::makeReady(sinkMessage(message)); } - void cancelAsyncOperations(const transport::BatonHandle& handle = nullptr) override {} + void cancelAsyncOperations(const BatonHandle& handle = nullptr) override {} void setTimeout(boost::optional<Milliseconds>) override {} diff --git a/src/mongo/transport/session.h b/src/mongo/transport/session.h index 4fa709ed849..28349273362 100644 --- a/src/mongo/transport/session.h +++ b/src/mongo/transport/session.h @@ -33,6 +33,7 @@ #include <memory> #include "mongo/base/disallow_copying.h" +#include "mongo/db/baton.h" #include "mongo/platform/atomic_word.h" #include "mongo/rpc/message.h" #include "mongo/transport/session_id.h" @@ -46,8 +47,6 @@ namespace transport { class TransportLayer; class Session; -class Baton; -using BatonHandle = std::shared_ptr<Baton>; using SessionHandle = std::shared_ptr<Session>; using ConstSessionHandle = std::shared_ptr<const Session>; @@ -107,7 +106,7 @@ public: * Source (receive) a new Message from the remote host for this Session. */ virtual StatusWith<Message> sourceMessage() = 0; - virtual Future<Message> asyncSourceMessage(const transport::BatonHandle& handle = nullptr) = 0; + virtual Future<Message> asyncSourceMessage(const BatonHandle& handle = nullptr) = 0; /** * Sink (send) a Message to the remote host for this Session. @@ -115,15 +114,14 @@ public: * Async version will keep the buffer alive until the operation completes. */ virtual Status sinkMessage(Message message) = 0; - virtual Future<void> asyncSinkMessage(Message message, - const transport::BatonHandle& handle = nullptr) = 0; + virtual Future<void> asyncSinkMessage(Message message, const BatonHandle& handle = nullptr) = 0; /** * Cancel any outstanding async operations. There is no way to cancel synchronous calls. * Futures will finish with an ErrorCodes::CallbackCancelled error if they haven't already * completed. */ - virtual void cancelAsyncOperations(const transport::BatonHandle& handle = nullptr) = 0; + virtual void cancelAsyncOperations(const BatonHandle& handle = nullptr) = 0; /** * This should only be used to detect when the remote host has disappeared without @@ -155,14 +153,14 @@ public: * * The 'kPending' tag is only for new sessions; callers should not set it directly. */ - virtual void setTags(TagMask tagsToSet); + void setTags(TagMask tagsToSet); /** * Atomically clears all of the session tags specified in the 'tagsToUnset' bit field. If the * 'kPending' tag is set, indicating that no tags have yet been specified for the session, this * function also clears that tag as part of the same atomic operation. */ - virtual void unsetTags(TagMask tagsToUnset); + void unsetTags(TagMask tagsToUnset); /** * Loads the session tags, passes them to 'mutateFunc' and then stores the result of that call @@ -175,9 +173,9 @@ public: * of the 'mutateFunc' call. The 'kPending' tag is only for new sessions; callers should never * try to set it. */ - virtual void mutateTags(const stdx::function<TagMask(TagMask)>& mutateFunc); + void mutateTags(const stdx::function<TagMask(TagMask)>& mutateFunc); - virtual TagMask getTags() const; + TagMask getTags() const; protected: Session(); diff --git a/src/mongo/transport/session_asio.h b/src/mongo/transport/session_asio.h index b74b29af1d9..49f56357686 100644 --- a/src/mongo/transport/session_asio.h +++ b/src/mongo/transport/session_asio.h @@ -133,7 +133,7 @@ public: return sourceMessageImpl().getNoThrow(); } - Future<Message> asyncSourceMessage(const transport::BatonHandle& baton = nullptr) override { + Future<Message> asyncSourceMessage(const BatonHandle& baton = nullptr) override { ensureAsync(); return sourceMessageImpl(baton); } @@ -150,8 +150,7 @@ public: .getNoThrow(); } - Future<void> asyncSinkMessage(Message message, - const transport::BatonHandle& baton = nullptr) override { + Future<void> asyncSinkMessage(Message message, const BatonHandle& baton = nullptr) override { ensureAsync(); return write(asio::buffer(message.buf(), message.size()), baton) .then([this, message /*keep the buffer alive*/]() { @@ -161,10 +160,10 @@ public: }); } - void cancelAsyncOperations(const transport::BatonHandle& baton = nullptr) override { + void cancelAsyncOperations(const BatonHandle& baton = nullptr) override { LOG(3) << "Cancelling outstanding I/O operations on connection to " << _remote; - if (baton) { - baton->cancelSession(*this); + if (baton && baton->networking()) { + baton->networking()->cancelSession(*this); } else { getSocket().cancel(); } @@ -342,7 +341,7 @@ private: return _socket; } - Future<Message> sourceMessageImpl(const transport::BatonHandle& baton = nullptr) { + Future<Message> sourceMessageImpl(const BatonHandle& baton = nullptr) { static constexpr auto kHeaderSize = sizeof(MSGHEADER::Value); auto headerBuffer = SharedBuffer::allocate(kHeaderSize); @@ -387,8 +386,7 @@ private: } template <typename MutableBufferSequence> - Future<void> read(const MutableBufferSequence& buffers, - const transport::BatonHandle& baton = nullptr) { + Future<void> read(const MutableBufferSequence& buffers, const BatonHandle& baton = nullptr) { #ifdef MONGO_CONFIG_SSL if (_sslSocket) { return opportunisticRead(*_sslSocket, buffers, baton); @@ -413,8 +411,7 @@ private: } template <typename ConstBufferSequence> - Future<void> write(const ConstBufferSequence& buffers, - const transport::BatonHandle& baton = nullptr) { + Future<void> write(const ConstBufferSequence& buffers, const BatonHandle& baton = nullptr) { #ifdef MONGO_CONFIG_SSL _ranHandshake = true; if (_sslSocket) { @@ -439,7 +436,7 @@ private: template <typename Stream, typename MutableBufferSequence> Future<void> opportunisticRead(Stream& stream, const MutableBufferSequence& buffers, - const transport::BatonHandle& baton = nullptr) { + const BatonHandle& baton = nullptr) { std::error_code ec; size_t size; @@ -469,8 +466,9 @@ private: asyncBuffers += size; } - if (baton) { - return baton->addSession(*this, Baton::Type::In) + if (baton && baton->networking()) { + return baton->networking() + ->addSession(*this, NetworkingBaton::Type::In) .then([&stream, asyncBuffers, baton, this] { return opportunisticRead(stream, asyncBuffers, baton); }); @@ -494,7 +492,7 @@ private: template <typename ConstBufferSequence> boost::optional<Future<void>> moreToSend(GenericSocket& socket, const ConstBufferSequence& buffers, - const transport::BatonHandle& baton) { + const BatonHandle& baton) { return boost::none; } @@ -517,7 +515,7 @@ private: template <typename Stream, typename ConstBufferSequence> Future<void> opportunisticWrite(Stream& stream, const ConstBufferSequence& buffers, - const transport::BatonHandle& baton = nullptr) { + const BatonHandle& baton = nullptr) { std::error_code ec; std::size_t size; @@ -552,8 +550,9 @@ private: return std::move(*more); } - if (baton) { - return baton->addSession(*this, Baton::Type::Out) + if (baton && baton->networking()) { + return baton->networking() + ->addSession(*this, NetworkingBaton::Type::Out) .then([&stream, asyncBuffers, baton, this] { return opportunisticWrite(stream, asyncBuffers, baton); }); diff --git a/src/mongo/transport/transport_layer.cpp b/src/mongo/transport/transport_layer.cpp index 442cf78b15c..e2b997add0c 100644 --- a/src/mongo/transport/transport_layer.cpp +++ b/src/mongo/transport/transport_layer.cpp @@ -31,11 +31,20 @@ #include "mongo/platform/basic.h" #include "mongo/base/status.h" +#include "mongo/db/operation_context.h" +#include "mongo/platform/atomic_word.h" +#include "mongo/transport/baton.h" #include "mongo/transport/transport_layer.h" namespace mongo { namespace transport { +namespace { + +AtomicWord<uint64_t> reactorTimerIdCounter(0); + +} // namespace + const Status TransportLayer::SessionUnknownStatus = Status(ErrorCodes::TransportSessionUnknown, "TransportLayer does not own the Session."); @@ -48,5 +57,11 @@ const Status TransportLayer::TicketSessionUnknownStatus = Status( const Status TransportLayer::TicketSessionClosedStatus = Status( ErrorCodes::TransportSessionClosed, "Operation attempted on a closed transport Session."); +ReactorTimer::ReactorTimer() : _id(reactorTimerIdCounter.addAndFetch(1)) {} + +BatonHandle TransportLayer::makeDefaultBaton(OperationContext* opCtx) { + return opCtx->getServiceContext()->makeBaton(opCtx); +} + } // namespace transport } // namespace mongo diff --git a/src/mongo/transport/transport_layer.h b/src/mongo/transport/transport_layer.h index ca1453cb3a4..0b736ba600f 100644 --- a/src/mongo/transport/transport_layer.h +++ b/src/mongo/transport/transport_layer.h @@ -109,17 +109,21 @@ public: enum WhichReactor { kIngress, kEgress, kNewReactor }; virtual ReactorHandle getReactor(WhichReactor which) = 0; - virtual BatonHandle makeBaton(OperationContext* opCtx) { - return nullptr; + virtual BatonHandle makeBaton(OperationContext* opCtx) const { + return makeDefaultBaton(opCtx); } protected: TransportLayer() = default; + +private: + static BatonHandle makeDefaultBaton(OperationContext* opCtx); }; class ReactorTimer { public: - ReactorTimer() = default; + ReactorTimer(); + ReactorTimer(const ReactorTimer&) = delete; ReactorTimer& operator=(const ReactorTimer&) = delete; @@ -128,6 +132,10 @@ public: */ virtual ~ReactorTimer() = default; + size_t id() const { + return _id; + } + /* * Cancel any outstanding future from waitFor/waitUntil. The future will be filled with an * ErrorCodes::CallbackCancelled status. @@ -142,6 +150,9 @@ public: * Calling this implicitly calls cancel(). */ virtual Future<void> waitUntil(Date_t deadline, const BatonHandle& baton = nullptr) = 0; + +private: + const size_t _id; }; class Reactor { diff --git a/src/mongo/transport/transport_layer_asio.cpp b/src/mongo/transport/transport_layer_asio.cpp index 06604606d0c..6f0c02d17fb 100644 --- a/src/mongo/transport/transport_layer_asio.cpp +++ b/src/mongo/transport/transport_layer_asio.cpp @@ -56,9 +56,11 @@ #endif // session_asio.h has some header dependencies that require it to be the last header. + #ifdef __linux__ #include "mongo/transport/baton_asio_linux.h" #endif + #include "mongo/transport/session_asio.h" namespace mongo { @@ -79,7 +81,7 @@ public: void cancel(const BatonHandle& baton = nullptr) override { // If we have a baton try to cancel that. - if (baton && baton->cancelTimer(*this)) { + if (baton && baton->networking() && baton->networking()->cancelTimer(*this)) { LOG(2) << "Canceled via baton, skipping asio cancel."; return; } @@ -90,8 +92,9 @@ public: Future<void> waitUntil(Date_t expiration, const BatonHandle& baton = nullptr) override { - if (baton) { - return _asyncWait([&] { return baton->waitUntil(*this, expiration); }, baton); + if (baton && baton->networking()) { + return _asyncWait([&] { return baton->networking()->waitUntil(*this, expiration); }, + baton); } else { return _asyncWait([&] { _timer->expires_at(expiration.toSystemTimePoint()); }); } @@ -875,8 +878,8 @@ SSLParams::SSLModes TransportLayerASIO::_sslMode() const { } #endif -BatonHandle TransportLayerASIO::makeBaton(OperationContext* opCtx) { #ifdef __linux__ +BatonHandle TransportLayerASIO::makeBaton(OperationContext* opCtx) const { auto baton = std::make_shared<BatonASIO>(opCtx); { @@ -885,11 +888,9 @@ BatonHandle TransportLayerASIO::makeBaton(OperationContext* opCtx) { opCtx->setBaton(baton); } - return std::move(baton); -#else - return nullptr; -#endif + return baton; } +#endif } // namespace transport } // namespace mongo diff --git a/src/mongo/transport/transport_layer_asio.h b/src/mongo/transport/transport_layer_asio.h index 77815024374..b5619bcf361 100644 --- a/src/mongo/transport/transport_layer_asio.h +++ b/src/mongo/transport/transport_layer_asio.h @@ -136,7 +136,9 @@ public: return _listenerPort; } - BatonHandle makeBaton(OperationContext* opCtx) override; +#ifdef __linux__ + BatonHandle makeBaton(OperationContext* opCtx) const override; +#endif private: class BatonASIO; diff --git a/src/mongo/transport/transport_layer_manager.h b/src/mongo/transport/transport_layer_manager.h index 60cfb58a41e..29ad6457be6 100644 --- a/src/mongo/transport/transport_layer_manager.h +++ b/src/mongo/transport/transport_layer_manager.h @@ -90,7 +90,7 @@ public: static std::unique_ptr<TransportLayer> makeAndStartDefaultEgressTransportLayer(); - BatonHandle makeBaton(OperationContext* opCtx) override { + BatonHandle makeBaton(OperationContext* opCtx) const override { stdx::lock_guard<stdx::mutex> lk(_tlsMutex); // TODO: figure out what to do about managers with more than one transport layer. invariant(_tls.size() == 1); diff --git a/src/mongo/unittest/task_executor_proxy.cpp b/src/mongo/unittest/task_executor_proxy.cpp index 2cf2886afa0..1e66347da96 100644 --- a/src/mongo/unittest/task_executor_proxy.cpp +++ b/src/mongo/unittest/task_executor_proxy.cpp @@ -103,7 +103,7 @@ StatusWith<executor::TaskExecutor::CallbackHandle> TaskExecutorProxy::scheduleWo StatusWith<executor::TaskExecutor::CallbackHandle> TaskExecutorProxy::scheduleRemoteCommand( const executor::RemoteCommandRequest& request, const RemoteCommandCallbackFn& cb, - const transport::BatonHandle& baton) { + const BatonHandle& baton) { return _executor->scheduleRemoteCommand(request, cb, baton); } diff --git a/src/mongo/unittest/task_executor_proxy.h b/src/mongo/unittest/task_executor_proxy.h index e03e2a6955e..26d83b2bece 100644 --- a/src/mongo/unittest/task_executor_proxy.h +++ b/src/mongo/unittest/task_executor_proxy.h @@ -47,33 +47,32 @@ public: * Does not own target executor. */ TaskExecutorProxy(executor::TaskExecutor* executor); - virtual ~TaskExecutorProxy(); + ~TaskExecutorProxy(); executor::TaskExecutor* getExecutor() const; void setExecutor(executor::TaskExecutor* executor); - virtual void startup() override; - virtual void shutdown() override; - virtual void join() override; - virtual void appendDiagnosticBSON(BSONObjBuilder* builder) const override; - virtual Date_t now() override; - virtual StatusWith<EventHandle> makeEvent() override; - virtual void signalEvent(const EventHandle& event) override; - virtual StatusWith<CallbackHandle> onEvent(const EventHandle& event, CallbackFn work) override; - virtual void waitForEvent(const EventHandle& event) override; - virtual StatusWith<stdx::cv_status> waitForEvent(OperationContext* opCtx, - const EventHandle& event, - Date_t deadline) override; - virtual StatusWith<CallbackHandle> scheduleWork(CallbackFn work) override; - virtual StatusWith<CallbackHandle> scheduleWorkAt(Date_t when, CallbackFn work) override; - virtual StatusWith<CallbackHandle> scheduleRemoteCommand( - const executor::RemoteCommandRequest& request, - const RemoteCommandCallbackFn& cb, - const transport::BatonHandle& baton = nullptr) override; - virtual void cancel(const CallbackHandle& cbHandle) override; - virtual void wait(const CallbackHandle& cbHandle, - Interruptible* interruptible = Interruptible::notInterruptible()) override; - virtual void appendConnectionStats(executor::ConnectionPoolStats* stats) const override; + void startup() override; + void shutdown() override; + void join() override; + void appendDiagnosticBSON(BSONObjBuilder* builder) const override; + Date_t now() override; + StatusWith<EventHandle> makeEvent() override; + void signalEvent(const EventHandle& event) override; + StatusWith<CallbackHandle> onEvent(const EventHandle& event, CallbackFn work) override; + void waitForEvent(const EventHandle& event) override; + StatusWith<stdx::cv_status> waitForEvent(OperationContext* opCtx, + const EventHandle& event, + Date_t deadline) override; + StatusWith<CallbackHandle> scheduleWork(CallbackFn work) override; + StatusWith<CallbackHandle> scheduleWorkAt(Date_t when, CallbackFn work) override; + StatusWith<CallbackHandle> scheduleRemoteCommand(const executor::RemoteCommandRequest& request, + const RemoteCommandCallbackFn& cb, + const BatonHandle& baton = nullptr) override; + void cancel(const CallbackHandle& cbHandle) override; + void wait(const CallbackHandle& cbHandle, + Interruptible* interruptible = Interruptible::notInterruptible()) override; + void appendConnectionStats(executor::ConnectionPoolStats* stats) const override; private: // Not owned by us. |