summaryrefslogtreecommitdiff
path: root/src/mongo/executor
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/executor')
-rw-r--r--src/mongo/executor/connection_pool.cpp3
-rw-r--r--src/mongo/executor/connection_pool.h4
-rw-r--r--src/mongo/executor/connection_pool_tl.cpp13
-rw-r--r--src/mongo/executor/network_interface.h30
-rw-r--r--src/mongo/executor/network_interface_asio.cpp14
-rw-r--r--src/mongo/executor/network_interface_asio.h10
-rw-r--r--src/mongo/executor/network_interface_mock.cpp10
-rw-r--r--src/mongo/executor/network_interface_mock.h10
-rw-r--r--src/mongo/executor/network_interface_tl.cpp227
-rw-r--r--src/mongo/executor/network_interface_tl.h29
-rw-r--r--src/mongo/executor/task_executor.h7
-rw-r--r--src/mongo/executor/task_executor_pool.cpp2
-rw-r--r--src/mongo/executor/thread_pool_task_executor.cpp59
-rw-r--r--src/mongo/executor/thread_pool_task_executor.h15
14 files changed, 287 insertions, 146 deletions
diff --git a/src/mongo/executor/connection_pool.cpp b/src/mongo/executor/connection_pool.cpp
index 7092c460ed3..f8b5da5fb62 100644
--- a/src/mongo/executor/connection_pool.cpp
+++ b/src/mongo/executor/connection_pool.cpp
@@ -649,8 +649,7 @@ void ConnectionPool::SpecificPool::processFailure(const Status& status,
_readyPool.clear();
// Log something helpful
- log() << "Dropping all pooled connections to " << _hostAndPort
- << " due to failed operation on a connection";
+ log() << "Dropping all pooled connections to " << _hostAndPort << " due to " << status;
// Migrate processing connections to the dropped pool
for (auto&& x : _processingPool) {
diff --git a/src/mongo/executor/connection_pool.h b/src/mongo/executor/connection_pool.h
index 10a711261ef..160321bec9a 100644
--- a/src/mongo/executor/connection_pool.h
+++ b/src/mongo/executor/connection_pool.h
@@ -60,10 +60,10 @@ struct ConnectionPoolStats;
* HostAndPort. See comments on the various Options for how the pool operates.
*/
class ConnectionPool : public EgressTagCloser {
- class ConnectionHandleDeleter;
class SpecificPool;
public:
+ class ConnectionHandleDeleter;
class ConnectionInterface;
class DependentTypeFactoryInterface;
class TimerInterface;
@@ -175,7 +175,7 @@ public:
ConnectionHandleDeleter() = default;
ConnectionHandleDeleter(ConnectionPool* pool) : _pool(pool) {}
- void operator()(ConnectionInterface* connection) {
+ void operator()(ConnectionInterface* connection) const {
if (_pool && connection)
_pool->returnConnection(connection);
}
diff --git a/src/mongo/executor/connection_pool_tl.cpp b/src/mongo/executor/connection_pool_tl.cpp
index 3823a9a3e0a..907032c7262 100644
--- a/src/mongo/executor/connection_pool_tl.cpp
+++ b/src/mongo/executor/connection_pool_tl.cpp
@@ -50,7 +50,9 @@ struct TimeoutHandler {
void TLTimer::setTimeout(Milliseconds timeoutVal, TimeoutCallback cb) {
_timer->waitFor(timeoutVal).getAsync([cb = std::move(cb)](Status status) {
- if (status == ErrorCodes::CallbackCanceled) {
+ // TODO: verify why we still get broken promises when expliciting call stop and shutting
+ // down NITL's quickly.
+ if (status == ErrorCodes::CallbackCanceled || status == ErrorCodes::BrokenPromise) {
return;
}
@@ -125,10 +127,9 @@ void TLConnection::setup(Milliseconds timeout, SetupCallback cb) {
});
AsyncDBClient::connect(_peer, transport::kGlobalSSLMode, _serviceContext, _reactor)
- .onError(
- [this](StatusWith<AsyncDBClient::Handle> swc) -> StatusWith<AsyncDBClient::Handle> {
- return Status(ErrorCodes::HostUnreachable, swc.getStatus().reason());
- })
+ .onError([](StatusWith<AsyncDBClient::Handle> swc) -> StatusWith<AsyncDBClient::Handle> {
+ return Status(ErrorCodes::HostUnreachable, swc.getStatus().reason());
+ })
.then([this](AsyncDBClient::Handle client) {
_client = std::move(client);
return _client->initWireVersion("NetworkInterfaceTL", _onConnectHook);
@@ -186,7 +187,7 @@ void TLConnection::refresh(Milliseconds timeout, RefreshCallback cb) {
_client
->runCommandRequest(
{_peer, std::string("admin"), BSON("isMaster" << 1), BSONObj(), nullptr})
- .then([this](executor::RemoteCommandResponse response) {
+ .then([](executor::RemoteCommandResponse response) {
return Future<void>::makeReady(response.status);
})
.getAsync([this, handler](Status status) {
diff --git a/src/mongo/executor/network_interface.h b/src/mongo/executor/network_interface.h
index 48428b820e3..b333813df14 100644
--- a/src/mongo/executor/network_interface.h
+++ b/src/mongo/executor/network_interface.h
@@ -34,6 +34,8 @@
#include "mongo/base/disallow_copying.h"
#include "mongo/executor/task_executor.h"
#include "mongo/stdx/functional.h"
+#include "mongo/transport/baton.h"
+#include "mongo/util/future.h"
namespace mongo {
@@ -128,13 +130,33 @@ public:
*/
virtual Status startCommand(const TaskExecutor::CallbackHandle& cbHandle,
RemoteCommandRequest& request,
- const RemoteCommandCompletionFn& onFinish) = 0;
+ const RemoteCommandCompletionFn& onFinish,
+ const transport::BatonHandle& baton = nullptr) = 0;
+
+ Future<TaskExecutor::ResponseStatus> startCommand(
+ const TaskExecutor::CallbackHandle& cbHandle,
+ RemoteCommandRequest& request,
+ const transport::BatonHandle& baton = nullptr) {
+ Promise<TaskExecutor::ResponseStatus> promise;
+ auto future = promise.getFuture();
+
+ auto status =
+ startCommand(cbHandle,
+ request,
+ [sp = promise.share()](const TaskExecutor::ResponseStatus& rs) mutable {
+ sp.emplaceValue(rs);
+ },
+ baton);
+
+ return future;
+ }
/**
* Requests cancelation of the network activity associated with "cbHandle" if it has not yet
* completed.
*/
- virtual void cancelCommand(const TaskExecutor::CallbackHandle& cbHandle) = 0;
+ virtual void cancelCommand(const TaskExecutor::CallbackHandle& cbHandle,
+ const transport::BatonHandle& baton = nullptr) = 0;
/**
* Sets an alarm, which schedules "action" to run no sooner than "when".
@@ -150,7 +172,9 @@ public:
* Any callbacks invoked from setAlarm must observe onNetworkThread to
* return true. See that method for why.
*/
- virtual Status setAlarm(Date_t when, const stdx::function<void()>& action) = 0;
+ virtual Status setAlarm(Date_t when,
+ const stdx::function<void()>& action,
+ const transport::BatonHandle& baton = nullptr) = 0;
/**
* Returns true if called from a thread dedicated to networking. I.e. not a
diff --git a/src/mongo/executor/network_interface_asio.cpp b/src/mongo/executor/network_interface_asio.cpp
index 9fe1c9da4da..8d74d56656f 100644
--- a/src/mongo/executor/network_interface_asio.cpp
+++ b/src/mongo/executor/network_interface_asio.cpp
@@ -250,7 +250,8 @@ Status attachMetadataIfNeeded(RemoteCommandRequest& request,
Status NetworkInterfaceASIO::startCommand(const TaskExecutor::CallbackHandle& cbHandle,
RemoteCommandRequest& request,
- const RemoteCommandCompletionFn& onFinish) {
+ const RemoteCommandCompletionFn& onFinish,
+ const transport::BatonHandle& baton) {
MONGO_ASIO_INVARIANT(onFinish, "Invalid completion function");
{
stdx::lock_guard<stdx::mutex> lk(_inProgressMutex);
@@ -422,7 +423,8 @@ Status NetworkInterfaceASIO::startCommand(const TaskExecutor::CallbackHandle& cb
return Status::OK();
}
-void NetworkInterfaceASIO::cancelCommand(const TaskExecutor::CallbackHandle& cbHandle) {
+void NetworkInterfaceASIO::cancelCommand(const TaskExecutor::CallbackHandle& cbHandle,
+ const transport::BatonHandle& baton) {
stdx::lock_guard<stdx::mutex> lk(_inProgressMutex);
// If we found a matching cbHandle in _inGetConnection, then
@@ -447,7 +449,9 @@ void NetworkInterfaceASIO::cancelCommand(const TaskExecutor::CallbackHandle& cbH
}
}
-Status NetworkInterfaceASIO::setAlarm(Date_t when, const stdx::function<void()>& action) {
+Status NetworkInterfaceASIO::setAlarm(Date_t when,
+ const stdx::function<void()>& action,
+ const transport::BatonHandle& baton) {
if (inShutdown()) {
return {ErrorCodes::ShutdownInProgress, "NetworkInterfaceASIO shutdown in progress"};
}
@@ -462,12 +466,12 @@ Status NetworkInterfaceASIO::setAlarm(Date_t when, const stdx::function<void()>&
return exceptionToStatus();
}
- alarm->async_wait([alarm, this, action, when](std::error_code ec) {
+ alarm->async_wait([alarm, this, action, when, baton](std::error_code ec) {
const auto nowValue = now();
if (nowValue < when) {
warning() << "ASIO alarm returned early. Expected at: " << when
<< ", fired at: " << nowValue;
- const auto status = setAlarm(when, action);
+ const auto status = setAlarm(when, action, baton);
if ((!status.isOK()) && (status.code() != ErrorCodes::ShutdownInProgress)) {
fassertFailedWithStatus(40383, status);
}
diff --git a/src/mongo/executor/network_interface_asio.h b/src/mongo/executor/network_interface_asio.h
index 8e464754bfa..8f57e3d08b8 100644
--- a/src/mongo/executor/network_interface_asio.h
+++ b/src/mongo/executor/network_interface_asio.h
@@ -130,9 +130,13 @@ public:
Date_t now() override;
Status startCommand(const TaskExecutor::CallbackHandle& cbHandle,
RemoteCommandRequest& request,
- const RemoteCommandCompletionFn& onFinish) override;
- void cancelCommand(const TaskExecutor::CallbackHandle& cbHandle) override;
- Status setAlarm(Date_t when, const stdx::function<void()>& action) override;
+ const RemoteCommandCompletionFn& onFinish,
+ const transport::BatonHandle& baton = nullptr) override;
+ void cancelCommand(const TaskExecutor::CallbackHandle& cbHandle,
+ const transport::BatonHandle& baton = nullptr) override;
+ Status setAlarm(Date_t when,
+ const stdx::function<void()>& action,
+ const transport::BatonHandle& baton = nullptr) override;
bool onNetworkThread() override;
diff --git a/src/mongo/executor/network_interface_mock.cpp b/src/mongo/executor/network_interface_mock.cpp
index 39e8e4e2995..674dd8bb116 100644
--- a/src/mongo/executor/network_interface_mock.cpp
+++ b/src/mongo/executor/network_interface_mock.cpp
@@ -113,7 +113,8 @@ std::string NetworkInterfaceMock::getHostName() {
Status NetworkInterfaceMock::startCommand(const CallbackHandle& cbHandle,
RemoteCommandRequest& request,
- const RemoteCommandCompletionFn& onFinish) {
+ const RemoteCommandCompletionFn& onFinish,
+ const transport::BatonHandle& baton) {
if (inShutdown()) {
return {ErrorCodes::ShutdownInProgress, "NetworkInterfaceMock shutdown in progress"};
}
@@ -145,7 +146,8 @@ void NetworkInterfaceMock::setHandshakeReplyForHost(
}
}
-void NetworkInterfaceMock::cancelCommand(const CallbackHandle& cbHandle) {
+void NetworkInterfaceMock::cancelCommand(const CallbackHandle& cbHandle,
+ const transport::BatonHandle& baton) {
invariant(!inShutdown());
stdx::lock_guard<stdx::mutex> lk(_mutex);
@@ -175,7 +177,9 @@ void NetworkInterfaceMock::_interruptWithResponse_inlock(
}
}
-Status NetworkInterfaceMock::setAlarm(const Date_t when, const stdx::function<void()>& action) {
+Status NetworkInterfaceMock::setAlarm(const Date_t when,
+ const stdx::function<void()>& action,
+ const transport::BatonHandle& baton) {
if (inShutdown()) {
return {ErrorCodes::ShutdownInProgress, "NetworkInterfaceMock shutdown in progress"};
}
diff --git a/src/mongo/executor/network_interface_mock.h b/src/mongo/executor/network_interface_mock.h
index 554306397ba..76a9a6462a8 100644
--- a/src/mongo/executor/network_interface_mock.h
+++ b/src/mongo/executor/network_interface_mock.h
@@ -108,7 +108,8 @@ public:
virtual std::string getHostName();
virtual Status startCommand(const TaskExecutor::CallbackHandle& cbHandle,
RemoteCommandRequest& request,
- const RemoteCommandCompletionFn& onFinish);
+ const RemoteCommandCompletionFn& onFinish,
+ const transport::BatonHandle& baton = nullptr);
/**
* If the network operation is in the _unscheduled or _processing queues, moves the operation
@@ -116,12 +117,15 @@ public:
* the _scheduled queue, does nothing. The latter simulates the case where cancelCommand() is
* called after the task has already completed, but its callback has not yet been run.
*/
- virtual void cancelCommand(const TaskExecutor::CallbackHandle& cbHandle);
+ virtual void cancelCommand(const TaskExecutor::CallbackHandle& cbHandle,
+ const transport::BatonHandle& baton = nullptr);
/**
* Not implemented.
*/
- virtual Status setAlarm(Date_t when, const stdx::function<void()>& action);
+ virtual Status setAlarm(Date_t when,
+ const stdx::function<void()>& action,
+ const transport::BatonHandle& baton = nullptr);
virtual bool onNetworkThread();
diff --git a/src/mongo/executor/network_interface_tl.cpp b/src/mongo/executor/network_interface_tl.cpp
index eef2299e623..15947af2c0b 100644
--- a/src/mongo/executor/network_interface_tl.cpp
+++ b/src/mongo/executor/network_interface_tl.cpp
@@ -85,7 +85,7 @@ void NetworkInterfaceTL::startup() {
auto typeFactory = std::make_unique<connection_pool_tl::TLTypeFactory>(
_reactor, _tl, std::move(_onConnectHook));
_pool = std::make_unique<ConnectionPool>(
- std::move(typeFactory), "NetworkInterfaceTL", _connPoolOpts);
+ std::move(typeFactory), std::string("NetworkInterfaceTL-") + _instanceName, _connPoolOpts);
_ioThread = stdx::thread([this] {
setThreadName(_instanceName);
LOG(2) << "The NetworkInterfaceTL reactor thread is spinning up";
@@ -97,6 +97,7 @@ void NetworkInterfaceTL::shutdown() {
_inShutdown.store(true);
_reactor->stop();
_ioThread.join();
+ _pool.reset();
LOG(2) << "NetworkInterfaceTL shutdown successfully";
}
@@ -136,7 +137,8 @@ Date_t NetworkInterfaceTL::now() {
Status NetworkInterfaceTL::startCommand(const TaskExecutor::CallbackHandle& cbHandle,
RemoteCommandRequest& request,
- const RemoteCommandCompletionFn& onFinish) {
+ const RemoteCommandCompletionFn& onFinish,
+ const transport::BatonHandle& baton) {
if (inShutdown()) {
return {ErrorCodes::ShutdownInProgress, "NetworkInterface shutdown in progress"};
}
@@ -166,40 +168,86 @@ Status NetworkInterfaceTL::startCommand(const TaskExecutor::CallbackHandle& cbHa
state->deadline = state->start + state->request.timeout;
}
- _pool->get(request.target, request.timeout)
- .tapError([state](Status error) {
- LOG(2) << "Failed to get connection from pool for request " << state->request.id << ": "
- << error;
- })
- .then([this, state](ConnectionPool::ConnectionHandle conn) mutable {
- return _onAcquireConn(state, std::move(conn));
- })
- .onError([](Status error) -> StatusWith<RemoteCommandResponse> {
- // The TransportLayer has, for historical reasons returned SocketException for
- // network errors, but sharding assumes HostUnreachable on network errors.
- if (error == ErrorCodes::SocketException) {
- error = Status(ErrorCodes::HostUnreachable, error.reason());
- }
- return error;
- })
- .getAsync([this, state, onFinish](StatusWith<RemoteCommandResponse> response) {
- auto duration = now() - state->start;
- if (!response.isOK()) {
- onFinish(RemoteCommandResponse(response.getStatus(), duration));
- } else {
- auto rs = std::move(response.getValue());
- LOG(2) << "Request " << state->request.id << " finished with response: "
- << redact(rs.isOK() ? rs.data.toString() : rs.status.toString());
- onFinish(rs);
- }
+ // Interacting with the connection pool can involve more work than just getting a connection
+ // out. In particular, we can end up having to spin up new connections, and fulfilling promises
+ // for other requesters. Returning connections has the same issue.
+ //
+ // To work around it, we make sure to hop onto the reactor thread before getting a connection,
+ // then making sure to get back to the client thread to do the work (if on a baton). And we
+ // hook up a connection returning unique_ptr that ensures that however we exit, we always do the
+ // return on the reactor thread.
+ //
+ // TODO: get rid of this cruft once we have a connection pool that's executor aware.
+ auto connFuture = _reactor->execute([this, state, request, baton] {
+ return makeReadyFutureWith(
+ [this, request] { return _pool->get(request.target, request.timeout); })
+ .tapError([state](Status error) {
+ LOG(2) << "Failed to get connection from pool for request " << state->request.id
+ << ": " << error;
+ })
+ .then([this, baton](ConnectionPool::ConnectionHandle conn) {
+ auto deleter = conn.get_deleter();
+
+ // TODO: drop out this shared_ptr once we have a unique_function capable future
+ return std::make_shared<CommandState::ConnHandle>(
+ conn.release(), CommandState::Deleter{deleter, _reactor});
+ });
+ });
+
+ auto remainingWork = [this, state, baton, onFinish](
+ StatusWith<std::shared_ptr<CommandState::ConnHandle>> swConn) {
+ makeReadyFutureWith(
+ [&] { return _onAcquireConn(state, std::move(*uassertStatusOK(swConn)), baton); })
+ .onError([](Status error) -> StatusWith<RemoteCommandResponse> {
+ // The TransportLayer has, for historical reasons returned SocketException for
+ // network errors, but sharding assumes HostUnreachable on network errors.
+ if (error == ErrorCodes::SocketException) {
+ error = Status(ErrorCodes::HostUnreachable, error.reason());
+ }
+ return error;
+ })
+ .getAsync([this, state, onFinish](StatusWith<RemoteCommandResponse> response) {
+ auto duration = now() - state->start;
+ if (!response.isOK()) {
+ onFinish(RemoteCommandResponse(response.getStatus(), duration));
+ } else {
+ const auto& rs = response.getValue();
+ LOG(2) << "Request " << state->request.id << " finished with response: "
+ << redact(rs.isOK() ? rs.data.toString() : rs.status.toString());
+ onFinish(rs);
+ }
+ });
+ };
+
+ if (baton) {
+ // If we have a baton, we want to get back to the baton thread immediately after we get a
+ // connection
+ std::move(connFuture).getAsync([
+ baton,
+ rw = std::move(remainingWork)
+ ](StatusWith<std::shared_ptr<CommandState::ConnHandle>> swConn) mutable {
+ baton->schedule([ rw = std::move(rw), swConn = std::move(swConn) ]() mutable {
+ std::move(rw)(std::move(swConn));
+ });
});
+ } else {
+ // otherwise we're happy to run inline
+ std::move(connFuture)
+ .getAsync([rw = std::move(remainingWork)](
+ StatusWith<std::shared_ptr<CommandState::ConnHandle>> swConn) mutable {
+ std::move(rw)(std::move(swConn));
+ });
+ }
+
return Status::OK();
}
// This is only called from within a then() callback on a future, so throwing is equivalent to
// returning a ready Future with a not-OK status.
Future<RemoteCommandResponse> NetworkInterfaceTL::_onAcquireConn(
- std::shared_ptr<CommandState> state, ConnectionPool::ConnectionHandle conn) {
+ std::shared_ptr<CommandState> state,
+ CommandState::ConnHandle conn,
+ const transport::BatonHandle& baton) {
if (state->done.load()) {
conn->indicateSuccess();
uasserted(ErrorCodes::CallbackCanceled, "Command was canceled");
@@ -222,44 +270,42 @@ Future<RemoteCommandResponse> NetworkInterfaceTL::_onAcquireConn(
}
state->timer = _reactor->makeTimer();
- state->timer->waitUntil(state->deadline).getAsync([client, state](Status status) {
- if (status == ErrorCodes::CallbackCanceled) {
- invariant(state->done.load());
- return;
- }
+ state->timer->waitUntil(state->deadline, baton)
+ .getAsync([client, state, baton](Status status) {
+ if (status == ErrorCodes::CallbackCanceled) {
+ invariant(state->done.load());
+ return;
+ }
- if (state->done.swap(true)) {
- return;
- }
+ if (state->done.swap(true)) {
+ return;
+ }
- LOG(2) << "Request " << state->request.id << " timed out"
- << ", deadline was " << state->deadline << ", op was "
- << redact(state->request.toString());
- state->promise.setError(
- Status(ErrorCodes::NetworkInterfaceExceededTimeLimit, "timed out"));
+ LOG(2) << "Request " << state->request.id << " timed out"
+ << ", deadline was " << state->deadline << ", op was "
+ << redact(state->request.toString());
+ state->promise.setError(
+ Status(ErrorCodes::NetworkInterfaceExceededTimeLimit, "timed out"));
- client->cancel();
- });
+ client->cancel(baton);
+ });
}
- client->runCommandRequest(state->request)
+ client->runCommandRequest(state->request, baton)
.then([this, state](RemoteCommandResponse response) {
if (state->done.load()) {
uasserted(ErrorCodes::CallbackCanceled, "Callback was canceled");
}
- // TODO Investigate whether this is necessary here.
- return _reactor->execute([ this, state, response = std::move(response) ]() mutable {
- if (_metadataHook && response.status.isOK()) {
- auto target = state->conn->getHostAndPort().toString();
- response.status = _metadataHook->readReplyMetadata(
- nullptr, std::move(target), response.metadata);
- }
+ if (_metadataHook && response.status.isOK()) {
+ auto target = state->conn->getHostAndPort().toString();
+ response.status =
+ _metadataHook->readReplyMetadata(nullptr, std::move(target), response.metadata);
+ }
- return std::move(response);
- });
+ return RemoteCommandResponse(std::move(response));
})
- .getAsync([this, state](StatusWith<RemoteCommandResponse> swr) {
+ .getAsync([this, state, baton](StatusWith<RemoteCommandResponse> swr) {
_eraseInUseConn(state->cbHandle);
if (!swr.isOK()) {
state->conn->indicateFailure(swr.getStatus());
@@ -273,10 +319,10 @@ Future<RemoteCommandResponse> NetworkInterfaceTL::_onAcquireConn(
return;
if (state->timer) {
- state->timer->cancel();
+ state->timer->cancel(baton);
}
- state->promise.setWith([&] { return std::move(swr); });
+ state->promise.setFromStatusWith(std::move(swr));
});
return std::move(state->mergedFuture);
@@ -287,7 +333,8 @@ void NetworkInterfaceTL::_eraseInUseConn(const TaskExecutor::CallbackHandle& cbH
_inProgress.erase(cbHandle);
}
-void NetworkInterfaceTL::cancelCommand(const TaskExecutor::CallbackHandle& cbHandle) {
+void NetworkInterfaceTL::cancelCommand(const TaskExecutor::CallbackHandle& cbHandle,
+ const transport::BatonHandle& baton) {
stdx::unique_lock<stdx::mutex> lk(_inProgressMutex);
auto it = _inProgress.find(cbHandle);
if (it == _inProgress.end()) {
@@ -307,17 +354,23 @@ void NetworkInterfaceTL::cancelCommand(const TaskExecutor::CallbackHandle& cbHan
<< redact(state->request.toString())});
if (state->conn) {
auto client = checked_cast<connection_pool_tl::TLConnection*>(state->conn.get());
- client->client()->cancel();
+ client->client()->cancel(baton);
}
}
-Status NetworkInterfaceTL::setAlarm(Date_t when, const stdx::function<void()>& action) {
+Status NetworkInterfaceTL::setAlarm(Date_t when,
+ const stdx::function<void()>& action,
+ const transport::BatonHandle& baton) {
if (inShutdown()) {
return {ErrorCodes::ShutdownInProgress, "NetworkInterface shutdown in progress"};
}
if (when <= now()) {
- _reactor->schedule(transport::Reactor::kPost, std::move(action));
+ if (baton) {
+ baton->schedule(std::move(action));
+ } else {
+ _reactor->schedule(transport::Reactor::kPost, std::move(action));
+ }
return Status::OK();
}
@@ -329,32 +382,38 @@ Status NetworkInterfaceTL::setAlarm(Date_t when, const stdx::function<void()>& a
_inProgressAlarms.insert(alarmTimer);
}
- alarmTimer->waitUntil(when).getAsync([this, weakTimer, action, when](Status status) {
- auto alarmTimer = weakTimer.lock();
- if (!alarmTimer) {
- return;
- } else {
- stdx::lock_guard<stdx::mutex> lk(_inProgressMutex);
- _inProgressAlarms.erase(alarmTimer);
- }
-
- auto nowVal = now();
- if (nowVal < when) {
- warning() << "Alarm returned early. Expected at: " << when << ", fired at: " << nowVal;
- const auto status = setAlarm(when, std::move(action));
- if ((!status.isOK()) && (status != ErrorCodes::ShutdownInProgress)) {
- fassertFailedWithStatus(50785, status);
+ alarmTimer->waitUntil(when, baton)
+ .getAsync([this, weakTimer, action, when, baton](Status status) {
+ auto alarmTimer = weakTimer.lock();
+ if (!alarmTimer) {
+ return;
+ } else {
+ stdx::lock_guard<stdx::mutex> lk(_inProgressMutex);
+ _inProgressAlarms.erase(alarmTimer);
}
- return;
- }
+ auto nowVal = now();
+ if (nowVal < when) {
+ warning() << "Alarm returned early. Expected at: " << when
+ << ", fired at: " << nowVal;
+ const auto status = setAlarm(when, std::move(action), baton);
+ if ((!status.isOK()) && (status != ErrorCodes::ShutdownInProgress)) {
+ fassertFailedWithStatus(50785, status);
+ }
- if (status.isOK()) {
- _reactor->schedule(transport::Reactor::kPost, std::move(action));
- } else if (status != ErrorCodes::CallbackCanceled) {
- warning() << "setAlarm() received an error: " << status;
- }
- });
+ return;
+ }
+
+ if (status.isOK()) {
+ if (baton) {
+ baton->schedule(std::move(action));
+ } else {
+ _reactor->schedule(transport::Reactor::kPost, std::move(action));
+ }
+ } else if (status != ErrorCodes::CallbackCanceled) {
+ warning() << "setAlarm() received an error: " << status;
+ }
+ });
return Status::OK();
}
diff --git a/src/mongo/executor/network_interface_tl.h b/src/mongo/executor/network_interface_tl.h
index 8c4d4697d93..bb22407756c 100644
--- a/src/mongo/executor/network_interface_tl.h
+++ b/src/mongo/executor/network_interface_tl.h
@@ -37,6 +37,7 @@
#include "mongo/rpc/metadata/metadata_hook.h"
#include "mongo/stdx/thread.h"
#include "mongo/stdx/unordered_map.h"
+#include "mongo/transport/baton.h"
#include "mongo/transport/transport_layer.h"
namespace mongo {
@@ -49,6 +50,7 @@ public:
ServiceContext* ctx,
std::unique_ptr<NetworkConnectionHook> onConnectHook,
std::unique_ptr<rpc::EgressMetadataHook> metadataHook);
+
std::string getDiagnosticString() override;
void appendConnectionStats(ConnectionPoolStats* stats) const override;
std::string getHostName() override;
@@ -61,9 +63,14 @@ public:
Date_t now() override;
Status startCommand(const TaskExecutor::CallbackHandle& cbHandle,
RemoteCommandRequest& request,
- const RemoteCommandCompletionFn& onFinish) override;
- void cancelCommand(const TaskExecutor::CallbackHandle& cbHandle) override;
- Status setAlarm(Date_t when, const stdx::function<void()>& action) override;
+ const RemoteCommandCompletionFn& onFinish,
+ const transport::BatonHandle& baton) override;
+
+ void cancelCommand(const TaskExecutor::CallbackHandle& cbHandle,
+ const transport::BatonHandle& baton) override;
+ Status setAlarm(Date_t when,
+ const stdx::function<void()>& action,
+ const transport::BatonHandle& baton) override;
bool onNetworkThread() override;
@@ -79,7 +86,18 @@ private:
Date_t deadline = RemoteCommandRequest::kNoExpirationDate;
Date_t start;
- ConnectionPool::ConnectionHandle conn;
+ struct Deleter {
+ ConnectionPool::ConnectionHandleDeleter returner;
+ transport::ReactorHandle reactor;
+
+ void operator()(ConnectionPool::ConnectionInterface* ptr) const {
+ reactor->schedule(transport::Reactor::kDispatch,
+ [ ret = returner, ptr ] { ret(ptr); });
+ }
+ };
+ using ConnHandle = std::unique_ptr<ConnectionPool::ConnectionInterface, Deleter>;
+
+ ConnHandle conn;
std::unique_ptr<transport::ReactorTimer> timer;
AtomicBool done;
@@ -89,7 +107,8 @@ private:
void _eraseInUseConn(const TaskExecutor::CallbackHandle& handle);
Future<RemoteCommandResponse> _onAcquireConn(std::shared_ptr<CommandState> state,
- ConnectionPool::ConnectionHandle conn);
+ CommandState::ConnHandle conn,
+ const transport::BatonHandle& baton);
std::string _instanceName;
ServiceContext* _svcCtx;
diff --git a/src/mongo/executor/task_executor.h b/src/mongo/executor/task_executor.h
index 8d7c9cf72b7..d23069ba3ac 100644
--- a/src/mongo/executor/task_executor.h
+++ b/src/mongo/executor/task_executor.h
@@ -41,6 +41,7 @@
#include "mongo/platform/hash_namespace.h"
#include "mongo/stdx/condition_variable.h"
#include "mongo/stdx/functional.h"
+#include "mongo/transport/baton.h"
#include "mongo/util/future.h"
#include "mongo/util/time_support.h"
@@ -235,8 +236,10 @@ public:
* Contract: Implementations should guarantee that callback should be called *after* doing any
* processing related to the callback.
*/
- virtual StatusWith<CallbackHandle> scheduleRemoteCommand(const RemoteCommandRequest& request,
- const RemoteCommandCallbackFn& cb) = 0;
+ virtual StatusWith<CallbackHandle> scheduleRemoteCommand(
+ const RemoteCommandRequest& request,
+ const RemoteCommandCallbackFn& cb,
+ const transport::BatonHandle& baton = nullptr) = 0;
/**
* If the callback referenced by "cbHandle" hasn't already executed, marks it as
diff --git a/src/mongo/executor/task_executor_pool.cpp b/src/mongo/executor/task_executor_pool.cpp
index 71d796a1a2b..24046a4fe75 100644
--- a/src/mongo/executor/task_executor_pool.cpp
+++ b/src/mongo/executor/task_executor_pool.cpp
@@ -42,7 +42,7 @@ namespace executor {
// If less than or equal to 0, the suggested pool size will be determined by the number of cores. If
// set to a particular positive value, this will be used as the pool size.
-MONGO_EXPORT_SERVER_PARAMETER(taskExecutorPoolSize, int, 0);
+MONGO_EXPORT_SERVER_PARAMETER(taskExecutorPoolSize, int, 1);
size_t TaskExecutorPool::getSuggestedPoolSize() {
auto poolSize = taskExecutorPoolSize.load();
diff --git a/src/mongo/executor/thread_pool_task_executor.cpp b/src/mongo/executor/thread_pool_task_executor.cpp
index 202de888b5a..7d42893edb2 100644
--- a/src/mongo/executor/thread_pool_task_executor.cpp
+++ b/src/mongo/executor/thread_pool_task_executor.cpp
@@ -43,6 +43,7 @@
#include "mongo/executor/connection_pool_stats.h"
#include "mongo/executor/network_interface.h"
#include "mongo/platform/atomic_word.h"
+#include "mongo/transport/baton.h"
#include "mongo/util/concurrency/thread_pool_interface.h"
#include "mongo/util/fail_point_service.h"
#include "mongo/util/log.h"
@@ -59,15 +60,17 @@ class ThreadPoolTaskExecutor::CallbackState : public TaskExecutor::CallbackState
MONGO_DISALLOW_COPYING(CallbackState);
public:
- static std::shared_ptr<CallbackState> make(CallbackFn&& cb, Date_t readyDate) {
- return std::make_shared<CallbackState>(std::move(cb), readyDate);
+ static std::shared_ptr<CallbackState> make(CallbackFn&& cb,
+ Date_t readyDate,
+ const transport::BatonHandle& baton) {
+ return std::make_shared<CallbackState>(std::move(cb), readyDate, baton);
}
/**
* Do not call directly. Use make.
*/
- CallbackState(CallbackFn&& cb, Date_t theReadyDate)
- : callback(std::move(cb)), readyDate(theReadyDate) {}
+ CallbackState(CallbackFn&& cb, Date_t theReadyDate, const transport::BatonHandle& baton)
+ : callback(std::move(cb)), readyDate(theReadyDate), baton(baton) {}
virtual ~CallbackState() = default;
@@ -94,6 +97,7 @@ public:
bool isNetworkOperation = false;
AtomicWord<bool> isFinished{false};
boost::optional<stdx::condition_variable> finishedCondition;
+ transport::BatonHandle baton;
};
class ThreadPoolTaskExecutor::EventState : public TaskExecutor::EventState {
@@ -125,7 +129,7 @@ public:
};
ThreadPoolTaskExecutor::ThreadPoolTaskExecutor(std::unique_ptr<ThreadPoolInterface> pool,
- std::unique_ptr<NetworkInterface> net)
+ std::shared_ptr<NetworkInterface> net)
: _net(std::move(net)), _pool(std::move(pool)) {}
ThreadPoolTaskExecutor::~ThreadPoolTaskExecutor() {
@@ -272,7 +276,7 @@ StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::onEvent(const E
if (!event.isValid()) {
return {ErrorCodes::BadValue, "Passed invalid event handle to onEvent"};
}
- auto wq = makeSingletonWorkQueue(work);
+ auto wq = makeSingletonWorkQueue(work, nullptr);
stdx::unique_lock<stdx::mutex> lk(_mutex);
auto eventState = checked_cast<EventState*>(getEventFromHandle(event));
auto cbHandle = enqueueCallbackState_inlock(&eventState->waiters, &wq);
@@ -319,7 +323,7 @@ void ThreadPoolTaskExecutor::waitForEvent(const EventHandle& event) {
StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::scheduleWork(
const CallbackFn& work) {
- auto wq = makeSingletonWorkQueue(work);
+ auto wq = makeSingletonWorkQueue(work, nullptr);
WorkQueue temp;
stdx::unique_lock<stdx::mutex> lk(_mutex);
auto cbHandle = enqueueCallbackState_inlock(&temp, &wq);
@@ -335,7 +339,7 @@ StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::scheduleWorkAt(
if (when <= now()) {
return scheduleWork(work);
}
- auto wq = makeSingletonWorkQueue(work, when);
+ auto wq = makeSingletonWorkQueue(work, nullptr, when);
stdx::unique_lock<stdx::mutex> lk(_mutex);
auto cbHandle = enqueueCallbackState_inlock(&_sleepersQueue, &wq);
if (!cbHandle.isOK()) {
@@ -354,7 +358,8 @@ StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::scheduleWorkAt(
return;
}
scheduleIntoPool_inlock(&_sleepersQueue, cbState->iter, std::move(lk));
- })
+ },
+ nullptr)
.transitional_ignore();
return cbHandle;
@@ -389,7 +394,9 @@ void remoteCommandFailedEarly(const TaskExecutor::CallbackArgs& cbData,
} // namespace
StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::scheduleRemoteCommand(
- const RemoteCommandRequest& request, const RemoteCommandCallbackFn& cb) {
+ const RemoteCommandRequest& request,
+ const RemoteCommandCallbackFn& cb,
+ const transport::BatonHandle& baton) {
RemoteCommandRequest scheduledRequest = request;
if (request.timeout == RemoteCommandRequest::kNoTimeout) {
scheduledRequest.expirationDate = RemoteCommandRequest::kNoExpirationDate;
@@ -399,9 +406,11 @@ StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::scheduleRemoteC
// In case the request fails to even get a connection from the pool,
// we wrap the callback in a method that prepares its input parameters.
- auto wq = makeSingletonWorkQueue([scheduledRequest, cb](const CallbackArgs& cbData) {
- remoteCommandFailedEarly(cbData, cb, scheduledRequest);
- });
+ auto wq = makeSingletonWorkQueue(
+ [scheduledRequest, cb](const CallbackArgs& cbData) {
+ remoteCommandFailedEarly(cbData, cb, scheduledRequest);
+ },
+ baton);
wq.front()->isNetworkOperation = true;
stdx::unique_lock<stdx::mutex> lk(_mutex);
auto cbHandle = enqueueCallbackState_inlock(&_networkInProgressQueue, &wq);
@@ -427,7 +436,8 @@ StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::scheduleRemoteC
: response.status.toString());
swap(cbState->callback, newCb);
scheduleIntoPool_inlock(&_networkInProgressQueue, cbState->iter, std::move(lk));
- })
+ },
+ baton)
.transitional_ignore();
return cbHandle;
}
@@ -442,7 +452,7 @@ void ThreadPoolTaskExecutor::cancel(const CallbackHandle& cbHandle) {
cbState->canceled.store(1);
if (cbState->isNetworkOperation) {
lk.unlock();
- _net->cancelCommand(cbHandle);
+ _net->cancelCommand(cbHandle, cbState->baton);
return;
}
if (cbState->readyDate != Date_t{}) {
@@ -492,10 +502,10 @@ StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::enqueueCallback
return cbHandle;
}
-ThreadPoolTaskExecutor::WorkQueue ThreadPoolTaskExecutor::makeSingletonWorkQueue(CallbackFn work,
- Date_t when) {
+ThreadPoolTaskExecutor::WorkQueue ThreadPoolTaskExecutor::makeSingletonWorkQueue(
+ CallbackFn work, const transport::BatonHandle& baton, Date_t when) {
WorkQueue result;
- result.emplace_front(CallbackState::make(std::move(work), when));
+ result.emplace_front(CallbackState::make(std::move(work), when, baton));
result.front()->iter = result.begin();
return result;
}
@@ -547,10 +557,15 @@ void ThreadPoolTaskExecutor::scheduleIntoPool_inlock(WorkQueue* fromQueue,
}
for (const auto& cbState : todo) {
- const auto status = _pool->schedule([this, cbState] { runCallback(std::move(cbState)); });
- if (status == ErrorCodes::ShutdownInProgress)
- break;
- fassert(28735, status);
+ if (cbState->baton) {
+ cbState->baton->schedule([this, cbState] { runCallback(std::move(cbState)); });
+ } else {
+ const auto status =
+ _pool->schedule([this, cbState] { runCallback(std::move(cbState)); });
+ if (status == ErrorCodes::ShutdownInProgress)
+ break;
+ fassert(28735, status);
+ }
}
_net->signalWorkAvailable();
}
diff --git a/src/mongo/executor/thread_pool_task_executor.h b/src/mongo/executor/thread_pool_task_executor.h
index 8e81e3a7f07..92b809bc0db 100644
--- a/src/mongo/executor/thread_pool_task_executor.h
+++ b/src/mongo/executor/thread_pool_task_executor.h
@@ -36,6 +36,7 @@
#include "mongo/stdx/list.h"
#include "mongo/stdx/mutex.h"
#include "mongo/stdx/thread.h"
+#include "mongo/transport/baton.h"
namespace mongo {
@@ -58,7 +59,7 @@ public:
* for network operations.
*/
ThreadPoolTaskExecutor(std::unique_ptr<ThreadPoolInterface> pool,
- std::unique_ptr<NetworkInterface> net);
+ std::shared_ptr<NetworkInterface> net);
/**
* Destroys a ThreadPoolTaskExecutor.
@@ -79,8 +80,10 @@ public:
void waitForEvent(const EventHandle& event) override;
StatusWith<CallbackHandle> scheduleWork(const CallbackFn& work) override;
StatusWith<CallbackHandle> scheduleWorkAt(Date_t when, const CallbackFn& work) override;
- StatusWith<CallbackHandle> scheduleRemoteCommand(const RemoteCommandRequest& request,
- const RemoteCommandCallbackFn& cb) override;
+ StatusWith<CallbackHandle> scheduleRemoteCommand(
+ const RemoteCommandRequest& request,
+ const RemoteCommandCallbackFn& cb,
+ const transport::BatonHandle& baton = nullptr) override;
void cancel(const CallbackHandle& cbHandle) override;
void wait(const CallbackHandle& cbHandle) override;
@@ -128,7 +131,9 @@ private:
* executing "work" no sooner than "when" (defaults to ASAP). This function may and should be
* called outside of _mutex.
*/
- static WorkQueue makeSingletonWorkQueue(CallbackFn work, Date_t when = {});
+ static WorkQueue makeSingletonWorkQueue(CallbackFn work,
+ const transport::BatonHandle& baton,
+ Date_t when = {});
/**
* Moves the single callback in "wq" to the end of "queue". It is required that "wq" was
@@ -174,7 +179,7 @@ private:
stdx::unique_lock<stdx::mutex> _join(stdx::unique_lock<stdx::mutex> lk);
// The network interface used for remote command execution and waiting.
- std::unique_ptr<NetworkInterface> _net;
+ std::shared_ptr<NetworkInterface> _net;
// The thread pool that executes scheduled work items.
std::unique_ptr<ThreadPoolInterface> _pool;