diff options
Diffstat (limited to 'src/mongo/executor')
-rw-r--r-- | src/mongo/executor/connection_pool.cpp | 3 | ||||
-rw-r--r-- | src/mongo/executor/connection_pool.h | 4 | ||||
-rw-r--r-- | src/mongo/executor/connection_pool_tl.cpp | 13 | ||||
-rw-r--r-- | src/mongo/executor/network_interface.h | 30 | ||||
-rw-r--r-- | src/mongo/executor/network_interface_asio.cpp | 14 | ||||
-rw-r--r-- | src/mongo/executor/network_interface_asio.h | 10 | ||||
-rw-r--r-- | src/mongo/executor/network_interface_mock.cpp | 10 | ||||
-rw-r--r-- | src/mongo/executor/network_interface_mock.h | 10 | ||||
-rw-r--r-- | src/mongo/executor/network_interface_tl.cpp | 227 | ||||
-rw-r--r-- | src/mongo/executor/network_interface_tl.h | 29 | ||||
-rw-r--r-- | src/mongo/executor/task_executor.h | 7 | ||||
-rw-r--r-- | src/mongo/executor/task_executor_pool.cpp | 2 | ||||
-rw-r--r-- | src/mongo/executor/thread_pool_task_executor.cpp | 59 | ||||
-rw-r--r-- | src/mongo/executor/thread_pool_task_executor.h | 15 |
14 files changed, 287 insertions, 146 deletions
diff --git a/src/mongo/executor/connection_pool.cpp b/src/mongo/executor/connection_pool.cpp index 7092c460ed3..f8b5da5fb62 100644 --- a/src/mongo/executor/connection_pool.cpp +++ b/src/mongo/executor/connection_pool.cpp @@ -649,8 +649,7 @@ void ConnectionPool::SpecificPool::processFailure(const Status& status, _readyPool.clear(); // Log something helpful - log() << "Dropping all pooled connections to " << _hostAndPort - << " due to failed operation on a connection"; + log() << "Dropping all pooled connections to " << _hostAndPort << " due to " << status; // Migrate processing connections to the dropped pool for (auto&& x : _processingPool) { diff --git a/src/mongo/executor/connection_pool.h b/src/mongo/executor/connection_pool.h index 10a711261ef..160321bec9a 100644 --- a/src/mongo/executor/connection_pool.h +++ b/src/mongo/executor/connection_pool.h @@ -60,10 +60,10 @@ struct ConnectionPoolStats; * HostAndPort. See comments on the various Options for how the pool operates. */ class ConnectionPool : public EgressTagCloser { - class ConnectionHandleDeleter; class SpecificPool; public: + class ConnectionHandleDeleter; class ConnectionInterface; class DependentTypeFactoryInterface; class TimerInterface; @@ -175,7 +175,7 @@ public: ConnectionHandleDeleter() = default; ConnectionHandleDeleter(ConnectionPool* pool) : _pool(pool) {} - void operator()(ConnectionInterface* connection) { + void operator()(ConnectionInterface* connection) const { if (_pool && connection) _pool->returnConnection(connection); } diff --git a/src/mongo/executor/connection_pool_tl.cpp b/src/mongo/executor/connection_pool_tl.cpp index 3823a9a3e0a..907032c7262 100644 --- a/src/mongo/executor/connection_pool_tl.cpp +++ b/src/mongo/executor/connection_pool_tl.cpp @@ -50,7 +50,9 @@ struct TimeoutHandler { void TLTimer::setTimeout(Milliseconds timeoutVal, TimeoutCallback cb) { _timer->waitFor(timeoutVal).getAsync([cb = std::move(cb)](Status status) { - if (status == ErrorCodes::CallbackCanceled) { + // TODO: verify why we still get broken promises when expliciting call stop and shutting + // down NITL's quickly. + if (status == ErrorCodes::CallbackCanceled || status == ErrorCodes::BrokenPromise) { return; } @@ -125,10 +127,9 @@ void TLConnection::setup(Milliseconds timeout, SetupCallback cb) { }); AsyncDBClient::connect(_peer, transport::kGlobalSSLMode, _serviceContext, _reactor) - .onError( - [this](StatusWith<AsyncDBClient::Handle> swc) -> StatusWith<AsyncDBClient::Handle> { - return Status(ErrorCodes::HostUnreachable, swc.getStatus().reason()); - }) + .onError([](StatusWith<AsyncDBClient::Handle> swc) -> StatusWith<AsyncDBClient::Handle> { + return Status(ErrorCodes::HostUnreachable, swc.getStatus().reason()); + }) .then([this](AsyncDBClient::Handle client) { _client = std::move(client); return _client->initWireVersion("NetworkInterfaceTL", _onConnectHook); @@ -186,7 +187,7 @@ void TLConnection::refresh(Milliseconds timeout, RefreshCallback cb) { _client ->runCommandRequest( {_peer, std::string("admin"), BSON("isMaster" << 1), BSONObj(), nullptr}) - .then([this](executor::RemoteCommandResponse response) { + .then([](executor::RemoteCommandResponse response) { return Future<void>::makeReady(response.status); }) .getAsync([this, handler](Status status) { diff --git a/src/mongo/executor/network_interface.h b/src/mongo/executor/network_interface.h index 48428b820e3..b333813df14 100644 --- a/src/mongo/executor/network_interface.h +++ b/src/mongo/executor/network_interface.h @@ -34,6 +34,8 @@ #include "mongo/base/disallow_copying.h" #include "mongo/executor/task_executor.h" #include "mongo/stdx/functional.h" +#include "mongo/transport/baton.h" +#include "mongo/util/future.h" namespace mongo { @@ -128,13 +130,33 @@ public: */ virtual Status startCommand(const TaskExecutor::CallbackHandle& cbHandle, RemoteCommandRequest& request, - const RemoteCommandCompletionFn& onFinish) = 0; + const RemoteCommandCompletionFn& onFinish, + const transport::BatonHandle& baton = nullptr) = 0; + + Future<TaskExecutor::ResponseStatus> startCommand( + const TaskExecutor::CallbackHandle& cbHandle, + RemoteCommandRequest& request, + const transport::BatonHandle& baton = nullptr) { + Promise<TaskExecutor::ResponseStatus> promise; + auto future = promise.getFuture(); + + auto status = + startCommand(cbHandle, + request, + [sp = promise.share()](const TaskExecutor::ResponseStatus& rs) mutable { + sp.emplaceValue(rs); + }, + baton); + + return future; + } /** * Requests cancelation of the network activity associated with "cbHandle" if it has not yet * completed. */ - virtual void cancelCommand(const TaskExecutor::CallbackHandle& cbHandle) = 0; + virtual void cancelCommand(const TaskExecutor::CallbackHandle& cbHandle, + const transport::BatonHandle& baton = nullptr) = 0; /** * Sets an alarm, which schedules "action" to run no sooner than "when". @@ -150,7 +172,9 @@ public: * Any callbacks invoked from setAlarm must observe onNetworkThread to * return true. See that method for why. */ - virtual Status setAlarm(Date_t when, const stdx::function<void()>& action) = 0; + virtual Status setAlarm(Date_t when, + const stdx::function<void()>& action, + const transport::BatonHandle& baton = nullptr) = 0; /** * Returns true if called from a thread dedicated to networking. I.e. not a diff --git a/src/mongo/executor/network_interface_asio.cpp b/src/mongo/executor/network_interface_asio.cpp index 9fe1c9da4da..8d74d56656f 100644 --- a/src/mongo/executor/network_interface_asio.cpp +++ b/src/mongo/executor/network_interface_asio.cpp @@ -250,7 +250,8 @@ Status attachMetadataIfNeeded(RemoteCommandRequest& request, Status NetworkInterfaceASIO::startCommand(const TaskExecutor::CallbackHandle& cbHandle, RemoteCommandRequest& request, - const RemoteCommandCompletionFn& onFinish) { + const RemoteCommandCompletionFn& onFinish, + const transport::BatonHandle& baton) { MONGO_ASIO_INVARIANT(onFinish, "Invalid completion function"); { stdx::lock_guard<stdx::mutex> lk(_inProgressMutex); @@ -422,7 +423,8 @@ Status NetworkInterfaceASIO::startCommand(const TaskExecutor::CallbackHandle& cb return Status::OK(); } -void NetworkInterfaceASIO::cancelCommand(const TaskExecutor::CallbackHandle& cbHandle) { +void NetworkInterfaceASIO::cancelCommand(const TaskExecutor::CallbackHandle& cbHandle, + const transport::BatonHandle& baton) { stdx::lock_guard<stdx::mutex> lk(_inProgressMutex); // If we found a matching cbHandle in _inGetConnection, then @@ -447,7 +449,9 @@ void NetworkInterfaceASIO::cancelCommand(const TaskExecutor::CallbackHandle& cbH } } -Status NetworkInterfaceASIO::setAlarm(Date_t when, const stdx::function<void()>& action) { +Status NetworkInterfaceASIO::setAlarm(Date_t when, + const stdx::function<void()>& action, + const transport::BatonHandle& baton) { if (inShutdown()) { return {ErrorCodes::ShutdownInProgress, "NetworkInterfaceASIO shutdown in progress"}; } @@ -462,12 +466,12 @@ Status NetworkInterfaceASIO::setAlarm(Date_t when, const stdx::function<void()>& return exceptionToStatus(); } - alarm->async_wait([alarm, this, action, when](std::error_code ec) { + alarm->async_wait([alarm, this, action, when, baton](std::error_code ec) { const auto nowValue = now(); if (nowValue < when) { warning() << "ASIO alarm returned early. Expected at: " << when << ", fired at: " << nowValue; - const auto status = setAlarm(when, action); + const auto status = setAlarm(when, action, baton); if ((!status.isOK()) && (status.code() != ErrorCodes::ShutdownInProgress)) { fassertFailedWithStatus(40383, status); } diff --git a/src/mongo/executor/network_interface_asio.h b/src/mongo/executor/network_interface_asio.h index 8e464754bfa..8f57e3d08b8 100644 --- a/src/mongo/executor/network_interface_asio.h +++ b/src/mongo/executor/network_interface_asio.h @@ -130,9 +130,13 @@ public: Date_t now() override; Status startCommand(const TaskExecutor::CallbackHandle& cbHandle, RemoteCommandRequest& request, - const RemoteCommandCompletionFn& onFinish) override; - void cancelCommand(const TaskExecutor::CallbackHandle& cbHandle) override; - Status setAlarm(Date_t when, const stdx::function<void()>& action) override; + const RemoteCommandCompletionFn& onFinish, + const transport::BatonHandle& baton = nullptr) override; + void cancelCommand(const TaskExecutor::CallbackHandle& cbHandle, + const transport::BatonHandle& baton = nullptr) override; + Status setAlarm(Date_t when, + const stdx::function<void()>& action, + const transport::BatonHandle& baton = nullptr) override; bool onNetworkThread() override; diff --git a/src/mongo/executor/network_interface_mock.cpp b/src/mongo/executor/network_interface_mock.cpp index 39e8e4e2995..674dd8bb116 100644 --- a/src/mongo/executor/network_interface_mock.cpp +++ b/src/mongo/executor/network_interface_mock.cpp @@ -113,7 +113,8 @@ std::string NetworkInterfaceMock::getHostName() { Status NetworkInterfaceMock::startCommand(const CallbackHandle& cbHandle, RemoteCommandRequest& request, - const RemoteCommandCompletionFn& onFinish) { + const RemoteCommandCompletionFn& onFinish, + const transport::BatonHandle& baton) { if (inShutdown()) { return {ErrorCodes::ShutdownInProgress, "NetworkInterfaceMock shutdown in progress"}; } @@ -145,7 +146,8 @@ void NetworkInterfaceMock::setHandshakeReplyForHost( } } -void NetworkInterfaceMock::cancelCommand(const CallbackHandle& cbHandle) { +void NetworkInterfaceMock::cancelCommand(const CallbackHandle& cbHandle, + const transport::BatonHandle& baton) { invariant(!inShutdown()); stdx::lock_guard<stdx::mutex> lk(_mutex); @@ -175,7 +177,9 @@ void NetworkInterfaceMock::_interruptWithResponse_inlock( } } -Status NetworkInterfaceMock::setAlarm(const Date_t when, const stdx::function<void()>& action) { +Status NetworkInterfaceMock::setAlarm(const Date_t when, + const stdx::function<void()>& action, + const transport::BatonHandle& baton) { if (inShutdown()) { return {ErrorCodes::ShutdownInProgress, "NetworkInterfaceMock shutdown in progress"}; } diff --git a/src/mongo/executor/network_interface_mock.h b/src/mongo/executor/network_interface_mock.h index 554306397ba..76a9a6462a8 100644 --- a/src/mongo/executor/network_interface_mock.h +++ b/src/mongo/executor/network_interface_mock.h @@ -108,7 +108,8 @@ public: virtual std::string getHostName(); virtual Status startCommand(const TaskExecutor::CallbackHandle& cbHandle, RemoteCommandRequest& request, - const RemoteCommandCompletionFn& onFinish); + const RemoteCommandCompletionFn& onFinish, + const transport::BatonHandle& baton = nullptr); /** * If the network operation is in the _unscheduled or _processing queues, moves the operation @@ -116,12 +117,15 @@ public: * the _scheduled queue, does nothing. The latter simulates the case where cancelCommand() is * called after the task has already completed, but its callback has not yet been run. */ - virtual void cancelCommand(const TaskExecutor::CallbackHandle& cbHandle); + virtual void cancelCommand(const TaskExecutor::CallbackHandle& cbHandle, + const transport::BatonHandle& baton = nullptr); /** * Not implemented. */ - virtual Status setAlarm(Date_t when, const stdx::function<void()>& action); + virtual Status setAlarm(Date_t when, + const stdx::function<void()>& action, + const transport::BatonHandle& baton = nullptr); virtual bool onNetworkThread(); diff --git a/src/mongo/executor/network_interface_tl.cpp b/src/mongo/executor/network_interface_tl.cpp index eef2299e623..15947af2c0b 100644 --- a/src/mongo/executor/network_interface_tl.cpp +++ b/src/mongo/executor/network_interface_tl.cpp @@ -85,7 +85,7 @@ void NetworkInterfaceTL::startup() { auto typeFactory = std::make_unique<connection_pool_tl::TLTypeFactory>( _reactor, _tl, std::move(_onConnectHook)); _pool = std::make_unique<ConnectionPool>( - std::move(typeFactory), "NetworkInterfaceTL", _connPoolOpts); + std::move(typeFactory), std::string("NetworkInterfaceTL-") + _instanceName, _connPoolOpts); _ioThread = stdx::thread([this] { setThreadName(_instanceName); LOG(2) << "The NetworkInterfaceTL reactor thread is spinning up"; @@ -97,6 +97,7 @@ void NetworkInterfaceTL::shutdown() { _inShutdown.store(true); _reactor->stop(); _ioThread.join(); + _pool.reset(); LOG(2) << "NetworkInterfaceTL shutdown successfully"; } @@ -136,7 +137,8 @@ Date_t NetworkInterfaceTL::now() { Status NetworkInterfaceTL::startCommand(const TaskExecutor::CallbackHandle& cbHandle, RemoteCommandRequest& request, - const RemoteCommandCompletionFn& onFinish) { + const RemoteCommandCompletionFn& onFinish, + const transport::BatonHandle& baton) { if (inShutdown()) { return {ErrorCodes::ShutdownInProgress, "NetworkInterface shutdown in progress"}; } @@ -166,40 +168,86 @@ Status NetworkInterfaceTL::startCommand(const TaskExecutor::CallbackHandle& cbHa state->deadline = state->start + state->request.timeout; } - _pool->get(request.target, request.timeout) - .tapError([state](Status error) { - LOG(2) << "Failed to get connection from pool for request " << state->request.id << ": " - << error; - }) - .then([this, state](ConnectionPool::ConnectionHandle conn) mutable { - return _onAcquireConn(state, std::move(conn)); - }) - .onError([](Status error) -> StatusWith<RemoteCommandResponse> { - // The TransportLayer has, for historical reasons returned SocketException for - // network errors, but sharding assumes HostUnreachable on network errors. - if (error == ErrorCodes::SocketException) { - error = Status(ErrorCodes::HostUnreachable, error.reason()); - } - return error; - }) - .getAsync([this, state, onFinish](StatusWith<RemoteCommandResponse> response) { - auto duration = now() - state->start; - if (!response.isOK()) { - onFinish(RemoteCommandResponse(response.getStatus(), duration)); - } else { - auto rs = std::move(response.getValue()); - LOG(2) << "Request " << state->request.id << " finished with response: " - << redact(rs.isOK() ? rs.data.toString() : rs.status.toString()); - onFinish(rs); - } + // Interacting with the connection pool can involve more work than just getting a connection + // out. In particular, we can end up having to spin up new connections, and fulfilling promises + // for other requesters. Returning connections has the same issue. + // + // To work around it, we make sure to hop onto the reactor thread before getting a connection, + // then making sure to get back to the client thread to do the work (if on a baton). And we + // hook up a connection returning unique_ptr that ensures that however we exit, we always do the + // return on the reactor thread. + // + // TODO: get rid of this cruft once we have a connection pool that's executor aware. + auto connFuture = _reactor->execute([this, state, request, baton] { + return makeReadyFutureWith( + [this, request] { return _pool->get(request.target, request.timeout); }) + .tapError([state](Status error) { + LOG(2) << "Failed to get connection from pool for request " << state->request.id + << ": " << error; + }) + .then([this, baton](ConnectionPool::ConnectionHandle conn) { + auto deleter = conn.get_deleter(); + + // TODO: drop out this shared_ptr once we have a unique_function capable future + return std::make_shared<CommandState::ConnHandle>( + conn.release(), CommandState::Deleter{deleter, _reactor}); + }); + }); + + auto remainingWork = [this, state, baton, onFinish]( + StatusWith<std::shared_ptr<CommandState::ConnHandle>> swConn) { + makeReadyFutureWith( + [&] { return _onAcquireConn(state, std::move(*uassertStatusOK(swConn)), baton); }) + .onError([](Status error) -> StatusWith<RemoteCommandResponse> { + // The TransportLayer has, for historical reasons returned SocketException for + // network errors, but sharding assumes HostUnreachable on network errors. + if (error == ErrorCodes::SocketException) { + error = Status(ErrorCodes::HostUnreachable, error.reason()); + } + return error; + }) + .getAsync([this, state, onFinish](StatusWith<RemoteCommandResponse> response) { + auto duration = now() - state->start; + if (!response.isOK()) { + onFinish(RemoteCommandResponse(response.getStatus(), duration)); + } else { + const auto& rs = response.getValue(); + LOG(2) << "Request " << state->request.id << " finished with response: " + << redact(rs.isOK() ? rs.data.toString() : rs.status.toString()); + onFinish(rs); + } + }); + }; + + if (baton) { + // If we have a baton, we want to get back to the baton thread immediately after we get a + // connection + std::move(connFuture).getAsync([ + baton, + rw = std::move(remainingWork) + ](StatusWith<std::shared_ptr<CommandState::ConnHandle>> swConn) mutable { + baton->schedule([ rw = std::move(rw), swConn = std::move(swConn) ]() mutable { + std::move(rw)(std::move(swConn)); + }); }); + } else { + // otherwise we're happy to run inline + std::move(connFuture) + .getAsync([rw = std::move(remainingWork)]( + StatusWith<std::shared_ptr<CommandState::ConnHandle>> swConn) mutable { + std::move(rw)(std::move(swConn)); + }); + } + return Status::OK(); } // This is only called from within a then() callback on a future, so throwing is equivalent to // returning a ready Future with a not-OK status. Future<RemoteCommandResponse> NetworkInterfaceTL::_onAcquireConn( - std::shared_ptr<CommandState> state, ConnectionPool::ConnectionHandle conn) { + std::shared_ptr<CommandState> state, + CommandState::ConnHandle conn, + const transport::BatonHandle& baton) { if (state->done.load()) { conn->indicateSuccess(); uasserted(ErrorCodes::CallbackCanceled, "Command was canceled"); @@ -222,44 +270,42 @@ Future<RemoteCommandResponse> NetworkInterfaceTL::_onAcquireConn( } state->timer = _reactor->makeTimer(); - state->timer->waitUntil(state->deadline).getAsync([client, state](Status status) { - if (status == ErrorCodes::CallbackCanceled) { - invariant(state->done.load()); - return; - } + state->timer->waitUntil(state->deadline, baton) + .getAsync([client, state, baton](Status status) { + if (status == ErrorCodes::CallbackCanceled) { + invariant(state->done.load()); + return; + } - if (state->done.swap(true)) { - return; - } + if (state->done.swap(true)) { + return; + } - LOG(2) << "Request " << state->request.id << " timed out" - << ", deadline was " << state->deadline << ", op was " - << redact(state->request.toString()); - state->promise.setError( - Status(ErrorCodes::NetworkInterfaceExceededTimeLimit, "timed out")); + LOG(2) << "Request " << state->request.id << " timed out" + << ", deadline was " << state->deadline << ", op was " + << redact(state->request.toString()); + state->promise.setError( + Status(ErrorCodes::NetworkInterfaceExceededTimeLimit, "timed out")); - client->cancel(); - }); + client->cancel(baton); + }); } - client->runCommandRequest(state->request) + client->runCommandRequest(state->request, baton) .then([this, state](RemoteCommandResponse response) { if (state->done.load()) { uasserted(ErrorCodes::CallbackCanceled, "Callback was canceled"); } - // TODO Investigate whether this is necessary here. - return _reactor->execute([ this, state, response = std::move(response) ]() mutable { - if (_metadataHook && response.status.isOK()) { - auto target = state->conn->getHostAndPort().toString(); - response.status = _metadataHook->readReplyMetadata( - nullptr, std::move(target), response.metadata); - } + if (_metadataHook && response.status.isOK()) { + auto target = state->conn->getHostAndPort().toString(); + response.status = + _metadataHook->readReplyMetadata(nullptr, std::move(target), response.metadata); + } - return std::move(response); - }); + return RemoteCommandResponse(std::move(response)); }) - .getAsync([this, state](StatusWith<RemoteCommandResponse> swr) { + .getAsync([this, state, baton](StatusWith<RemoteCommandResponse> swr) { _eraseInUseConn(state->cbHandle); if (!swr.isOK()) { state->conn->indicateFailure(swr.getStatus()); @@ -273,10 +319,10 @@ Future<RemoteCommandResponse> NetworkInterfaceTL::_onAcquireConn( return; if (state->timer) { - state->timer->cancel(); + state->timer->cancel(baton); } - state->promise.setWith([&] { return std::move(swr); }); + state->promise.setFromStatusWith(std::move(swr)); }); return std::move(state->mergedFuture); @@ -287,7 +333,8 @@ void NetworkInterfaceTL::_eraseInUseConn(const TaskExecutor::CallbackHandle& cbH _inProgress.erase(cbHandle); } -void NetworkInterfaceTL::cancelCommand(const TaskExecutor::CallbackHandle& cbHandle) { +void NetworkInterfaceTL::cancelCommand(const TaskExecutor::CallbackHandle& cbHandle, + const transport::BatonHandle& baton) { stdx::unique_lock<stdx::mutex> lk(_inProgressMutex); auto it = _inProgress.find(cbHandle); if (it == _inProgress.end()) { @@ -307,17 +354,23 @@ void NetworkInterfaceTL::cancelCommand(const TaskExecutor::CallbackHandle& cbHan << redact(state->request.toString())}); if (state->conn) { auto client = checked_cast<connection_pool_tl::TLConnection*>(state->conn.get()); - client->client()->cancel(); + client->client()->cancel(baton); } } -Status NetworkInterfaceTL::setAlarm(Date_t when, const stdx::function<void()>& action) { +Status NetworkInterfaceTL::setAlarm(Date_t when, + const stdx::function<void()>& action, + const transport::BatonHandle& baton) { if (inShutdown()) { return {ErrorCodes::ShutdownInProgress, "NetworkInterface shutdown in progress"}; } if (when <= now()) { - _reactor->schedule(transport::Reactor::kPost, std::move(action)); + if (baton) { + baton->schedule(std::move(action)); + } else { + _reactor->schedule(transport::Reactor::kPost, std::move(action)); + } return Status::OK(); } @@ -329,32 +382,38 @@ Status NetworkInterfaceTL::setAlarm(Date_t when, const stdx::function<void()>& a _inProgressAlarms.insert(alarmTimer); } - alarmTimer->waitUntil(when).getAsync([this, weakTimer, action, when](Status status) { - auto alarmTimer = weakTimer.lock(); - if (!alarmTimer) { - return; - } else { - stdx::lock_guard<stdx::mutex> lk(_inProgressMutex); - _inProgressAlarms.erase(alarmTimer); - } - - auto nowVal = now(); - if (nowVal < when) { - warning() << "Alarm returned early. Expected at: " << when << ", fired at: " << nowVal; - const auto status = setAlarm(when, std::move(action)); - if ((!status.isOK()) && (status != ErrorCodes::ShutdownInProgress)) { - fassertFailedWithStatus(50785, status); + alarmTimer->waitUntil(when, baton) + .getAsync([this, weakTimer, action, when, baton](Status status) { + auto alarmTimer = weakTimer.lock(); + if (!alarmTimer) { + return; + } else { + stdx::lock_guard<stdx::mutex> lk(_inProgressMutex); + _inProgressAlarms.erase(alarmTimer); } - return; - } + auto nowVal = now(); + if (nowVal < when) { + warning() << "Alarm returned early. Expected at: " << when + << ", fired at: " << nowVal; + const auto status = setAlarm(when, std::move(action), baton); + if ((!status.isOK()) && (status != ErrorCodes::ShutdownInProgress)) { + fassertFailedWithStatus(50785, status); + } - if (status.isOK()) { - _reactor->schedule(transport::Reactor::kPost, std::move(action)); - } else if (status != ErrorCodes::CallbackCanceled) { - warning() << "setAlarm() received an error: " << status; - } - }); + return; + } + + if (status.isOK()) { + if (baton) { + baton->schedule(std::move(action)); + } else { + _reactor->schedule(transport::Reactor::kPost, std::move(action)); + } + } else if (status != ErrorCodes::CallbackCanceled) { + warning() << "setAlarm() received an error: " << status; + } + }); return Status::OK(); } diff --git a/src/mongo/executor/network_interface_tl.h b/src/mongo/executor/network_interface_tl.h index 8c4d4697d93..bb22407756c 100644 --- a/src/mongo/executor/network_interface_tl.h +++ b/src/mongo/executor/network_interface_tl.h @@ -37,6 +37,7 @@ #include "mongo/rpc/metadata/metadata_hook.h" #include "mongo/stdx/thread.h" #include "mongo/stdx/unordered_map.h" +#include "mongo/transport/baton.h" #include "mongo/transport/transport_layer.h" namespace mongo { @@ -49,6 +50,7 @@ public: ServiceContext* ctx, std::unique_ptr<NetworkConnectionHook> onConnectHook, std::unique_ptr<rpc::EgressMetadataHook> metadataHook); + std::string getDiagnosticString() override; void appendConnectionStats(ConnectionPoolStats* stats) const override; std::string getHostName() override; @@ -61,9 +63,14 @@ public: Date_t now() override; Status startCommand(const TaskExecutor::CallbackHandle& cbHandle, RemoteCommandRequest& request, - const RemoteCommandCompletionFn& onFinish) override; - void cancelCommand(const TaskExecutor::CallbackHandle& cbHandle) override; - Status setAlarm(Date_t when, const stdx::function<void()>& action) override; + const RemoteCommandCompletionFn& onFinish, + const transport::BatonHandle& baton) override; + + void cancelCommand(const TaskExecutor::CallbackHandle& cbHandle, + const transport::BatonHandle& baton) override; + Status setAlarm(Date_t when, + const stdx::function<void()>& action, + const transport::BatonHandle& baton) override; bool onNetworkThread() override; @@ -79,7 +86,18 @@ private: Date_t deadline = RemoteCommandRequest::kNoExpirationDate; Date_t start; - ConnectionPool::ConnectionHandle conn; + struct Deleter { + ConnectionPool::ConnectionHandleDeleter returner; + transport::ReactorHandle reactor; + + void operator()(ConnectionPool::ConnectionInterface* ptr) const { + reactor->schedule(transport::Reactor::kDispatch, + [ ret = returner, ptr ] { ret(ptr); }); + } + }; + using ConnHandle = std::unique_ptr<ConnectionPool::ConnectionInterface, Deleter>; + + ConnHandle conn; std::unique_ptr<transport::ReactorTimer> timer; AtomicBool done; @@ -89,7 +107,8 @@ private: void _eraseInUseConn(const TaskExecutor::CallbackHandle& handle); Future<RemoteCommandResponse> _onAcquireConn(std::shared_ptr<CommandState> state, - ConnectionPool::ConnectionHandle conn); + CommandState::ConnHandle conn, + const transport::BatonHandle& baton); std::string _instanceName; ServiceContext* _svcCtx; diff --git a/src/mongo/executor/task_executor.h b/src/mongo/executor/task_executor.h index 8d7c9cf72b7..d23069ba3ac 100644 --- a/src/mongo/executor/task_executor.h +++ b/src/mongo/executor/task_executor.h @@ -41,6 +41,7 @@ #include "mongo/platform/hash_namespace.h" #include "mongo/stdx/condition_variable.h" #include "mongo/stdx/functional.h" +#include "mongo/transport/baton.h" #include "mongo/util/future.h" #include "mongo/util/time_support.h" @@ -235,8 +236,10 @@ public: * Contract: Implementations should guarantee that callback should be called *after* doing any * processing related to the callback. */ - virtual StatusWith<CallbackHandle> scheduleRemoteCommand(const RemoteCommandRequest& request, - const RemoteCommandCallbackFn& cb) = 0; + virtual StatusWith<CallbackHandle> scheduleRemoteCommand( + const RemoteCommandRequest& request, + const RemoteCommandCallbackFn& cb, + const transport::BatonHandle& baton = nullptr) = 0; /** * If the callback referenced by "cbHandle" hasn't already executed, marks it as diff --git a/src/mongo/executor/task_executor_pool.cpp b/src/mongo/executor/task_executor_pool.cpp index 71d796a1a2b..24046a4fe75 100644 --- a/src/mongo/executor/task_executor_pool.cpp +++ b/src/mongo/executor/task_executor_pool.cpp @@ -42,7 +42,7 @@ namespace executor { // If less than or equal to 0, the suggested pool size will be determined by the number of cores. If // set to a particular positive value, this will be used as the pool size. -MONGO_EXPORT_SERVER_PARAMETER(taskExecutorPoolSize, int, 0); +MONGO_EXPORT_SERVER_PARAMETER(taskExecutorPoolSize, int, 1); size_t TaskExecutorPool::getSuggestedPoolSize() { auto poolSize = taskExecutorPoolSize.load(); diff --git a/src/mongo/executor/thread_pool_task_executor.cpp b/src/mongo/executor/thread_pool_task_executor.cpp index 202de888b5a..7d42893edb2 100644 --- a/src/mongo/executor/thread_pool_task_executor.cpp +++ b/src/mongo/executor/thread_pool_task_executor.cpp @@ -43,6 +43,7 @@ #include "mongo/executor/connection_pool_stats.h" #include "mongo/executor/network_interface.h" #include "mongo/platform/atomic_word.h" +#include "mongo/transport/baton.h" #include "mongo/util/concurrency/thread_pool_interface.h" #include "mongo/util/fail_point_service.h" #include "mongo/util/log.h" @@ -59,15 +60,17 @@ class ThreadPoolTaskExecutor::CallbackState : public TaskExecutor::CallbackState MONGO_DISALLOW_COPYING(CallbackState); public: - static std::shared_ptr<CallbackState> make(CallbackFn&& cb, Date_t readyDate) { - return std::make_shared<CallbackState>(std::move(cb), readyDate); + static std::shared_ptr<CallbackState> make(CallbackFn&& cb, + Date_t readyDate, + const transport::BatonHandle& baton) { + return std::make_shared<CallbackState>(std::move(cb), readyDate, baton); } /** * Do not call directly. Use make. */ - CallbackState(CallbackFn&& cb, Date_t theReadyDate) - : callback(std::move(cb)), readyDate(theReadyDate) {} + CallbackState(CallbackFn&& cb, Date_t theReadyDate, const transport::BatonHandle& baton) + : callback(std::move(cb)), readyDate(theReadyDate), baton(baton) {} virtual ~CallbackState() = default; @@ -94,6 +97,7 @@ public: bool isNetworkOperation = false; AtomicWord<bool> isFinished{false}; boost::optional<stdx::condition_variable> finishedCondition; + transport::BatonHandle baton; }; class ThreadPoolTaskExecutor::EventState : public TaskExecutor::EventState { @@ -125,7 +129,7 @@ public: }; ThreadPoolTaskExecutor::ThreadPoolTaskExecutor(std::unique_ptr<ThreadPoolInterface> pool, - std::unique_ptr<NetworkInterface> net) + std::shared_ptr<NetworkInterface> net) : _net(std::move(net)), _pool(std::move(pool)) {} ThreadPoolTaskExecutor::~ThreadPoolTaskExecutor() { @@ -272,7 +276,7 @@ StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::onEvent(const E if (!event.isValid()) { return {ErrorCodes::BadValue, "Passed invalid event handle to onEvent"}; } - auto wq = makeSingletonWorkQueue(work); + auto wq = makeSingletonWorkQueue(work, nullptr); stdx::unique_lock<stdx::mutex> lk(_mutex); auto eventState = checked_cast<EventState*>(getEventFromHandle(event)); auto cbHandle = enqueueCallbackState_inlock(&eventState->waiters, &wq); @@ -319,7 +323,7 @@ void ThreadPoolTaskExecutor::waitForEvent(const EventHandle& event) { StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::scheduleWork( const CallbackFn& work) { - auto wq = makeSingletonWorkQueue(work); + auto wq = makeSingletonWorkQueue(work, nullptr); WorkQueue temp; stdx::unique_lock<stdx::mutex> lk(_mutex); auto cbHandle = enqueueCallbackState_inlock(&temp, &wq); @@ -335,7 +339,7 @@ StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::scheduleWorkAt( if (when <= now()) { return scheduleWork(work); } - auto wq = makeSingletonWorkQueue(work, when); + auto wq = makeSingletonWorkQueue(work, nullptr, when); stdx::unique_lock<stdx::mutex> lk(_mutex); auto cbHandle = enqueueCallbackState_inlock(&_sleepersQueue, &wq); if (!cbHandle.isOK()) { @@ -354,7 +358,8 @@ StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::scheduleWorkAt( return; } scheduleIntoPool_inlock(&_sleepersQueue, cbState->iter, std::move(lk)); - }) + }, + nullptr) .transitional_ignore(); return cbHandle; @@ -389,7 +394,9 @@ void remoteCommandFailedEarly(const TaskExecutor::CallbackArgs& cbData, } // namespace StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::scheduleRemoteCommand( - const RemoteCommandRequest& request, const RemoteCommandCallbackFn& cb) { + const RemoteCommandRequest& request, + const RemoteCommandCallbackFn& cb, + const transport::BatonHandle& baton) { RemoteCommandRequest scheduledRequest = request; if (request.timeout == RemoteCommandRequest::kNoTimeout) { scheduledRequest.expirationDate = RemoteCommandRequest::kNoExpirationDate; @@ -399,9 +406,11 @@ StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::scheduleRemoteC // In case the request fails to even get a connection from the pool, // we wrap the callback in a method that prepares its input parameters. - auto wq = makeSingletonWorkQueue([scheduledRequest, cb](const CallbackArgs& cbData) { - remoteCommandFailedEarly(cbData, cb, scheduledRequest); - }); + auto wq = makeSingletonWorkQueue( + [scheduledRequest, cb](const CallbackArgs& cbData) { + remoteCommandFailedEarly(cbData, cb, scheduledRequest); + }, + baton); wq.front()->isNetworkOperation = true; stdx::unique_lock<stdx::mutex> lk(_mutex); auto cbHandle = enqueueCallbackState_inlock(&_networkInProgressQueue, &wq); @@ -427,7 +436,8 @@ StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::scheduleRemoteC : response.status.toString()); swap(cbState->callback, newCb); scheduleIntoPool_inlock(&_networkInProgressQueue, cbState->iter, std::move(lk)); - }) + }, + baton) .transitional_ignore(); return cbHandle; } @@ -442,7 +452,7 @@ void ThreadPoolTaskExecutor::cancel(const CallbackHandle& cbHandle) { cbState->canceled.store(1); if (cbState->isNetworkOperation) { lk.unlock(); - _net->cancelCommand(cbHandle); + _net->cancelCommand(cbHandle, cbState->baton); return; } if (cbState->readyDate != Date_t{}) { @@ -492,10 +502,10 @@ StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::enqueueCallback return cbHandle; } -ThreadPoolTaskExecutor::WorkQueue ThreadPoolTaskExecutor::makeSingletonWorkQueue(CallbackFn work, - Date_t when) { +ThreadPoolTaskExecutor::WorkQueue ThreadPoolTaskExecutor::makeSingletonWorkQueue( + CallbackFn work, const transport::BatonHandle& baton, Date_t when) { WorkQueue result; - result.emplace_front(CallbackState::make(std::move(work), when)); + result.emplace_front(CallbackState::make(std::move(work), when, baton)); result.front()->iter = result.begin(); return result; } @@ -547,10 +557,15 @@ void ThreadPoolTaskExecutor::scheduleIntoPool_inlock(WorkQueue* fromQueue, } for (const auto& cbState : todo) { - const auto status = _pool->schedule([this, cbState] { runCallback(std::move(cbState)); }); - if (status == ErrorCodes::ShutdownInProgress) - break; - fassert(28735, status); + if (cbState->baton) { + cbState->baton->schedule([this, cbState] { runCallback(std::move(cbState)); }); + } else { + const auto status = + _pool->schedule([this, cbState] { runCallback(std::move(cbState)); }); + if (status == ErrorCodes::ShutdownInProgress) + break; + fassert(28735, status); + } } _net->signalWorkAvailable(); } diff --git a/src/mongo/executor/thread_pool_task_executor.h b/src/mongo/executor/thread_pool_task_executor.h index 8e81e3a7f07..92b809bc0db 100644 --- a/src/mongo/executor/thread_pool_task_executor.h +++ b/src/mongo/executor/thread_pool_task_executor.h @@ -36,6 +36,7 @@ #include "mongo/stdx/list.h" #include "mongo/stdx/mutex.h" #include "mongo/stdx/thread.h" +#include "mongo/transport/baton.h" namespace mongo { @@ -58,7 +59,7 @@ public: * for network operations. */ ThreadPoolTaskExecutor(std::unique_ptr<ThreadPoolInterface> pool, - std::unique_ptr<NetworkInterface> net); + std::shared_ptr<NetworkInterface> net); /** * Destroys a ThreadPoolTaskExecutor. @@ -79,8 +80,10 @@ public: void waitForEvent(const EventHandle& event) override; StatusWith<CallbackHandle> scheduleWork(const CallbackFn& work) override; StatusWith<CallbackHandle> scheduleWorkAt(Date_t when, const CallbackFn& work) override; - StatusWith<CallbackHandle> scheduleRemoteCommand(const RemoteCommandRequest& request, - const RemoteCommandCallbackFn& cb) override; + StatusWith<CallbackHandle> scheduleRemoteCommand( + const RemoteCommandRequest& request, + const RemoteCommandCallbackFn& cb, + const transport::BatonHandle& baton = nullptr) override; void cancel(const CallbackHandle& cbHandle) override; void wait(const CallbackHandle& cbHandle) override; @@ -128,7 +131,9 @@ private: * executing "work" no sooner than "when" (defaults to ASAP). This function may and should be * called outside of _mutex. */ - static WorkQueue makeSingletonWorkQueue(CallbackFn work, Date_t when = {}); + static WorkQueue makeSingletonWorkQueue(CallbackFn work, + const transport::BatonHandle& baton, + Date_t when = {}); /** * Moves the single callback in "wq" to the end of "queue". It is required that "wq" was @@ -174,7 +179,7 @@ private: stdx::unique_lock<stdx::mutex> _join(stdx::unique_lock<stdx::mutex> lk); // The network interface used for remote command execution and waiting. - std::unique_ptr<NetworkInterface> _net; + std::shared_ptr<NetworkInterface> _net; // The thread pool that executes scheduled work items. std::unique_ptr<ThreadPoolInterface> _pool; |