diff options
Diffstat (limited to 'src/mongo')
44 files changed, 785 insertions, 347 deletions
diff --git a/src/mongo/base/error_codes.err b/src/mongo/base/error_codes.err index b0be7a0303d..8d01f077791 100644 --- a/src/mongo/base/error_codes.err +++ b/src/mongo/base/error_codes.err @@ -276,6 +276,7 @@ error_code("ExchangePassthrough", 275) # For exchange execution in aggregation. error_code("IndexBuildAborted", 276) error_code("AlarmAlreadyFulfilled", 277) error_code("UnsatisfiableCommitQuorum", 278) +error_code("ClientDisconnect", 279) # Error codes 4000-8999 are reserved. # Non-sequential error codes (for compatibility only) diff --git a/src/mongo/client/async_client.cpp b/src/mongo/client/async_client.cpp index 520e69606bc..1a2c01ed335 100644 --- a/src/mongo/client/async_client.cpp +++ b/src/mongo/client/async_client.cpp @@ -193,7 +193,7 @@ Future<void> AsyncDBClient::initWireVersion(const std::string& appName, }); } -Future<Message> AsyncDBClient::_call(Message request, const transport::BatonHandle& baton) { +Future<Message> AsyncDBClient::_call(Message request, const BatonHandle& baton) { auto swm = _compressorManager.compressMessage(request); if (!swm.isOK()) { return swm.getStatus(); @@ -219,8 +219,7 @@ Future<Message> AsyncDBClient::_call(Message request, const transport::BatonHand }); } -Future<rpc::UniqueReply> AsyncDBClient::runCommand(OpMsgRequest request, - const transport::BatonHandle& baton) { +Future<rpc::UniqueReply> AsyncDBClient::runCommand(OpMsgRequest request, const BatonHandle& baton) { invariant(_negotiatedProtocol); auto requestMsg = rpc::messageFromOpMsgRequest(*_negotiatedProtocol, std::move(request)); return _call(std::move(requestMsg), baton) @@ -230,7 +229,7 @@ Future<rpc::UniqueReply> AsyncDBClient::runCommand(OpMsgRequest request, } Future<executor::RemoteCommandResponse> AsyncDBClient::runCommandRequest( - executor::RemoteCommandRequest request, const transport::BatonHandle& baton) { + executor::RemoteCommandRequest request, const BatonHandle& baton) { auto clkSource = _svcCtx->getPreciseClockSource(); auto start = clkSource->now(); auto opMsgRequest = OpMsgRequest::fromDBAndBody( @@ -246,7 +245,7 @@ Future<executor::RemoteCommandResponse> AsyncDBClient::runCommandRequest( }); } -void AsyncDBClient::cancel(const transport::BatonHandle& baton) { +void AsyncDBClient::cancel(const BatonHandle& baton) { _session->cancelAsyncOperations(baton); } diff --git a/src/mongo/client/async_client.h b/src/mongo/client/async_client.h index 356f4e0f0ca..e7a40dcc73f 100644 --- a/src/mongo/client/async_client.h +++ b/src/mongo/client/async_client.h @@ -62,9 +62,8 @@ public: Milliseconds timeout); Future<executor::RemoteCommandResponse> runCommandRequest( - executor::RemoteCommandRequest request, const transport::BatonHandle& baton = nullptr); - Future<rpc::UniqueReply> runCommand(OpMsgRequest request, - const transport::BatonHandle& baton = nullptr); + executor::RemoteCommandRequest request, const BatonHandle& baton = nullptr); + Future<rpc::UniqueReply> runCommand(OpMsgRequest request, const BatonHandle& baton = nullptr); Future<void> authenticate(const BSONObj& params); @@ -73,7 +72,7 @@ public: Future<void> initWireVersion(const std::string& appName, executor::NetworkConnectionHook* const hook); - void cancel(const transport::BatonHandle& baton = nullptr); + void cancel(const BatonHandle& baton = nullptr); bool isStillConnected(); @@ -83,7 +82,7 @@ public: const HostAndPort& local() const; private: - Future<Message> _call(Message request, const transport::BatonHandle& baton = nullptr); + Future<Message> _call(Message request, const BatonHandle& baton = nullptr); BSONObj _buildIsMasterRequest(const std::string& appName, executor::NetworkConnectionHook* hook); void _parseIsMasterResponse(BSONObj request, diff --git a/src/mongo/client/remote_command_retry_scheduler_test.cpp b/src/mongo/client/remote_command_retry_scheduler_test.cpp index 503a7c4a42e..81ecd179dcc 100644 --- a/src/mongo/client/remote_command_retry_scheduler_test.cpp +++ b/src/mongo/client/remote_command_retry_scheduler_test.cpp @@ -94,7 +94,7 @@ public: virtual StatusWith<executor::TaskExecutor::CallbackHandle> scheduleRemoteCommand( const executor::RemoteCommandRequest& request, const RemoteCommandCallbackFn& cb, - const transport::BatonHandle& baton = nullptr) override { + const BatonHandle& baton = nullptr) override { if (scheduleRemoteCommandFailPoint) { return Status(ErrorCodes::ShutdownInProgress, "failed to send remote command - shutdown in progress"); diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index b135cb559a2..a8fe9910b39 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -527,13 +527,27 @@ env.Library( '$BUILD_DIR/mongo/db/logical_session_id', '$BUILD_DIR/mongo/db/multi_key_path_tracker', '$BUILD_DIR/mongo/db/storage/write_unit_of_work', - '$BUILD_DIR/mongo/transport/transport_layer_common', '$BUILD_DIR/mongo/util/clock_sources', '$BUILD_DIR/mongo/util/concurrency/spin_lock', '$BUILD_DIR/mongo/util/fail_point', '$BUILD_DIR/mongo/util/net/network', '$BUILD_DIR/mongo/util/periodic_runner', ], + LIBDEPS_PRIVATE=[ + 'default_baton', + ], +) + +env.Library( + target='default_baton', + source=[ + 'default_baton.cpp', + ], + LIBDEPS_PRIVATE=[ + '$BUILD_DIR/mongo/base', + '$BUILD_DIR/mongo/util/clock_sources', + '$BUILD_DIR/mongo/util/concurrency/spin_lock', + ], ) env.CppUnitTest( diff --git a/src/mongo/db/baton.h b/src/mongo/db/baton.h new file mode 100644 index 00000000000..6e1d1319d2b --- /dev/null +++ b/src/mongo/db/baton.h @@ -0,0 +1,107 @@ +/** + * Copyright (C) 2019-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <memory> + +#include "mongo/util/functional.h" +#include "mongo/util/future.h" +#include "mongo/util/time_support.h" +#include "mongo/util/waitable.h" + +namespace mongo { + +class OperationContext; + +namespace transport { + +class NetworkingBaton; + +} // namespace transport + +/** + * A Baton is lightweight executor, with parallel forward progress guarantees. Rather than + * asynchronously running tasks through one, the baton records the intent of those tasks and defers + * waiting and execution to a later call to run(). + * + * Note: This occurs automatically when opCtx waiting on a condition variable. + */ +class Baton : public Waitable, public std::enable_shared_from_this<Baton> { +public: + virtual ~Baton() = default; + + /** + * Detaches a baton from an associated opCtx. + * + * This invokes all callbacks currently inside the baton (from the detaching thread) with a void + * opCtx. Also, any calls to schedule after this point will immediately invoke their callback + * with a null opCtx. + */ + void detach() noexcept { + // We make this anchor so that deleting the shared_ptr inside opCtx doesn't remove the last + // reference to this type until we return from detach. + const auto anchor = shared_from_this(); + + detachImpl(); + } + + /** + * Schedules a callback to run on the baton. + * + * The function will be invoked in one of 3 ways: + * 1. With an opCtx inside run() (I.e. opCtx->waitForConditionOrInterrupt) + * 2. Without an opCtx inside detach() + * 3. Without an opCtx inside schedule() after detach() + * + * Note that the latter two cases are indistinguishable, so the callback should be safe to run + * inline if passed a nullptr. Examples of such work are logging, simple cleanup and + * rescheduling the task on another executor. + */ + virtual void schedule(unique_function<void(OperationContext*)> func) noexcept = 0; + + /** + * Returns a networking view of the baton, if this baton supports networking functionality + */ + virtual transport::NetworkingBaton* networking() noexcept { + return nullptr; + } + + /** + * Marks the baton to wake up on client socket disconnect + */ + virtual void markKillOnClientDisconnect() noexcept = 0; + +private: + virtual void detachImpl() noexcept = 0; +}; + +using BatonHandle = std::shared_ptr<Baton>; + +} // namespace mongo diff --git a/src/mongo/db/default_baton.cpp b/src/mongo/db/default_baton.cpp new file mode 100644 index 00000000000..9855957135a --- /dev/null +++ b/src/mongo/db/default_baton.cpp @@ -0,0 +1,154 @@ + +/** + * Copyright (C) 2019-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kDefault + +#include "mongo/platform/basic.h" + +#include "mongo/db/default_baton.h" + +#include "mongo/db/operation_context.h" +#include "mongo/util/scopeguard.h" + +namespace mongo { + +DefaultBaton::DefaultBaton(OperationContext* opCtx) : _opCtx(opCtx) {} + +DefaultBaton::~DefaultBaton() { + invariant(!_opCtx); + invariant(_scheduled.empty()); +} + +void DefaultBaton::markKillOnClientDisconnect() noexcept { + if (_opCtx->getClient() && _opCtx->getClient()->session()) { + _hasIngressSocket = true; + } +} + +void DefaultBaton::detachImpl() noexcept { + decltype(_scheduled) scheduled; + + { + stdx::lock_guard<stdx::mutex> lk(_mutex); + + { + stdx::lock_guard<Client> lk(*_opCtx->getClient()); + invariant(_opCtx->getBaton().get() == this); + _opCtx->setBaton(nullptr); + } + + _opCtx = nullptr; + _hasIngressSocket = false; + + using std::swap; + swap(_scheduled, scheduled); + } + + for (auto& job : scheduled) { + job(nullptr); + } +} + +void DefaultBaton::schedule(unique_function<void(OperationContext*)> func) noexcept { + stdx::unique_lock<stdx::mutex> lk(_mutex); + + if (!_opCtx) { + lk.unlock(); + func(nullptr); + + return; + } + + _scheduled.push_back(std::move(func)); + + if (_sleeping && !_notified) { + _notified = true; + _cv.notify_one(); + } +} + +void DefaultBaton::notify() noexcept { + stdx::lock_guard<stdx::mutex> lk(_mutex); + _notified = true; + _cv.notify_one(); +} + +Waitable::TimeoutState DefaultBaton::run_until(ClockSource* clkSource, + Date_t oldDeadline) noexcept { + stdx::unique_lock<stdx::mutex> lk(_mutex); + + // We'll fulfill promises and run jobs on the way out, ensuring we don't hold any locks + const auto guard = makeGuard([&] { + // While we have scheduled work, keep running jobs + while (_scheduled.size()) { + auto toRun = std::exchange(_scheduled, {}); + + lk.unlock(); + for (auto& job : toRun) { + job(_opCtx); + } + lk.lock(); + } + }); + + // If anything was scheduled, run it now. + if (_scheduled.size()) { + return Waitable::TimeoutState::NoTimeout; + } + + auto newDeadline = oldDeadline; + + // If we have an ingress socket, sleep no more than 1 second (so we poll for closure in the + // outside opCtx waitForConditionOrInterruptUntil implementation) + if (_hasIngressSocket) { + newDeadline = std::min(oldDeadline, clkSource->now() + Seconds(1)); + } + + // we mark sleeping, so that we receive notifications + _sleeping = true; + + // while we're not notified + auto notified = + clkSource->waitForConditionUntil(_cv, lk, newDeadline, [&] { return _notified; }); + + _sleeping = false; + _notified = false; + + // If we've been notified, or we haven't timed out yet, return as if by spurious wakeup. + // Otherwise call it a timeout. + return (notified || (clkSource->now() < oldDeadline)) ? Waitable::TimeoutState::NoTimeout + : Waitable::TimeoutState::Timeout; +} + +void DefaultBaton::run(ClockSource* clkSource) noexcept { + run_until(clkSource, Date_t::max()); +} + +} // namespace mongo diff --git a/src/mongo/db/default_baton.h b/src/mongo/db/default_baton.h new file mode 100644 index 00000000000..6311f397e95 --- /dev/null +++ b/src/mongo/db/default_baton.h @@ -0,0 +1,78 @@ + +/** + * Copyright (C) 2019-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <vector> + +#include "mongo/db/baton.h" +#include "mongo/stdx/condition_variable.h" +#include "mongo/stdx/mutex.h" +#include "mongo/util/functional.h" + +namespace mongo { + +class OperationContext; + +/** + * The most basic Baton implementation. + */ +class DefaultBaton : public Baton { +public: + explicit DefaultBaton(OperationContext* opCtx); + + ~DefaultBaton(); + + void markKillOnClientDisconnect() noexcept override; + + void schedule(unique_function<void(OperationContext*)> func) noexcept override; + + void notify() noexcept override; + + Waitable::TimeoutState run_until(ClockSource* clkSource, Date_t oldDeadline) noexcept override; + + void run(ClockSource* clkSource) noexcept override; + +private: + void detachImpl() noexcept override; + + stdx::mutex _mutex; + stdx::condition_variable _cv; + bool _notified = false; + bool _sleeping = false; + + OperationContext* _opCtx; + + bool _hasIngressSocket = false; + + std::vector<unique_function<void(OperationContext*)>> _scheduled; +}; + +} // namespace mongo diff --git a/src/mongo/db/operation_context.cpp b/src/mongo/db/operation_context.cpp index 11ca8b8f641..319b803288c 100644 --- a/src/mongo/db/operation_context.cpp +++ b/src/mongo/db/operation_context.cpp @@ -77,6 +77,8 @@ MONGO_FAIL_POINT_DEFINE(maxTimeNeverTimeOut); // inclusive. MONGO_FAIL_POINT_DEFINE(checkForInterruptFail); +const auto kNoWaiterThread = stdx::thread::id(); + } // namespace OperationContext::OperationContext(Client* client, unsigned int opId) @@ -261,6 +263,7 @@ StatusWith<stdx::cv_status> OperationContext::waitForConditionOrInterruptNoAsser stdx::lock_guard<Client> clientLock(*getClient()); invariant(!_waitMutex); invariant(!_waitCV); + invariant(_waitThread == kNoWaiterThread); invariant(0 == _numKillers); // This interrupt check must be done while holding the client lock, so as not to race with a @@ -271,6 +274,7 @@ StatusWith<stdx::cv_status> OperationContext::waitForConditionOrInterruptNoAsser } _waitMutex = m.mutex(); _waitCV = &cv; + _waitThread = stdx::this_thread::get_id(); } // If the maxTimeNeverTimeOut failpoint is set, behave as though the operation's deadline does @@ -302,6 +306,7 @@ StatusWith<stdx::cv_status> OperationContext::waitForConditionOrInterruptNoAsser if (0 == _numKillers) { _waitMutex = nullptr; _waitCV = nullptr; + _waitThread = kNoWaiterThread; return true; } return false; @@ -328,7 +333,20 @@ StatusWith<stdx::cv_status> OperationContext::waitForConditionOrInterruptNoAsser void OperationContext::markKilled(ErrorCodes::Error killCode) { invariant(killCode != ErrorCodes::OK); stdx::unique_lock<stdx::mutex> lkWaitMutex; - if (_waitMutex) { + + // If we have a _waitMutex, it means this opCtx is currently blocked in + // waitForConditionOrInterrupt. + // + // From there, we also know which thread is actually doing that waiting (it's recorded in + // _waitThread). If that thread isn't our thread, it's necessary to do the regular numKillers + // song and dance mentioned in waitForConditionOrInterrupt. + // + // If it is our thread, we know that we're currently inside that call to + // waitForConditionOrInterrupt and are being invoked by a callback run from Baton->run. And + // that means we don't need to deal with the waitMutex and waitCV (because we're running + // callbacks, which means run is returning, which means we'll be checking _killCode in the near + // future). + if (_waitMutex && stdx::this_thread::get_id() != _waitThread) { invariant(++_numKillers > 0); getClient()->unlock(); ON_BLOCK_EXIT([this] { diff --git a/src/mongo/db/operation_context.h b/src/mongo/db/operation_context.h index 389797c2e1c..f376a98e048 100644 --- a/src/mongo/db/operation_context.h +++ b/src/mongo/db/operation_context.h @@ -184,14 +184,14 @@ public: /** * Sets a transport Baton on the operation. This will trigger the Baton on markKilled. */ - void setBaton(const transport::BatonHandle& baton) { + void setBaton(const BatonHandle& baton) { _baton = baton; } /** * Retrieves the baton associated with the operation. */ - const transport::BatonHandle& getBaton() const { + const BatonHandle& getBaton() const { return _baton; } @@ -436,13 +436,17 @@ private: // A transport Baton associated with the operation. The presence of this object implies that a // client thread is doing it's own async networking by blocking on it's own thread. - transport::BatonHandle _baton; + BatonHandle _baton; // If non-null, _waitMutex and _waitCV are the (mutex, condition variable) pair that the // operation is currently waiting on inside a call to waitForConditionOrInterrupt...(). + // + // _waitThread is the calling thread's thread id. + // // All access guarded by the Client's lock. stdx::mutex* _waitMutex = nullptr; stdx::condition_variable* _waitCV = nullptr; + stdx::thread::id _waitThread; // If _waitMutex and _waitCV are non-null, this is the number of threads in a call to markKilled // actively attempting to kill the operation. If this value is non-zero, the operation is inside diff --git a/src/mongo/db/repl/abstract_oplog_fetcher_test.cpp b/src/mongo/db/repl/abstract_oplog_fetcher_test.cpp index 330f7c1e199..463c990052a 100644 --- a/src/mongo/db/repl/abstract_oplog_fetcher_test.cpp +++ b/src/mongo/db/repl/abstract_oplog_fetcher_test.cpp @@ -372,10 +372,9 @@ public: ShouldFailRequestFn shouldFailRequest) : unittest::TaskExecutorProxy(executor), _shouldFailRequest(shouldFailRequest) {} - StatusWith<CallbackHandle> scheduleRemoteCommand( - const executor::RemoteCommandRequest& request, - const RemoteCommandCallbackFn& cb, - const transport::BatonHandle& baton = nullptr) override { + StatusWith<CallbackHandle> scheduleRemoteCommand(const executor::RemoteCommandRequest& request, + const RemoteCommandCallbackFn& cb, + const BatonHandle& baton = nullptr) override { if (_shouldFailRequest(request)) { return Status(ErrorCodes::OperationFailed, "failed to schedule remote command"); } diff --git a/src/mongo/db/repl/collection_cloner_test.cpp b/src/mongo/db/repl/collection_cloner_test.cpp index 466bd80a4ce..a8ca6efb7c7 100644 --- a/src/mongo/db/repl/collection_cloner_test.cpp +++ b/src/mongo/db/repl/collection_cloner_test.cpp @@ -507,10 +507,9 @@ public: ShouldFailRequestFn shouldFailRequest) : unittest::TaskExecutorProxy(executor), _shouldFailRequest(shouldFailRequest) {} - StatusWith<CallbackHandle> scheduleRemoteCommand( - const executor::RemoteCommandRequest& request, - const RemoteCommandCallbackFn& cb, - const transport::BatonHandle& baton = nullptr) override { + StatusWith<CallbackHandle> scheduleRemoteCommand(const executor::RemoteCommandRequest& request, + const RemoteCommandCallbackFn& cb, + const BatonHandle& baton = nullptr) override { if (_shouldFailRequest(request)) { return Status(ErrorCodes::OperationFailed, "failed to schedule remote command"); } diff --git a/src/mongo/db/repl/database_cloner_test.cpp b/src/mongo/db/repl/database_cloner_test.cpp index af688a3b26f..07f7bc7bfff 100644 --- a/src/mongo/db/repl/database_cloner_test.cpp +++ b/src/mongo/db/repl/database_cloner_test.cpp @@ -233,10 +233,9 @@ public: ShouldFailRequestFn shouldFailRequest) : unittest::TaskExecutorProxy(executor), _shouldFailRequest(shouldFailRequest) {} - StatusWith<CallbackHandle> scheduleRemoteCommand( - const executor::RemoteCommandRequest& request, - const RemoteCommandCallbackFn& cb, - const transport::BatonHandle& baton = nullptr) override { + StatusWith<CallbackHandle> scheduleRemoteCommand(const executor::RemoteCommandRequest& request, + const RemoteCommandCallbackFn& cb, + const BatonHandle& baton = nullptr) override { if (_shouldFailRequest(request)) { return Status(ErrorCodes::OperationFailed, "failed to schedule remote command"); } diff --git a/src/mongo/db/repl/databases_cloner_test.cpp b/src/mongo/db/repl/databases_cloner_test.cpp index 7463f93ab7c..02cdcd1118d 100644 --- a/src/mongo/db/repl/databases_cloner_test.cpp +++ b/src/mongo/db/repl/databases_cloner_test.cpp @@ -772,10 +772,9 @@ public: ShouldFailRequestFn shouldFailRequest) : unittest::TaskExecutorProxy(executor), _shouldFailRequest(shouldFailRequest) {} - StatusWith<CallbackHandle> scheduleRemoteCommand( - const executor::RemoteCommandRequest& request, - const RemoteCommandCallbackFn& cb, - const transport::BatonHandle& baton = nullptr) override { + StatusWith<CallbackHandle> scheduleRemoteCommand(const executor::RemoteCommandRequest& request, + const RemoteCommandCallbackFn& cb, + const BatonHandle& baton = nullptr) override { if (_shouldFailRequest(request)) { return Status(ErrorCodes::OperationFailed, "failed to schedule remote command"); } diff --git a/src/mongo/db/repl/reporter_test.cpp b/src/mongo/db/repl/reporter_test.cpp index 7ce5768cf62..97e8e8d707a 100644 --- a/src/mongo/db/repl/reporter_test.cpp +++ b/src/mongo/db/repl/reporter_test.cpp @@ -604,7 +604,7 @@ TEST_F(ReporterTestNoTriggerAtSetUp, FailingToScheduleRemoteCommandTaskShouldMak virtual StatusWith<executor::TaskExecutor::CallbackHandle> scheduleRemoteCommand( const executor::RemoteCommandRequest& request, const RemoteCommandCallbackFn& cb, - const transport::BatonHandle& baton = nullptr) override { + const BatonHandle& baton = nullptr) override { // Any error status other than ShutdownInProgress will cause the reporter to fassert. return Status(ErrorCodes::ShutdownInProgress, "failed to send remote command - shutdown in progress"); diff --git a/src/mongo/db/repl/sync_source_resolver_test.cpp b/src/mongo/db/repl/sync_source_resolver_test.cpp index 7d00daff9e7..6084dc12041 100644 --- a/src/mongo/db/repl/sync_source_resolver_test.cpp +++ b/src/mongo/db/repl/sync_source_resolver_test.cpp @@ -59,10 +59,9 @@ public: ShouldFailRequestFn shouldFailRequest) : unittest::TaskExecutorProxy(executor), _shouldFailRequest(shouldFailRequest) {} - StatusWith<CallbackHandle> scheduleRemoteCommand( - const executor::RemoteCommandRequest& request, - const RemoteCommandCallbackFn& cb, - const transport::BatonHandle& baton = nullptr) override { + StatusWith<CallbackHandle> scheduleRemoteCommand(const executor::RemoteCommandRequest& request, + const RemoteCommandCallbackFn& cb, + const BatonHandle& baton = nullptr) override { if (_shouldFailRequest(request)) { return Status(ErrorCodes::OperationFailed, "failed to schedule remote command"); } diff --git a/src/mongo/db/repl/task_executor_mock.cpp b/src/mongo/db/repl/task_executor_mock.cpp index 727014a5cc2..6c994dc719d 100644 --- a/src/mongo/db/repl/task_executor_mock.cpp +++ b/src/mongo/db/repl/task_executor_mock.cpp @@ -61,7 +61,7 @@ StatusWith<executor::TaskExecutor::CallbackHandle> TaskExecutorMock::scheduleWor StatusWith<executor::TaskExecutor::CallbackHandle> TaskExecutorMock::scheduleRemoteCommand( const executor::RemoteCommandRequest& request, const RemoteCommandCallbackFn& cb, - const transport::BatonHandle& baton) { + const BatonHandle& baton) { if (shouldFailScheduleRemoteCommandRequest(request)) { return Status(ErrorCodes::OperationFailed, "failed to schedule remote command"); } diff --git a/src/mongo/db/repl/task_executor_mock.h b/src/mongo/db/repl/task_executor_mock.h index e5e92e0f0e1..ffceae96d23 100644 --- a/src/mongo/db/repl/task_executor_mock.h +++ b/src/mongo/db/repl/task_executor_mock.h @@ -49,10 +49,9 @@ public: StatusWith<CallbackHandle> scheduleWork(CallbackFn work) override; StatusWith<CallbackHandle> scheduleWorkAt(Date_t when, CallbackFn work) override; - StatusWith<CallbackHandle> scheduleRemoteCommand( - const executor::RemoteCommandRequest& request, - const RemoteCommandCallbackFn& cb, - const transport::BatonHandle& baton = nullptr) override; + StatusWith<CallbackHandle> scheduleRemoteCommand(const executor::RemoteCommandRequest& request, + const RemoteCommandCallbackFn& cb, + const BatonHandle& baton = nullptr) override; // Override to make scheduleWork() fail during testing. ShouldFailScheduleWorkRequestFn shouldFailScheduleWorkRequest = []() { return false; }; diff --git a/src/mongo/db/service_context.cpp b/src/mongo/db/service_context.cpp index f2f72ac6ed0..4b1683edab0 100644 --- a/src/mongo/db/service_context.cpp +++ b/src/mongo/db/service_context.cpp @@ -38,6 +38,7 @@ #include "mongo/bson/bsonobj.h" #include "mongo/db/client.h" #include "mongo/db/concurrency/locker_noop.h" +#include "mongo/db/default_baton.h" #include "mongo/db/op_observer.h" #include "mongo/db/operation_context.h" #include "mongo/db/storage/recovery_unit_noop.h" @@ -153,7 +154,6 @@ void onCreate(T* object, const ObserversContainer& observers) { onCreate(object, observers.cbegin(), observers.cend()); } - } // namespace ServiceContext::UniqueClient ServiceContext::makeClient(std::string desc, @@ -406,4 +406,16 @@ void ServiceContext::ServiceContextDeleter::operator()(ServiceContext* service) delete service; } +BatonHandle ServiceContext::makeBaton(OperationContext* opCtx) const { + auto baton = std::make_shared<DefaultBaton>(opCtx); + + { + stdx::lock_guard<Client> lk(*opCtx->getClient()); + invariant(!opCtx->getBaton()); + opCtx->setBaton(baton); + } + + return baton; +} + } // namespace mongo diff --git a/src/mongo/db/service_context.h b/src/mongo/db/service_context.h index 31ad873085a..548efe1cd88 100644 --- a/src/mongo/db/service_context.h +++ b/src/mongo/db/service_context.h @@ -511,6 +511,11 @@ public: */ void setServiceExecutor(std::unique_ptr<transport::ServiceExecutor> exec); + /** + * Creates a delayed execution baton with basic functionality + */ + BatonHandle makeBaton(OperationContext* opCtx) const; + private: class ClientObserverHolder { public: diff --git a/src/mongo/executor/network_interface.h b/src/mongo/executor/network_interface.h index 62012162bf6..34685392f1e 100644 --- a/src/mongo/executor/network_interface.h +++ b/src/mongo/executor/network_interface.h @@ -144,12 +144,11 @@ public: virtual Status startCommand(const TaskExecutor::CallbackHandle& cbHandle, RemoteCommandRequest& request, RemoteCommandCompletionFn&& onFinish, - const transport::BatonHandle& baton = nullptr) = 0; + const BatonHandle& baton = nullptr) = 0; - Future<TaskExecutor::ResponseStatus> startCommand( - const TaskExecutor::CallbackHandle& cbHandle, - RemoteCommandRequest& request, - const transport::BatonHandle& baton = nullptr) { + Future<TaskExecutor::ResponseStatus> startCommand(const TaskExecutor::CallbackHandle& cbHandle, + RemoteCommandRequest& request, + const BatonHandle& baton = nullptr) { auto pf = makePromiseFuture<TaskExecutor::ResponseStatus>(); auto status = startCommand( @@ -171,7 +170,7 @@ public: * completed. */ virtual void cancelCommand(const TaskExecutor::CallbackHandle& cbHandle, - const transport::BatonHandle& baton = nullptr) = 0; + const BatonHandle& baton = nullptr) = 0; /** * Sets an alarm, which schedules "action" to run no sooner than "when". @@ -187,9 +186,7 @@ public: * Any callbacks invoked from setAlarm must observe onNetworkThread to * return true. See that method for why. */ - virtual Status setAlarm(Date_t when, - unique_function<void()> action, - const transport::BatonHandle& baton = nullptr) = 0; + virtual Status setAlarm(Date_t when, unique_function<void()> action) = 0; /** * Returns true if called from a thread dedicated to networking. I.e. not a diff --git a/src/mongo/executor/network_interface_mock.cpp b/src/mongo/executor/network_interface_mock.cpp index f700f6c211e..641f1095530 100644 --- a/src/mongo/executor/network_interface_mock.cpp +++ b/src/mongo/executor/network_interface_mock.cpp @@ -108,7 +108,7 @@ std::string NetworkInterfaceMock::getHostName() { Status NetworkInterfaceMock::startCommand(const CallbackHandle& cbHandle, RemoteCommandRequest& request, RemoteCommandCompletionFn&& onFinish, - const transport::BatonHandle& baton) { + const BatonHandle& baton) { if (inShutdown()) { return {ErrorCodes::ShutdownInProgress, "NetworkInterfaceMock shutdown in progress"}; } @@ -140,8 +140,7 @@ void NetworkInterfaceMock::setHandshakeReplyForHost( } } -void NetworkInterfaceMock::cancelCommand(const CallbackHandle& cbHandle, - const transport::BatonHandle& baton) { +void NetworkInterfaceMock::cancelCommand(const CallbackHandle& cbHandle, const BatonHandle& baton) { invariant(!inShutdown()); stdx::lock_guard<stdx::mutex> lk(_mutex); @@ -171,9 +170,7 @@ void NetworkInterfaceMock::_interruptWithResponse_inlock( } } -Status NetworkInterfaceMock::setAlarm(const Date_t when, - unique_function<void()> action, - const transport::BatonHandle& baton) { +Status NetworkInterfaceMock::setAlarm(const Date_t when, unique_function<void()> action) { if (inShutdown()) { return {ErrorCodes::ShutdownInProgress, "NetworkInterfaceMock shutdown in progress"}; } diff --git a/src/mongo/executor/network_interface_mock.h b/src/mongo/executor/network_interface_mock.h index 937bed41525..c0f1693a660 100644 --- a/src/mongo/executor/network_interface_mock.h +++ b/src/mongo/executor/network_interface_mock.h @@ -114,7 +114,7 @@ public: Status startCommand(const TaskExecutor::CallbackHandle& cbHandle, RemoteCommandRequest& request, RemoteCommandCompletionFn&& onFinish, - const transport::BatonHandle& baton = nullptr) override; + const BatonHandle& baton = nullptr) override; /** * If the network operation is in the _unscheduled or _processing queues, moves the operation @@ -123,14 +123,12 @@ public: * called after the task has already completed, but its callback has not yet been run. */ void cancelCommand(const TaskExecutor::CallbackHandle& cbHandle, - const transport::BatonHandle& baton = nullptr) override; + const BatonHandle& baton = nullptr) override; /** * Not implemented. */ - Status setAlarm(Date_t when, - unique_function<void()> action, - const transport::BatonHandle& baton = nullptr) override; + Status setAlarm(Date_t when, unique_function<void()> action) override; bool onNetworkThread() override; diff --git a/src/mongo/executor/network_interface_tl.cpp b/src/mongo/executor/network_interface_tl.cpp index 512b3fee58b..5ca24991e8c 100644 --- a/src/mongo/executor/network_interface_tl.cpp +++ b/src/mongo/executor/network_interface_tl.cpp @@ -173,7 +173,7 @@ Date_t NetworkInterfaceTL::now() { Status NetworkInterfaceTL::startCommand(const TaskExecutor::CallbackHandle& cbHandle, RemoteCommandRequest& request, RemoteCommandCompletionFn&& onFinish, - const transport::BatonHandle& baton) { + const BatonHandle& baton) { if (inShutdown()) { return {ErrorCodes::ShutdownInProgress, "NetworkInterface shutdown in progress"}; } @@ -280,14 +280,19 @@ Status NetworkInterfaceTL::startCommand(const TaskExecutor::CallbackHandle& cbHa if (baton) { // If we have a baton, we want to get back to the baton thread immediately after we get a // connection - std::move(connFuture).getAsync([ - baton, - rw = std::move(remainingWork) - ](StatusWith<std::shared_ptr<CommandState::ConnHandle>> swConn) mutable { - baton->schedule([ rw = std::move(rw), swConn = std::move(swConn) ]() mutable { - std::move(rw)(std::move(swConn)); + std::move(connFuture) + .getAsync([ baton, reactor = _reactor.get(), rw = std::move(remainingWork) ]( + StatusWith<std::shared_ptr<CommandState::ConnHandle>> swConn) mutable { + baton->schedule([ rw = std::move(rw), + swConn = std::move(swConn) ](OperationContext * opCtx) mutable { + if (opCtx) { + std::move(rw)(std::move(swConn)); + } else { + std::move(rw)(Status(ErrorCodes::ShutdownInProgress, + "baton is detached, failing operation")); + } + }); }); - }); } else { // otherwise we're happy to run inline std::move(connFuture) @@ -306,7 +311,7 @@ Future<RemoteCommandResponse> NetworkInterfaceTL::_onAcquireConn( std::shared_ptr<CommandState> state, Future<RemoteCommandResponse> future, CommandState::ConnHandle conn, - const transport::BatonHandle& baton) { + const BatonHandle& baton) { if (MONGO_FAIL_POINT(networkInterfaceDiscardCommandsAfterAcquireConn)) { conn->indicateSuccess(); return future; @@ -416,7 +421,7 @@ void NetworkInterfaceTL::_eraseInUseConn(const TaskExecutor::CallbackHandle& cbH } void NetworkInterfaceTL::cancelCommand(const TaskExecutor::CallbackHandle& cbHandle, - const transport::BatonHandle& baton) { + const BatonHandle& baton) { stdx::unique_lock<stdx::mutex> lk(_inProgressMutex); auto it = _inProgress.find(cbHandle); if (it == _inProgress.end()) { @@ -445,19 +450,13 @@ void NetworkInterfaceTL::cancelCommand(const TaskExecutor::CallbackHandle& cbHan } } -Status NetworkInterfaceTL::setAlarm(Date_t when, - unique_function<void()> action, - const transport::BatonHandle& baton) { +Status NetworkInterfaceTL::setAlarm(Date_t when, unique_function<void()> action) { if (inShutdown()) { return {ErrorCodes::ShutdownInProgress, "NetworkInterface shutdown in progress"}; } if (when <= now()) { - if (baton) { - baton->schedule(std::move(action)); - } else { - _reactor->schedule(transport::Reactor::kPost, std::move(action)); - } + _reactor->schedule(transport::Reactor::kPost, std::move(action)); return Status::OK(); } @@ -469,39 +468,34 @@ Status NetworkInterfaceTL::setAlarm(Date_t when, _inProgressAlarms.insert(alarmTimer); } - alarmTimer->waitUntil(when, baton) - .getAsync( - [ this, weakTimer, action = std::move(action), when, baton ](Status status) mutable { - auto alarmTimer = weakTimer.lock(); - if (!alarmTimer) { - return; - } else { - stdx::lock_guard<stdx::mutex> lk(_inProgressMutex); - _inProgressAlarms.erase(alarmTimer); - } - - auto nowVal = now(); - if (nowVal < when) { - warning() << "Alarm returned early. Expected at: " << when - << ", fired at: " << nowVal; - const auto status = setAlarm(when, std::move(action), baton); - if ((!status.isOK()) && (status != ErrorCodes::ShutdownInProgress)) { - fassertFailedWithStatus(50785, status); - } + alarmTimer->waitUntil(when, nullptr) + .getAsync([ this, weakTimer, action = std::move(action), when ](Status status) mutable { + auto alarmTimer = weakTimer.lock(); + if (!alarmTimer) { + return; + } else { + stdx::lock_guard<stdx::mutex> lk(_inProgressMutex); + _inProgressAlarms.erase(alarmTimer); + } - return; + auto nowVal = now(); + if (nowVal < when) { + warning() << "Alarm returned early. Expected at: " << when + << ", fired at: " << nowVal; + const auto status = setAlarm(when, std::move(action)); + if ((!status.isOK()) && (status != ErrorCodes::ShutdownInProgress)) { + fassertFailedWithStatus(50785, status); } - if (status.isOK()) { - if (baton) { - baton->schedule(std::move(action)); - } else { - _reactor->schedule(transport::Reactor::kPost, std::move(action)); - } - } else if (status != ErrorCodes::CallbackCanceled) { - warning() << "setAlarm() received an error: " << status; - } - }); + return; + } + + if (status.isOK()) { + _reactor->schedule(transport::Reactor::kPost, std::move(action)); + } else if (status != ErrorCodes::CallbackCanceled) { + warning() << "setAlarm() received an error: " << status; + } + }); return Status::OK(); } diff --git a/src/mongo/executor/network_interface_tl.h b/src/mongo/executor/network_interface_tl.h index 0b0c99cd03e..5cd5dd6115b 100644 --- a/src/mongo/executor/network_interface_tl.h +++ b/src/mongo/executor/network_interface_tl.h @@ -68,13 +68,11 @@ public: Status startCommand(const TaskExecutor::CallbackHandle& cbHandle, RemoteCommandRequest& request, RemoteCommandCompletionFn&& onFinish, - const transport::BatonHandle& baton) override; + const BatonHandle& baton) override; void cancelCommand(const TaskExecutor::CallbackHandle& cbHandle, - const transport::BatonHandle& baton) override; - Status setAlarm(Date_t when, - unique_function<void()> action, - const transport::BatonHandle& baton) override; + const BatonHandle& baton) override; + Status setAlarm(Date_t when, unique_function<void()> action) override; bool onNetworkThread() override; @@ -117,7 +115,7 @@ private: Future<RemoteCommandResponse> _onAcquireConn(std::shared_ptr<CommandState> state, Future<RemoteCommandResponse> future, CommandState::ConnHandle conn, - const transport::BatonHandle& baton); + const BatonHandle& baton); std::string _instanceName; ServiceContext* _svcCtx; diff --git a/src/mongo/executor/task_executor.h b/src/mongo/executor/task_executor.h index 1e3051e17b4..37c2669f306 100644 --- a/src/mongo/executor/task_executor.h +++ b/src/mongo/executor/task_executor.h @@ -239,7 +239,7 @@ public: virtual StatusWith<CallbackHandle> scheduleRemoteCommand( const RemoteCommandRequest& request, const RemoteCommandCallbackFn& cb, - const transport::BatonHandle& baton = nullptr) = 0; + const BatonHandle& baton = nullptr) = 0; /** * If the callback referenced by "cbHandle" hasn't already executed, marks it as diff --git a/src/mongo/executor/thread_pool_task_executor.cpp b/src/mongo/executor/thread_pool_task_executor.cpp index 1af20aa8692..4800de47db1 100644 --- a/src/mongo/executor/thread_pool_task_executor.cpp +++ b/src/mongo/executor/thread_pool_task_executor.cpp @@ -67,14 +67,14 @@ class ThreadPoolTaskExecutor::CallbackState : public TaskExecutor::CallbackState public: static std::shared_ptr<CallbackState> make(CallbackFn&& cb, Date_t readyDate, - const transport::BatonHandle& baton) { + const BatonHandle& baton) { return std::make_shared<CallbackState>(std::move(cb), readyDate, baton); } /** * Do not call directly. Use make. */ - CallbackState(CallbackFn&& cb, Date_t theReadyDate, const transport::BatonHandle& baton) + CallbackState(CallbackFn&& cb, Date_t theReadyDate, const BatonHandle& baton) : callback(std::move(cb)), readyDate(theReadyDate), baton(baton) {} virtual ~CallbackState() = default; @@ -102,7 +102,7 @@ public: bool isNetworkOperation = false; AtomicWord<bool> isFinished{false}; boost::optional<stdx::condition_variable> finishedCondition; - transport::BatonHandle baton; + BatonHandle baton; }; class ThreadPoolTaskExecutor::EventState : public TaskExecutor::EventState { @@ -350,21 +350,23 @@ StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::scheduleWorkAt( return cbHandle; } lk.unlock(); - _net->setAlarm(when, - [this, cbHandle] { - auto cbState = - checked_cast<CallbackState*>(getCallbackFromHandle(cbHandle.getValue())); - if (cbState->canceled.load()) { - return; - } - stdx::unique_lock<stdx::mutex> lk(_mutex); - if (cbState->canceled.load()) { - return; - } - scheduleIntoPool_inlock(&_sleepersQueue, cbState->iter, std::move(lk)); - }, - nullptr) - .transitional_ignore(); + + auto status = _net->setAlarm(when, [this, cbHandle] { + auto cbState = checked_cast<CallbackState*>(getCallbackFromHandle(cbHandle.getValue())); + if (cbState->canceled.load()) { + return; + } + stdx::unique_lock<stdx::mutex> lk(_mutex); + if (cbState->canceled.load()) { + return; + } + scheduleIntoPool_inlock(&_sleepersQueue, cbState->iter, std::move(lk)); + }); + + if (!status.isOK()) { + cancel(cbHandle.getValue()); + return status; + } return cbHandle; } @@ -406,7 +408,7 @@ const auto initialSyncPauseCmds = StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::scheduleRemoteCommand( const RemoteCommandRequest& request, const RemoteCommandCallbackFn& cb, - const transport::BatonHandle& baton) { + const BatonHandle& baton) { if (MONGO_FAIL_POINT(initialSyncFuzzerSynchronizationPoint1)) { // We are only going to pause on these failpoints if the command issued is for the @@ -537,7 +539,7 @@ StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::enqueueCallback } ThreadPoolTaskExecutor::WorkQueue ThreadPoolTaskExecutor::makeSingletonWorkQueue( - CallbackFn work, const transport::BatonHandle& baton, Date_t when) { + CallbackFn work, const BatonHandle& baton, Date_t when) { WorkQueue result; result.emplace_front(CallbackState::make(std::move(work), when, baton)); result.front()->iter = result.begin(); @@ -592,7 +594,17 @@ void ThreadPoolTaskExecutor::scheduleIntoPool_inlock(WorkQueue* fromQueue, for (const auto& cbState : todo) { if (cbState->baton) { - cbState->baton->schedule([this, cbState] { runCallback(std::move(cbState)); }); + cbState->baton->schedule([this, cbState](OperationContext* opCtx) { + if (opCtx) { + runCallback(std::move(cbState)); + return; + } + + cbState->canceled.store(1); + const auto status = + _pool->schedule([this, cbState] { runCallback(std::move(cbState)); }); + invariant(status.isOK() || status == ErrorCodes::ShutdownInProgress); + }); } else { const auto status = _pool->schedule([this, cbState] { runCallback(std::move(cbState)); }); diff --git a/src/mongo/executor/thread_pool_task_executor.h b/src/mongo/executor/thread_pool_task_executor.h index 1558eaf7f01..1832a021d2c 100644 --- a/src/mongo/executor/thread_pool_task_executor.h +++ b/src/mongo/executor/thread_pool_task_executor.h @@ -74,7 +74,7 @@ public: void startup() override; void shutdown() override; void join() override; - void appendDiagnosticBSON(BSONObjBuilder* b) const; + void appendDiagnosticBSON(BSONObjBuilder* b) const override; Date_t now() override; StatusWith<EventHandle> makeEvent() override; void signalEvent(const EventHandle& event) override; @@ -85,10 +85,9 @@ public: void waitForEvent(const EventHandle& event) override; StatusWith<CallbackHandle> scheduleWork(CallbackFn work) override; StatusWith<CallbackHandle> scheduleWorkAt(Date_t when, CallbackFn work) override; - StatusWith<CallbackHandle> scheduleRemoteCommand( - const RemoteCommandRequest& request, - const RemoteCommandCallbackFn& cb, - const transport::BatonHandle& baton = nullptr) override; + StatusWith<CallbackHandle> scheduleRemoteCommand(const RemoteCommandRequest& request, + const RemoteCommandCallbackFn& cb, + const BatonHandle& baton = nullptr) override; void cancel(const CallbackHandle& cbHandle) override; void wait(const CallbackHandle& cbHandle, Interruptible* interruptible = Interruptible::notInterruptible()) override; @@ -138,7 +137,7 @@ private: * called outside of _mutex. */ static WorkQueue makeSingletonWorkQueue(CallbackFn work, - const transport::BatonHandle& baton, + const BatonHandle& baton, Date_t when = {}); /** diff --git a/src/mongo/s/async_requests_sender.h b/src/mongo/s/async_requests_sender.h index 4646d371db8..a9bdcc43cb0 100644 --- a/src/mongo/s/async_requests_sender.h +++ b/src/mongo/s/async_requests_sender.h @@ -227,15 +227,15 @@ private: explicit BatonDetacher(OperationContext* opCtx); ~BatonDetacher(); - transport::Baton& operator*() const { + Baton& operator*() const { return *_baton; } - transport::Baton* operator->() const noexcept { + Baton* operator->() const noexcept { return _baton.get(); } - operator transport::BatonHandle() const { + operator BatonHandle() const { return _baton; } @@ -244,7 +244,7 @@ private: } private: - transport::BatonHandle _baton; + BatonHandle _baton; }; /** diff --git a/src/mongo/s/sharding_task_executor.cpp b/src/mongo/s/sharding_task_executor.cpp index d11de192011..8189b308c33 100644 --- a/src/mongo/s/sharding_task_executor.cpp +++ b/src/mongo/s/sharding_task_executor.cpp @@ -115,7 +115,7 @@ StatusWith<TaskExecutor::CallbackHandle> ShardingTaskExecutor::scheduleWorkAt(Da StatusWith<TaskExecutor::CallbackHandle> ShardingTaskExecutor::scheduleRemoteCommand( const RemoteCommandRequest& request, const RemoteCommandCallbackFn& cb, - const transport::BatonHandle& baton) { + const BatonHandle& baton) { // schedule the user's callback if there is not opCtx if (!request.opCtx) { diff --git a/src/mongo/s/sharding_task_executor.h b/src/mongo/s/sharding_task_executor.h index 3a0df5fb57e..4f43a70b5a0 100644 --- a/src/mongo/s/sharding_task_executor.h +++ b/src/mongo/s/sharding_task_executor.h @@ -69,10 +69,9 @@ public: Date_t deadline) override; StatusWith<CallbackHandle> scheduleWork(CallbackFn work) override; StatusWith<CallbackHandle> scheduleWorkAt(Date_t when, CallbackFn work) override; - StatusWith<CallbackHandle> scheduleRemoteCommand( - const RemoteCommandRequest& request, - const RemoteCommandCallbackFn& cb, - const transport::BatonHandle& baton = nullptr) override; + StatusWith<CallbackHandle> scheduleRemoteCommand(const RemoteCommandRequest& request, + const RemoteCommandCallbackFn& cb, + const BatonHandle& baton = nullptr) override; void cancel(const CallbackHandle& cbHandle) override; void wait(const CallbackHandle& cbHandle, Interruptible* interruptible = Interruptible::notInterruptible()) override; diff --git a/src/mongo/transport/SConscript b/src/mongo/transport/SConscript index 43032ccb1b7..12b2cf44899 100644 --- a/src/mongo/transport/SConscript +++ b/src/mongo/transport/SConscript @@ -14,6 +14,9 @@ env.Library( LIBDEPS=[ '$BUILD_DIR/mongo/base', ], + LIBDEPS_PRIVATE=[ + '$BUILD_DIR/mongo/db/service_context', + ], ) env.Library( diff --git a/src/mongo/transport/baton.h b/src/mongo/transport/baton.h index 9dbf570ede2..de981ef6284 100644 --- a/src/mongo/transport/baton.h +++ b/src/mongo/transport/baton.h @@ -1,6 +1,6 @@ /** - * Copyright (C) 2018-present MongoDB, Inc. + * Copyright (C) 2019-present MongoDB, Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the Server Side Public License, version 1, @@ -32,9 +32,11 @@ #include <memory> +#include "mongo/db/baton.h" #include "mongo/transport/transport_layer.h" #include "mongo/util/functional.h" #include "mongo/util/future.h" +#include "mongo/util/out_of_line_executor.h" #include "mongo/util/time_support.h" #include "mongo/util/waitable.h" @@ -49,44 +51,17 @@ class Session; class ReactorTimer; /** - * A Baton is basically a networking reactor, with limited functionality and no forward progress - * guarantees. Rather than asynchronously running tasks through one, the baton records the intent - * of those tasks and defers waiting and execution to a later call to run(); + * A NetworkingBaton is basically a networking reactor, with limited functionality and parallel + * forward progress guarantees. Rather than asynchronously running tasks through one, the baton + * records the intent of those tasks and defers waiting and execution to a later call to run(); * - * Baton's provide a mechanism to allow consumers of a transport layer to execute IO themselves, - * rather than having this occur on another thread. This can improve performance by minimizing - * context switches, as well as improving the readability of stack traces by grounding async - * execution on top of a regular client call stack. + * NetworkingBaton's provide a mechanism to allow consumers of a transport layer to execute IO + * themselves, rather than having this occur on another thread. This can improve performance by + * minimizing context switches, as well as improving the readability of stack traces by grounding + * async execution on top of a regular client call stack. */ -class Baton : public Waitable { +class NetworkingBaton : public Baton { public: - virtual ~Baton() = default; - - /** - * Detaches a baton from an associated opCtx. - */ - virtual void detach() = 0; - - /** - * Executes a callback on the baton via schedule. Returns a future which will execute on the - * baton runner. - */ - template <typename Callback> - Future<FutureContinuationResult<Callback>> execute(Callback&& cb) { - auto pf = makePromiseFuture<FutureContinuationResult<Callback>>(); - - schedule([ cb = std::forward<Callback>(cb), p = std::move(pf.promise) ]() mutable { - p.setWith(std::move(cb)); - }); - - return std::move(pf.future); - } - - /** - * Executes a callback on the baton. - */ - virtual void schedule(unique_function<void()> func) = 0; - /** * Adds a session, returning a future which activates on read/write-ability of the session. */ @@ -94,29 +69,31 @@ public: In, Out, }; - virtual Future<void> addSession(Session& session, Type type) = 0; + virtual Future<void> addSession(Session& session, Type type) noexcept = 0; /** * Adds a timer, returning a future which activates after a deadline. */ - virtual Future<void> waitUntil(const ReactorTimer& timer, Date_t expiration) = 0; + virtual Future<void> waitUntil(const ReactorTimer& timer, Date_t expiration) noexcept = 0; /** * Cancels waiting on a session. * * Returns true if the session was in the baton to be cancelled. */ - virtual bool cancelSession(Session& session) = 0; + virtual bool cancelSession(Session& session) noexcept = 0; /** * Cancels waiting on a timer * * Returns true if the timer was in the baton to be cancelled. */ - virtual bool cancelTimer(const ReactorTimer& timer) = 0; -}; + virtual bool cancelTimer(const ReactorTimer& timer) noexcept = 0; -using BatonHandle = std::shared_ptr<Baton>; + NetworkingBaton* networking() noexcept final { + return this; + } +}; } // namespace transport } // namespace mongo diff --git a/src/mongo/transport/baton_asio_linux.h b/src/mongo/transport/baton_asio_linux.h index 63bfe67da3f..55570768ba7 100644 --- a/src/mongo/transport/baton_asio_linux.h +++ b/src/mongo/transport/baton_asio_linux.h @@ -30,8 +30,8 @@ #pragma once +#include <map> #include <memory> -#include <set> #include <vector> #include <poll.h> @@ -55,7 +55,7 @@ namespace transport { * * We implement our networking reactor on top of poll + eventfd for wakeups */ -class TransportLayerASIO::BatonASIO : public Baton { +class TransportLayerASIO::BatonASIO : public NetworkingBaton { /** * We use this internal reactor timer to exit run_until calls (by forcing an early timeout for * ::poll). @@ -117,6 +117,8 @@ class TransportLayerASIO::BatonASIO : public Baton { } const int fd; + + static const Client::Decoration<EventFDHolder> getForClient; }; public: @@ -129,75 +131,69 @@ public: invariant(_timers.empty()); } - void detach() override { - { - stdx::lock_guard<stdx::mutex> lk(_mutex); - invariant(_sessions.empty()); - invariant(_scheduled.empty()); - invariant(_timers.empty()); - } + void markKillOnClientDisconnect() noexcept override { + if (_opCtx->getClient() && _opCtx->getClient()->session()) { + addSessionImpl(*(_opCtx->getClient()->session()), POLLRDHUP).getAsync([this](Status s) { + if (!s.isOK()) { + return; + } - { - stdx::lock_guard<Client> lk(*_opCtx->getClient()); - invariant(_opCtx->getBaton().get() == this); - _opCtx->setBaton(nullptr); + _opCtx->markKilled(ErrorCodes::ClientDisconnect); + }); } + } - _opCtx = nullptr; + Future<void> addSession(Session& session, Type type) noexcept override { + return addSessionImpl(session, type == Type::In ? POLLIN : POLLOUT); } - Future<void> addSession(Session& session, Type type) override { - auto fd = checked_cast<ASIOSession&>(session).getSocket().native_handle(); + Future<void> waitUntil(const ReactorTimer& timer, Date_t expiration) noexcept override { auto pf = makePromiseFuture<void>(); + auto id = timer.id(); - _safeExecute([ fd, type, promise = std::move(pf.promise), this ]() mutable { - _sessions[fd] = TransportSession{type, std::move(promise)}; - }); + stdx::unique_lock<stdx::mutex> lk(_mutex); - return std::move(pf.future); - } + if (!_opCtx) { + return Status(ErrorCodes::ShutdownInProgress, + "baton is detached, cannot waitUntil on timer"); + } - Future<void> waitUntil(const ReactorTimer& timer, Date_t expiration) override { - auto pf = makePromiseFuture<void>(); - _safeExecute( - [ timerPtr = &timer, expiration, promise = std::move(pf.promise), this ]() mutable { - auto pair = _timers.insert({ - timerPtr, expiration, std::move(promise), - }); - invariant(pair.second); - _timersById[pair.first->id] = pair.first; - }); + _safeExecute(std::move(lk), + [ id, expiration, promise = std::move(pf.promise), this ]() mutable { + auto iter = _timers.emplace(std::piecewise_construct, + std::forward_as_tuple(expiration), + std::forward_as_tuple(id, std::move(promise))); + _timersById[id] = iter; + }); return std::move(pf.future); } - bool cancelSession(Session& session) override { - const auto fd = checked_cast<ASIOSession&>(session).getSocket().native_handle(); + bool cancelSession(Session& session) noexcept override { + const auto id = session.id(); stdx::unique_lock<stdx::mutex> lk(_mutex); - if (_sessions.find(fd) == _sessions.end()) { + if (_sessions.find(id) == _sessions.end()) { return false; } - // TODO: There's an ABA issue here with fds where between previously and before we could - // have removed the fd, then opened and added a new socket with the same fd. We need to - // solve it via using session id's for handles. - _safeExecute(std::move(lk), [fd, this] { _sessions.erase(fd); }); + _safeExecute(std::move(lk), [id, this] { _sessions.erase(id); }); return true; } - bool cancelTimer(const ReactorTimer& timer) override { + bool cancelTimer(const ReactorTimer& timer) noexcept override { + const auto id = timer.id(); + stdx::unique_lock<stdx::mutex> lk(_mutex); - if (_timersById.find(&timer) == _timersById.end()) { + if (_timersById.find(id) == _timersById.end()) { return false; } - // TODO: Same ABA issue as above, but for pointers. - _safeExecute(std::move(lk), [ timerPtr = &timer, this ] { - auto iter = _timersById.find(timerPtr); + _safeExecute(std::move(lk), [id, this] { + auto iter = _timersById.find(id); if (iter != _timersById.end()) { _timers.erase(iter->second); @@ -208,18 +204,24 @@ public: return true; } - void schedule(unique_function<void()> func) override { + void schedule(unique_function<void(OperationContext*)> func) noexcept override { stdx::lock_guard<stdx::mutex> lk(_mutex); + if (!_opCtx) { + func(nullptr); + + return; + } + _scheduled.push_back(std::move(func)); if (_inPoll) { - _efd.notify(); + efd().notify(); } } void notify() noexcept override { - _efd.notify(); + efd().notify(); } /** @@ -263,7 +265,7 @@ public: lk.unlock(); for (auto& job : toRun) { - job(); + job(_opCtx); } lk.lock(); } @@ -280,21 +282,19 @@ public: // If we have a timer, poll no longer than that if (_timers.size()) { - deadline = _timers.begin()->expiration; + deadline = _timers.begin()->first; } std::vector<decltype(_sessions)::iterator> sessions; sessions.reserve(_sessions.size()); std::vector<pollfd> pollSet; + pollSet.reserve(_sessions.size() + 1); - pollSet.push_back(pollfd{_efd.fd, POLLIN, 0}); + pollSet.push_back(pollfd{efd().fd, POLLIN, 0}); for (auto iter = _sessions.begin(); iter != _sessions.end(); ++iter) { - pollSet.push_back( - pollfd{iter->first, - static_cast<short>(iter->second.type == Type::In ? POLLIN : POLLOUT), - 0}); + pollSet.push_back(pollfd{iter->second.fd, iter->second.type, 0}); sessions.push_back(iter); } @@ -330,9 +330,9 @@ public: now = clkSource->now(); // Fire expired timers - for (auto iter = _timers.begin(); iter != _timers.end() && iter->expiration < now;) { - toFulfill.push_back(std::move(iter->promise)); - _timersById.erase(iter->id); + for (auto iter = _timers.begin(); iter != _timers.end() && iter->first < now;) { + toFulfill.push_back(std::move(iter->second.promise)); + _timersById.erase(iter->second.id); iter = _timers.erase(iter); } @@ -343,12 +343,13 @@ public: auto pollIter = pollSet.begin(); if (pollIter->revents) { - _efd.wait(); + efd().wait(); remaining--; } ++pollIter; + for (auto sessionIter = sessions.begin(); sessionIter != sessions.end() && remaining; ++sessionIter, ++pollIter) { if (pollIter->revents) { @@ -366,28 +367,75 @@ public: } private: - struct Timer { - const ReactorTimer* id; - Date_t expiration; - mutable Promise<void> promise; // Needs to be mutable to move from it while in std::set. + Future<void> addSessionImpl(Session& session, short type) noexcept { + auto fd = checked_cast<ASIOSession&>(session).getSocket().native_handle(); + auto id = session.id(); + auto pf = makePromiseFuture<void>(); + + stdx::unique_lock<stdx::mutex> lk(_mutex); + + if (!_opCtx) { + return Status(ErrorCodes::ShutdownInProgress, "baton is detached, cannot addSession"); + } - struct LessThan { - bool operator()(const Timer& lhs, const Timer& rhs) const { - return std::tie(lhs.expiration, lhs.id) < std::tie(rhs.expiration, rhs.id); + _safeExecute(std::move(lk), + [ id, fd, type, promise = std::move(pf.promise), this ]() mutable { + _sessions[id] = TransportSession{fd, type, std::move(promise)}; + }); + + return std::move(pf.future); + } + + void detachImpl() noexcept override { + decltype(_sessions) sessions; + decltype(_scheduled) scheduled; + decltype(_timers) timers; + + { + stdx::lock_guard<stdx::mutex> lk(_mutex); + + { + stdx::lock_guard<Client> lk(*_opCtx->getClient()); + invariant(_opCtx->getBaton().get() == this); + _opCtx->setBaton(nullptr); } - }; + + _opCtx = nullptr; + + using std::swap; + swap(_sessions, sessions); + swap(_scheduled, scheduled); + swap(_timers, timers); + } + + for (auto& job : scheduled) { + job(nullptr); + } + + for (auto& session : sessions) { + session.second.promise.setError(Status(ErrorCodes::ShutdownInProgress, + "baton is detached, cannot wait for socket")); + } + + for (auto& pair : timers) { + pair.second.promise.setError(Status(ErrorCodes::ShutdownInProgress, + "baton is detached, completing timer early")); + } + } + + struct Timer { + Timer(size_t id, Promise<void> promise) : id(id), promise(std::move(promise)) {} + + size_t id; + Promise<void> promise; // Needs to be mutable to move from it while in std::set. }; struct TransportSession { - Type type; + int fd; + short type; Promise<void> promise; }; - template <typename Callback> - void _safeExecute(Callback&& cb) { - return _safeExecute(stdx::unique_lock<stdx::mutex>(_mutex), std::forward<Callback>(cb)); - } - /** * Safely executes method on the reactor. If we're in poll, we schedule a task, then write to * the eventfd. If not, we run inline. @@ -395,37 +443,44 @@ private: template <typename Callback> void _safeExecute(stdx::unique_lock<stdx::mutex> lk, Callback&& cb) { if (_inPoll) { - _scheduled.push_back([ cb = std::forward<Callback>(cb), this ]() mutable { - stdx::lock_guard<stdx::mutex> lk(_mutex); - cb(); - }); + _scheduled.push_back( + [ cb = std::forward<Callback>(cb), this ](OperationContext*) mutable { + stdx::lock_guard<stdx::mutex> lk(_mutex); + cb(); + }); - _efd.notify(); + efd().notify(); } else { cb(); } } + EventFDHolder& efd() { + return EventFDHolder::getForClient(_opCtx->getClient()); + } + stdx::mutex _mutex; OperationContext* _opCtx; bool _inPoll = false; - EventFDHolder _efd; - // This map stores the sessions we need to poll on. We unwind it into a pollset for every // blocking call to run - stdx::unordered_map<int, TransportSession> _sessions; + stdx::unordered_map<SessionId, TransportSession> _sessions; // The set is used to find the next timer which will fire. The unordered_map looks up the // timers so we can remove them in O(1) - std::set<Timer, Timer::LessThan> _timers; - stdx::unordered_map<const ReactorTimer*, decltype(_timers)::const_iterator> _timersById; + std::multimap<Date_t, Timer> _timers; + stdx::unordered_map<size_t, decltype(_timers)::const_iterator> _timersById; // For tasks that come in via schedule. Or that were deferred because we were in poll - std::vector<unique_function<void()>> _scheduled; + std::vector<unique_function<void(OperationContext*)>> _scheduled; }; +const Client::Decoration<TransportLayerASIO::BatonASIO::EventFDHolder> + TransportLayerASIO::BatonASIO::EventFDHolder::getForClient = + Client::declareDecoration<TransportLayerASIO::BatonASIO::EventFDHolder>(); + } // namespace transport } // namespace mongo diff --git a/src/mongo/transport/mock_session.h b/src/mongo/transport/mock_session.h index a5c4b78de82..4e350887446 100644 --- a/src/mongo/transport/mock_session.h +++ b/src/mongo/transport/mock_session.h @@ -85,7 +85,7 @@ public: return Message(); // Subclasses can do something different. } - Future<Message> asyncSourceMessage(const transport::BatonHandle& handle = nullptr) override { + Future<Message> asyncSourceMessage(const BatonHandle& handle = nullptr) override { return Future<Message>::makeReady(sourceMessage()); } @@ -101,12 +101,11 @@ public: return Status::OK(); } - Future<void> asyncSinkMessage(Message message, - const transport::BatonHandle& handle = nullptr) override { + Future<void> asyncSinkMessage(Message message, const BatonHandle& handle = nullptr) override { return Future<void>::makeReady(sinkMessage(message)); } - void cancelAsyncOperations(const transport::BatonHandle& handle = nullptr) override {} + void cancelAsyncOperations(const BatonHandle& handle = nullptr) override {} void setTimeout(boost::optional<Milliseconds>) override {} diff --git a/src/mongo/transport/session.h b/src/mongo/transport/session.h index 4fa709ed849..28349273362 100644 --- a/src/mongo/transport/session.h +++ b/src/mongo/transport/session.h @@ -33,6 +33,7 @@ #include <memory> #include "mongo/base/disallow_copying.h" +#include "mongo/db/baton.h" #include "mongo/platform/atomic_word.h" #include "mongo/rpc/message.h" #include "mongo/transport/session_id.h" @@ -46,8 +47,6 @@ namespace transport { class TransportLayer; class Session; -class Baton; -using BatonHandle = std::shared_ptr<Baton>; using SessionHandle = std::shared_ptr<Session>; using ConstSessionHandle = std::shared_ptr<const Session>; @@ -107,7 +106,7 @@ public: * Source (receive) a new Message from the remote host for this Session. */ virtual StatusWith<Message> sourceMessage() = 0; - virtual Future<Message> asyncSourceMessage(const transport::BatonHandle& handle = nullptr) = 0; + virtual Future<Message> asyncSourceMessage(const BatonHandle& handle = nullptr) = 0; /** * Sink (send) a Message to the remote host for this Session. @@ -115,15 +114,14 @@ public: * Async version will keep the buffer alive until the operation completes. */ virtual Status sinkMessage(Message message) = 0; - virtual Future<void> asyncSinkMessage(Message message, - const transport::BatonHandle& handle = nullptr) = 0; + virtual Future<void> asyncSinkMessage(Message message, const BatonHandle& handle = nullptr) = 0; /** * Cancel any outstanding async operations. There is no way to cancel synchronous calls. * Futures will finish with an ErrorCodes::CallbackCancelled error if they haven't already * completed. */ - virtual void cancelAsyncOperations(const transport::BatonHandle& handle = nullptr) = 0; + virtual void cancelAsyncOperations(const BatonHandle& handle = nullptr) = 0; /** * This should only be used to detect when the remote host has disappeared without @@ -155,14 +153,14 @@ public: * * The 'kPending' tag is only for new sessions; callers should not set it directly. */ - virtual void setTags(TagMask tagsToSet); + void setTags(TagMask tagsToSet); /** * Atomically clears all of the session tags specified in the 'tagsToUnset' bit field. If the * 'kPending' tag is set, indicating that no tags have yet been specified for the session, this * function also clears that tag as part of the same atomic operation. */ - virtual void unsetTags(TagMask tagsToUnset); + void unsetTags(TagMask tagsToUnset); /** * Loads the session tags, passes them to 'mutateFunc' and then stores the result of that call @@ -175,9 +173,9 @@ public: * of the 'mutateFunc' call. The 'kPending' tag is only for new sessions; callers should never * try to set it. */ - virtual void mutateTags(const stdx::function<TagMask(TagMask)>& mutateFunc); + void mutateTags(const stdx::function<TagMask(TagMask)>& mutateFunc); - virtual TagMask getTags() const; + TagMask getTags() const; protected: Session(); diff --git a/src/mongo/transport/session_asio.h b/src/mongo/transport/session_asio.h index b74b29af1d9..49f56357686 100644 --- a/src/mongo/transport/session_asio.h +++ b/src/mongo/transport/session_asio.h @@ -133,7 +133,7 @@ public: return sourceMessageImpl().getNoThrow(); } - Future<Message> asyncSourceMessage(const transport::BatonHandle& baton = nullptr) override { + Future<Message> asyncSourceMessage(const BatonHandle& baton = nullptr) override { ensureAsync(); return sourceMessageImpl(baton); } @@ -150,8 +150,7 @@ public: .getNoThrow(); } - Future<void> asyncSinkMessage(Message message, - const transport::BatonHandle& baton = nullptr) override { + Future<void> asyncSinkMessage(Message message, const BatonHandle& baton = nullptr) override { ensureAsync(); return write(asio::buffer(message.buf(), message.size()), baton) .then([this, message /*keep the buffer alive*/]() { @@ -161,10 +160,10 @@ public: }); } - void cancelAsyncOperations(const transport::BatonHandle& baton = nullptr) override { + void cancelAsyncOperations(const BatonHandle& baton = nullptr) override { LOG(3) << "Cancelling outstanding I/O operations on connection to " << _remote; - if (baton) { - baton->cancelSession(*this); + if (baton && baton->networking()) { + baton->networking()->cancelSession(*this); } else { getSocket().cancel(); } @@ -342,7 +341,7 @@ private: return _socket; } - Future<Message> sourceMessageImpl(const transport::BatonHandle& baton = nullptr) { + Future<Message> sourceMessageImpl(const BatonHandle& baton = nullptr) { static constexpr auto kHeaderSize = sizeof(MSGHEADER::Value); auto headerBuffer = SharedBuffer::allocate(kHeaderSize); @@ -387,8 +386,7 @@ private: } template <typename MutableBufferSequence> - Future<void> read(const MutableBufferSequence& buffers, - const transport::BatonHandle& baton = nullptr) { + Future<void> read(const MutableBufferSequence& buffers, const BatonHandle& baton = nullptr) { #ifdef MONGO_CONFIG_SSL if (_sslSocket) { return opportunisticRead(*_sslSocket, buffers, baton); @@ -413,8 +411,7 @@ private: } template <typename ConstBufferSequence> - Future<void> write(const ConstBufferSequence& buffers, - const transport::BatonHandle& baton = nullptr) { + Future<void> write(const ConstBufferSequence& buffers, const BatonHandle& baton = nullptr) { #ifdef MONGO_CONFIG_SSL _ranHandshake = true; if (_sslSocket) { @@ -439,7 +436,7 @@ private: template <typename Stream, typename MutableBufferSequence> Future<void> opportunisticRead(Stream& stream, const MutableBufferSequence& buffers, - const transport::BatonHandle& baton = nullptr) { + const BatonHandle& baton = nullptr) { std::error_code ec; size_t size; @@ -469,8 +466,9 @@ private: asyncBuffers += size; } - if (baton) { - return baton->addSession(*this, Baton::Type::In) + if (baton && baton->networking()) { + return baton->networking() + ->addSession(*this, NetworkingBaton::Type::In) .then([&stream, asyncBuffers, baton, this] { return opportunisticRead(stream, asyncBuffers, baton); }); @@ -494,7 +492,7 @@ private: template <typename ConstBufferSequence> boost::optional<Future<void>> moreToSend(GenericSocket& socket, const ConstBufferSequence& buffers, - const transport::BatonHandle& baton) { + const BatonHandle& baton) { return boost::none; } @@ -517,7 +515,7 @@ private: template <typename Stream, typename ConstBufferSequence> Future<void> opportunisticWrite(Stream& stream, const ConstBufferSequence& buffers, - const transport::BatonHandle& baton = nullptr) { + const BatonHandle& baton = nullptr) { std::error_code ec; std::size_t size; @@ -552,8 +550,9 @@ private: return std::move(*more); } - if (baton) { - return baton->addSession(*this, Baton::Type::Out) + if (baton && baton->networking()) { + return baton->networking() + ->addSession(*this, NetworkingBaton::Type::Out) .then([&stream, asyncBuffers, baton, this] { return opportunisticWrite(stream, asyncBuffers, baton); }); diff --git a/src/mongo/transport/transport_layer.cpp b/src/mongo/transport/transport_layer.cpp index 442cf78b15c..e2b997add0c 100644 --- a/src/mongo/transport/transport_layer.cpp +++ b/src/mongo/transport/transport_layer.cpp @@ -31,11 +31,20 @@ #include "mongo/platform/basic.h" #include "mongo/base/status.h" +#include "mongo/db/operation_context.h" +#include "mongo/platform/atomic_word.h" +#include "mongo/transport/baton.h" #include "mongo/transport/transport_layer.h" namespace mongo { namespace transport { +namespace { + +AtomicWord<uint64_t> reactorTimerIdCounter(0); + +} // namespace + const Status TransportLayer::SessionUnknownStatus = Status(ErrorCodes::TransportSessionUnknown, "TransportLayer does not own the Session."); @@ -48,5 +57,11 @@ const Status TransportLayer::TicketSessionUnknownStatus = Status( const Status TransportLayer::TicketSessionClosedStatus = Status( ErrorCodes::TransportSessionClosed, "Operation attempted on a closed transport Session."); +ReactorTimer::ReactorTimer() : _id(reactorTimerIdCounter.addAndFetch(1)) {} + +BatonHandle TransportLayer::makeDefaultBaton(OperationContext* opCtx) { + return opCtx->getServiceContext()->makeBaton(opCtx); +} + } // namespace transport } // namespace mongo diff --git a/src/mongo/transport/transport_layer.h b/src/mongo/transport/transport_layer.h index ca1453cb3a4..0b736ba600f 100644 --- a/src/mongo/transport/transport_layer.h +++ b/src/mongo/transport/transport_layer.h @@ -109,17 +109,21 @@ public: enum WhichReactor { kIngress, kEgress, kNewReactor }; virtual ReactorHandle getReactor(WhichReactor which) = 0; - virtual BatonHandle makeBaton(OperationContext* opCtx) { - return nullptr; + virtual BatonHandle makeBaton(OperationContext* opCtx) const { + return makeDefaultBaton(opCtx); } protected: TransportLayer() = default; + +private: + static BatonHandle makeDefaultBaton(OperationContext* opCtx); }; class ReactorTimer { public: - ReactorTimer() = default; + ReactorTimer(); + ReactorTimer(const ReactorTimer&) = delete; ReactorTimer& operator=(const ReactorTimer&) = delete; @@ -128,6 +132,10 @@ public: */ virtual ~ReactorTimer() = default; + size_t id() const { + return _id; + } + /* * Cancel any outstanding future from waitFor/waitUntil. The future will be filled with an * ErrorCodes::CallbackCancelled status. @@ -142,6 +150,9 @@ public: * Calling this implicitly calls cancel(). */ virtual Future<void> waitUntil(Date_t deadline, const BatonHandle& baton = nullptr) = 0; + +private: + const size_t _id; }; class Reactor { diff --git a/src/mongo/transport/transport_layer_asio.cpp b/src/mongo/transport/transport_layer_asio.cpp index 06604606d0c..6f0c02d17fb 100644 --- a/src/mongo/transport/transport_layer_asio.cpp +++ b/src/mongo/transport/transport_layer_asio.cpp @@ -56,9 +56,11 @@ #endif // session_asio.h has some header dependencies that require it to be the last header. + #ifdef __linux__ #include "mongo/transport/baton_asio_linux.h" #endif + #include "mongo/transport/session_asio.h" namespace mongo { @@ -79,7 +81,7 @@ public: void cancel(const BatonHandle& baton = nullptr) override { // If we have a baton try to cancel that. - if (baton && baton->cancelTimer(*this)) { + if (baton && baton->networking() && baton->networking()->cancelTimer(*this)) { LOG(2) << "Canceled via baton, skipping asio cancel."; return; } @@ -90,8 +92,9 @@ public: Future<void> waitUntil(Date_t expiration, const BatonHandle& baton = nullptr) override { - if (baton) { - return _asyncWait([&] { return baton->waitUntil(*this, expiration); }, baton); + if (baton && baton->networking()) { + return _asyncWait([&] { return baton->networking()->waitUntil(*this, expiration); }, + baton); } else { return _asyncWait([&] { _timer->expires_at(expiration.toSystemTimePoint()); }); } @@ -875,8 +878,8 @@ SSLParams::SSLModes TransportLayerASIO::_sslMode() const { } #endif -BatonHandle TransportLayerASIO::makeBaton(OperationContext* opCtx) { #ifdef __linux__ +BatonHandle TransportLayerASIO::makeBaton(OperationContext* opCtx) const { auto baton = std::make_shared<BatonASIO>(opCtx); { @@ -885,11 +888,9 @@ BatonHandle TransportLayerASIO::makeBaton(OperationContext* opCtx) { opCtx->setBaton(baton); } - return std::move(baton); -#else - return nullptr; -#endif + return baton; } +#endif } // namespace transport } // namespace mongo diff --git a/src/mongo/transport/transport_layer_asio.h b/src/mongo/transport/transport_layer_asio.h index 77815024374..b5619bcf361 100644 --- a/src/mongo/transport/transport_layer_asio.h +++ b/src/mongo/transport/transport_layer_asio.h @@ -136,7 +136,9 @@ public: return _listenerPort; } - BatonHandle makeBaton(OperationContext* opCtx) override; +#ifdef __linux__ + BatonHandle makeBaton(OperationContext* opCtx) const override; +#endif private: class BatonASIO; diff --git a/src/mongo/transport/transport_layer_manager.h b/src/mongo/transport/transport_layer_manager.h index 60cfb58a41e..29ad6457be6 100644 --- a/src/mongo/transport/transport_layer_manager.h +++ b/src/mongo/transport/transport_layer_manager.h @@ -90,7 +90,7 @@ public: static std::unique_ptr<TransportLayer> makeAndStartDefaultEgressTransportLayer(); - BatonHandle makeBaton(OperationContext* opCtx) override { + BatonHandle makeBaton(OperationContext* opCtx) const override { stdx::lock_guard<stdx::mutex> lk(_tlsMutex); // TODO: figure out what to do about managers with more than one transport layer. invariant(_tls.size() == 1); diff --git a/src/mongo/unittest/task_executor_proxy.cpp b/src/mongo/unittest/task_executor_proxy.cpp index 2cf2886afa0..1e66347da96 100644 --- a/src/mongo/unittest/task_executor_proxy.cpp +++ b/src/mongo/unittest/task_executor_proxy.cpp @@ -103,7 +103,7 @@ StatusWith<executor::TaskExecutor::CallbackHandle> TaskExecutorProxy::scheduleWo StatusWith<executor::TaskExecutor::CallbackHandle> TaskExecutorProxy::scheduleRemoteCommand( const executor::RemoteCommandRequest& request, const RemoteCommandCallbackFn& cb, - const transport::BatonHandle& baton) { + const BatonHandle& baton) { return _executor->scheduleRemoteCommand(request, cb, baton); } diff --git a/src/mongo/unittest/task_executor_proxy.h b/src/mongo/unittest/task_executor_proxy.h index e03e2a6955e..26d83b2bece 100644 --- a/src/mongo/unittest/task_executor_proxy.h +++ b/src/mongo/unittest/task_executor_proxy.h @@ -47,33 +47,32 @@ public: * Does not own target executor. */ TaskExecutorProxy(executor::TaskExecutor* executor); - virtual ~TaskExecutorProxy(); + ~TaskExecutorProxy(); executor::TaskExecutor* getExecutor() const; void setExecutor(executor::TaskExecutor* executor); - virtual void startup() override; - virtual void shutdown() override; - virtual void join() override; - virtual void appendDiagnosticBSON(BSONObjBuilder* builder) const override; - virtual Date_t now() override; - virtual StatusWith<EventHandle> makeEvent() override; - virtual void signalEvent(const EventHandle& event) override; - virtual StatusWith<CallbackHandle> onEvent(const EventHandle& event, CallbackFn work) override; - virtual void waitForEvent(const EventHandle& event) override; - virtual StatusWith<stdx::cv_status> waitForEvent(OperationContext* opCtx, - const EventHandle& event, - Date_t deadline) override; - virtual StatusWith<CallbackHandle> scheduleWork(CallbackFn work) override; - virtual StatusWith<CallbackHandle> scheduleWorkAt(Date_t when, CallbackFn work) override; - virtual StatusWith<CallbackHandle> scheduleRemoteCommand( - const executor::RemoteCommandRequest& request, - const RemoteCommandCallbackFn& cb, - const transport::BatonHandle& baton = nullptr) override; - virtual void cancel(const CallbackHandle& cbHandle) override; - virtual void wait(const CallbackHandle& cbHandle, - Interruptible* interruptible = Interruptible::notInterruptible()) override; - virtual void appendConnectionStats(executor::ConnectionPoolStats* stats) const override; + void startup() override; + void shutdown() override; + void join() override; + void appendDiagnosticBSON(BSONObjBuilder* builder) const override; + Date_t now() override; + StatusWith<EventHandle> makeEvent() override; + void signalEvent(const EventHandle& event) override; + StatusWith<CallbackHandle> onEvent(const EventHandle& event, CallbackFn work) override; + void waitForEvent(const EventHandle& event) override; + StatusWith<stdx::cv_status> waitForEvent(OperationContext* opCtx, + const EventHandle& event, + Date_t deadline) override; + StatusWith<CallbackHandle> scheduleWork(CallbackFn work) override; + StatusWith<CallbackHandle> scheduleWorkAt(Date_t when, CallbackFn work) override; + StatusWith<CallbackHandle> scheduleRemoteCommand(const executor::RemoteCommandRequest& request, + const RemoteCommandCallbackFn& cb, + const BatonHandle& baton = nullptr) override; + void cancel(const CallbackHandle& cbHandle) override; + void wait(const CallbackHandle& cbHandle, + Interruptible* interruptible = Interruptible::notInterruptible()) override; + void appendConnectionStats(executor::ConnectionPoolStats* stats) const override; private: // Not owned by us. |