diff options
author | Jason Carey <jcarey@argv.me> | 2018-03-21 00:15:35 -0400 |
---|---|---|
committer | Jason Carey <jcarey@argv.me> | 2018-04-27 19:49:28 -0400 |
commit | 4ddf18bcf4d517c3dc0f005f9222ffaab9a86ffa (patch) | |
tree | 438865c1065d0a96c427b1ed3a89e5163d85699a /src | |
parent | 91eaa878c4feeebd9397c49180631fc719238aaf (diff) | |
download | mongo-4ddf18bcf4d517c3dc0f005f9222ffaab9a86ffa.tar.gz |
SERVER-34739 Migrate to 1 connpool in ARS
Migrate to 1 connection pool in mongos.
This change involves the introduction of a transport layer baton, which
improves perf for a particular transport layer when doing local
scatter/gather operations.
Diffstat (limited to 'src')
50 files changed, 1319 insertions, 337 deletions
diff --git a/src/mongo/base/checked_cast.h b/src/mongo/base/checked_cast.h index e434a4c6e12..68b5887a7f6 100644 --- a/src/mongo/base/checked_cast.h +++ b/src/mongo/base/checked_cast.h @@ -50,6 +50,11 @@ struct checked_cast_impl<false> { static T cast(const U& u) { return static_cast<T>(u); } + + template <typename T, typename U> + static T cast(U& u) { + return static_cast<T>(u); + } }; template <> @@ -68,6 +73,11 @@ struct checked_cast_impl<true> { static T cast(const U& u) { return dynamic_cast<T>(u); } + + template <typename T, typename U> + static T cast(U& u) { + return dynamic_cast<T>(u); + } }; template <typename T, typename U> @@ -75,6 +85,11 @@ T checked_cast(const U& u) { return checked_cast_impl<kDebugBuild>::cast<T>(u); }; +template <typename T, typename U> +T checked_cast(U& u) { + return checked_cast_impl<kDebugBuild>::cast<T>(u); +}; + /** * Similar to static_pointer_cast, but in debug builds uses RTTI to confirm that the cast * is legal at runtime. diff --git a/src/mongo/client/async_client.cpp b/src/mongo/client/async_client.cpp index c1499fe394d..44b3d91e521 100644 --- a/src/mongo/client/async_client.cpp +++ b/src/mongo/client/async_client.cpp @@ -190,7 +190,7 @@ Future<void> AsyncDBClient::initWireVersion(const std::string& appName, }); } -Future<Message> AsyncDBClient::_call(Message request) { +Future<Message> AsyncDBClient::_call(Message request, const transport::BatonHandle& baton) { auto swm = _compressorManager.compressMessage(request); if (!swm.isOK()) { return swm.getStatus(); @@ -201,8 +201,8 @@ Future<Message> AsyncDBClient::_call(Message request) { request.header().setId(msgId); request.header().setResponseToMsgId(0); - return _session->asyncSinkMessage(request) - .then([this] { return _session->asyncSourceMessage(); }) + return _session->asyncSinkMessage(request, baton) + .then([this, baton] { return _session->asyncSourceMessage(baton); }) .then([this, msgId](Message response) -> StatusWith<Message> { uassert(50787, "ResponseId did not match sent message ID.", @@ -216,21 +216,23 @@ Future<Message> AsyncDBClient::_call(Message request) { }); } -Future<rpc::UniqueReply> AsyncDBClient::runCommand(OpMsgRequest request) { +Future<rpc::UniqueReply> AsyncDBClient::runCommand(OpMsgRequest request, + const transport::BatonHandle& baton) { invariant(_negotiatedProtocol); auto requestMsg = rpc::messageFromOpMsgRequest(*_negotiatedProtocol, std::move(request)); - return _call(std::move(requestMsg)).then([this](Message response) -> Future<rpc::UniqueReply> { - return rpc::UniqueReply(response, rpc::makeReply(&response)); - }); + return _call(std::move(requestMsg), baton) + .then([this](Message response) -> Future<rpc::UniqueReply> { + return rpc::UniqueReply(response, rpc::makeReply(&response)); + }); } Future<executor::RemoteCommandResponse> AsyncDBClient::runCommandRequest( - executor::RemoteCommandRequest request) { + executor::RemoteCommandRequest request, const transport::BatonHandle& baton) { auto clkSource = _svcCtx->getPreciseClockSource(); auto start = clkSource->now(); auto opMsgRequest = OpMsgRequest::fromDBAndBody( std::move(request.dbname), std::move(request.cmdObj), std::move(request.metadata)); - return runCommand(std::move(opMsgRequest)) + return runCommand(std::move(opMsgRequest), baton) .then([start, clkSource, this](rpc::UniqueReply response) { auto duration = duration_cast<Milliseconds>(clkSource->now() - start); return executor::RemoteCommandResponse(*response, duration); @@ -241,8 +243,8 @@ Future<executor::RemoteCommandResponse> AsyncDBClient::runCommandRequest( }); } -void AsyncDBClient::cancel() { - _session->cancelAsyncOperations(); +void AsyncDBClient::cancel(const transport::BatonHandle& baton) { + _session->cancelAsyncOperations(baton); } bool AsyncDBClient::isStillConnected() { diff --git a/src/mongo/client/async_client.h b/src/mongo/client/async_client.h index ee7239a1d03..ec34dca021e 100644 --- a/src/mongo/client/async_client.h +++ b/src/mongo/client/async_client.h @@ -36,6 +36,7 @@ #include "mongo/executor/remote_command_response.h" #include "mongo/rpc/protocol.h" #include "mongo/rpc/unique_message.h" +#include "mongo/transport/baton.h" #include "mongo/transport/message_compressor_manager.h" #include "mongo/transport/transport_layer.h" #include "mongo/util/future.h" @@ -57,15 +58,16 @@ public: transport::ReactorHandle reactor); Future<executor::RemoteCommandResponse> runCommandRequest( - executor::RemoteCommandRequest request); - Future<rpc::UniqueReply> runCommand(OpMsgRequest request); + executor::RemoteCommandRequest request, const transport::BatonHandle& baton = nullptr); + Future<rpc::UniqueReply> runCommand(OpMsgRequest request, + const transport::BatonHandle& baton = nullptr); Future<void> authenticate(const BSONObj& params); Future<void> initWireVersion(const std::string& appName, executor::NetworkConnectionHook* const hook); - void cancel(); + void cancel(const transport::BatonHandle& baton = nullptr); bool isStillConnected(); @@ -75,7 +77,7 @@ public: const HostAndPort& local() const; private: - Future<Message> _call(Message request); + Future<Message> _call(Message request, const transport::BatonHandle& baton = nullptr); BSONObj _buildIsMasterRequest(const std::string& appName); void _parseIsMasterResponse(BSONObj request, const std::unique_ptr<rpc::ReplyInterface>& response); diff --git a/src/mongo/client/remote_command_retry_scheduler_test.cpp b/src/mongo/client/remote_command_retry_scheduler_test.cpp index dca50732e93..a3c23c6b05d 100644 --- a/src/mongo/client/remote_command_retry_scheduler_test.cpp +++ b/src/mongo/client/remote_command_retry_scheduler_test.cpp @@ -90,12 +90,14 @@ public: TaskExecutorWithFailureInScheduleRemoteCommand(executor::TaskExecutor* executor) : unittest::TaskExecutorProxy(executor) {} virtual StatusWith<executor::TaskExecutor::CallbackHandle> scheduleRemoteCommand( - const executor::RemoteCommandRequest& request, const RemoteCommandCallbackFn& cb) override { + const executor::RemoteCommandRequest& request, + const RemoteCommandCallbackFn& cb, + const transport::BatonHandle& baton = nullptr) override { if (scheduleRemoteCommandFailPoint) { return Status(ErrorCodes::ShutdownInProgress, "failed to send remote command - shutdown in progress"); } - return getExecutor()->scheduleRemoteCommand(request, cb); + return getExecutor()->scheduleRemoteCommand(request, cb, baton); } bool scheduleRemoteCommandFailPoint = false; diff --git a/src/mongo/db/catalog/database_impl.cpp b/src/mongo/db/catalog/database_impl.cpp index 44e229aaf26..4d195d3cf58 100644 --- a/src/mongo/db/catalog/database_impl.cpp +++ b/src/mongo/db/catalog/database_impl.cpp @@ -895,7 +895,6 @@ StatusWith<NamespaceString> DatabaseImpl::makeUniqueCollectionNamespace( } if (!_uniqueCollectionNamespacePseudoRandom) { - Timestamp ts; _uniqueCollectionNamespacePseudoRandom = std::make_unique<PseudoRandom>(Date_t::now().asInt64()); } diff --git a/src/mongo/db/db.cpp b/src/mongo/db/db.cpp index 743a4aef4a4..5c4019e18d2 100644 --- a/src/mongo/db/db.cpp +++ b/src/mongo/db/db.cpp @@ -767,8 +767,7 @@ auto makeReplicationExecutor(ServiceContext* serviceContext) { hookList->addHook(stdx::make_unique<rpc::LogicalTimeMetadataHook>(serviceContext)); return stdx::make_unique<executor::ThreadPoolTaskExecutor>( stdx::make_unique<ThreadPool>(tpOptions), - executor::makeNetworkInterface( - "NetworkInterfaceASIO-Replication", nullptr, std::move(hookList))); + executor::makeNetworkInterface("Replication", nullptr, std::move(hookList))); } MONGO_INITIALIZER_WITH_PREREQUISITES(CreateReplicationManager, ("SSLManager", "default")) diff --git a/src/mongo/db/free_mon/free_mon_mongod.cpp b/src/mongo/db/free_mon/free_mon_mongod.cpp index df72066aabf..6d8f2042604 100644 --- a/src/mongo/db/free_mon/free_mon_mongod.cpp +++ b/src/mongo/db/free_mon/free_mon_mongod.cpp @@ -245,8 +245,7 @@ auto makeTaskExecutor(ServiceContext* /*serviceContext*/) { Client::initThread(threadName.c_str()); }; return stdx::make_unique<executor::ThreadPoolTaskExecutor>( - std::make_unique<ThreadPool>(tpOptions), - executor::makeNetworkInterface("NetworkInterfaceASIO-FreeMon")); + std::make_unique<ThreadPool>(tpOptions), executor::makeNetworkInterface("FreeMon")); } void registerCollectors(FreeMonController* controller) { diff --git a/src/mongo/db/operation_context.cpp b/src/mongo/db/operation_context.cpp index c7cf9514882..50209b8d0ff 100644 --- a/src/mongo/db/operation_context.cpp +++ b/src/mongo/db/operation_context.cpp @@ -37,6 +37,7 @@ #include "mongo/db/service_context.h" #include "mongo/platform/random.h" #include "mongo/stdx/mutex.h" +#include "mongo/transport/baton.h" #include "mongo/util/assert_util.h" #include "mongo/util/clock_source.h" #include "mongo/util/fail_point_service.h" @@ -356,6 +357,11 @@ void OperationContext::markKilled(ErrorCodes::Error killCode) { invariant(_waitCV); _waitCV->notify_all(); } + + // If we have a baton, we need to wake it up. The baton itself will check for interruption + if (_baton) { + _baton->schedule([] {}); + } } void OperationContext::setLogicalSessionId(LogicalSessionId lsid) { diff --git a/src/mongo/db/operation_context.h b/src/mongo/db/operation_context.h index 07980165e33..c188f43c103 100644 --- a/src/mongo/db/operation_context.h +++ b/src/mongo/db/operation_context.h @@ -277,6 +277,20 @@ public: } /** + * Sets a transport Baton on the operation. This will trigger the Baton on markKilled. + */ + void setBaton(const transport::BatonHandle& baton) { + _baton = baton; + } + + /** + * Retrieves the baton associated with the operation. + */ + const transport::BatonHandle& getBaton() const { + return _baton; + } + + /** * Associates a transaction number with this operation context. May only be called once for the * lifetime of the operation and the operation must have a logical session id assigned. */ @@ -465,6 +479,9 @@ private: // once from OK to some kill code. AtomicWord<ErrorCodes::Error> _killCode{ErrorCodes::OK}; + // A transport Baton associated with the operation. The presence of this object implies that a + // client thread is doing it's own async networking by blocking on it's own thread. + transport::BatonHandle _baton; // If non-null, _waitMutex and _waitCV are the (mutex, condition variable) pair that the // operation is currently waiting on inside a call to waitForConditionOrInterrupt...(). diff --git a/src/mongo/db/repl/abstract_oplog_fetcher_test.cpp b/src/mongo/db/repl/abstract_oplog_fetcher_test.cpp index a53ec085a20..067928e6bb2 100644 --- a/src/mongo/db/repl/abstract_oplog_fetcher_test.cpp +++ b/src/mongo/db/repl/abstract_oplog_fetcher_test.cpp @@ -345,8 +345,10 @@ public: ShouldFailRequestFn shouldFailRequest) : unittest::TaskExecutorProxy(executor), _shouldFailRequest(shouldFailRequest) {} - StatusWith<CallbackHandle> scheduleRemoteCommand(const executor::RemoteCommandRequest& request, - const RemoteCommandCallbackFn& cb) override { + StatusWith<CallbackHandle> scheduleRemoteCommand( + const executor::RemoteCommandRequest& request, + const RemoteCommandCallbackFn& cb, + const transport::BatonHandle& baton = nullptr) override { if (_shouldFailRequest(request)) { return Status(ErrorCodes::OperationFailed, "failed to schedule remote command"); } diff --git a/src/mongo/db/repl/collection_cloner_test.cpp b/src/mongo/db/repl/collection_cloner_test.cpp index b493279b991..8322d29fdf3 100644 --- a/src/mongo/db/repl/collection_cloner_test.cpp +++ b/src/mongo/db/repl/collection_cloner_test.cpp @@ -347,12 +347,14 @@ public: ShouldFailRequestFn shouldFailRequest) : unittest::TaskExecutorProxy(executor), _shouldFailRequest(shouldFailRequest) {} - StatusWith<CallbackHandle> scheduleRemoteCommand(const executor::RemoteCommandRequest& request, - const RemoteCommandCallbackFn& cb) override { + StatusWith<CallbackHandle> scheduleRemoteCommand( + const executor::RemoteCommandRequest& request, + const RemoteCommandCallbackFn& cb, + const transport::BatonHandle& baton = nullptr) override { if (_shouldFailRequest(request)) { return Status(ErrorCodes::OperationFailed, "failed to schedule remote command"); } - return getExecutor()->scheduleRemoteCommand(request, cb); + return getExecutor()->scheduleRemoteCommand(request, cb, baton); } private: diff --git a/src/mongo/db/repl/database_cloner_test.cpp b/src/mongo/db/repl/database_cloner_test.cpp index f0330add560..1238a794c76 100644 --- a/src/mongo/db/repl/database_cloner_test.cpp +++ b/src/mongo/db/repl/database_cloner_test.cpp @@ -206,12 +206,14 @@ public: ShouldFailRequestFn shouldFailRequest) : unittest::TaskExecutorProxy(executor), _shouldFailRequest(shouldFailRequest) {} - StatusWith<CallbackHandle> scheduleRemoteCommand(const executor::RemoteCommandRequest& request, - const RemoteCommandCallbackFn& cb) override { + StatusWith<CallbackHandle> scheduleRemoteCommand( + const executor::RemoteCommandRequest& request, + const RemoteCommandCallbackFn& cb, + const transport::BatonHandle& baton = nullptr) override { if (_shouldFailRequest(request)) { return Status(ErrorCodes::OperationFailed, "failed to schedule remote command"); } - return getExecutor()->scheduleRemoteCommand(request, cb); + return getExecutor()->scheduleRemoteCommand(request, cb, baton); } private: diff --git a/src/mongo/db/repl/databases_cloner_test.cpp b/src/mongo/db/repl/databases_cloner_test.cpp index 56abaa0c4bd..b6e9f97431e 100644 --- a/src/mongo/db/repl/databases_cloner_test.cpp +++ b/src/mongo/db/repl/databases_cloner_test.cpp @@ -756,12 +756,14 @@ public: ShouldFailRequestFn shouldFailRequest) : unittest::TaskExecutorProxy(executor), _shouldFailRequest(shouldFailRequest) {} - StatusWith<CallbackHandle> scheduleRemoteCommand(const executor::RemoteCommandRequest& request, - const RemoteCommandCallbackFn& cb) override { + StatusWith<CallbackHandle> scheduleRemoteCommand( + const executor::RemoteCommandRequest& request, + const RemoteCommandCallbackFn& cb, + const transport::BatonHandle& baton = nullptr) override { if (_shouldFailRequest(request)) { return Status(ErrorCodes::OperationFailed, "failed to schedule remote command"); } - return getExecutor()->scheduleRemoteCommand(request, cb); + return getExecutor()->scheduleRemoteCommand(request, cb, baton); } private: diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp index 1fb7bea1adf..d87cf4f73f8 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp @@ -168,7 +168,7 @@ auto makeTaskExecutor(ServiceContext* service, const std::string& poolName) { hookList->addHook(stdx::make_unique<rpc::LogicalTimeMetadataHook>(service)); return stdx::make_unique<executor::ThreadPoolTaskExecutor>( makeThreadPool(poolName), - executor::makeNetworkInterface("NetworkInterfaceASIO-RS", nullptr, std::move(hookList))); + executor::makeNetworkInterface("RS", nullptr, std::move(hookList))); } /** diff --git a/src/mongo/db/repl/reporter_test.cpp b/src/mongo/db/repl/reporter_test.cpp index f6967dcfca2..5fb620d5252 100644 --- a/src/mongo/db/repl/reporter_test.cpp +++ b/src/mongo/db/repl/reporter_test.cpp @@ -601,7 +601,8 @@ TEST_F(ReporterTestNoTriggerAtSetUp, FailingToScheduleRemoteCommandTaskShouldMak : unittest::TaskExecutorProxy(executor) {} virtual StatusWith<executor::TaskExecutor::CallbackHandle> scheduleRemoteCommand( const executor::RemoteCommandRequest& request, - const RemoteCommandCallbackFn& cb) override { + const RemoteCommandCallbackFn& cb, + const transport::BatonHandle& baton = nullptr) override { // Any error status other than ShutdownInProgress will cause the reporter to fassert. return Status(ErrorCodes::ShutdownInProgress, "failed to send remote command - shutdown in progress"); diff --git a/src/mongo/db/repl/sync_source_resolver_test.cpp b/src/mongo/db/repl/sync_source_resolver_test.cpp index 4a7ff632e0e..37a11f12fca 100644 --- a/src/mongo/db/repl/sync_source_resolver_test.cpp +++ b/src/mongo/db/repl/sync_source_resolver_test.cpp @@ -57,12 +57,14 @@ public: ShouldFailRequestFn shouldFailRequest) : unittest::TaskExecutorProxy(executor), _shouldFailRequest(shouldFailRequest) {} - StatusWith<CallbackHandle> scheduleRemoteCommand(const executor::RemoteCommandRequest& request, - const RemoteCommandCallbackFn& cb) override { + StatusWith<CallbackHandle> scheduleRemoteCommand( + const executor::RemoteCommandRequest& request, + const RemoteCommandCallbackFn& cb, + const transport::BatonHandle& baton = nullptr) override { if (_shouldFailRequest(request)) { return Status(ErrorCodes::OperationFailed, "failed to schedule remote command"); } - return getExecutor()->scheduleRemoteCommand(request, cb); + return getExecutor()->scheduleRemoteCommand(request, cb, baton); } private: diff --git a/src/mongo/db/repl/task_executor_mock.cpp b/src/mongo/db/repl/task_executor_mock.cpp index 320fb12c77f..a25126baecb 100644 --- a/src/mongo/db/repl/task_executor_mock.cpp +++ b/src/mongo/db/repl/task_executor_mock.cpp @@ -58,11 +58,13 @@ StatusWith<executor::TaskExecutor::CallbackHandle> TaskExecutorMock::scheduleWor } StatusWith<executor::TaskExecutor::CallbackHandle> TaskExecutorMock::scheduleRemoteCommand( - const executor::RemoteCommandRequest& request, const RemoteCommandCallbackFn& cb) { + const executor::RemoteCommandRequest& request, + const RemoteCommandCallbackFn& cb, + const transport::BatonHandle& baton) { if (shouldFailScheduleRemoteCommandRequest(request)) { return Status(ErrorCodes::OperationFailed, "failed to schedule remote command"); } - return getExecutor()->scheduleRemoteCommand(request, cb); + return getExecutor()->scheduleRemoteCommand(request, cb, baton); } } // namespace repl diff --git a/src/mongo/db/repl/task_executor_mock.h b/src/mongo/db/repl/task_executor_mock.h index eb02ad18eb6..3b0ab2bf09f 100644 --- a/src/mongo/db/repl/task_executor_mock.h +++ b/src/mongo/db/repl/task_executor_mock.h @@ -47,8 +47,10 @@ public: StatusWith<CallbackHandle> scheduleWork(const CallbackFn& work) override; StatusWith<CallbackHandle> scheduleWorkAt(Date_t when, const CallbackFn& work) override; - StatusWith<CallbackHandle> scheduleRemoteCommand(const executor::RemoteCommandRequest& request, - const RemoteCommandCallbackFn& cb) override; + StatusWith<CallbackHandle> scheduleRemoteCommand( + const executor::RemoteCommandRequest& request, + const RemoteCommandCallbackFn& cb, + const transport::BatonHandle& baton = nullptr) override; // Override to make scheduleWork() fail during testing. ShouldFailScheduleWorkRequestFn shouldFailScheduleWorkRequest = []() { return false; }; diff --git a/src/mongo/executor/connection_pool.cpp b/src/mongo/executor/connection_pool.cpp index 7092c460ed3..f8b5da5fb62 100644 --- a/src/mongo/executor/connection_pool.cpp +++ b/src/mongo/executor/connection_pool.cpp @@ -649,8 +649,7 @@ void ConnectionPool::SpecificPool::processFailure(const Status& status, _readyPool.clear(); // Log something helpful - log() << "Dropping all pooled connections to " << _hostAndPort - << " due to failed operation on a connection"; + log() << "Dropping all pooled connections to " << _hostAndPort << " due to " << status; // Migrate processing connections to the dropped pool for (auto&& x : _processingPool) { diff --git a/src/mongo/executor/connection_pool.h b/src/mongo/executor/connection_pool.h index 10a711261ef..160321bec9a 100644 --- a/src/mongo/executor/connection_pool.h +++ b/src/mongo/executor/connection_pool.h @@ -60,10 +60,10 @@ struct ConnectionPoolStats; * HostAndPort. See comments on the various Options for how the pool operates. */ class ConnectionPool : public EgressTagCloser { - class ConnectionHandleDeleter; class SpecificPool; public: + class ConnectionHandleDeleter; class ConnectionInterface; class DependentTypeFactoryInterface; class TimerInterface; @@ -175,7 +175,7 @@ public: ConnectionHandleDeleter() = default; ConnectionHandleDeleter(ConnectionPool* pool) : _pool(pool) {} - void operator()(ConnectionInterface* connection) { + void operator()(ConnectionInterface* connection) const { if (_pool && connection) _pool->returnConnection(connection); } diff --git a/src/mongo/executor/connection_pool_tl.cpp b/src/mongo/executor/connection_pool_tl.cpp index 3823a9a3e0a..907032c7262 100644 --- a/src/mongo/executor/connection_pool_tl.cpp +++ b/src/mongo/executor/connection_pool_tl.cpp @@ -50,7 +50,9 @@ struct TimeoutHandler { void TLTimer::setTimeout(Milliseconds timeoutVal, TimeoutCallback cb) { _timer->waitFor(timeoutVal).getAsync([cb = std::move(cb)](Status status) { - if (status == ErrorCodes::CallbackCanceled) { + // TODO: verify why we still get broken promises when expliciting call stop and shutting + // down NITL's quickly. + if (status == ErrorCodes::CallbackCanceled || status == ErrorCodes::BrokenPromise) { return; } @@ -125,10 +127,9 @@ void TLConnection::setup(Milliseconds timeout, SetupCallback cb) { }); AsyncDBClient::connect(_peer, transport::kGlobalSSLMode, _serviceContext, _reactor) - .onError( - [this](StatusWith<AsyncDBClient::Handle> swc) -> StatusWith<AsyncDBClient::Handle> { - return Status(ErrorCodes::HostUnreachable, swc.getStatus().reason()); - }) + .onError([](StatusWith<AsyncDBClient::Handle> swc) -> StatusWith<AsyncDBClient::Handle> { + return Status(ErrorCodes::HostUnreachable, swc.getStatus().reason()); + }) .then([this](AsyncDBClient::Handle client) { _client = std::move(client); return _client->initWireVersion("NetworkInterfaceTL", _onConnectHook); @@ -186,7 +187,7 @@ void TLConnection::refresh(Milliseconds timeout, RefreshCallback cb) { _client ->runCommandRequest( {_peer, std::string("admin"), BSON("isMaster" << 1), BSONObj(), nullptr}) - .then([this](executor::RemoteCommandResponse response) { + .then([](executor::RemoteCommandResponse response) { return Future<void>::makeReady(response.status); }) .getAsync([this, handler](Status status) { diff --git a/src/mongo/executor/network_interface.h b/src/mongo/executor/network_interface.h index 48428b820e3..b333813df14 100644 --- a/src/mongo/executor/network_interface.h +++ b/src/mongo/executor/network_interface.h @@ -34,6 +34,8 @@ #include "mongo/base/disallow_copying.h" #include "mongo/executor/task_executor.h" #include "mongo/stdx/functional.h" +#include "mongo/transport/baton.h" +#include "mongo/util/future.h" namespace mongo { @@ -128,13 +130,33 @@ public: */ virtual Status startCommand(const TaskExecutor::CallbackHandle& cbHandle, RemoteCommandRequest& request, - const RemoteCommandCompletionFn& onFinish) = 0; + const RemoteCommandCompletionFn& onFinish, + const transport::BatonHandle& baton = nullptr) = 0; + + Future<TaskExecutor::ResponseStatus> startCommand( + const TaskExecutor::CallbackHandle& cbHandle, + RemoteCommandRequest& request, + const transport::BatonHandle& baton = nullptr) { + Promise<TaskExecutor::ResponseStatus> promise; + auto future = promise.getFuture(); + + auto status = + startCommand(cbHandle, + request, + [sp = promise.share()](const TaskExecutor::ResponseStatus& rs) mutable { + sp.emplaceValue(rs); + }, + baton); + + return future; + } /** * Requests cancelation of the network activity associated with "cbHandle" if it has not yet * completed. */ - virtual void cancelCommand(const TaskExecutor::CallbackHandle& cbHandle) = 0; + virtual void cancelCommand(const TaskExecutor::CallbackHandle& cbHandle, + const transport::BatonHandle& baton = nullptr) = 0; /** * Sets an alarm, which schedules "action" to run no sooner than "when". @@ -150,7 +172,9 @@ public: * Any callbacks invoked from setAlarm must observe onNetworkThread to * return true. See that method for why. */ - virtual Status setAlarm(Date_t when, const stdx::function<void()>& action) = 0; + virtual Status setAlarm(Date_t when, + const stdx::function<void()>& action, + const transport::BatonHandle& baton = nullptr) = 0; /** * Returns true if called from a thread dedicated to networking. I.e. not a diff --git a/src/mongo/executor/network_interface_asio.cpp b/src/mongo/executor/network_interface_asio.cpp index 9fe1c9da4da..8d74d56656f 100644 --- a/src/mongo/executor/network_interface_asio.cpp +++ b/src/mongo/executor/network_interface_asio.cpp @@ -250,7 +250,8 @@ Status attachMetadataIfNeeded(RemoteCommandRequest& request, Status NetworkInterfaceASIO::startCommand(const TaskExecutor::CallbackHandle& cbHandle, RemoteCommandRequest& request, - const RemoteCommandCompletionFn& onFinish) { + const RemoteCommandCompletionFn& onFinish, + const transport::BatonHandle& baton) { MONGO_ASIO_INVARIANT(onFinish, "Invalid completion function"); { stdx::lock_guard<stdx::mutex> lk(_inProgressMutex); @@ -422,7 +423,8 @@ Status NetworkInterfaceASIO::startCommand(const TaskExecutor::CallbackHandle& cb return Status::OK(); } -void NetworkInterfaceASIO::cancelCommand(const TaskExecutor::CallbackHandle& cbHandle) { +void NetworkInterfaceASIO::cancelCommand(const TaskExecutor::CallbackHandle& cbHandle, + const transport::BatonHandle& baton) { stdx::lock_guard<stdx::mutex> lk(_inProgressMutex); // If we found a matching cbHandle in _inGetConnection, then @@ -447,7 +449,9 @@ void NetworkInterfaceASIO::cancelCommand(const TaskExecutor::CallbackHandle& cbH } } -Status NetworkInterfaceASIO::setAlarm(Date_t when, const stdx::function<void()>& action) { +Status NetworkInterfaceASIO::setAlarm(Date_t when, + const stdx::function<void()>& action, + const transport::BatonHandle& baton) { if (inShutdown()) { return {ErrorCodes::ShutdownInProgress, "NetworkInterfaceASIO shutdown in progress"}; } @@ -462,12 +466,12 @@ Status NetworkInterfaceASIO::setAlarm(Date_t when, const stdx::function<void()>& return exceptionToStatus(); } - alarm->async_wait([alarm, this, action, when](std::error_code ec) { + alarm->async_wait([alarm, this, action, when, baton](std::error_code ec) { const auto nowValue = now(); if (nowValue < when) { warning() << "ASIO alarm returned early. Expected at: " << when << ", fired at: " << nowValue; - const auto status = setAlarm(when, action); + const auto status = setAlarm(when, action, baton); if ((!status.isOK()) && (status.code() != ErrorCodes::ShutdownInProgress)) { fassertFailedWithStatus(40383, status); } diff --git a/src/mongo/executor/network_interface_asio.h b/src/mongo/executor/network_interface_asio.h index 8e464754bfa..8f57e3d08b8 100644 --- a/src/mongo/executor/network_interface_asio.h +++ b/src/mongo/executor/network_interface_asio.h @@ -130,9 +130,13 @@ public: Date_t now() override; Status startCommand(const TaskExecutor::CallbackHandle& cbHandle, RemoteCommandRequest& request, - const RemoteCommandCompletionFn& onFinish) override; - void cancelCommand(const TaskExecutor::CallbackHandle& cbHandle) override; - Status setAlarm(Date_t when, const stdx::function<void()>& action) override; + const RemoteCommandCompletionFn& onFinish, + const transport::BatonHandle& baton = nullptr) override; + void cancelCommand(const TaskExecutor::CallbackHandle& cbHandle, + const transport::BatonHandle& baton = nullptr) override; + Status setAlarm(Date_t when, + const stdx::function<void()>& action, + const transport::BatonHandle& baton = nullptr) override; bool onNetworkThread() override; diff --git a/src/mongo/executor/network_interface_mock.cpp b/src/mongo/executor/network_interface_mock.cpp index 39e8e4e2995..674dd8bb116 100644 --- a/src/mongo/executor/network_interface_mock.cpp +++ b/src/mongo/executor/network_interface_mock.cpp @@ -113,7 +113,8 @@ std::string NetworkInterfaceMock::getHostName() { Status NetworkInterfaceMock::startCommand(const CallbackHandle& cbHandle, RemoteCommandRequest& request, - const RemoteCommandCompletionFn& onFinish) { + const RemoteCommandCompletionFn& onFinish, + const transport::BatonHandle& baton) { if (inShutdown()) { return {ErrorCodes::ShutdownInProgress, "NetworkInterfaceMock shutdown in progress"}; } @@ -145,7 +146,8 @@ void NetworkInterfaceMock::setHandshakeReplyForHost( } } -void NetworkInterfaceMock::cancelCommand(const CallbackHandle& cbHandle) { +void NetworkInterfaceMock::cancelCommand(const CallbackHandle& cbHandle, + const transport::BatonHandle& baton) { invariant(!inShutdown()); stdx::lock_guard<stdx::mutex> lk(_mutex); @@ -175,7 +177,9 @@ void NetworkInterfaceMock::_interruptWithResponse_inlock( } } -Status NetworkInterfaceMock::setAlarm(const Date_t when, const stdx::function<void()>& action) { +Status NetworkInterfaceMock::setAlarm(const Date_t when, + const stdx::function<void()>& action, + const transport::BatonHandle& baton) { if (inShutdown()) { return {ErrorCodes::ShutdownInProgress, "NetworkInterfaceMock shutdown in progress"}; } diff --git a/src/mongo/executor/network_interface_mock.h b/src/mongo/executor/network_interface_mock.h index 554306397ba..76a9a6462a8 100644 --- a/src/mongo/executor/network_interface_mock.h +++ b/src/mongo/executor/network_interface_mock.h @@ -108,7 +108,8 @@ public: virtual std::string getHostName(); virtual Status startCommand(const TaskExecutor::CallbackHandle& cbHandle, RemoteCommandRequest& request, - const RemoteCommandCompletionFn& onFinish); + const RemoteCommandCompletionFn& onFinish, + const transport::BatonHandle& baton = nullptr); /** * If the network operation is in the _unscheduled or _processing queues, moves the operation @@ -116,12 +117,15 @@ public: * the _scheduled queue, does nothing. The latter simulates the case where cancelCommand() is * called after the task has already completed, but its callback has not yet been run. */ - virtual void cancelCommand(const TaskExecutor::CallbackHandle& cbHandle); + virtual void cancelCommand(const TaskExecutor::CallbackHandle& cbHandle, + const transport::BatonHandle& baton = nullptr); /** * Not implemented. */ - virtual Status setAlarm(Date_t when, const stdx::function<void()>& action); + virtual Status setAlarm(Date_t when, + const stdx::function<void()>& action, + const transport::BatonHandle& baton = nullptr); virtual bool onNetworkThread(); diff --git a/src/mongo/executor/network_interface_tl.cpp b/src/mongo/executor/network_interface_tl.cpp index eef2299e623..15947af2c0b 100644 --- a/src/mongo/executor/network_interface_tl.cpp +++ b/src/mongo/executor/network_interface_tl.cpp @@ -85,7 +85,7 @@ void NetworkInterfaceTL::startup() { auto typeFactory = std::make_unique<connection_pool_tl::TLTypeFactory>( _reactor, _tl, std::move(_onConnectHook)); _pool = std::make_unique<ConnectionPool>( - std::move(typeFactory), "NetworkInterfaceTL", _connPoolOpts); + std::move(typeFactory), std::string("NetworkInterfaceTL-") + _instanceName, _connPoolOpts); _ioThread = stdx::thread([this] { setThreadName(_instanceName); LOG(2) << "The NetworkInterfaceTL reactor thread is spinning up"; @@ -97,6 +97,7 @@ void NetworkInterfaceTL::shutdown() { _inShutdown.store(true); _reactor->stop(); _ioThread.join(); + _pool.reset(); LOG(2) << "NetworkInterfaceTL shutdown successfully"; } @@ -136,7 +137,8 @@ Date_t NetworkInterfaceTL::now() { Status NetworkInterfaceTL::startCommand(const TaskExecutor::CallbackHandle& cbHandle, RemoteCommandRequest& request, - const RemoteCommandCompletionFn& onFinish) { + const RemoteCommandCompletionFn& onFinish, + const transport::BatonHandle& baton) { if (inShutdown()) { return {ErrorCodes::ShutdownInProgress, "NetworkInterface shutdown in progress"}; } @@ -166,40 +168,86 @@ Status NetworkInterfaceTL::startCommand(const TaskExecutor::CallbackHandle& cbHa state->deadline = state->start + state->request.timeout; } - _pool->get(request.target, request.timeout) - .tapError([state](Status error) { - LOG(2) << "Failed to get connection from pool for request " << state->request.id << ": " - << error; - }) - .then([this, state](ConnectionPool::ConnectionHandle conn) mutable { - return _onAcquireConn(state, std::move(conn)); - }) - .onError([](Status error) -> StatusWith<RemoteCommandResponse> { - // The TransportLayer has, for historical reasons returned SocketException for - // network errors, but sharding assumes HostUnreachable on network errors. - if (error == ErrorCodes::SocketException) { - error = Status(ErrorCodes::HostUnreachable, error.reason()); - } - return error; - }) - .getAsync([this, state, onFinish](StatusWith<RemoteCommandResponse> response) { - auto duration = now() - state->start; - if (!response.isOK()) { - onFinish(RemoteCommandResponse(response.getStatus(), duration)); - } else { - auto rs = std::move(response.getValue()); - LOG(2) << "Request " << state->request.id << " finished with response: " - << redact(rs.isOK() ? rs.data.toString() : rs.status.toString()); - onFinish(rs); - } + // Interacting with the connection pool can involve more work than just getting a connection + // out. In particular, we can end up having to spin up new connections, and fulfilling promises + // for other requesters. Returning connections has the same issue. + // + // To work around it, we make sure to hop onto the reactor thread before getting a connection, + // then making sure to get back to the client thread to do the work (if on a baton). And we + // hook up a connection returning unique_ptr that ensures that however we exit, we always do the + // return on the reactor thread. + // + // TODO: get rid of this cruft once we have a connection pool that's executor aware. + auto connFuture = _reactor->execute([this, state, request, baton] { + return makeReadyFutureWith( + [this, request] { return _pool->get(request.target, request.timeout); }) + .tapError([state](Status error) { + LOG(2) << "Failed to get connection from pool for request " << state->request.id + << ": " << error; + }) + .then([this, baton](ConnectionPool::ConnectionHandle conn) { + auto deleter = conn.get_deleter(); + + // TODO: drop out this shared_ptr once we have a unique_function capable future + return std::make_shared<CommandState::ConnHandle>( + conn.release(), CommandState::Deleter{deleter, _reactor}); + }); + }); + + auto remainingWork = [this, state, baton, onFinish]( + StatusWith<std::shared_ptr<CommandState::ConnHandle>> swConn) { + makeReadyFutureWith( + [&] { return _onAcquireConn(state, std::move(*uassertStatusOK(swConn)), baton); }) + .onError([](Status error) -> StatusWith<RemoteCommandResponse> { + // The TransportLayer has, for historical reasons returned SocketException for + // network errors, but sharding assumes HostUnreachable on network errors. + if (error == ErrorCodes::SocketException) { + error = Status(ErrorCodes::HostUnreachable, error.reason()); + } + return error; + }) + .getAsync([this, state, onFinish](StatusWith<RemoteCommandResponse> response) { + auto duration = now() - state->start; + if (!response.isOK()) { + onFinish(RemoteCommandResponse(response.getStatus(), duration)); + } else { + const auto& rs = response.getValue(); + LOG(2) << "Request " << state->request.id << " finished with response: " + << redact(rs.isOK() ? rs.data.toString() : rs.status.toString()); + onFinish(rs); + } + }); + }; + + if (baton) { + // If we have a baton, we want to get back to the baton thread immediately after we get a + // connection + std::move(connFuture).getAsync([ + baton, + rw = std::move(remainingWork) + ](StatusWith<std::shared_ptr<CommandState::ConnHandle>> swConn) mutable { + baton->schedule([ rw = std::move(rw), swConn = std::move(swConn) ]() mutable { + std::move(rw)(std::move(swConn)); + }); }); + } else { + // otherwise we're happy to run inline + std::move(connFuture) + .getAsync([rw = std::move(remainingWork)]( + StatusWith<std::shared_ptr<CommandState::ConnHandle>> swConn) mutable { + std::move(rw)(std::move(swConn)); + }); + } + return Status::OK(); } // This is only called from within a then() callback on a future, so throwing is equivalent to // returning a ready Future with a not-OK status. Future<RemoteCommandResponse> NetworkInterfaceTL::_onAcquireConn( - std::shared_ptr<CommandState> state, ConnectionPool::ConnectionHandle conn) { + std::shared_ptr<CommandState> state, + CommandState::ConnHandle conn, + const transport::BatonHandle& baton) { if (state->done.load()) { conn->indicateSuccess(); uasserted(ErrorCodes::CallbackCanceled, "Command was canceled"); @@ -222,44 +270,42 @@ Future<RemoteCommandResponse> NetworkInterfaceTL::_onAcquireConn( } state->timer = _reactor->makeTimer(); - state->timer->waitUntil(state->deadline).getAsync([client, state](Status status) { - if (status == ErrorCodes::CallbackCanceled) { - invariant(state->done.load()); - return; - } + state->timer->waitUntil(state->deadline, baton) + .getAsync([client, state, baton](Status status) { + if (status == ErrorCodes::CallbackCanceled) { + invariant(state->done.load()); + return; + } - if (state->done.swap(true)) { - return; - } + if (state->done.swap(true)) { + return; + } - LOG(2) << "Request " << state->request.id << " timed out" - << ", deadline was " << state->deadline << ", op was " - << redact(state->request.toString()); - state->promise.setError( - Status(ErrorCodes::NetworkInterfaceExceededTimeLimit, "timed out")); + LOG(2) << "Request " << state->request.id << " timed out" + << ", deadline was " << state->deadline << ", op was " + << redact(state->request.toString()); + state->promise.setError( + Status(ErrorCodes::NetworkInterfaceExceededTimeLimit, "timed out")); - client->cancel(); - }); + client->cancel(baton); + }); } - client->runCommandRequest(state->request) + client->runCommandRequest(state->request, baton) .then([this, state](RemoteCommandResponse response) { if (state->done.load()) { uasserted(ErrorCodes::CallbackCanceled, "Callback was canceled"); } - // TODO Investigate whether this is necessary here. - return _reactor->execute([ this, state, response = std::move(response) ]() mutable { - if (_metadataHook && response.status.isOK()) { - auto target = state->conn->getHostAndPort().toString(); - response.status = _metadataHook->readReplyMetadata( - nullptr, std::move(target), response.metadata); - } + if (_metadataHook && response.status.isOK()) { + auto target = state->conn->getHostAndPort().toString(); + response.status = + _metadataHook->readReplyMetadata(nullptr, std::move(target), response.metadata); + } - return std::move(response); - }); + return RemoteCommandResponse(std::move(response)); }) - .getAsync([this, state](StatusWith<RemoteCommandResponse> swr) { + .getAsync([this, state, baton](StatusWith<RemoteCommandResponse> swr) { _eraseInUseConn(state->cbHandle); if (!swr.isOK()) { state->conn->indicateFailure(swr.getStatus()); @@ -273,10 +319,10 @@ Future<RemoteCommandResponse> NetworkInterfaceTL::_onAcquireConn( return; if (state->timer) { - state->timer->cancel(); + state->timer->cancel(baton); } - state->promise.setWith([&] { return std::move(swr); }); + state->promise.setFromStatusWith(std::move(swr)); }); return std::move(state->mergedFuture); @@ -287,7 +333,8 @@ void NetworkInterfaceTL::_eraseInUseConn(const TaskExecutor::CallbackHandle& cbH _inProgress.erase(cbHandle); } -void NetworkInterfaceTL::cancelCommand(const TaskExecutor::CallbackHandle& cbHandle) { +void NetworkInterfaceTL::cancelCommand(const TaskExecutor::CallbackHandle& cbHandle, + const transport::BatonHandle& baton) { stdx::unique_lock<stdx::mutex> lk(_inProgressMutex); auto it = _inProgress.find(cbHandle); if (it == _inProgress.end()) { @@ -307,17 +354,23 @@ void NetworkInterfaceTL::cancelCommand(const TaskExecutor::CallbackHandle& cbHan << redact(state->request.toString())}); if (state->conn) { auto client = checked_cast<connection_pool_tl::TLConnection*>(state->conn.get()); - client->client()->cancel(); + client->client()->cancel(baton); } } -Status NetworkInterfaceTL::setAlarm(Date_t when, const stdx::function<void()>& action) { +Status NetworkInterfaceTL::setAlarm(Date_t when, + const stdx::function<void()>& action, + const transport::BatonHandle& baton) { if (inShutdown()) { return {ErrorCodes::ShutdownInProgress, "NetworkInterface shutdown in progress"}; } if (when <= now()) { - _reactor->schedule(transport::Reactor::kPost, std::move(action)); + if (baton) { + baton->schedule(std::move(action)); + } else { + _reactor->schedule(transport::Reactor::kPost, std::move(action)); + } return Status::OK(); } @@ -329,32 +382,38 @@ Status NetworkInterfaceTL::setAlarm(Date_t when, const stdx::function<void()>& a _inProgressAlarms.insert(alarmTimer); } - alarmTimer->waitUntil(when).getAsync([this, weakTimer, action, when](Status status) { - auto alarmTimer = weakTimer.lock(); - if (!alarmTimer) { - return; - } else { - stdx::lock_guard<stdx::mutex> lk(_inProgressMutex); - _inProgressAlarms.erase(alarmTimer); - } - - auto nowVal = now(); - if (nowVal < when) { - warning() << "Alarm returned early. Expected at: " << when << ", fired at: " << nowVal; - const auto status = setAlarm(when, std::move(action)); - if ((!status.isOK()) && (status != ErrorCodes::ShutdownInProgress)) { - fassertFailedWithStatus(50785, status); + alarmTimer->waitUntil(when, baton) + .getAsync([this, weakTimer, action, when, baton](Status status) { + auto alarmTimer = weakTimer.lock(); + if (!alarmTimer) { + return; + } else { + stdx::lock_guard<stdx::mutex> lk(_inProgressMutex); + _inProgressAlarms.erase(alarmTimer); } - return; - } + auto nowVal = now(); + if (nowVal < when) { + warning() << "Alarm returned early. Expected at: " << when + << ", fired at: " << nowVal; + const auto status = setAlarm(when, std::move(action), baton); + if ((!status.isOK()) && (status != ErrorCodes::ShutdownInProgress)) { + fassertFailedWithStatus(50785, status); + } - if (status.isOK()) { - _reactor->schedule(transport::Reactor::kPost, std::move(action)); - } else if (status != ErrorCodes::CallbackCanceled) { - warning() << "setAlarm() received an error: " << status; - } - }); + return; + } + + if (status.isOK()) { + if (baton) { + baton->schedule(std::move(action)); + } else { + _reactor->schedule(transport::Reactor::kPost, std::move(action)); + } + } else if (status != ErrorCodes::CallbackCanceled) { + warning() << "setAlarm() received an error: " << status; + } + }); return Status::OK(); } diff --git a/src/mongo/executor/network_interface_tl.h b/src/mongo/executor/network_interface_tl.h index 8c4d4697d93..bb22407756c 100644 --- a/src/mongo/executor/network_interface_tl.h +++ b/src/mongo/executor/network_interface_tl.h @@ -37,6 +37,7 @@ #include "mongo/rpc/metadata/metadata_hook.h" #include "mongo/stdx/thread.h" #include "mongo/stdx/unordered_map.h" +#include "mongo/transport/baton.h" #include "mongo/transport/transport_layer.h" namespace mongo { @@ -49,6 +50,7 @@ public: ServiceContext* ctx, std::unique_ptr<NetworkConnectionHook> onConnectHook, std::unique_ptr<rpc::EgressMetadataHook> metadataHook); + std::string getDiagnosticString() override; void appendConnectionStats(ConnectionPoolStats* stats) const override; std::string getHostName() override; @@ -61,9 +63,14 @@ public: Date_t now() override; Status startCommand(const TaskExecutor::CallbackHandle& cbHandle, RemoteCommandRequest& request, - const RemoteCommandCompletionFn& onFinish) override; - void cancelCommand(const TaskExecutor::CallbackHandle& cbHandle) override; - Status setAlarm(Date_t when, const stdx::function<void()>& action) override; + const RemoteCommandCompletionFn& onFinish, + const transport::BatonHandle& baton) override; + + void cancelCommand(const TaskExecutor::CallbackHandle& cbHandle, + const transport::BatonHandle& baton) override; + Status setAlarm(Date_t when, + const stdx::function<void()>& action, + const transport::BatonHandle& baton) override; bool onNetworkThread() override; @@ -79,7 +86,18 @@ private: Date_t deadline = RemoteCommandRequest::kNoExpirationDate; Date_t start; - ConnectionPool::ConnectionHandle conn; + struct Deleter { + ConnectionPool::ConnectionHandleDeleter returner; + transport::ReactorHandle reactor; + + void operator()(ConnectionPool::ConnectionInterface* ptr) const { + reactor->schedule(transport::Reactor::kDispatch, + [ ret = returner, ptr ] { ret(ptr); }); + } + }; + using ConnHandle = std::unique_ptr<ConnectionPool::ConnectionInterface, Deleter>; + + ConnHandle conn; std::unique_ptr<transport::ReactorTimer> timer; AtomicBool done; @@ -89,7 +107,8 @@ private: void _eraseInUseConn(const TaskExecutor::CallbackHandle& handle); Future<RemoteCommandResponse> _onAcquireConn(std::shared_ptr<CommandState> state, - ConnectionPool::ConnectionHandle conn); + CommandState::ConnHandle conn, + const transport::BatonHandle& baton); std::string _instanceName; ServiceContext* _svcCtx; diff --git a/src/mongo/executor/task_executor.h b/src/mongo/executor/task_executor.h index 8d7c9cf72b7..d23069ba3ac 100644 --- a/src/mongo/executor/task_executor.h +++ b/src/mongo/executor/task_executor.h @@ -41,6 +41,7 @@ #include "mongo/platform/hash_namespace.h" #include "mongo/stdx/condition_variable.h" #include "mongo/stdx/functional.h" +#include "mongo/transport/baton.h" #include "mongo/util/future.h" #include "mongo/util/time_support.h" @@ -235,8 +236,10 @@ public: * Contract: Implementations should guarantee that callback should be called *after* doing any * processing related to the callback. */ - virtual StatusWith<CallbackHandle> scheduleRemoteCommand(const RemoteCommandRequest& request, - const RemoteCommandCallbackFn& cb) = 0; + virtual StatusWith<CallbackHandle> scheduleRemoteCommand( + const RemoteCommandRequest& request, + const RemoteCommandCallbackFn& cb, + const transport::BatonHandle& baton = nullptr) = 0; /** * If the callback referenced by "cbHandle" hasn't already executed, marks it as diff --git a/src/mongo/executor/task_executor_pool.cpp b/src/mongo/executor/task_executor_pool.cpp index 71d796a1a2b..24046a4fe75 100644 --- a/src/mongo/executor/task_executor_pool.cpp +++ b/src/mongo/executor/task_executor_pool.cpp @@ -42,7 +42,7 @@ namespace executor { // If less than or equal to 0, the suggested pool size will be determined by the number of cores. If // set to a particular positive value, this will be used as the pool size. -MONGO_EXPORT_SERVER_PARAMETER(taskExecutorPoolSize, int, 0); +MONGO_EXPORT_SERVER_PARAMETER(taskExecutorPoolSize, int, 1); size_t TaskExecutorPool::getSuggestedPoolSize() { auto poolSize = taskExecutorPoolSize.load(); diff --git a/src/mongo/executor/thread_pool_task_executor.cpp b/src/mongo/executor/thread_pool_task_executor.cpp index 202de888b5a..7d42893edb2 100644 --- a/src/mongo/executor/thread_pool_task_executor.cpp +++ b/src/mongo/executor/thread_pool_task_executor.cpp @@ -43,6 +43,7 @@ #include "mongo/executor/connection_pool_stats.h" #include "mongo/executor/network_interface.h" #include "mongo/platform/atomic_word.h" +#include "mongo/transport/baton.h" #include "mongo/util/concurrency/thread_pool_interface.h" #include "mongo/util/fail_point_service.h" #include "mongo/util/log.h" @@ -59,15 +60,17 @@ class ThreadPoolTaskExecutor::CallbackState : public TaskExecutor::CallbackState MONGO_DISALLOW_COPYING(CallbackState); public: - static std::shared_ptr<CallbackState> make(CallbackFn&& cb, Date_t readyDate) { - return std::make_shared<CallbackState>(std::move(cb), readyDate); + static std::shared_ptr<CallbackState> make(CallbackFn&& cb, + Date_t readyDate, + const transport::BatonHandle& baton) { + return std::make_shared<CallbackState>(std::move(cb), readyDate, baton); } /** * Do not call directly. Use make. */ - CallbackState(CallbackFn&& cb, Date_t theReadyDate) - : callback(std::move(cb)), readyDate(theReadyDate) {} + CallbackState(CallbackFn&& cb, Date_t theReadyDate, const transport::BatonHandle& baton) + : callback(std::move(cb)), readyDate(theReadyDate), baton(baton) {} virtual ~CallbackState() = default; @@ -94,6 +97,7 @@ public: bool isNetworkOperation = false; AtomicWord<bool> isFinished{false}; boost::optional<stdx::condition_variable> finishedCondition; + transport::BatonHandle baton; }; class ThreadPoolTaskExecutor::EventState : public TaskExecutor::EventState { @@ -125,7 +129,7 @@ public: }; ThreadPoolTaskExecutor::ThreadPoolTaskExecutor(std::unique_ptr<ThreadPoolInterface> pool, - std::unique_ptr<NetworkInterface> net) + std::shared_ptr<NetworkInterface> net) : _net(std::move(net)), _pool(std::move(pool)) {} ThreadPoolTaskExecutor::~ThreadPoolTaskExecutor() { @@ -272,7 +276,7 @@ StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::onEvent(const E if (!event.isValid()) { return {ErrorCodes::BadValue, "Passed invalid event handle to onEvent"}; } - auto wq = makeSingletonWorkQueue(work); + auto wq = makeSingletonWorkQueue(work, nullptr); stdx::unique_lock<stdx::mutex> lk(_mutex); auto eventState = checked_cast<EventState*>(getEventFromHandle(event)); auto cbHandle = enqueueCallbackState_inlock(&eventState->waiters, &wq); @@ -319,7 +323,7 @@ void ThreadPoolTaskExecutor::waitForEvent(const EventHandle& event) { StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::scheduleWork( const CallbackFn& work) { - auto wq = makeSingletonWorkQueue(work); + auto wq = makeSingletonWorkQueue(work, nullptr); WorkQueue temp; stdx::unique_lock<stdx::mutex> lk(_mutex); auto cbHandle = enqueueCallbackState_inlock(&temp, &wq); @@ -335,7 +339,7 @@ StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::scheduleWorkAt( if (when <= now()) { return scheduleWork(work); } - auto wq = makeSingletonWorkQueue(work, when); + auto wq = makeSingletonWorkQueue(work, nullptr, when); stdx::unique_lock<stdx::mutex> lk(_mutex); auto cbHandle = enqueueCallbackState_inlock(&_sleepersQueue, &wq); if (!cbHandle.isOK()) { @@ -354,7 +358,8 @@ StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::scheduleWorkAt( return; } scheduleIntoPool_inlock(&_sleepersQueue, cbState->iter, std::move(lk)); - }) + }, + nullptr) .transitional_ignore(); return cbHandle; @@ -389,7 +394,9 @@ void remoteCommandFailedEarly(const TaskExecutor::CallbackArgs& cbData, } // namespace StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::scheduleRemoteCommand( - const RemoteCommandRequest& request, const RemoteCommandCallbackFn& cb) { + const RemoteCommandRequest& request, + const RemoteCommandCallbackFn& cb, + const transport::BatonHandle& baton) { RemoteCommandRequest scheduledRequest = request; if (request.timeout == RemoteCommandRequest::kNoTimeout) { scheduledRequest.expirationDate = RemoteCommandRequest::kNoExpirationDate; @@ -399,9 +406,11 @@ StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::scheduleRemoteC // In case the request fails to even get a connection from the pool, // we wrap the callback in a method that prepares its input parameters. - auto wq = makeSingletonWorkQueue([scheduledRequest, cb](const CallbackArgs& cbData) { - remoteCommandFailedEarly(cbData, cb, scheduledRequest); - }); + auto wq = makeSingletonWorkQueue( + [scheduledRequest, cb](const CallbackArgs& cbData) { + remoteCommandFailedEarly(cbData, cb, scheduledRequest); + }, + baton); wq.front()->isNetworkOperation = true; stdx::unique_lock<stdx::mutex> lk(_mutex); auto cbHandle = enqueueCallbackState_inlock(&_networkInProgressQueue, &wq); @@ -427,7 +436,8 @@ StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::scheduleRemoteC : response.status.toString()); swap(cbState->callback, newCb); scheduleIntoPool_inlock(&_networkInProgressQueue, cbState->iter, std::move(lk)); - }) + }, + baton) .transitional_ignore(); return cbHandle; } @@ -442,7 +452,7 @@ void ThreadPoolTaskExecutor::cancel(const CallbackHandle& cbHandle) { cbState->canceled.store(1); if (cbState->isNetworkOperation) { lk.unlock(); - _net->cancelCommand(cbHandle); + _net->cancelCommand(cbHandle, cbState->baton); return; } if (cbState->readyDate != Date_t{}) { @@ -492,10 +502,10 @@ StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::enqueueCallback return cbHandle; } -ThreadPoolTaskExecutor::WorkQueue ThreadPoolTaskExecutor::makeSingletonWorkQueue(CallbackFn work, - Date_t when) { +ThreadPoolTaskExecutor::WorkQueue ThreadPoolTaskExecutor::makeSingletonWorkQueue( + CallbackFn work, const transport::BatonHandle& baton, Date_t when) { WorkQueue result; - result.emplace_front(CallbackState::make(std::move(work), when)); + result.emplace_front(CallbackState::make(std::move(work), when, baton)); result.front()->iter = result.begin(); return result; } @@ -547,10 +557,15 @@ void ThreadPoolTaskExecutor::scheduleIntoPool_inlock(WorkQueue* fromQueue, } for (const auto& cbState : todo) { - const auto status = _pool->schedule([this, cbState] { runCallback(std::move(cbState)); }); - if (status == ErrorCodes::ShutdownInProgress) - break; - fassert(28735, status); + if (cbState->baton) { + cbState->baton->schedule([this, cbState] { runCallback(std::move(cbState)); }); + } else { + const auto status = + _pool->schedule([this, cbState] { runCallback(std::move(cbState)); }); + if (status == ErrorCodes::ShutdownInProgress) + break; + fassert(28735, status); + } } _net->signalWorkAvailable(); } diff --git a/src/mongo/executor/thread_pool_task_executor.h b/src/mongo/executor/thread_pool_task_executor.h index 8e81e3a7f07..92b809bc0db 100644 --- a/src/mongo/executor/thread_pool_task_executor.h +++ b/src/mongo/executor/thread_pool_task_executor.h @@ -36,6 +36,7 @@ #include "mongo/stdx/list.h" #include "mongo/stdx/mutex.h" #include "mongo/stdx/thread.h" +#include "mongo/transport/baton.h" namespace mongo { @@ -58,7 +59,7 @@ public: * for network operations. */ ThreadPoolTaskExecutor(std::unique_ptr<ThreadPoolInterface> pool, - std::unique_ptr<NetworkInterface> net); + std::shared_ptr<NetworkInterface> net); /** * Destroys a ThreadPoolTaskExecutor. @@ -79,8 +80,10 @@ public: void waitForEvent(const EventHandle& event) override; StatusWith<CallbackHandle> scheduleWork(const CallbackFn& work) override; StatusWith<CallbackHandle> scheduleWorkAt(Date_t when, const CallbackFn& work) override; - StatusWith<CallbackHandle> scheduleRemoteCommand(const RemoteCommandRequest& request, - const RemoteCommandCallbackFn& cb) override; + StatusWith<CallbackHandle> scheduleRemoteCommand( + const RemoteCommandRequest& request, + const RemoteCommandCallbackFn& cb, + const transport::BatonHandle& baton = nullptr) override; void cancel(const CallbackHandle& cbHandle) override; void wait(const CallbackHandle& cbHandle) override; @@ -128,7 +131,9 @@ private: * executing "work" no sooner than "when" (defaults to ASAP). This function may and should be * called outside of _mutex. */ - static WorkQueue makeSingletonWorkQueue(CallbackFn work, Date_t when = {}); + static WorkQueue makeSingletonWorkQueue(CallbackFn work, + const transport::BatonHandle& baton, + Date_t when = {}); /** * Moves the single callback in "wq" to the end of "queue". It is required that "wq" was @@ -174,7 +179,7 @@ private: stdx::unique_lock<stdx::mutex> _join(stdx::unique_lock<stdx::mutex> lk); // The network interface used for remote command execution and waiting. - std::unique_ptr<NetworkInterface> _net; + std::shared_ptr<NetworkInterface> _net; // The thread pool that executes scheduled work items. std::unique_ptr<ThreadPoolInterface> _pool; diff --git a/src/mongo/s/async_requests_sender.cpp b/src/mongo/s/async_requests_sender.cpp index 5e70f3630ff..d63429ff8e9 100644 --- a/src/mongo/s/async_requests_sender.cpp +++ b/src/mongo/s/async_requests_sender.cpp @@ -33,16 +33,22 @@ #include "mongo/s/async_requests_sender.h" #include "mongo/client/remote_command_targeter.h" +#include "mongo/db/server_parameters.h" #include "mongo/executor/remote_command_request.h" #include "mongo/rpc/get_status_from_command_result.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/grid.h" #include "mongo/stdx/memory.h" +#include "mongo/transport/baton.h" +#include "mongo/transport/transport_layer.h" #include "mongo/util/assert_util.h" #include "mongo/util/log.h" #include "mongo/util/scopeguard.h" namespace mongo { + +MONGO_EXPORT_SERVER_PARAMETER(AsyncRequestsSenderUseBaton, bool, true); + namespace { // Maximum number of retries for network and replication notMaster errors (per host). @@ -58,6 +64,7 @@ AsyncRequestsSender::AsyncRequestsSender(OperationContext* opCtx, Shard::RetryPolicy retryPolicy) : _opCtx(opCtx), _executor(executor), + _baton(opCtx), _db(dbName.toString()), _readPreference(readPreference), _retryPolicy(retryPolicy) { @@ -71,6 +78,7 @@ AsyncRequestsSender::AsyncRequestsSender(OperationContext* opCtx, // Schedule the requests immediately. _scheduleRequests(); } + AsyncRequestsSender::~AsyncRequestsSender() { _cancelPendingRequests(); @@ -90,7 +98,7 @@ AsyncRequestsSender::Response AsyncRequestsSender::next() { // Otherwise, wait for some response to be received. if (_interruptStatus.isOK()) { try { - _handleResponse(_responseQueue.pop(_opCtx)); + _makeProgress(_opCtx); } catch (const AssertionException& ex) { // If the operation is interrupted, we cancel outstanding requests and switch to // waiting for the (canceled) callbacks to finish without checking for interrupts. @@ -99,7 +107,7 @@ AsyncRequestsSender::Response AsyncRequestsSender::next() { continue; } } else { - _handleResponse(_responseQueue.pop()); + _makeProgress(nullptr); } } return *readyResponse; @@ -130,6 +138,11 @@ boost::optional<AsyncRequestsSender::Response> AsyncRequestsSender::_ready() { _scheduleRequests(); } + // If we have baton requests, we want to process those before proceeding + if (_batonRequests) { + return boost::none; + } + // Check if any remote is ready. invariant(!_remotes.empty()); for (auto& remote : _remotes) { @@ -200,6 +213,12 @@ void AsyncRequestsSender::_scheduleRequests() { auto scheduleStatus = _scheduleRequest(i); if (!scheduleStatus.isOK()) { remote.swResponse = std::move(scheduleStatus); + + if (_baton) { + _batonRequests++; + _baton->schedule([this] { _batonRequests--; }); + } + // Push a noop response to the queue to indicate that a remote is ready for // re-processing due to failure. _responseQueue.push(boost::none); @@ -214,7 +233,7 @@ Status AsyncRequestsSender::_scheduleRequest(size_t remoteIndex) { invariant(!remote.cbHandle.isValid()); invariant(!remote.swResponse); - Status resolveStatus = remote.resolveShardIdToHostAndPort(_readPreference); + Status resolveStatus = remote.resolveShardIdToHostAndPort(this, _readPreference); if (!resolveStatus.isOK()) { return resolveStatus; } @@ -225,8 +244,14 @@ Status AsyncRequestsSender::_scheduleRequest(size_t remoteIndex) { auto callbackStatus = _executor->scheduleRemoteCommand( request, [remoteIndex, this](const executor::TaskExecutor::RemoteCommandCallbackArgs& cbData) { + if (_baton) { + _batonRequests++; + _baton->schedule([this] { _batonRequests--; }); + } + _responseQueue.push(Job{cbData, remoteIndex}); - }); + }, + _baton); if (!callbackStatus.isOK()) { return callbackStatus.getStatus(); } @@ -235,7 +260,21 @@ Status AsyncRequestsSender::_scheduleRequest(size_t remoteIndex) { return Status::OK(); } -void AsyncRequestsSender::_handleResponse(boost::optional<Job> job) { +void AsyncRequestsSender::_makeProgress(OperationContext* opCtx) { + boost::optional<Job> job; + + if (_baton) { + // If we're using a baton, we peek the queue, and block on the baton if it's empty + if (boost::optional<boost::optional<Job>> tryJob = _responseQueue.tryPop()) { + job = std::move(*tryJob); + } else { + _baton->run(_opCtx, boost::none); + } + } else { + // Otherwise we block on the queue + job = _opCtx ? _responseQueue.pop(_opCtx) : _responseQueue.pop(); + } + if (!job) { return; } @@ -274,14 +313,57 @@ AsyncRequestsSender::RemoteData::RemoteData(ShardId shardId, BSONObj cmdObj) : shardId(std::move(shardId)), cmdObj(std::move(cmdObj)) {} Status AsyncRequestsSender::RemoteData::resolveShardIdToHostAndPort( - const ReadPreferenceSetting& readPref) { + AsyncRequestsSender* ars, const ReadPreferenceSetting& readPref) { const auto shard = getShard(); if (!shard) { return Status(ErrorCodes::ShardNotFound, str::stream() << "Could not find shard " << shardId); } - auto findHostStatus = shard->getTargeter()->findHostWithMaxWait(readPref, Seconds{20}); + auto clock = ars->_opCtx->getServiceContext()->getFastClockSource(); + + auto deadline = clock->now() + Seconds(20); + + auto targeter = shard->getTargeter(); + + auto findHostStatus = [&] { + // If we don't have a baton, just go ahead and block in targeting + if (!ars->_baton) { + return targeter->findHostWithMaxWait(readPref, Seconds{20}); + } + + // If we do have a baton, and we can target quickly, just do that + { + auto findHostStatus = targeter->findHostNoWait(readPref); + if (findHostStatus.isOK()) { + return findHostStatus; + } + } + + // If it's going to take a while to target, we spin up a background thread to do our + // targeting, while running the baton on the calling thread. This allows us to make forward + // progress on previous requests. + Promise<HostAndPort> promise; + auto future = promise.getFuture(); + + ars->_batonRequests++; + stdx::thread bgChecker([&] { + promise.setWith( + [&] { return targeter->findHostWithMaxWait(readPref, deadline - clock->now()); }); + + ars->_baton->schedule([ars] { ars->_batonRequests--; }); + }); + const auto guard = MakeGuard([&] { bgChecker.join(); }); + + while (!future.isReady()) { + if (!ars->_baton->run(nullptr, deadline)) { + break; + } + } + + return future.getNoThrow(); + }(); + if (!findHostStatus.isOK()) { return findHostStatus.getStatus(); } @@ -296,4 +378,17 @@ std::shared_ptr<Shard> AsyncRequestsSender::RemoteData::getShard() { return grid.shardRegistry()->getShardNoReload(shardId); } +AsyncRequestsSender::BatonDetacher::BatonDetacher(OperationContext* opCtx) + : _baton(AsyncRequestsSenderUseBaton.load() + ? (opCtx->getServiceContext()->getTransportLayer() + ? opCtx->getServiceContext()->getTransportLayer()->makeBaton(opCtx) + : nullptr) + : nullptr) {} + +AsyncRequestsSender::BatonDetacher::~BatonDetacher() { + if (_baton) { + _baton->detach(); + } +} + } // namespace mongo diff --git a/src/mongo/s/async_requests_sender.h b/src/mongo/s/async_requests_sender.h index 2371ee988bb..dfb053c977e 100644 --- a/src/mongo/s/async_requests_sender.h +++ b/src/mongo/s/async_requests_sender.h @@ -171,7 +171,8 @@ private: /** * Given a read preference, selects a host on which the command should be run. */ - Status resolveShardIdToHostAndPort(const ReadPreferenceSetting& readPref); + Status resolveShardIdToHostAndPort(AsyncRequestsSender* ars, + const ReadPreferenceSetting& readPref); /** * Returns the Shard object associated with this remote. @@ -203,7 +204,7 @@ private: }; /** - * Job for _handleResponse. We use a producer consumer queue to coordinate with TaskExecutors + * Job for _makeProgress. We use a producer consumer queue to coordinate with TaskExecutors * off thread, and this wraps up the arguments for that call. */ struct Job { @@ -212,6 +213,38 @@ private: }; /** + * We have to make sure to detach the baton if we throw in construction. We also need a baton + * that lives longer than this type (because it can end up in callbacks that won't actually + * modify it). + * + * TODO: work out actual lifetime semantics for a baton. For now, leaving this as a wort in ARS + */ + class BatonDetacher { + public: + explicit BatonDetacher(OperationContext* opCtx); + ~BatonDetacher(); + + transport::Baton& operator*() const { + return *_baton; + } + + transport::Baton* operator->() const noexcept { + return _baton.get(); + } + + operator transport::BatonHandle() const { + return _baton; + } + + explicit operator bool() const noexcept { + return static_cast<bool>(_baton); + } + + private: + transport::BatonHandle _baton; + }; + + /** * Cancels all outstanding requests on the TaskExecutor and sets the _stopRetrying flag. */ void _cancelPendingRequests(); @@ -246,17 +279,19 @@ private: Status _scheduleRequest(size_t remoteIndex); /** - * The callback for a remote command. + * Waits for forward progress in gathering responses from a remote. * - * If the job is not set, we've failed targeting and calling this function is a noop. + * If the opCtx is non-null, use it while waiting on completion. * * Stores the response or error in the remote. */ - void _handleResponse(boost::optional<Job> job); + void _makeProgress(OperationContext* opCtx); OperationContext* _opCtx; executor::TaskExecutor* _executor; + BatonDetacher _baton; + size_t _batonRequests = 0; // The metadata obj to pass along with the command remote. Used to indicate that the command is // ok to run on secondaries. diff --git a/src/mongo/s/sharding_egress_metadata_hook_for_mongos.cpp b/src/mongo/s/sharding_egress_metadata_hook_for_mongos.cpp index e64860f2513..1582cb70291 100644 --- a/src/mongo/s/sharding_egress_metadata_hook_for_mongos.cpp +++ b/src/mongo/s/sharding_egress_metadata_hook_for_mongos.cpp @@ -42,40 +42,7 @@ namespace mongo { namespace rpc { void ShardingEgressMetadataHookForMongos::_saveGLEStats(const BSONObj& metadata, - StringData hostString) { - if (!haveClient()) { - // Client will be present only when write commands are used. - return; - } - - auto swShardingMetadata = rpc::ShardingMetadata::readFromMetadata(metadata); - if (swShardingMetadata.getStatus() == ErrorCodes::NoSuchKey) { - return; - } else if (!swShardingMetadata.isOK()) { - warning() << "Got invalid sharding metadata " << redact(swShardingMetadata.getStatus()) - << " metadata object was '" << redact(metadata) << "'"; - return; - } - - auto shardConn = ConnectionString::parse(hostString.toString()); - - // If we got the reply from this host, we expect that its 'hostString' must be valid. - if (!shardConn.isOK()) { - severe() << "got bad host string in saveGLEStats: " << hostString; - } - invariant(shardConn.getStatus()); - - auto shardingMetadata = std::move(swShardingMetadata.getValue()); - - auto& clientInfo = cc(); - LOG(4) << "saveGLEStats lastOpTime:" << shardingMetadata.getLastOpTime() - << " electionId:" << shardingMetadata.getLastElectionId(); - - ClusterLastErrorInfo::get(clientInfo) - ->addHostOpTime( - shardConn.getValue(), - HostOpTime(shardingMetadata.getLastOpTime(), shardingMetadata.getLastElectionId())); -} + StringData hostString) {} repl::OpTime ShardingEgressMetadataHookForMongos::_getConfigServerOpTime() { return Grid::get(_serviceContext)->configOpTime(); diff --git a/src/mongo/s/sharding_initialization.cpp b/src/mongo/s/sharding_initialization.cpp index 1bb4215fa2c..7b595826256 100644 --- a/src/mongo/s/sharding_initialization.cpp +++ b/src/mongo/s/sharding_initialization.cpp @@ -130,11 +130,11 @@ std::unique_ptr<TaskExecutorPool> makeShardingTaskExecutorPool( const auto poolSize = taskExecutorPoolSize.value_or(TaskExecutorPool::getSuggestedPoolSize()); for (size_t i = 0; i < poolSize; ++i) { - auto exec = makeShardingTaskExecutor(executor::makeNetworkInterface( - "NetworkInterfaceASIO-TaskExecutorPool-" + std::to_string(i), - stdx::make_unique<ShardingNetworkConnectionHook>(), - metadataHookBuilder(), - connPoolOptions)); + auto exec = makeShardingTaskExecutor( + executor::makeNetworkInterface("TaskExecutorPool-" + std::to_string(i), + stdx::make_unique<ShardingNetworkConnectionHook>(), + metadataHookBuilder(), + connPoolOptions)); executors.emplace_back(std::move(exec)); } @@ -219,7 +219,7 @@ Status initializeGlobalShardingState(OperationContext* opCtx, } auto network = - executor::makeNetworkInterface("NetworkInterfaceASIO-ShardRegistry", + executor::makeNetworkInterface("ShardRegistry", stdx::make_unique<ShardingNetworkConnectionHook>(), hookBuilder(), connPoolOptions); diff --git a/src/mongo/s/sharding_task_executor.cpp b/src/mongo/s/sharding_task_executor.cpp index ee15fdf3a6b..e7401272645 100644 --- a/src/mongo/s/sharding_task_executor.cpp +++ b/src/mongo/s/sharding_task_executor.cpp @@ -111,11 +111,13 @@ StatusWith<TaskExecutor::CallbackHandle> ShardingTaskExecutor::scheduleWorkAt( } StatusWith<TaskExecutor::CallbackHandle> ShardingTaskExecutor::scheduleRemoteCommand( - const RemoteCommandRequest& request, const RemoteCommandCallbackFn& cb) { + const RemoteCommandRequest& request, + const RemoteCommandCallbackFn& cb, + const transport::BatonHandle& baton) { // schedule the user's callback if there is not opCtx if (!request.opCtx) { - return _executor->scheduleRemoteCommand(request, cb); + return _executor->scheduleRemoteCommand(request, cb, baton); } boost::optional<RemoteCommandRequest> newRequest; @@ -201,7 +203,7 @@ StatusWith<TaskExecutor::CallbackHandle> ShardingTaskExecutor::scheduleRemoteCom } }; - return _executor->scheduleRemoteCommand(newRequest ? *newRequest : request, shardingCb); + return _executor->scheduleRemoteCommand(newRequest ? *newRequest : request, shardingCb, baton); } void ShardingTaskExecutor::cancel(const CallbackHandle& cbHandle) { diff --git a/src/mongo/s/sharding_task_executor.h b/src/mongo/s/sharding_task_executor.h index 4c2571c684a..92a78169549 100644 --- a/src/mongo/s/sharding_task_executor.h +++ b/src/mongo/s/sharding_task_executor.h @@ -67,8 +67,10 @@ public: Date_t deadline) override; StatusWith<CallbackHandle> scheduleWork(const CallbackFn& work) override; StatusWith<CallbackHandle> scheduleWorkAt(Date_t when, const CallbackFn& work) override; - StatusWith<CallbackHandle> scheduleRemoteCommand(const RemoteCommandRequest& request, - const RemoteCommandCallbackFn& cb) override; + StatusWith<CallbackHandle> scheduleRemoteCommand( + const RemoteCommandRequest& request, + const RemoteCommandCallbackFn& cb, + const transport::BatonHandle& baton = nullptr) override; void cancel(const CallbackHandle& cbHandle) override; void wait(const CallbackHandle& cbHandle) override; diff --git a/src/mongo/transport/baton.h b/src/mongo/transport/baton.h new file mode 100644 index 00000000000..59c2d7c06ab --- /dev/null +++ b/src/mongo/transport/baton.h @@ -0,0 +1,133 @@ +/** + * Copyright (C) 2018 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#pragma once + +#include <memory> + +#include "mongo/stdx/functional.h" +#include "mongo/transport/transport_layer.h" +#include "mongo/util/future.h" +#include "mongo/util/time_support.h" + +namespace mongo { + +class OperationContext; + +namespace transport { + +class TransportLayer; +class Session; +class ReactorTimer; + +/** + * A Baton is basically a networking reactor, with limited functionality and no forward progress + * guarantees. Rather than asynchronously running tasks through one, the baton records the intent + * of those tasks and defers waiting and execution to a later call to run(); + * + * Baton's provide a mechanism to allow consumers of a transport layer to execute IO themselves, + * rather than having this occur on another thread. This can improve performance by minimizing + * context switches, as well as improving the readability of stack traces by grounding async + * execution on top of a regular client call stack. + */ +class Baton { +public: + virtual ~Baton() = default; + + /** + * Detaches a baton from an associated opCtx. + */ + virtual void detach() = 0; + + /** + * Executes a callback on the baton via schedule. Returns a future which will execute on the + * baton runner. + */ + template <typename Callback> + Future<FutureContinuationResult<Callback>> execute(Callback&& cb) { + Promise<FutureContinuationResult<Callback>> promise; + auto future = promise.getFuture(); + + schedule([ cb = std::forward<Callback>(cb), sp = promise.share() ]() mutable { + sp.setWith(std::move(cb)); + }); + + return future; + } + + /** + * Executes a callback on the baton. + */ + virtual void schedule(stdx::function<void()> func) = 0; + + /** + * Adds a session, returning a future which activates on read/write-ability of the session. + */ + enum class Type { + In, + Out, + }; + virtual Future<void> addSession(Session& session, Type type) = 0; + + /** + * Adds a timer, returning a future which activates after a duration. + */ + virtual Future<void> waitFor(const ReactorTimer& timer, Milliseconds timeout) = 0; + + /** + * Adds a timer, returning a future which activates after a deadline. + */ + virtual Future<void> waitUntil(const ReactorTimer& timer, Date_t expiration) = 0; + + /** + * Cancels waiting on a session. + * + * Returns true if the session was in the baton to be cancelled. + */ + virtual bool cancelSession(Session& session) = 0; + + /** + * Cancels waiting on a timer + * + * Returns true if the timer was in the baton to be cancelled. + */ + virtual bool cancelTimer(const ReactorTimer& timer) = 0; + + /** + * Runs the baton. This blocks, waiting for networking events or timeouts, and fulfills + * promises and executes scheduled work. + * + * Returns false if the optional deadline has passed + */ + virtual bool run(OperationContext* opCtx, boost::optional<Date_t> deadline) = 0; +}; + +using BatonHandle = std::shared_ptr<Baton>; + +} // namespace transport +} // namespace mongo diff --git a/src/mongo/transport/baton_asio_linux.h b/src/mongo/transport/baton_asio_linux.h new file mode 100644 index 00000000000..65d894dbea7 --- /dev/null +++ b/src/mongo/transport/baton_asio_linux.h @@ -0,0 +1,424 @@ +/** + * Copyright (C) 2018 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#pragma once + +#include <memory> +#include <set> +#include <vector> + +#include <poll.h> +#include <sys/eventfd.h> + +#include "mongo/base/checked_cast.h" +#include "mongo/db/operation_context.h" +#include "mongo/stdx/mutex.h" +#include "mongo/stdx/unordered_map.h" +#include "mongo/transport/baton.h" +#include "mongo/transport/session_asio.h" +#include "mongo/util/errno_util.h" +#include "mongo/util/future.h" +#include "mongo/util/time_support.h" + +namespace mongo { +namespace transport { + +/** + * TransportLayerASIO Baton implementation for linux. + * + * We implement our networking reactor on top of poll + eventfd for wakeups + */ +class TransportLayerASIO::BatonASIO : public Baton { + + /** + * RAII type that wraps up an eventfd and reading/writing to it. We don't actually need the + * counter portion, just the notify/wakeup + */ + struct EventFDHolder { + EventFDHolder() : fd(::eventfd(0, EFD_CLOEXEC)) { + if (fd < 0) { + severe() << "error in eventfd: " << errnoWithDescription(errno); + fassertFailed(50833); + } + } + + ~EventFDHolder() { + ::close(fd); + } + + // Writes to the underlying eventfd + void notify() { + while (true) { + if (eventfd_write(fd, 1) == 0) { + break; + } + + invariant(errno == EINTR); + } + } + + void wait() { + while (true) { + // If we have activity on the eventfd, pull the count out + uint64_t u; + if (::eventfd_read(fd, &u) == 0) { + break; + } + + invariant(errno == EINTR); + } + } + + const int fd; + }; + +public: + BatonASIO(OperationContext* opCtx) : _opCtx(opCtx) {} + + ~BatonASIO() { + invariant(!_opCtx); + invariant(_sessions.empty()); + invariant(_scheduled.empty()); + invariant(_timers.empty()); + } + + void detach() override { + { + stdx::lock_guard<stdx::mutex> lk(_mutex); + invariant(_sessions.empty()); + invariant(_scheduled.empty()); + invariant(_timers.empty()); + } + + { + stdx::lock_guard<Client> lk(*_opCtx->getClient()); + invariant(_opCtx->getBaton().get() == this); + _opCtx->setBaton(nullptr); + } + + _opCtx = nullptr; + } + + Future<void> addSession(Session& session, Type type) override { + auto fd = checked_cast<ASIOSession&>(session).getSocket().native_handle(); + + Promise<void> promise; + auto out = promise.getFuture(); + + _safeExecute([ fd, type, sp = promise.share(), this ] { + _sessions[fd] = TransportSession{type, sp}; + }); + + return out; + } + + Future<void> waitFor(const ReactorTimer& timer, Milliseconds timeout) override { + return waitUntil(timer, Date_t::now() + timeout); + } + + Future<void> waitUntil(const ReactorTimer& timer, Date_t expiration) override { + Promise<void> promise; + auto out = promise.getFuture(); + + _safeExecute([ timerPtr = &timer, expiration, sp = promise.share(), this ] { + auto pair = _timers.insert({ + timerPtr, expiration, sp, + }); + invariant(pair.second); + _timersById[pair.first->id] = pair.first; + }); + + return out; + } + + bool cancelSession(Session& session) override { + const auto fd = checked_cast<ASIOSession&>(session).getSocket().native_handle(); + + stdx::unique_lock<stdx::mutex> lk(_mutex); + + if (_sessions.find(fd) == _sessions.end()) { + return false; + } + + // TODO: There's an ABA issue here with fds where between previously and before we could + // have removed the fd, then opened and added a new socket with the same fd. We need to + // solve it via using session id's for handles. + _safeExecute(std::move(lk), [fd, this] { _sessions.erase(fd); }); + + return true; + } + + bool cancelTimer(const ReactorTimer& timer) override { + stdx::unique_lock<stdx::mutex> lk(_mutex); + + if (_timersById.find(&timer) == _timersById.end()) { + return false; + } + + // TODO: Same ABA issue as above, but for pointers. + _safeExecute(std::move(lk), [ timerPtr = &timer, this ] { + auto iter = _timersById.find(timerPtr); + + if (iter != _timersById.end()) { + _timers.erase(iter->second); + _timersById.erase(iter); + } + }); + + return true; + } + + void schedule(stdx::function<void()> func) override { + stdx::lock_guard<stdx::mutex> lk(_mutex); + + _scheduled.push_back(std::move(func)); + + if (_inPoll) { + _efd.notify(); + } + } + + bool run(OperationContext* opCtx, boost::optional<Date_t> deadline) override { + std::vector<SharedPromise<void>> toFulfill; + + // We'll fulfill promises and run jobs on the way out, ensuring we don't hold any locks + const auto guard = MakeGuard([&] { + for (auto& promise : toFulfill) { + promise.emplaceValue(); + } + + stdx::unique_lock<stdx::mutex> lk(_mutex); + while (_scheduled.size()) { + decltype(_scheduled) toRun; + { + using std::swap; + swap(_scheduled, toRun); + } + + lk.unlock(); + for (auto& job : toRun) { + job(); + } + lk.lock(); + } + }); + + bool eventfdFired = false; + + // Note that it's important to check for interrupt without the lock, because markKilled + // calls schedule, which will deadlock if we're holding the lock when calling this. + if (opCtx) { + opCtx->checkForInterrupt(); + } + + stdx::unique_lock<stdx::mutex> lk(_mutex); + if (opCtx) { + invariant(opCtx == _opCtx); + } + + auto now = Date_t::now(); + + // If our deadline has passed, return that we've already failed + if (deadline && *deadline <= now) { + return false; + } + + // If anything was scheduled, run it now. No need to poll + if (_scheduled.size()) { + return true; + } + + boost::optional<Milliseconds> timeout; + + // If we have a timer, poll no longer than that + if (_timers.size()) { + timeout = _timers.begin()->expiration - now; + } + + if (deadline) { + auto deadlineTimeout = *deadline - now; + + // If we didn't have a timer with a deadline, or our deadline is sooner than that + // timer + if (!timeout || (deadlineTimeout < *timeout)) { + timeout = deadlineTimeout; + } + } + + std::vector<decltype(_sessions)::iterator> sessions; + sessions.reserve(_sessions.size()); + std::vector<pollfd> pollSet; + pollSet.reserve(_sessions.size() + 1); + + pollSet.push_back(pollfd{_efd.fd, POLLIN, 0}); + + for (auto iter = _sessions.begin(); iter != _sessions.end(); ++iter) { + pollSet.push_back( + pollfd{iter->first, + static_cast<short>(iter->second.type == Type::In ? POLLIN : POLLOUT), + 0}); + sessions.push_back(iter); + } + + int rval = 0; + // If we don't have a timeout, or we have a timeout that's unexpired, run poll. + if (!timeout || (*timeout > Milliseconds(0))) { + _inPoll = true; + lk.unlock(); + rval = + ::poll(pollSet.data(), pollSet.size(), timeout.value_or(Milliseconds(-1)).count()); + + const auto pollGuard = MakeGuard([&] { + lk.lock(); + _inPoll = false; + }); + + // If poll failed, it better be in EINTR + if (rval < 0 && errno != EINTR) { + severe() << "error in poll: " << errnoWithDescription(errno); + fassertFailed(50834); + } + + // Note that it's important to check for interrupt without the lock, because markKilled + // calls schedule, which will deadlock if we're holding the lock when calling this. + if (opCtx) { + opCtx->checkForInterrupt(); + } + } + + now = Date_t::now(); + + // If our deadline passed while in poll, we've failed + if (deadline && now > *deadline) { + return false; + } + + // Fire expired timers + for (auto iter = _timers.begin(); iter != _timers.end() && iter->expiration < now;) { + toFulfill.push_back(std::move(iter->promise)); + _timersById.erase(iter->id); + iter = _timers.erase(iter); + } + + // If poll found some activity + if (rval > 0) { + size_t remaining = rval; + + auto pollIter = pollSet.begin(); + + if (pollIter->revents) { + _efd.wait(); + eventfdFired = true; + + remaining--; + } + + ++pollIter; + for (auto sessionIter = sessions.begin(); sessionIter != sessions.end() && remaining; + ++sessionIter, ++pollIter) { + if (pollIter->revents) { + toFulfill.push_back(std::move((*sessionIter)->second.promise)); + _sessions.erase(*sessionIter); + + remaining--; + } + } + + invariant(remaining == 0); + } + + // If we got here, we should have done something + invariant(toFulfill.size() || _scheduled.size() || eventfdFired); + + return true; + } + +private: + struct Timer { + const ReactorTimer* id; + Date_t expiration; + SharedPromise<void> promise; + + struct LessThan { + bool operator()(const Timer& lhs, const Timer& rhs) const { + return std::tie(lhs.expiration, lhs.id) < std::tie(rhs.expiration, rhs.id); + } + }; + }; + + struct TransportSession { + Type type; + SharedPromise<void> promise; + }; + + template <typename Callback> + void _safeExecute(Callback&& cb) { + return _safeExecute(stdx::unique_lock<stdx::mutex>(_mutex), std::forward<Callback>(cb)); + } + + /** + * Safely executes method on the reactor. If we're in poll, we schedule a task, then write to + * the eventfd. If not, we run inline. + */ + template <typename Callback> + void _safeExecute(stdx::unique_lock<stdx::mutex> lk, Callback&& cb) { + if (_inPoll) { + _scheduled.push_back([cb, this] { + stdx::lock_guard<stdx::mutex> lk(_mutex); + cb(); + }); + + _efd.notify(); + } else { + cb(); + } + } + + stdx::mutex _mutex; + + OperationContext* _opCtx; + + bool _inPoll = false; + + EventFDHolder _efd; + + // This map stores the sessions we need to poll on. We unwind it into a pollset for every + // blocking call to run + stdx::unordered_map<int, TransportSession> _sessions; + + // The set is used to find the next timer which will fire. The unordered_map looks up the + // timers so we can remove them in O(1) + std::set<Timer, Timer::LessThan> _timers; + stdx::unordered_map<const ReactorTimer*, decltype(_timers)::const_iterator> _timersById; + + // For tasks that come in via schedule. Or that were deferred because we were in poll + std::vector<std::function<void()>> _scheduled; +}; + +} // namespace transport +} // namespace mongo diff --git a/src/mongo/transport/mock_session.h b/src/mongo/transport/mock_session.h index 60e6a4e33e2..0620650ce35 100644 --- a/src/mongo/transport/mock_session.h +++ b/src/mongo/transport/mock_session.h @@ -83,7 +83,7 @@ public: return Message(); // Subclasses can do something different. } - Future<Message> asyncSourceMessage() override { + Future<Message> asyncSourceMessage(const transport::BatonHandle& handle = nullptr) override { return Future<Message>::makeReady(sourceMessage()); } @@ -99,11 +99,12 @@ public: return Status::OK(); } - Future<void> asyncSinkMessage(Message message) override { + Future<void> asyncSinkMessage(Message message, + const transport::BatonHandle& handle = nullptr) override { return Future<void>::makeReady(sinkMessage(message)); } - void cancelAsyncOperations() override {} + void cancelAsyncOperations(const transport::BatonHandle& handle = nullptr) override {} void setTimeout(boost::optional<Milliseconds>) override {} diff --git a/src/mongo/transport/session.h b/src/mongo/transport/session.h index bfe45a6cbe2..304c8f11da4 100644 --- a/src/mongo/transport/session.h +++ b/src/mongo/transport/session.h @@ -44,6 +44,8 @@ namespace transport { class TransportLayer; class Session; +class Baton; +using BatonHandle = std::shared_ptr<Baton>; using SessionHandle = std::shared_ptr<Session>; using ConstSessionHandle = std::shared_ptr<const Session>; @@ -103,7 +105,7 @@ public: * Source (receive) a new Message from the remote host for this Session. */ virtual StatusWith<Message> sourceMessage() = 0; - virtual Future<Message> asyncSourceMessage() = 0; + virtual Future<Message> asyncSourceMessage(const transport::BatonHandle& handle = nullptr) = 0; /** * Sink (send) a Message to the remote host for this Session. @@ -111,14 +113,15 @@ public: * Async version will keep the buffer alive until the operation completes. */ virtual Status sinkMessage(Message message) = 0; - virtual Future<void> asyncSinkMessage(Message message) = 0; + virtual Future<void> asyncSinkMessage(Message message, + const transport::BatonHandle& handle = nullptr) = 0; /** * Cancel any outstanding async operations. There is no way to cancel synchronous calls. * Futures will finish with an ErrorCodes::CallbackCancelled error if they haven't already * completed. */ - virtual void cancelAsyncOperations() = 0; + virtual void cancelAsyncOperations(const transport::BatonHandle& handle = nullptr) = 0; /** * This should only be used to detect when the remote host has disappeared without diff --git a/src/mongo/transport/session_asio.h b/src/mongo/transport/session_asio.h index 994b0941b04..459cab76676 100644 --- a/src/mongo/transport/session_asio.h +++ b/src/mongo/transport/session_asio.h @@ -34,6 +34,7 @@ #include "mongo/config.h" #include "mongo/db/stats/counters.h" #include "mongo/transport/asio_utils.h" +#include "mongo/transport/baton.h" #include "mongo/transport/transport_layer_asio.h" #include "mongo/util/net/sock.h" #ifdef MONGO_CONFIG_SSL @@ -58,6 +59,14 @@ auto futurize(const std::error_code& ec, SuccessValue&& successValue) { return Result::makeReady(successValue); } +Future<void> futurize(const std::error_code& ec) { + using Result = Future<void>; + if (MONGO_unlikely(ec)) { + return Result::makeReady(errorCodeToStatus(ec)); + } + return Result::makeReady(); +} + using GenericSocket = asio::generic::stream_protocol::socket; class TransportLayerASIO::ASIOSession final : public Session { @@ -117,34 +126,35 @@ public: return sourceMessageImpl().getNoThrow(); } - Future<Message> asyncSourceMessage() override { + Future<Message> asyncSourceMessage(const transport::BatonHandle& baton = nullptr) override { ensureAsync(); - return sourceMessageImpl(); + return sourceMessageImpl(baton); } Status sinkMessage(Message message) override { ensureSync(); return write(asio::buffer(message.buf(), message.size())) - .then([&message](size_t size) { - invariant(size == size_t(message.size())); - networkCounter.hitPhysicalOut(message.size()); - }) + .then([&message] { networkCounter.hitPhysicalOut(message.size()); }) .getNoThrow(); } - Future<void> asyncSinkMessage(Message message) override { + Future<void> asyncSinkMessage(Message message, + const transport::BatonHandle& baton = nullptr) override { ensureAsync(); - return write(asio::buffer(message.buf(), message.size())) - .then([message /*keep the buffer alive*/](size_t size) { - invariant(size == size_t(message.size())); + return write(asio::buffer(message.buf(), message.size()), baton) + .then([message /*keep the buffer alive*/]() { networkCounter.hitPhysicalOut(message.size()); }); } - void cancelAsyncOperations() override { + void cancelAsyncOperations(const transport::BatonHandle& baton = nullptr) override { LOG(3) << "Cancelling outstanding I/O operations on connection to " << _remote; - getSocket().cancel(); + if (baton) { + baton->cancelSession(*this); + } else { + getSocket().cancel(); + } } void setTimeout(boost::optional<Milliseconds> timeout) override { @@ -186,6 +196,7 @@ public: protected: friend class TransportLayerASIO; + friend TransportLayerASIO::BatonASIO; #ifdef MONGO_CONFIG_SSL Future<void> handshakeSSLForEgress(const HostAndPort& target) { @@ -307,19 +318,17 @@ private: return _socket; } - Future<Message> sourceMessageImpl() { + Future<Message> sourceMessageImpl(const transport::BatonHandle& baton = nullptr) { static constexpr auto kHeaderSize = sizeof(MSGHEADER::Value); auto headerBuffer = SharedBuffer::allocate(kHeaderSize); auto ptr = headerBuffer.get(); - return read(asio::buffer(ptr, kHeaderSize)) - .then([ headerBuffer = std::move(headerBuffer), this ](size_t size) mutable { - if (checkForHTTPRequest(asio::buffer(headerBuffer.get(), size))) { - return sendHTTPResponse(); + return read(asio::buffer(ptr, kHeaderSize), baton) + .then([ headerBuffer = std::move(headerBuffer), this, baton ]() mutable { + if (checkForHTTPRequest(asio::buffer(headerBuffer.get(), kHeaderSize))) { + return sendHTTPResponse(baton); } - invariant(size == kHeaderSize); - const auto msgLen = size_t(MSGHEADER::View(headerBuffer.get()).getMessageLength()); if (msgLen < kHeaderSize || msgLen > MaxMessageSizeBytes) { StringBuilder sb; @@ -331,7 +340,7 @@ private: return Future<Message>::makeReady(Status(ErrorCodes::ProtocolError, str)); } - if (msgLen == size) { + if (msgLen == kHeaderSize) { // This probably isn't a real case since all (current) messages have bodies. networkCounter.hitPhysicalIn(msgLen); return Future<Message>::makeReady(Message(std::move(headerBuffer))); @@ -341,8 +350,8 @@ private: memcpy(buffer.get(), headerBuffer.get(), kHeaderSize); MsgData::View msgView(buffer.get()); - return read(asio::buffer(msgView.data(), msgView.dataLen())) - .then([ buffer = std::move(buffer), msgLen ](size_t size) mutable { + return read(asio::buffer(msgView.data(), msgView.dataLen()), baton) + .then([ buffer = std::move(buffer), msgLen ]() mutable { networkCounter.hitPhysicalIn(msgLen); return Message(std::move(buffer)); }); @@ -350,55 +359,59 @@ private: } template <typename MutableBufferSequence> - Future<size_t> read(const MutableBufferSequence& buffers) { + Future<void> read(const MutableBufferSequence& buffers, + const transport::BatonHandle& baton = nullptr) { #ifdef MONGO_CONFIG_SSL if (_sslSocket) { - return opportunisticRead(*_sslSocket, buffers); + return opportunisticRead(*_sslSocket, buffers, baton); } else if (!_ranHandshake) { invariant(asio::buffer_size(buffers) >= sizeof(MSGHEADER::Value)); - return opportunisticRead(_socket, buffers) - .then([this, buffers](size_t size) mutable { + return opportunisticRead(_socket, buffers, baton) + .then([this, buffers]() mutable { _ranHandshake = true; return maybeHandshakeSSLForIngress(buffers); }) - .then([this, buffers](bool needsRead) mutable { + .then([this, buffers, baton](bool needsRead) mutable { if (needsRead) { - return read(buffers); + return read(buffers, baton); } else { - return Future<size_t>::makeReady(asio::buffer_size(buffers)); + return Future<void>::makeReady(); } }); } #endif - return opportunisticRead(_socket, buffers); + return opportunisticRead(_socket, buffers, baton); } template <typename ConstBufferSequence> - Future<size_t> write(const ConstBufferSequence& buffers) { + Future<void> write(const ConstBufferSequence& buffers, + const transport::BatonHandle& baton = nullptr) { #ifdef MONGO_CONFIG_SSL _ranHandshake = true; if (_sslSocket) { #ifdef __linux__ // We do some trickery in asio (see moreToSend), which appears to work well on linux, // but fails on other platforms. - return opportunisticWrite(*_sslSocket, buffers); + return opportunisticWrite(*_sslSocket, buffers, baton); #else if (_blockingMode == Async) { // Opportunistic writes are broken for async egress SSL (switching between blocking // and non-blocking mode corrupts the TLS exchange). - return asio::async_write(*_sslSocket, buffers, UseFuture{}); + return asio::async_write(*_sslSocket, buffers, UseFuture{}).ignoreValue(); } else { - return opportunisticWrite(*_sslSocket, buffers); + return opportunisticWrite(*_sslSocket, buffers, baton); } #endif } #endif - return opportunisticWrite(_socket, buffers); + return opportunisticWrite(_socket, buffers, baton); } template <typename Stream, typename MutableBufferSequence> - Future<size_t> opportunisticRead(Stream& stream, const MutableBufferSequence& buffers) { + Future<void> opportunisticRead(Stream& stream, + const MutableBufferSequence& buffers, + const transport::BatonHandle& baton = nullptr) { std::error_code ec; auto size = asio::read(stream, buffers, ec); if (((ec == asio::error::would_block) || (ec == asio::error::try_again)) && @@ -410,13 +423,17 @@ private: if (size > 0) { asyncBuffers += size; } - return asio::async_read(stream, asyncBuffers, UseFuture{}) - .then([size](size_t asyncSize) { - // Add back in the size read opportunistically. - return size + asyncSize; - }); + + if (baton) { + return baton->addSession(*this, Baton::Type::In) + .then([&stream, asyncBuffers, baton, this] { + return opportunisticRead(stream, asyncBuffers, baton); + }); + } + + return asio::async_read(stream, asyncBuffers, UseFuture{}).ignoreValue(); } else { - return futurize(ec, size); + return futurize(ec); } } @@ -430,24 +447,21 @@ private: * needs to keep sending). */ template <typename ConstBufferSequence> - boost::optional<Future<size_t>> moreToSend(GenericSocket& socket, - const ConstBufferSequence& buffers, - size_t size) { + boost::optional<Future<void>> moreToSend(GenericSocket& socket, + const ConstBufferSequence& buffers, + const transport::BatonHandle& baton) { return boost::none; } #ifdef MONGO_CONFIG_SSL template <typename ConstBufferSequence> - boost::optional<Future<size_t>> moreToSend(asio::ssl::stream<GenericSocket>& socket, - const ConstBufferSequence& buffers, - size_t sizeFromBefore) { + boost::optional<Future<void>> moreToSend(asio::ssl::stream<GenericSocket>& socket, + const ConstBufferSequence& buffers, + const BatonHandle& baton) { if (_sslSocket->getCoreOutputBuffer().size()) { - return opportunisticWrite(getSocket(), _sslSocket->getCoreOutputBuffer()) - .then([this, &socket, buffers, sizeFromBefore](size_t) { - return opportunisticWrite(socket, buffers) - .then([sizeFromBefore](size_t justWritten) { - return justWritten + sizeFromBefore; - }); + return opportunisticWrite(getSocket(), _sslSocket->getCoreOutputBuffer(), baton) + .then([this, &socket, buffers, baton] { + return opportunisticWrite(socket, buffers, baton); }); } @@ -456,7 +470,9 @@ private: #endif template <typename Stream, typename ConstBufferSequence> - Future<size_t> opportunisticWrite(Stream& stream, const ConstBufferSequence& buffers) { + Future<void> opportunisticWrite(Stream& stream, + const ConstBufferSequence& buffers, + const transport::BatonHandle& baton = nullptr) { std::error_code ec; auto size = asio::write(stream, buffers, ec); if (((ec == asio::error::would_block) || (ec == asio::error::try_again)) && @@ -470,17 +486,20 @@ private: asyncBuffers += size; } - if (auto more = moreToSend(stream, asyncBuffers, size)) { + if (auto more = moreToSend(stream, asyncBuffers, baton)) { return std::move(*more); } - return asio::async_write(stream, asyncBuffers, UseFuture{}) - .then([size](size_t asyncSize) { - // Add back in the size written opportunistically. - return size + asyncSize; - }); + if (baton) { + return baton->addSession(*this, Baton::Type::Out) + .then([&stream, asyncBuffers, baton, this] { + return opportunisticWrite(stream, asyncBuffers, baton); + }); + } + + return asio::async_write(stream, asyncBuffers, UseFuture{}).ignoreValue(); } else { - return futurize(ec, size); + return futurize(ec); } } @@ -568,7 +587,7 @@ private: // Called from read() to send an HTTP response back to a client that's trying to use HTTP // over a native MongoDB port. This returns a Future<Message> to match its only caller, but it // always contains an error, so it could really return Future<Anything> - Future<Message> sendHTTPResponse() { + Future<Message> sendHTTPResponse(const BatonHandle& baton = nullptr) { constexpr auto userMsg = "It looks like you are trying to access MongoDB over HTTP" " on the native driver port.\r\n"_sd; @@ -580,17 +599,17 @@ private: << userMsg.size() << "\r\n\r\n" << userMsg; - return write(asio::buffer(httpResp.data(), httpResp.size())) + return write(asio::buffer(httpResp.data(), httpResp.size()), baton) .onError( [](const Status& status) { - return StatusWith<size_t>( + return Status( ErrorCodes::ProtocolError, str::stream() << "Client sent an HTTP request over a native MongoDB connection, " "but there was an error sending a response: " << status.toString()); }) - .then([](size_t size) { + .then([] { return StatusWith<Message>( ErrorCodes::ProtocolError, "Client sent an HTTP request over a native MongoDB connection"); diff --git a/src/mongo/transport/transport_layer.h b/src/mongo/transport/transport_layer.h index 492fbf82ea0..ab36f7954a3 100644 --- a/src/mongo/transport/transport_layer.h +++ b/src/mongo/transport/transport_layer.h @@ -38,6 +38,9 @@ #include "mongo/util/time_support.h" namespace mongo { + +class OperationContext; + namespace transport { enum ConnectSSLMode { kGlobalSSLMode, kEnableSSL, kDisableSSL }; @@ -103,6 +106,10 @@ public: enum WhichReactor { kIngress, kEgress, kNewReactor }; virtual ReactorHandle getReactor(WhichReactor which) = 0; + virtual BatonHandle makeBaton(OperationContext* opCtx) { + return nullptr; + } + protected: TransportLayer() = default; }; @@ -124,15 +131,15 @@ public: * * If no future is outstanding, then this is a noop. */ - virtual void cancel() = 0; + virtual void cancel(const BatonHandle& baton = nullptr) = 0; /* * Returns a future that will be filled with Status::OK after the timeout has ellapsed. * * Calling this implicitly calls cancel(). */ - virtual Future<void> waitFor(Milliseconds timeout) = 0; - virtual Future<void> waitUntil(Date_t timeout) = 0; + virtual Future<void> waitFor(Milliseconds timeout, const BatonHandle& baton = nullptr) = 0; + virtual Future<void> waitUntil(Date_t timeout, const BatonHandle& baton = nullptr) = 0; }; class Reactor { diff --git a/src/mongo/transport/transport_layer_asio.cpp b/src/mongo/transport/transport_layer_asio.cpp index 508e70f39e1..c194073d000 100644 --- a/src/mongo/transport/transport_layer_asio.cpp +++ b/src/mongo/transport/transport_layer_asio.cpp @@ -56,6 +56,9 @@ #endif // session_asio.h has some header dependencies that require it to be the last header. +#ifdef __linux__ +#include "mongo/transport/baton_asio_linux.h" +#endif #include "mongo/transport/session_asio.h" namespace mongo { @@ -72,7 +75,7 @@ public: cancel(); } - void cancel() override { + void cancel(const BatonHandle& baton = nullptr) override { auto promise = [&] { stdx::lock_guard<stdx::mutex> lk(_timerState->mutex); _timerState->generation++; @@ -86,18 +89,40 @@ public: promise.setError({ErrorCodes::CallbackCanceled, "Timer was canceled"}); }); } - _timerState->timer.cancel(); + + if (!(baton && baton->cancelTimer(*this))) { + _timerState->timer.cancel(); + } } - Future<void> waitFor(Milliseconds timeout) override { - return _asyncWait([&] { _timerState->timer.expires_after(timeout.toSystemDuration()); }); + Future<void> waitFor(Milliseconds timeout, const BatonHandle& baton = nullptr) override { + if (baton) { + return _asyncWait([&] { return baton->waitFor(*this, timeout); }, baton); + } else { + return _asyncWait( + [&] { _timerState->timer.expires_after(timeout.toSystemDuration()); }); + } } - Future<void> waitUntil(Date_t expiration) override { - return _asyncWait([&] { _timerState->timer.expires_at(expiration.toSystemTimePoint()); }); + Future<void> waitUntil(Date_t expiration, const BatonHandle& baton = nullptr) override { + if (baton) { + return _asyncWait([&] { return baton->waitUntil(*this, expiration); }, baton); + } else { + return _asyncWait( + [&] { _timerState->timer.expires_at(expiration.toSystemTimePoint()); }); + } } private: + std::pair<Future<void>, uint64_t> _getFuture() { + stdx::lock_guard<stdx::mutex> lk(_timerState->mutex); + auto id = ++_timerState->generation; + invariant(!_timerState->finalPromise); + _timerState->finalPromise = std::make_unique<Promise<void>>(); + auto future = _timerState->finalPromise->getFuture(); + return std::make_pair(std::move(future), id); + } + template <typename ArmTimerCb> Future<void> _asyncWait(ArmTimerCb&& armTimer) { try { @@ -105,14 +130,7 @@ private: Future<void> ret; uint64_t id; - std::tie(ret, id) = [&] { - stdx::lock_guard<stdx::mutex> lk(_timerState->mutex); - auto id = ++_timerState->generation; - invariant(!_timerState->finalPromise); - _timerState->finalPromise = std::make_unique<Promise<void>>(); - auto future = _timerState->finalPromise->getFuture(); - return std::make_pair(std::move(future), id); - }(); + std::tie(ret, id) = _getFuture(); armTimer(); _timerState->timer.async_wait( @@ -137,6 +155,32 @@ private: } } + template <typename ArmTimerCb> + Future<void> _asyncWait(ArmTimerCb&& armTimer, const BatonHandle& baton) { + cancel(baton); + + Future<void> ret; + uint64_t id; + std::tie(ret, id) = _getFuture(); + + armTimer().getAsync([ id, state = _timerState ](Status status) mutable { + stdx::unique_lock<stdx::mutex> lk(state->mutex); + if (id != state->generation) { + return; + } + auto promise = std::move(state->finalPromise); + lk.unlock(); + + if (status.isOK()) { + promise->emplaceValue(); + } else { + promise->setError(status); + } + }); + + return ret; + } + // The timer itself and its state are stored in this struct managed by a shared_ptr so we can // extend the lifetime of the timer until all callbacks to timer.async_wait have run. struct TimerState { @@ -729,5 +773,21 @@ SSLParams::SSLModes TransportLayerASIO::_sslMode() const { } #endif +BatonHandle TransportLayerASIO::makeBaton(OperationContext* opCtx) { +#ifdef __linux__ + auto baton = std::make_shared<BatonASIO>(opCtx); + + { + stdx::lock_guard<Client> lk(*opCtx->getClient()); + invariant(!opCtx->getBaton()); + opCtx->setBaton(baton); + } + + return std::move(baton); +#else + return nullptr; +#endif +} + } // namespace transport } // namespace mongo diff --git a/src/mongo/transport/transport_layer_asio.h b/src/mongo/transport/transport_layer_asio.h index 85ead5799b3..2e774210c55 100644 --- a/src/mongo/transport/transport_layer_asio.h +++ b/src/mongo/transport/transport_layer_asio.h @@ -125,7 +125,10 @@ public: return _listenerPort; } + BatonHandle makeBaton(OperationContext* opCtx) override; + private: + class BatonASIO; class ASIOSession; class ASIOReactor; diff --git a/src/mongo/transport/transport_layer_manager.h b/src/mongo/transport/transport_layer_manager.h index d90386829e2..18df98e99cd 100644 --- a/src/mongo/transport/transport_layer_manager.h +++ b/src/mongo/transport/transport_layer_manager.h @@ -88,6 +88,13 @@ public: static std::unique_ptr<TransportLayer> makeAndStartDefaultEgressTransportLayer(); + BatonHandle makeBaton(OperationContext* opCtx) override { + stdx::lock_guard<stdx::mutex> lk(_tlsMutex); + // TODO: figure out what to do about managers with more than one transport layer. + invariant(_tls.size() == 1); + return _tls[0]->makeBaton(opCtx); + } + private: template <typename Callable> void _foreach(Callable&& cb) const; diff --git a/src/mongo/unittest/task_executor_proxy.cpp b/src/mongo/unittest/task_executor_proxy.cpp index 03a9c90246e..b58d75a394e 100644 --- a/src/mongo/unittest/task_executor_proxy.cpp +++ b/src/mongo/unittest/task_executor_proxy.cpp @@ -99,8 +99,10 @@ StatusWith<executor::TaskExecutor::CallbackHandle> TaskExecutorProxy::scheduleWo } StatusWith<executor::TaskExecutor::CallbackHandle> TaskExecutorProxy::scheduleRemoteCommand( - const executor::RemoteCommandRequest& request, const RemoteCommandCallbackFn& cb) { - return _executor->scheduleRemoteCommand(request, cb); + const executor::RemoteCommandRequest& request, + const RemoteCommandCallbackFn& cb, + const transport::BatonHandle& baton) { + return _executor->scheduleRemoteCommand(request, cb, baton); } void TaskExecutorProxy::cancel(const CallbackHandle& cbHandle) { diff --git a/src/mongo/unittest/task_executor_proxy.h b/src/mongo/unittest/task_executor_proxy.h index fdcbc9c71d4..3c2bb935692 100644 --- a/src/mongo/unittest/task_executor_proxy.h +++ b/src/mongo/unittest/task_executor_proxy.h @@ -66,7 +66,9 @@ public: virtual StatusWith<CallbackHandle> scheduleWork(const CallbackFn& work) override; virtual StatusWith<CallbackHandle> scheduleWorkAt(Date_t when, const CallbackFn& work) override; virtual StatusWith<CallbackHandle> scheduleRemoteCommand( - const executor::RemoteCommandRequest& request, const RemoteCommandCallbackFn& cb) override; + const executor::RemoteCommandRequest& request, + const RemoteCommandCallbackFn& cb, + const transport::BatonHandle& baton = nullptr) override; virtual void cancel(const CallbackHandle& cbHandle) override; virtual void wait(const CallbackHandle& cbHandle) override; virtual void appendConnectionStats(executor::ConnectionPoolStats* stats) const override; diff --git a/src/mongo/util/future.h b/src/mongo/util/future.h index 2485b170463..2b4d2c9c4f4 100644 --- a/src/mongo/util/future.h +++ b/src/mongo/util/future.h @@ -537,6 +537,11 @@ public: setImpl([&] { sharedState->setError(std::move(status)); }); } + // TODO rename to not XXXWith and handle void + void setFromStatusWith(StatusWith<T> sw) noexcept { + setImpl([&] { sharedState->setFromStatusWith(std::move(sw)); }); + } + /** * Get a copyable SharedPromise that can be used to complete this Promise's Future. * @@ -1008,6 +1013,15 @@ public: [](Func && func, const Status& status) noexcept { call(func, status); }); } + /** + * Ignores the return value of a future, transforming it down into a Future<void>. + * + * This only ignores values, not errors. Those remain propogated until an onError handler. + * + * Equivalent to then([](auto&&){}); + */ + Future<void> ignoreValue() && noexcept; + private: template <typename T2> friend class Future; @@ -1203,6 +1217,10 @@ public: return std::move(inner).tapAll(std::forward<Func>(func)); } + Future<void> ignoreValue() && noexcept { + return std::move(*this); + } + private: template <typename T> friend class Future; @@ -1225,6 +1243,11 @@ private: Future<FakeVoid> inner; }; +template <typename T> + Future<void> Future<T>::ignoreValue() && noexcept { + return std::move(*this).then([](auto&&) {}); +} + /** * Makes a ready Future with the return value of a nullary function. This has the same semantics as * Promise::setWith, and has the same reasons to prefer it over Future<T>::makeReady(). Also, it |