summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJason Carey <jcarey@argv.me>2018-03-21 00:15:35 -0400
committerJason Carey <jcarey@argv.me>2018-04-27 19:49:28 -0400
commit4ddf18bcf4d517c3dc0f005f9222ffaab9a86ffa (patch)
tree438865c1065d0a96c427b1ed3a89e5163d85699a
parent91eaa878c4feeebd9397c49180631fc719238aaf (diff)
downloadmongo-4ddf18bcf4d517c3dc0f005f9222ffaab9a86ffa.tar.gz
SERVER-34739 Migrate to 1 connpool in ARS
Migrate to 1 connection pool in mongos. This change involves the introduction of a transport layer baton, which improves perf for a particular transport layer when doing local scatter/gather operations.
-rw-r--r--src/mongo/base/checked_cast.h15
-rw-r--r--src/mongo/client/async_client.cpp24
-rw-r--r--src/mongo/client/async_client.h10
-rw-r--r--src/mongo/client/remote_command_retry_scheduler_test.cpp6
-rw-r--r--src/mongo/db/catalog/database_impl.cpp1
-rw-r--r--src/mongo/db/db.cpp3
-rw-r--r--src/mongo/db/free_mon/free_mon_mongod.cpp3
-rw-r--r--src/mongo/db/operation_context.cpp6
-rw-r--r--src/mongo/db/operation_context.h17
-rw-r--r--src/mongo/db/repl/abstract_oplog_fetcher_test.cpp6
-rw-r--r--src/mongo/db/repl/collection_cloner_test.cpp8
-rw-r--r--src/mongo/db/repl/database_cloner_test.cpp8
-rw-r--r--src/mongo/db/repl/databases_cloner_test.cpp8
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.cpp2
-rw-r--r--src/mongo/db/repl/reporter_test.cpp3
-rw-r--r--src/mongo/db/repl/sync_source_resolver_test.cpp8
-rw-r--r--src/mongo/db/repl/task_executor_mock.cpp6
-rw-r--r--src/mongo/db/repl/task_executor_mock.h6
-rw-r--r--src/mongo/executor/connection_pool.cpp3
-rw-r--r--src/mongo/executor/connection_pool.h4
-rw-r--r--src/mongo/executor/connection_pool_tl.cpp13
-rw-r--r--src/mongo/executor/network_interface.h30
-rw-r--r--src/mongo/executor/network_interface_asio.cpp14
-rw-r--r--src/mongo/executor/network_interface_asio.h10
-rw-r--r--src/mongo/executor/network_interface_mock.cpp10
-rw-r--r--src/mongo/executor/network_interface_mock.h10
-rw-r--r--src/mongo/executor/network_interface_tl.cpp227
-rw-r--r--src/mongo/executor/network_interface_tl.h29
-rw-r--r--src/mongo/executor/task_executor.h7
-rw-r--r--src/mongo/executor/task_executor_pool.cpp2
-rw-r--r--src/mongo/executor/thread_pool_task_executor.cpp59
-rw-r--r--src/mongo/executor/thread_pool_task_executor.h15
-rw-r--r--src/mongo/s/async_requests_sender.cpp109
-rw-r--r--src/mongo/s/async_requests_sender.h45
-rw-r--r--src/mongo/s/sharding_egress_metadata_hook_for_mongos.cpp35
-rw-r--r--src/mongo/s/sharding_initialization.cpp12
-rw-r--r--src/mongo/s/sharding_task_executor.cpp8
-rw-r--r--src/mongo/s/sharding_task_executor.h6
-rw-r--r--src/mongo/transport/baton.h133
-rw-r--r--src/mongo/transport/baton_asio_linux.h424
-rw-r--r--src/mongo/transport/mock_session.h7
-rw-r--r--src/mongo/transport/session.h9
-rw-r--r--src/mongo/transport/session_asio.h151
-rw-r--r--src/mongo/transport/transport_layer.h13
-rw-r--r--src/mongo/transport/transport_layer_asio.cpp88
-rw-r--r--src/mongo/transport/transport_layer_asio.h3
-rw-r--r--src/mongo/transport/transport_layer_manager.h7
-rw-r--r--src/mongo/unittest/task_executor_proxy.cpp6
-rw-r--r--src/mongo/unittest/task_executor_proxy.h4
-rw-r--r--src/mongo/util/future.h23
50 files changed, 1319 insertions, 337 deletions
diff --git a/src/mongo/base/checked_cast.h b/src/mongo/base/checked_cast.h
index e434a4c6e12..68b5887a7f6 100644
--- a/src/mongo/base/checked_cast.h
+++ b/src/mongo/base/checked_cast.h
@@ -50,6 +50,11 @@ struct checked_cast_impl<false> {
static T cast(const U& u) {
return static_cast<T>(u);
}
+
+ template <typename T, typename U>
+ static T cast(U& u) {
+ return static_cast<T>(u);
+ }
};
template <>
@@ -68,6 +73,11 @@ struct checked_cast_impl<true> {
static T cast(const U& u) {
return dynamic_cast<T>(u);
}
+
+ template <typename T, typename U>
+ static T cast(U& u) {
+ return dynamic_cast<T>(u);
+ }
};
template <typename T, typename U>
@@ -75,6 +85,11 @@ T checked_cast(const U& u) {
return checked_cast_impl<kDebugBuild>::cast<T>(u);
};
+template <typename T, typename U>
+T checked_cast(U& u) {
+ return checked_cast_impl<kDebugBuild>::cast<T>(u);
+};
+
/**
* Similar to static_pointer_cast, but in debug builds uses RTTI to confirm that the cast
* is legal at runtime.
diff --git a/src/mongo/client/async_client.cpp b/src/mongo/client/async_client.cpp
index c1499fe394d..44b3d91e521 100644
--- a/src/mongo/client/async_client.cpp
+++ b/src/mongo/client/async_client.cpp
@@ -190,7 +190,7 @@ Future<void> AsyncDBClient::initWireVersion(const std::string& appName,
});
}
-Future<Message> AsyncDBClient::_call(Message request) {
+Future<Message> AsyncDBClient::_call(Message request, const transport::BatonHandle& baton) {
auto swm = _compressorManager.compressMessage(request);
if (!swm.isOK()) {
return swm.getStatus();
@@ -201,8 +201,8 @@ Future<Message> AsyncDBClient::_call(Message request) {
request.header().setId(msgId);
request.header().setResponseToMsgId(0);
- return _session->asyncSinkMessage(request)
- .then([this] { return _session->asyncSourceMessage(); })
+ return _session->asyncSinkMessage(request, baton)
+ .then([this, baton] { return _session->asyncSourceMessage(baton); })
.then([this, msgId](Message response) -> StatusWith<Message> {
uassert(50787,
"ResponseId did not match sent message ID.",
@@ -216,21 +216,23 @@ Future<Message> AsyncDBClient::_call(Message request) {
});
}
-Future<rpc::UniqueReply> AsyncDBClient::runCommand(OpMsgRequest request) {
+Future<rpc::UniqueReply> AsyncDBClient::runCommand(OpMsgRequest request,
+ const transport::BatonHandle& baton) {
invariant(_negotiatedProtocol);
auto requestMsg = rpc::messageFromOpMsgRequest(*_negotiatedProtocol, std::move(request));
- return _call(std::move(requestMsg)).then([this](Message response) -> Future<rpc::UniqueReply> {
- return rpc::UniqueReply(response, rpc::makeReply(&response));
- });
+ return _call(std::move(requestMsg), baton)
+ .then([this](Message response) -> Future<rpc::UniqueReply> {
+ return rpc::UniqueReply(response, rpc::makeReply(&response));
+ });
}
Future<executor::RemoteCommandResponse> AsyncDBClient::runCommandRequest(
- executor::RemoteCommandRequest request) {
+ executor::RemoteCommandRequest request, const transport::BatonHandle& baton) {
auto clkSource = _svcCtx->getPreciseClockSource();
auto start = clkSource->now();
auto opMsgRequest = OpMsgRequest::fromDBAndBody(
std::move(request.dbname), std::move(request.cmdObj), std::move(request.metadata));
- return runCommand(std::move(opMsgRequest))
+ return runCommand(std::move(opMsgRequest), baton)
.then([start, clkSource, this](rpc::UniqueReply response) {
auto duration = duration_cast<Milliseconds>(clkSource->now() - start);
return executor::RemoteCommandResponse(*response, duration);
@@ -241,8 +243,8 @@ Future<executor::RemoteCommandResponse> AsyncDBClient::runCommandRequest(
});
}
-void AsyncDBClient::cancel() {
- _session->cancelAsyncOperations();
+void AsyncDBClient::cancel(const transport::BatonHandle& baton) {
+ _session->cancelAsyncOperations(baton);
}
bool AsyncDBClient::isStillConnected() {
diff --git a/src/mongo/client/async_client.h b/src/mongo/client/async_client.h
index ee7239a1d03..ec34dca021e 100644
--- a/src/mongo/client/async_client.h
+++ b/src/mongo/client/async_client.h
@@ -36,6 +36,7 @@
#include "mongo/executor/remote_command_response.h"
#include "mongo/rpc/protocol.h"
#include "mongo/rpc/unique_message.h"
+#include "mongo/transport/baton.h"
#include "mongo/transport/message_compressor_manager.h"
#include "mongo/transport/transport_layer.h"
#include "mongo/util/future.h"
@@ -57,15 +58,16 @@ public:
transport::ReactorHandle reactor);
Future<executor::RemoteCommandResponse> runCommandRequest(
- executor::RemoteCommandRequest request);
- Future<rpc::UniqueReply> runCommand(OpMsgRequest request);
+ executor::RemoteCommandRequest request, const transport::BatonHandle& baton = nullptr);
+ Future<rpc::UniqueReply> runCommand(OpMsgRequest request,
+ const transport::BatonHandle& baton = nullptr);
Future<void> authenticate(const BSONObj& params);
Future<void> initWireVersion(const std::string& appName,
executor::NetworkConnectionHook* const hook);
- void cancel();
+ void cancel(const transport::BatonHandle& baton = nullptr);
bool isStillConnected();
@@ -75,7 +77,7 @@ public:
const HostAndPort& local() const;
private:
- Future<Message> _call(Message request);
+ Future<Message> _call(Message request, const transport::BatonHandle& baton = nullptr);
BSONObj _buildIsMasterRequest(const std::string& appName);
void _parseIsMasterResponse(BSONObj request,
const std::unique_ptr<rpc::ReplyInterface>& response);
diff --git a/src/mongo/client/remote_command_retry_scheduler_test.cpp b/src/mongo/client/remote_command_retry_scheduler_test.cpp
index dca50732e93..a3c23c6b05d 100644
--- a/src/mongo/client/remote_command_retry_scheduler_test.cpp
+++ b/src/mongo/client/remote_command_retry_scheduler_test.cpp
@@ -90,12 +90,14 @@ public:
TaskExecutorWithFailureInScheduleRemoteCommand(executor::TaskExecutor* executor)
: unittest::TaskExecutorProxy(executor) {}
virtual StatusWith<executor::TaskExecutor::CallbackHandle> scheduleRemoteCommand(
- const executor::RemoteCommandRequest& request, const RemoteCommandCallbackFn& cb) override {
+ const executor::RemoteCommandRequest& request,
+ const RemoteCommandCallbackFn& cb,
+ const transport::BatonHandle& baton = nullptr) override {
if (scheduleRemoteCommandFailPoint) {
return Status(ErrorCodes::ShutdownInProgress,
"failed to send remote command - shutdown in progress");
}
- return getExecutor()->scheduleRemoteCommand(request, cb);
+ return getExecutor()->scheduleRemoteCommand(request, cb, baton);
}
bool scheduleRemoteCommandFailPoint = false;
diff --git a/src/mongo/db/catalog/database_impl.cpp b/src/mongo/db/catalog/database_impl.cpp
index 44e229aaf26..4d195d3cf58 100644
--- a/src/mongo/db/catalog/database_impl.cpp
+++ b/src/mongo/db/catalog/database_impl.cpp
@@ -895,7 +895,6 @@ StatusWith<NamespaceString> DatabaseImpl::makeUniqueCollectionNamespace(
}
if (!_uniqueCollectionNamespacePseudoRandom) {
- Timestamp ts;
_uniqueCollectionNamespacePseudoRandom =
std::make_unique<PseudoRandom>(Date_t::now().asInt64());
}
diff --git a/src/mongo/db/db.cpp b/src/mongo/db/db.cpp
index 743a4aef4a4..5c4019e18d2 100644
--- a/src/mongo/db/db.cpp
+++ b/src/mongo/db/db.cpp
@@ -767,8 +767,7 @@ auto makeReplicationExecutor(ServiceContext* serviceContext) {
hookList->addHook(stdx::make_unique<rpc::LogicalTimeMetadataHook>(serviceContext));
return stdx::make_unique<executor::ThreadPoolTaskExecutor>(
stdx::make_unique<ThreadPool>(tpOptions),
- executor::makeNetworkInterface(
- "NetworkInterfaceASIO-Replication", nullptr, std::move(hookList)));
+ executor::makeNetworkInterface("Replication", nullptr, std::move(hookList)));
}
MONGO_INITIALIZER_WITH_PREREQUISITES(CreateReplicationManager, ("SSLManager", "default"))
diff --git a/src/mongo/db/free_mon/free_mon_mongod.cpp b/src/mongo/db/free_mon/free_mon_mongod.cpp
index df72066aabf..6d8f2042604 100644
--- a/src/mongo/db/free_mon/free_mon_mongod.cpp
+++ b/src/mongo/db/free_mon/free_mon_mongod.cpp
@@ -245,8 +245,7 @@ auto makeTaskExecutor(ServiceContext* /*serviceContext*/) {
Client::initThread(threadName.c_str());
};
return stdx::make_unique<executor::ThreadPoolTaskExecutor>(
- std::make_unique<ThreadPool>(tpOptions),
- executor::makeNetworkInterface("NetworkInterfaceASIO-FreeMon"));
+ std::make_unique<ThreadPool>(tpOptions), executor::makeNetworkInterface("FreeMon"));
}
void registerCollectors(FreeMonController* controller) {
diff --git a/src/mongo/db/operation_context.cpp b/src/mongo/db/operation_context.cpp
index c7cf9514882..50209b8d0ff 100644
--- a/src/mongo/db/operation_context.cpp
+++ b/src/mongo/db/operation_context.cpp
@@ -37,6 +37,7 @@
#include "mongo/db/service_context.h"
#include "mongo/platform/random.h"
#include "mongo/stdx/mutex.h"
+#include "mongo/transport/baton.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/clock_source.h"
#include "mongo/util/fail_point_service.h"
@@ -356,6 +357,11 @@ void OperationContext::markKilled(ErrorCodes::Error killCode) {
invariant(_waitCV);
_waitCV->notify_all();
}
+
+ // If we have a baton, we need to wake it up. The baton itself will check for interruption
+ if (_baton) {
+ _baton->schedule([] {});
+ }
}
void OperationContext::setLogicalSessionId(LogicalSessionId lsid) {
diff --git a/src/mongo/db/operation_context.h b/src/mongo/db/operation_context.h
index 07980165e33..c188f43c103 100644
--- a/src/mongo/db/operation_context.h
+++ b/src/mongo/db/operation_context.h
@@ -277,6 +277,20 @@ public:
}
/**
+ * Sets a transport Baton on the operation. This will trigger the Baton on markKilled.
+ */
+ void setBaton(const transport::BatonHandle& baton) {
+ _baton = baton;
+ }
+
+ /**
+ * Retrieves the baton associated with the operation.
+ */
+ const transport::BatonHandle& getBaton() const {
+ return _baton;
+ }
+
+ /**
* Associates a transaction number with this operation context. May only be called once for the
* lifetime of the operation and the operation must have a logical session id assigned.
*/
@@ -465,6 +479,9 @@ private:
// once from OK to some kill code.
AtomicWord<ErrorCodes::Error> _killCode{ErrorCodes::OK};
+ // A transport Baton associated with the operation. The presence of this object implies that a
+ // client thread is doing it's own async networking by blocking on it's own thread.
+ transport::BatonHandle _baton;
// If non-null, _waitMutex and _waitCV are the (mutex, condition variable) pair that the
// operation is currently waiting on inside a call to waitForConditionOrInterrupt...().
diff --git a/src/mongo/db/repl/abstract_oplog_fetcher_test.cpp b/src/mongo/db/repl/abstract_oplog_fetcher_test.cpp
index a53ec085a20..067928e6bb2 100644
--- a/src/mongo/db/repl/abstract_oplog_fetcher_test.cpp
+++ b/src/mongo/db/repl/abstract_oplog_fetcher_test.cpp
@@ -345,8 +345,10 @@ public:
ShouldFailRequestFn shouldFailRequest)
: unittest::TaskExecutorProxy(executor), _shouldFailRequest(shouldFailRequest) {}
- StatusWith<CallbackHandle> scheduleRemoteCommand(const executor::RemoteCommandRequest& request,
- const RemoteCommandCallbackFn& cb) override {
+ StatusWith<CallbackHandle> scheduleRemoteCommand(
+ const executor::RemoteCommandRequest& request,
+ const RemoteCommandCallbackFn& cb,
+ const transport::BatonHandle& baton = nullptr) override {
if (_shouldFailRequest(request)) {
return Status(ErrorCodes::OperationFailed, "failed to schedule remote command");
}
diff --git a/src/mongo/db/repl/collection_cloner_test.cpp b/src/mongo/db/repl/collection_cloner_test.cpp
index b493279b991..8322d29fdf3 100644
--- a/src/mongo/db/repl/collection_cloner_test.cpp
+++ b/src/mongo/db/repl/collection_cloner_test.cpp
@@ -347,12 +347,14 @@ public:
ShouldFailRequestFn shouldFailRequest)
: unittest::TaskExecutorProxy(executor), _shouldFailRequest(shouldFailRequest) {}
- StatusWith<CallbackHandle> scheduleRemoteCommand(const executor::RemoteCommandRequest& request,
- const RemoteCommandCallbackFn& cb) override {
+ StatusWith<CallbackHandle> scheduleRemoteCommand(
+ const executor::RemoteCommandRequest& request,
+ const RemoteCommandCallbackFn& cb,
+ const transport::BatonHandle& baton = nullptr) override {
if (_shouldFailRequest(request)) {
return Status(ErrorCodes::OperationFailed, "failed to schedule remote command");
}
- return getExecutor()->scheduleRemoteCommand(request, cb);
+ return getExecutor()->scheduleRemoteCommand(request, cb, baton);
}
private:
diff --git a/src/mongo/db/repl/database_cloner_test.cpp b/src/mongo/db/repl/database_cloner_test.cpp
index f0330add560..1238a794c76 100644
--- a/src/mongo/db/repl/database_cloner_test.cpp
+++ b/src/mongo/db/repl/database_cloner_test.cpp
@@ -206,12 +206,14 @@ public:
ShouldFailRequestFn shouldFailRequest)
: unittest::TaskExecutorProxy(executor), _shouldFailRequest(shouldFailRequest) {}
- StatusWith<CallbackHandle> scheduleRemoteCommand(const executor::RemoteCommandRequest& request,
- const RemoteCommandCallbackFn& cb) override {
+ StatusWith<CallbackHandle> scheduleRemoteCommand(
+ const executor::RemoteCommandRequest& request,
+ const RemoteCommandCallbackFn& cb,
+ const transport::BatonHandle& baton = nullptr) override {
if (_shouldFailRequest(request)) {
return Status(ErrorCodes::OperationFailed, "failed to schedule remote command");
}
- return getExecutor()->scheduleRemoteCommand(request, cb);
+ return getExecutor()->scheduleRemoteCommand(request, cb, baton);
}
private:
diff --git a/src/mongo/db/repl/databases_cloner_test.cpp b/src/mongo/db/repl/databases_cloner_test.cpp
index 56abaa0c4bd..b6e9f97431e 100644
--- a/src/mongo/db/repl/databases_cloner_test.cpp
+++ b/src/mongo/db/repl/databases_cloner_test.cpp
@@ -756,12 +756,14 @@ public:
ShouldFailRequestFn shouldFailRequest)
: unittest::TaskExecutorProxy(executor), _shouldFailRequest(shouldFailRequest) {}
- StatusWith<CallbackHandle> scheduleRemoteCommand(const executor::RemoteCommandRequest& request,
- const RemoteCommandCallbackFn& cb) override {
+ StatusWith<CallbackHandle> scheduleRemoteCommand(
+ const executor::RemoteCommandRequest& request,
+ const RemoteCommandCallbackFn& cb,
+ const transport::BatonHandle& baton = nullptr) override {
if (_shouldFailRequest(request)) {
return Status(ErrorCodes::OperationFailed, "failed to schedule remote command");
}
- return getExecutor()->scheduleRemoteCommand(request, cb);
+ return getExecutor()->scheduleRemoteCommand(request, cb, baton);
}
private:
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
index 1fb7bea1adf..d87cf4f73f8 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
@@ -168,7 +168,7 @@ auto makeTaskExecutor(ServiceContext* service, const std::string& poolName) {
hookList->addHook(stdx::make_unique<rpc::LogicalTimeMetadataHook>(service));
return stdx::make_unique<executor::ThreadPoolTaskExecutor>(
makeThreadPool(poolName),
- executor::makeNetworkInterface("NetworkInterfaceASIO-RS", nullptr, std::move(hookList)));
+ executor::makeNetworkInterface("RS", nullptr, std::move(hookList)));
}
/**
diff --git a/src/mongo/db/repl/reporter_test.cpp b/src/mongo/db/repl/reporter_test.cpp
index f6967dcfca2..5fb620d5252 100644
--- a/src/mongo/db/repl/reporter_test.cpp
+++ b/src/mongo/db/repl/reporter_test.cpp
@@ -601,7 +601,8 @@ TEST_F(ReporterTestNoTriggerAtSetUp, FailingToScheduleRemoteCommandTaskShouldMak
: unittest::TaskExecutorProxy(executor) {}
virtual StatusWith<executor::TaskExecutor::CallbackHandle> scheduleRemoteCommand(
const executor::RemoteCommandRequest& request,
- const RemoteCommandCallbackFn& cb) override {
+ const RemoteCommandCallbackFn& cb,
+ const transport::BatonHandle& baton = nullptr) override {
// Any error status other than ShutdownInProgress will cause the reporter to fassert.
return Status(ErrorCodes::ShutdownInProgress,
"failed to send remote command - shutdown in progress");
diff --git a/src/mongo/db/repl/sync_source_resolver_test.cpp b/src/mongo/db/repl/sync_source_resolver_test.cpp
index 4a7ff632e0e..37a11f12fca 100644
--- a/src/mongo/db/repl/sync_source_resolver_test.cpp
+++ b/src/mongo/db/repl/sync_source_resolver_test.cpp
@@ -57,12 +57,14 @@ public:
ShouldFailRequestFn shouldFailRequest)
: unittest::TaskExecutorProxy(executor), _shouldFailRequest(shouldFailRequest) {}
- StatusWith<CallbackHandle> scheduleRemoteCommand(const executor::RemoteCommandRequest& request,
- const RemoteCommandCallbackFn& cb) override {
+ StatusWith<CallbackHandle> scheduleRemoteCommand(
+ const executor::RemoteCommandRequest& request,
+ const RemoteCommandCallbackFn& cb,
+ const transport::BatonHandle& baton = nullptr) override {
if (_shouldFailRequest(request)) {
return Status(ErrorCodes::OperationFailed, "failed to schedule remote command");
}
- return getExecutor()->scheduleRemoteCommand(request, cb);
+ return getExecutor()->scheduleRemoteCommand(request, cb, baton);
}
private:
diff --git a/src/mongo/db/repl/task_executor_mock.cpp b/src/mongo/db/repl/task_executor_mock.cpp
index 320fb12c77f..a25126baecb 100644
--- a/src/mongo/db/repl/task_executor_mock.cpp
+++ b/src/mongo/db/repl/task_executor_mock.cpp
@@ -58,11 +58,13 @@ StatusWith<executor::TaskExecutor::CallbackHandle> TaskExecutorMock::scheduleWor
}
StatusWith<executor::TaskExecutor::CallbackHandle> TaskExecutorMock::scheduleRemoteCommand(
- const executor::RemoteCommandRequest& request, const RemoteCommandCallbackFn& cb) {
+ const executor::RemoteCommandRequest& request,
+ const RemoteCommandCallbackFn& cb,
+ const transport::BatonHandle& baton) {
if (shouldFailScheduleRemoteCommandRequest(request)) {
return Status(ErrorCodes::OperationFailed, "failed to schedule remote command");
}
- return getExecutor()->scheduleRemoteCommand(request, cb);
+ return getExecutor()->scheduleRemoteCommand(request, cb, baton);
}
} // namespace repl
diff --git a/src/mongo/db/repl/task_executor_mock.h b/src/mongo/db/repl/task_executor_mock.h
index eb02ad18eb6..3b0ab2bf09f 100644
--- a/src/mongo/db/repl/task_executor_mock.h
+++ b/src/mongo/db/repl/task_executor_mock.h
@@ -47,8 +47,10 @@ public:
StatusWith<CallbackHandle> scheduleWork(const CallbackFn& work) override;
StatusWith<CallbackHandle> scheduleWorkAt(Date_t when, const CallbackFn& work) override;
- StatusWith<CallbackHandle> scheduleRemoteCommand(const executor::RemoteCommandRequest& request,
- const RemoteCommandCallbackFn& cb) override;
+ StatusWith<CallbackHandle> scheduleRemoteCommand(
+ const executor::RemoteCommandRequest& request,
+ const RemoteCommandCallbackFn& cb,
+ const transport::BatonHandle& baton = nullptr) override;
// Override to make scheduleWork() fail during testing.
ShouldFailScheduleWorkRequestFn shouldFailScheduleWorkRequest = []() { return false; };
diff --git a/src/mongo/executor/connection_pool.cpp b/src/mongo/executor/connection_pool.cpp
index 7092c460ed3..f8b5da5fb62 100644
--- a/src/mongo/executor/connection_pool.cpp
+++ b/src/mongo/executor/connection_pool.cpp
@@ -649,8 +649,7 @@ void ConnectionPool::SpecificPool::processFailure(const Status& status,
_readyPool.clear();
// Log something helpful
- log() << "Dropping all pooled connections to " << _hostAndPort
- << " due to failed operation on a connection";
+ log() << "Dropping all pooled connections to " << _hostAndPort << " due to " << status;
// Migrate processing connections to the dropped pool
for (auto&& x : _processingPool) {
diff --git a/src/mongo/executor/connection_pool.h b/src/mongo/executor/connection_pool.h
index 10a711261ef..160321bec9a 100644
--- a/src/mongo/executor/connection_pool.h
+++ b/src/mongo/executor/connection_pool.h
@@ -60,10 +60,10 @@ struct ConnectionPoolStats;
* HostAndPort. See comments on the various Options for how the pool operates.
*/
class ConnectionPool : public EgressTagCloser {
- class ConnectionHandleDeleter;
class SpecificPool;
public:
+ class ConnectionHandleDeleter;
class ConnectionInterface;
class DependentTypeFactoryInterface;
class TimerInterface;
@@ -175,7 +175,7 @@ public:
ConnectionHandleDeleter() = default;
ConnectionHandleDeleter(ConnectionPool* pool) : _pool(pool) {}
- void operator()(ConnectionInterface* connection) {
+ void operator()(ConnectionInterface* connection) const {
if (_pool && connection)
_pool->returnConnection(connection);
}
diff --git a/src/mongo/executor/connection_pool_tl.cpp b/src/mongo/executor/connection_pool_tl.cpp
index 3823a9a3e0a..907032c7262 100644
--- a/src/mongo/executor/connection_pool_tl.cpp
+++ b/src/mongo/executor/connection_pool_tl.cpp
@@ -50,7 +50,9 @@ struct TimeoutHandler {
void TLTimer::setTimeout(Milliseconds timeoutVal, TimeoutCallback cb) {
_timer->waitFor(timeoutVal).getAsync([cb = std::move(cb)](Status status) {
- if (status == ErrorCodes::CallbackCanceled) {
+ // TODO: verify why we still get broken promises when expliciting call stop and shutting
+ // down NITL's quickly.
+ if (status == ErrorCodes::CallbackCanceled || status == ErrorCodes::BrokenPromise) {
return;
}
@@ -125,10 +127,9 @@ void TLConnection::setup(Milliseconds timeout, SetupCallback cb) {
});
AsyncDBClient::connect(_peer, transport::kGlobalSSLMode, _serviceContext, _reactor)
- .onError(
- [this](StatusWith<AsyncDBClient::Handle> swc) -> StatusWith<AsyncDBClient::Handle> {
- return Status(ErrorCodes::HostUnreachable, swc.getStatus().reason());
- })
+ .onError([](StatusWith<AsyncDBClient::Handle> swc) -> StatusWith<AsyncDBClient::Handle> {
+ return Status(ErrorCodes::HostUnreachable, swc.getStatus().reason());
+ })
.then([this](AsyncDBClient::Handle client) {
_client = std::move(client);
return _client->initWireVersion("NetworkInterfaceTL", _onConnectHook);
@@ -186,7 +187,7 @@ void TLConnection::refresh(Milliseconds timeout, RefreshCallback cb) {
_client
->runCommandRequest(
{_peer, std::string("admin"), BSON("isMaster" << 1), BSONObj(), nullptr})
- .then([this](executor::RemoteCommandResponse response) {
+ .then([](executor::RemoteCommandResponse response) {
return Future<void>::makeReady(response.status);
})
.getAsync([this, handler](Status status) {
diff --git a/src/mongo/executor/network_interface.h b/src/mongo/executor/network_interface.h
index 48428b820e3..b333813df14 100644
--- a/src/mongo/executor/network_interface.h
+++ b/src/mongo/executor/network_interface.h
@@ -34,6 +34,8 @@
#include "mongo/base/disallow_copying.h"
#include "mongo/executor/task_executor.h"
#include "mongo/stdx/functional.h"
+#include "mongo/transport/baton.h"
+#include "mongo/util/future.h"
namespace mongo {
@@ -128,13 +130,33 @@ public:
*/
virtual Status startCommand(const TaskExecutor::CallbackHandle& cbHandle,
RemoteCommandRequest& request,
- const RemoteCommandCompletionFn& onFinish) = 0;
+ const RemoteCommandCompletionFn& onFinish,
+ const transport::BatonHandle& baton = nullptr) = 0;
+
+ Future<TaskExecutor::ResponseStatus> startCommand(
+ const TaskExecutor::CallbackHandle& cbHandle,
+ RemoteCommandRequest& request,
+ const transport::BatonHandle& baton = nullptr) {
+ Promise<TaskExecutor::ResponseStatus> promise;
+ auto future = promise.getFuture();
+
+ auto status =
+ startCommand(cbHandle,
+ request,
+ [sp = promise.share()](const TaskExecutor::ResponseStatus& rs) mutable {
+ sp.emplaceValue(rs);
+ },
+ baton);
+
+ return future;
+ }
/**
* Requests cancelation of the network activity associated with "cbHandle" if it has not yet
* completed.
*/
- virtual void cancelCommand(const TaskExecutor::CallbackHandle& cbHandle) = 0;
+ virtual void cancelCommand(const TaskExecutor::CallbackHandle& cbHandle,
+ const transport::BatonHandle& baton = nullptr) = 0;
/**
* Sets an alarm, which schedules "action" to run no sooner than "when".
@@ -150,7 +172,9 @@ public:
* Any callbacks invoked from setAlarm must observe onNetworkThread to
* return true. See that method for why.
*/
- virtual Status setAlarm(Date_t when, const stdx::function<void()>& action) = 0;
+ virtual Status setAlarm(Date_t when,
+ const stdx::function<void()>& action,
+ const transport::BatonHandle& baton = nullptr) = 0;
/**
* Returns true if called from a thread dedicated to networking. I.e. not a
diff --git a/src/mongo/executor/network_interface_asio.cpp b/src/mongo/executor/network_interface_asio.cpp
index 9fe1c9da4da..8d74d56656f 100644
--- a/src/mongo/executor/network_interface_asio.cpp
+++ b/src/mongo/executor/network_interface_asio.cpp
@@ -250,7 +250,8 @@ Status attachMetadataIfNeeded(RemoteCommandRequest& request,
Status NetworkInterfaceASIO::startCommand(const TaskExecutor::CallbackHandle& cbHandle,
RemoteCommandRequest& request,
- const RemoteCommandCompletionFn& onFinish) {
+ const RemoteCommandCompletionFn& onFinish,
+ const transport::BatonHandle& baton) {
MONGO_ASIO_INVARIANT(onFinish, "Invalid completion function");
{
stdx::lock_guard<stdx::mutex> lk(_inProgressMutex);
@@ -422,7 +423,8 @@ Status NetworkInterfaceASIO::startCommand(const TaskExecutor::CallbackHandle& cb
return Status::OK();
}
-void NetworkInterfaceASIO::cancelCommand(const TaskExecutor::CallbackHandle& cbHandle) {
+void NetworkInterfaceASIO::cancelCommand(const TaskExecutor::CallbackHandle& cbHandle,
+ const transport::BatonHandle& baton) {
stdx::lock_guard<stdx::mutex> lk(_inProgressMutex);
// If we found a matching cbHandle in _inGetConnection, then
@@ -447,7 +449,9 @@ void NetworkInterfaceASIO::cancelCommand(const TaskExecutor::CallbackHandle& cbH
}
}
-Status NetworkInterfaceASIO::setAlarm(Date_t when, const stdx::function<void()>& action) {
+Status NetworkInterfaceASIO::setAlarm(Date_t when,
+ const stdx::function<void()>& action,
+ const transport::BatonHandle& baton) {
if (inShutdown()) {
return {ErrorCodes::ShutdownInProgress, "NetworkInterfaceASIO shutdown in progress"};
}
@@ -462,12 +466,12 @@ Status NetworkInterfaceASIO::setAlarm(Date_t when, const stdx::function<void()>&
return exceptionToStatus();
}
- alarm->async_wait([alarm, this, action, when](std::error_code ec) {
+ alarm->async_wait([alarm, this, action, when, baton](std::error_code ec) {
const auto nowValue = now();
if (nowValue < when) {
warning() << "ASIO alarm returned early. Expected at: " << when
<< ", fired at: " << nowValue;
- const auto status = setAlarm(when, action);
+ const auto status = setAlarm(when, action, baton);
if ((!status.isOK()) && (status.code() != ErrorCodes::ShutdownInProgress)) {
fassertFailedWithStatus(40383, status);
}
diff --git a/src/mongo/executor/network_interface_asio.h b/src/mongo/executor/network_interface_asio.h
index 8e464754bfa..8f57e3d08b8 100644
--- a/src/mongo/executor/network_interface_asio.h
+++ b/src/mongo/executor/network_interface_asio.h
@@ -130,9 +130,13 @@ public:
Date_t now() override;
Status startCommand(const TaskExecutor::CallbackHandle& cbHandle,
RemoteCommandRequest& request,
- const RemoteCommandCompletionFn& onFinish) override;
- void cancelCommand(const TaskExecutor::CallbackHandle& cbHandle) override;
- Status setAlarm(Date_t when, const stdx::function<void()>& action) override;
+ const RemoteCommandCompletionFn& onFinish,
+ const transport::BatonHandle& baton = nullptr) override;
+ void cancelCommand(const TaskExecutor::CallbackHandle& cbHandle,
+ const transport::BatonHandle& baton = nullptr) override;
+ Status setAlarm(Date_t when,
+ const stdx::function<void()>& action,
+ const transport::BatonHandle& baton = nullptr) override;
bool onNetworkThread() override;
diff --git a/src/mongo/executor/network_interface_mock.cpp b/src/mongo/executor/network_interface_mock.cpp
index 39e8e4e2995..674dd8bb116 100644
--- a/src/mongo/executor/network_interface_mock.cpp
+++ b/src/mongo/executor/network_interface_mock.cpp
@@ -113,7 +113,8 @@ std::string NetworkInterfaceMock::getHostName() {
Status NetworkInterfaceMock::startCommand(const CallbackHandle& cbHandle,
RemoteCommandRequest& request,
- const RemoteCommandCompletionFn& onFinish) {
+ const RemoteCommandCompletionFn& onFinish,
+ const transport::BatonHandle& baton) {
if (inShutdown()) {
return {ErrorCodes::ShutdownInProgress, "NetworkInterfaceMock shutdown in progress"};
}
@@ -145,7 +146,8 @@ void NetworkInterfaceMock::setHandshakeReplyForHost(
}
}
-void NetworkInterfaceMock::cancelCommand(const CallbackHandle& cbHandle) {
+void NetworkInterfaceMock::cancelCommand(const CallbackHandle& cbHandle,
+ const transport::BatonHandle& baton) {
invariant(!inShutdown());
stdx::lock_guard<stdx::mutex> lk(_mutex);
@@ -175,7 +177,9 @@ void NetworkInterfaceMock::_interruptWithResponse_inlock(
}
}
-Status NetworkInterfaceMock::setAlarm(const Date_t when, const stdx::function<void()>& action) {
+Status NetworkInterfaceMock::setAlarm(const Date_t when,
+ const stdx::function<void()>& action,
+ const transport::BatonHandle& baton) {
if (inShutdown()) {
return {ErrorCodes::ShutdownInProgress, "NetworkInterfaceMock shutdown in progress"};
}
diff --git a/src/mongo/executor/network_interface_mock.h b/src/mongo/executor/network_interface_mock.h
index 554306397ba..76a9a6462a8 100644
--- a/src/mongo/executor/network_interface_mock.h
+++ b/src/mongo/executor/network_interface_mock.h
@@ -108,7 +108,8 @@ public:
virtual std::string getHostName();
virtual Status startCommand(const TaskExecutor::CallbackHandle& cbHandle,
RemoteCommandRequest& request,
- const RemoteCommandCompletionFn& onFinish);
+ const RemoteCommandCompletionFn& onFinish,
+ const transport::BatonHandle& baton = nullptr);
/**
* If the network operation is in the _unscheduled or _processing queues, moves the operation
@@ -116,12 +117,15 @@ public:
* the _scheduled queue, does nothing. The latter simulates the case where cancelCommand() is
* called after the task has already completed, but its callback has not yet been run.
*/
- virtual void cancelCommand(const TaskExecutor::CallbackHandle& cbHandle);
+ virtual void cancelCommand(const TaskExecutor::CallbackHandle& cbHandle,
+ const transport::BatonHandle& baton = nullptr);
/**
* Not implemented.
*/
- virtual Status setAlarm(Date_t when, const stdx::function<void()>& action);
+ virtual Status setAlarm(Date_t when,
+ const stdx::function<void()>& action,
+ const transport::BatonHandle& baton = nullptr);
virtual bool onNetworkThread();
diff --git a/src/mongo/executor/network_interface_tl.cpp b/src/mongo/executor/network_interface_tl.cpp
index eef2299e623..15947af2c0b 100644
--- a/src/mongo/executor/network_interface_tl.cpp
+++ b/src/mongo/executor/network_interface_tl.cpp
@@ -85,7 +85,7 @@ void NetworkInterfaceTL::startup() {
auto typeFactory = std::make_unique<connection_pool_tl::TLTypeFactory>(
_reactor, _tl, std::move(_onConnectHook));
_pool = std::make_unique<ConnectionPool>(
- std::move(typeFactory), "NetworkInterfaceTL", _connPoolOpts);
+ std::move(typeFactory), std::string("NetworkInterfaceTL-") + _instanceName, _connPoolOpts);
_ioThread = stdx::thread([this] {
setThreadName(_instanceName);
LOG(2) << "The NetworkInterfaceTL reactor thread is spinning up";
@@ -97,6 +97,7 @@ void NetworkInterfaceTL::shutdown() {
_inShutdown.store(true);
_reactor->stop();
_ioThread.join();
+ _pool.reset();
LOG(2) << "NetworkInterfaceTL shutdown successfully";
}
@@ -136,7 +137,8 @@ Date_t NetworkInterfaceTL::now() {
Status NetworkInterfaceTL::startCommand(const TaskExecutor::CallbackHandle& cbHandle,
RemoteCommandRequest& request,
- const RemoteCommandCompletionFn& onFinish) {
+ const RemoteCommandCompletionFn& onFinish,
+ const transport::BatonHandle& baton) {
if (inShutdown()) {
return {ErrorCodes::ShutdownInProgress, "NetworkInterface shutdown in progress"};
}
@@ -166,40 +168,86 @@ Status NetworkInterfaceTL::startCommand(const TaskExecutor::CallbackHandle& cbHa
state->deadline = state->start + state->request.timeout;
}
- _pool->get(request.target, request.timeout)
- .tapError([state](Status error) {
- LOG(2) << "Failed to get connection from pool for request " << state->request.id << ": "
- << error;
- })
- .then([this, state](ConnectionPool::ConnectionHandle conn) mutable {
- return _onAcquireConn(state, std::move(conn));
- })
- .onError([](Status error) -> StatusWith<RemoteCommandResponse> {
- // The TransportLayer has, for historical reasons returned SocketException for
- // network errors, but sharding assumes HostUnreachable on network errors.
- if (error == ErrorCodes::SocketException) {
- error = Status(ErrorCodes::HostUnreachable, error.reason());
- }
- return error;
- })
- .getAsync([this, state, onFinish](StatusWith<RemoteCommandResponse> response) {
- auto duration = now() - state->start;
- if (!response.isOK()) {
- onFinish(RemoteCommandResponse(response.getStatus(), duration));
- } else {
- auto rs = std::move(response.getValue());
- LOG(2) << "Request " << state->request.id << " finished with response: "
- << redact(rs.isOK() ? rs.data.toString() : rs.status.toString());
- onFinish(rs);
- }
+ // Interacting with the connection pool can involve more work than just getting a connection
+ // out. In particular, we can end up having to spin up new connections, and fulfilling promises
+ // for other requesters. Returning connections has the same issue.
+ //
+ // To work around it, we make sure to hop onto the reactor thread before getting a connection,
+ // then making sure to get back to the client thread to do the work (if on a baton). And we
+ // hook up a connection returning unique_ptr that ensures that however we exit, we always do the
+ // return on the reactor thread.
+ //
+ // TODO: get rid of this cruft once we have a connection pool that's executor aware.
+ auto connFuture = _reactor->execute([this, state, request, baton] {
+ return makeReadyFutureWith(
+ [this, request] { return _pool->get(request.target, request.timeout); })
+ .tapError([state](Status error) {
+ LOG(2) << "Failed to get connection from pool for request " << state->request.id
+ << ": " << error;
+ })
+ .then([this, baton](ConnectionPool::ConnectionHandle conn) {
+ auto deleter = conn.get_deleter();
+
+ // TODO: drop out this shared_ptr once we have a unique_function capable future
+ return std::make_shared<CommandState::ConnHandle>(
+ conn.release(), CommandState::Deleter{deleter, _reactor});
+ });
+ });
+
+ auto remainingWork = [this, state, baton, onFinish](
+ StatusWith<std::shared_ptr<CommandState::ConnHandle>> swConn) {
+ makeReadyFutureWith(
+ [&] { return _onAcquireConn(state, std::move(*uassertStatusOK(swConn)), baton); })
+ .onError([](Status error) -> StatusWith<RemoteCommandResponse> {
+ // The TransportLayer has, for historical reasons returned SocketException for
+ // network errors, but sharding assumes HostUnreachable on network errors.
+ if (error == ErrorCodes::SocketException) {
+ error = Status(ErrorCodes::HostUnreachable, error.reason());
+ }
+ return error;
+ })
+ .getAsync([this, state, onFinish](StatusWith<RemoteCommandResponse> response) {
+ auto duration = now() - state->start;
+ if (!response.isOK()) {
+ onFinish(RemoteCommandResponse(response.getStatus(), duration));
+ } else {
+ const auto& rs = response.getValue();
+ LOG(2) << "Request " << state->request.id << " finished with response: "
+ << redact(rs.isOK() ? rs.data.toString() : rs.status.toString());
+ onFinish(rs);
+ }
+ });
+ };
+
+ if (baton) {
+ // If we have a baton, we want to get back to the baton thread immediately after we get a
+ // connection
+ std::move(connFuture).getAsync([
+ baton,
+ rw = std::move(remainingWork)
+ ](StatusWith<std::shared_ptr<CommandState::ConnHandle>> swConn) mutable {
+ baton->schedule([ rw = std::move(rw), swConn = std::move(swConn) ]() mutable {
+ std::move(rw)(std::move(swConn));
+ });
});
+ } else {
+ // otherwise we're happy to run inline
+ std::move(connFuture)
+ .getAsync([rw = std::move(remainingWork)](
+ StatusWith<std::shared_ptr<CommandState::ConnHandle>> swConn) mutable {
+ std::move(rw)(std::move(swConn));
+ });
+ }
+
return Status::OK();
}
// This is only called from within a then() callback on a future, so throwing is equivalent to
// returning a ready Future with a not-OK status.
Future<RemoteCommandResponse> NetworkInterfaceTL::_onAcquireConn(
- std::shared_ptr<CommandState> state, ConnectionPool::ConnectionHandle conn) {
+ std::shared_ptr<CommandState> state,
+ CommandState::ConnHandle conn,
+ const transport::BatonHandle& baton) {
if (state->done.load()) {
conn->indicateSuccess();
uasserted(ErrorCodes::CallbackCanceled, "Command was canceled");
@@ -222,44 +270,42 @@ Future<RemoteCommandResponse> NetworkInterfaceTL::_onAcquireConn(
}
state->timer = _reactor->makeTimer();
- state->timer->waitUntil(state->deadline).getAsync([client, state](Status status) {
- if (status == ErrorCodes::CallbackCanceled) {
- invariant(state->done.load());
- return;
- }
+ state->timer->waitUntil(state->deadline, baton)
+ .getAsync([client, state, baton](Status status) {
+ if (status == ErrorCodes::CallbackCanceled) {
+ invariant(state->done.load());
+ return;
+ }
- if (state->done.swap(true)) {
- return;
- }
+ if (state->done.swap(true)) {
+ return;
+ }
- LOG(2) << "Request " << state->request.id << " timed out"
- << ", deadline was " << state->deadline << ", op was "
- << redact(state->request.toString());
- state->promise.setError(
- Status(ErrorCodes::NetworkInterfaceExceededTimeLimit, "timed out"));
+ LOG(2) << "Request " << state->request.id << " timed out"
+ << ", deadline was " << state->deadline << ", op was "
+ << redact(state->request.toString());
+ state->promise.setError(
+ Status(ErrorCodes::NetworkInterfaceExceededTimeLimit, "timed out"));
- client->cancel();
- });
+ client->cancel(baton);
+ });
}
- client->runCommandRequest(state->request)
+ client->runCommandRequest(state->request, baton)
.then([this, state](RemoteCommandResponse response) {
if (state->done.load()) {
uasserted(ErrorCodes::CallbackCanceled, "Callback was canceled");
}
- // TODO Investigate whether this is necessary here.
- return _reactor->execute([ this, state, response = std::move(response) ]() mutable {
- if (_metadataHook && response.status.isOK()) {
- auto target = state->conn->getHostAndPort().toString();
- response.status = _metadataHook->readReplyMetadata(
- nullptr, std::move(target), response.metadata);
- }
+ if (_metadataHook && response.status.isOK()) {
+ auto target = state->conn->getHostAndPort().toString();
+ response.status =
+ _metadataHook->readReplyMetadata(nullptr, std::move(target), response.metadata);
+ }
- return std::move(response);
- });
+ return RemoteCommandResponse(std::move(response));
})
- .getAsync([this, state](StatusWith<RemoteCommandResponse> swr) {
+ .getAsync([this, state, baton](StatusWith<RemoteCommandResponse> swr) {
_eraseInUseConn(state->cbHandle);
if (!swr.isOK()) {
state->conn->indicateFailure(swr.getStatus());
@@ -273,10 +319,10 @@ Future<RemoteCommandResponse> NetworkInterfaceTL::_onAcquireConn(
return;
if (state->timer) {
- state->timer->cancel();
+ state->timer->cancel(baton);
}
- state->promise.setWith([&] { return std::move(swr); });
+ state->promise.setFromStatusWith(std::move(swr));
});
return std::move(state->mergedFuture);
@@ -287,7 +333,8 @@ void NetworkInterfaceTL::_eraseInUseConn(const TaskExecutor::CallbackHandle& cbH
_inProgress.erase(cbHandle);
}
-void NetworkInterfaceTL::cancelCommand(const TaskExecutor::CallbackHandle& cbHandle) {
+void NetworkInterfaceTL::cancelCommand(const TaskExecutor::CallbackHandle& cbHandle,
+ const transport::BatonHandle& baton) {
stdx::unique_lock<stdx::mutex> lk(_inProgressMutex);
auto it = _inProgress.find(cbHandle);
if (it == _inProgress.end()) {
@@ -307,17 +354,23 @@ void NetworkInterfaceTL::cancelCommand(const TaskExecutor::CallbackHandle& cbHan
<< redact(state->request.toString())});
if (state->conn) {
auto client = checked_cast<connection_pool_tl::TLConnection*>(state->conn.get());
- client->client()->cancel();
+ client->client()->cancel(baton);
}
}
-Status NetworkInterfaceTL::setAlarm(Date_t when, const stdx::function<void()>& action) {
+Status NetworkInterfaceTL::setAlarm(Date_t when,
+ const stdx::function<void()>& action,
+ const transport::BatonHandle& baton) {
if (inShutdown()) {
return {ErrorCodes::ShutdownInProgress, "NetworkInterface shutdown in progress"};
}
if (when <= now()) {
- _reactor->schedule(transport::Reactor::kPost, std::move(action));
+ if (baton) {
+ baton->schedule(std::move(action));
+ } else {
+ _reactor->schedule(transport::Reactor::kPost, std::move(action));
+ }
return Status::OK();
}
@@ -329,32 +382,38 @@ Status NetworkInterfaceTL::setAlarm(Date_t when, const stdx::function<void()>& a
_inProgressAlarms.insert(alarmTimer);
}
- alarmTimer->waitUntil(when).getAsync([this, weakTimer, action, when](Status status) {
- auto alarmTimer = weakTimer.lock();
- if (!alarmTimer) {
- return;
- } else {
- stdx::lock_guard<stdx::mutex> lk(_inProgressMutex);
- _inProgressAlarms.erase(alarmTimer);
- }
-
- auto nowVal = now();
- if (nowVal < when) {
- warning() << "Alarm returned early. Expected at: " << when << ", fired at: " << nowVal;
- const auto status = setAlarm(when, std::move(action));
- if ((!status.isOK()) && (status != ErrorCodes::ShutdownInProgress)) {
- fassertFailedWithStatus(50785, status);
+ alarmTimer->waitUntil(when, baton)
+ .getAsync([this, weakTimer, action, when, baton](Status status) {
+ auto alarmTimer = weakTimer.lock();
+ if (!alarmTimer) {
+ return;
+ } else {
+ stdx::lock_guard<stdx::mutex> lk(_inProgressMutex);
+ _inProgressAlarms.erase(alarmTimer);
}
- return;
- }
+ auto nowVal = now();
+ if (nowVal < when) {
+ warning() << "Alarm returned early. Expected at: " << when
+ << ", fired at: " << nowVal;
+ const auto status = setAlarm(when, std::move(action), baton);
+ if ((!status.isOK()) && (status != ErrorCodes::ShutdownInProgress)) {
+ fassertFailedWithStatus(50785, status);
+ }
- if (status.isOK()) {
- _reactor->schedule(transport::Reactor::kPost, std::move(action));
- } else if (status != ErrorCodes::CallbackCanceled) {
- warning() << "setAlarm() received an error: " << status;
- }
- });
+ return;
+ }
+
+ if (status.isOK()) {
+ if (baton) {
+ baton->schedule(std::move(action));
+ } else {
+ _reactor->schedule(transport::Reactor::kPost, std::move(action));
+ }
+ } else if (status != ErrorCodes::CallbackCanceled) {
+ warning() << "setAlarm() received an error: " << status;
+ }
+ });
return Status::OK();
}
diff --git a/src/mongo/executor/network_interface_tl.h b/src/mongo/executor/network_interface_tl.h
index 8c4d4697d93..bb22407756c 100644
--- a/src/mongo/executor/network_interface_tl.h
+++ b/src/mongo/executor/network_interface_tl.h
@@ -37,6 +37,7 @@
#include "mongo/rpc/metadata/metadata_hook.h"
#include "mongo/stdx/thread.h"
#include "mongo/stdx/unordered_map.h"
+#include "mongo/transport/baton.h"
#include "mongo/transport/transport_layer.h"
namespace mongo {
@@ -49,6 +50,7 @@ public:
ServiceContext* ctx,
std::unique_ptr<NetworkConnectionHook> onConnectHook,
std::unique_ptr<rpc::EgressMetadataHook> metadataHook);
+
std::string getDiagnosticString() override;
void appendConnectionStats(ConnectionPoolStats* stats) const override;
std::string getHostName() override;
@@ -61,9 +63,14 @@ public:
Date_t now() override;
Status startCommand(const TaskExecutor::CallbackHandle& cbHandle,
RemoteCommandRequest& request,
- const RemoteCommandCompletionFn& onFinish) override;
- void cancelCommand(const TaskExecutor::CallbackHandle& cbHandle) override;
- Status setAlarm(Date_t when, const stdx::function<void()>& action) override;
+ const RemoteCommandCompletionFn& onFinish,
+ const transport::BatonHandle& baton) override;
+
+ void cancelCommand(const TaskExecutor::CallbackHandle& cbHandle,
+ const transport::BatonHandle& baton) override;
+ Status setAlarm(Date_t when,
+ const stdx::function<void()>& action,
+ const transport::BatonHandle& baton) override;
bool onNetworkThread() override;
@@ -79,7 +86,18 @@ private:
Date_t deadline = RemoteCommandRequest::kNoExpirationDate;
Date_t start;
- ConnectionPool::ConnectionHandle conn;
+ struct Deleter {
+ ConnectionPool::ConnectionHandleDeleter returner;
+ transport::ReactorHandle reactor;
+
+ void operator()(ConnectionPool::ConnectionInterface* ptr) const {
+ reactor->schedule(transport::Reactor::kDispatch,
+ [ ret = returner, ptr ] { ret(ptr); });
+ }
+ };
+ using ConnHandle = std::unique_ptr<ConnectionPool::ConnectionInterface, Deleter>;
+
+ ConnHandle conn;
std::unique_ptr<transport::ReactorTimer> timer;
AtomicBool done;
@@ -89,7 +107,8 @@ private:
void _eraseInUseConn(const TaskExecutor::CallbackHandle& handle);
Future<RemoteCommandResponse> _onAcquireConn(std::shared_ptr<CommandState> state,
- ConnectionPool::ConnectionHandle conn);
+ CommandState::ConnHandle conn,
+ const transport::BatonHandle& baton);
std::string _instanceName;
ServiceContext* _svcCtx;
diff --git a/src/mongo/executor/task_executor.h b/src/mongo/executor/task_executor.h
index 8d7c9cf72b7..d23069ba3ac 100644
--- a/src/mongo/executor/task_executor.h
+++ b/src/mongo/executor/task_executor.h
@@ -41,6 +41,7 @@
#include "mongo/platform/hash_namespace.h"
#include "mongo/stdx/condition_variable.h"
#include "mongo/stdx/functional.h"
+#include "mongo/transport/baton.h"
#include "mongo/util/future.h"
#include "mongo/util/time_support.h"
@@ -235,8 +236,10 @@ public:
* Contract: Implementations should guarantee that callback should be called *after* doing any
* processing related to the callback.
*/
- virtual StatusWith<CallbackHandle> scheduleRemoteCommand(const RemoteCommandRequest& request,
- const RemoteCommandCallbackFn& cb) = 0;
+ virtual StatusWith<CallbackHandle> scheduleRemoteCommand(
+ const RemoteCommandRequest& request,
+ const RemoteCommandCallbackFn& cb,
+ const transport::BatonHandle& baton = nullptr) = 0;
/**
* If the callback referenced by "cbHandle" hasn't already executed, marks it as
diff --git a/src/mongo/executor/task_executor_pool.cpp b/src/mongo/executor/task_executor_pool.cpp
index 71d796a1a2b..24046a4fe75 100644
--- a/src/mongo/executor/task_executor_pool.cpp
+++ b/src/mongo/executor/task_executor_pool.cpp
@@ -42,7 +42,7 @@ namespace executor {
// If less than or equal to 0, the suggested pool size will be determined by the number of cores. If
// set to a particular positive value, this will be used as the pool size.
-MONGO_EXPORT_SERVER_PARAMETER(taskExecutorPoolSize, int, 0);
+MONGO_EXPORT_SERVER_PARAMETER(taskExecutorPoolSize, int, 1);
size_t TaskExecutorPool::getSuggestedPoolSize() {
auto poolSize = taskExecutorPoolSize.load();
diff --git a/src/mongo/executor/thread_pool_task_executor.cpp b/src/mongo/executor/thread_pool_task_executor.cpp
index 202de888b5a..7d42893edb2 100644
--- a/src/mongo/executor/thread_pool_task_executor.cpp
+++ b/src/mongo/executor/thread_pool_task_executor.cpp
@@ -43,6 +43,7 @@
#include "mongo/executor/connection_pool_stats.h"
#include "mongo/executor/network_interface.h"
#include "mongo/platform/atomic_word.h"
+#include "mongo/transport/baton.h"
#include "mongo/util/concurrency/thread_pool_interface.h"
#include "mongo/util/fail_point_service.h"
#include "mongo/util/log.h"
@@ -59,15 +60,17 @@ class ThreadPoolTaskExecutor::CallbackState : public TaskExecutor::CallbackState
MONGO_DISALLOW_COPYING(CallbackState);
public:
- static std::shared_ptr<CallbackState> make(CallbackFn&& cb, Date_t readyDate) {
- return std::make_shared<CallbackState>(std::move(cb), readyDate);
+ static std::shared_ptr<CallbackState> make(CallbackFn&& cb,
+ Date_t readyDate,
+ const transport::BatonHandle& baton) {
+ return std::make_shared<CallbackState>(std::move(cb), readyDate, baton);
}
/**
* Do not call directly. Use make.
*/
- CallbackState(CallbackFn&& cb, Date_t theReadyDate)
- : callback(std::move(cb)), readyDate(theReadyDate) {}
+ CallbackState(CallbackFn&& cb, Date_t theReadyDate, const transport::BatonHandle& baton)
+ : callback(std::move(cb)), readyDate(theReadyDate), baton(baton) {}
virtual ~CallbackState() = default;
@@ -94,6 +97,7 @@ public:
bool isNetworkOperation = false;
AtomicWord<bool> isFinished{false};
boost::optional<stdx::condition_variable> finishedCondition;
+ transport::BatonHandle baton;
};
class ThreadPoolTaskExecutor::EventState : public TaskExecutor::EventState {
@@ -125,7 +129,7 @@ public:
};
ThreadPoolTaskExecutor::ThreadPoolTaskExecutor(std::unique_ptr<ThreadPoolInterface> pool,
- std::unique_ptr<NetworkInterface> net)
+ std::shared_ptr<NetworkInterface> net)
: _net(std::move(net)), _pool(std::move(pool)) {}
ThreadPoolTaskExecutor::~ThreadPoolTaskExecutor() {
@@ -272,7 +276,7 @@ StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::onEvent(const E
if (!event.isValid()) {
return {ErrorCodes::BadValue, "Passed invalid event handle to onEvent"};
}
- auto wq = makeSingletonWorkQueue(work);
+ auto wq = makeSingletonWorkQueue(work, nullptr);
stdx::unique_lock<stdx::mutex> lk(_mutex);
auto eventState = checked_cast<EventState*>(getEventFromHandle(event));
auto cbHandle = enqueueCallbackState_inlock(&eventState->waiters, &wq);
@@ -319,7 +323,7 @@ void ThreadPoolTaskExecutor::waitForEvent(const EventHandle& event) {
StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::scheduleWork(
const CallbackFn& work) {
- auto wq = makeSingletonWorkQueue(work);
+ auto wq = makeSingletonWorkQueue(work, nullptr);
WorkQueue temp;
stdx::unique_lock<stdx::mutex> lk(_mutex);
auto cbHandle = enqueueCallbackState_inlock(&temp, &wq);
@@ -335,7 +339,7 @@ StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::scheduleWorkAt(
if (when <= now()) {
return scheduleWork(work);
}
- auto wq = makeSingletonWorkQueue(work, when);
+ auto wq = makeSingletonWorkQueue(work, nullptr, when);
stdx::unique_lock<stdx::mutex> lk(_mutex);
auto cbHandle = enqueueCallbackState_inlock(&_sleepersQueue, &wq);
if (!cbHandle.isOK()) {
@@ -354,7 +358,8 @@ StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::scheduleWorkAt(
return;
}
scheduleIntoPool_inlock(&_sleepersQueue, cbState->iter, std::move(lk));
- })
+ },
+ nullptr)
.transitional_ignore();
return cbHandle;
@@ -389,7 +394,9 @@ void remoteCommandFailedEarly(const TaskExecutor::CallbackArgs& cbData,
} // namespace
StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::scheduleRemoteCommand(
- const RemoteCommandRequest& request, const RemoteCommandCallbackFn& cb) {
+ const RemoteCommandRequest& request,
+ const RemoteCommandCallbackFn& cb,
+ const transport::BatonHandle& baton) {
RemoteCommandRequest scheduledRequest = request;
if (request.timeout == RemoteCommandRequest::kNoTimeout) {
scheduledRequest.expirationDate = RemoteCommandRequest::kNoExpirationDate;
@@ -399,9 +406,11 @@ StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::scheduleRemoteC
// In case the request fails to even get a connection from the pool,
// we wrap the callback in a method that prepares its input parameters.
- auto wq = makeSingletonWorkQueue([scheduledRequest, cb](const CallbackArgs& cbData) {
- remoteCommandFailedEarly(cbData, cb, scheduledRequest);
- });
+ auto wq = makeSingletonWorkQueue(
+ [scheduledRequest, cb](const CallbackArgs& cbData) {
+ remoteCommandFailedEarly(cbData, cb, scheduledRequest);
+ },
+ baton);
wq.front()->isNetworkOperation = true;
stdx::unique_lock<stdx::mutex> lk(_mutex);
auto cbHandle = enqueueCallbackState_inlock(&_networkInProgressQueue, &wq);
@@ -427,7 +436,8 @@ StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::scheduleRemoteC
: response.status.toString());
swap(cbState->callback, newCb);
scheduleIntoPool_inlock(&_networkInProgressQueue, cbState->iter, std::move(lk));
- })
+ },
+ baton)
.transitional_ignore();
return cbHandle;
}
@@ -442,7 +452,7 @@ void ThreadPoolTaskExecutor::cancel(const CallbackHandle& cbHandle) {
cbState->canceled.store(1);
if (cbState->isNetworkOperation) {
lk.unlock();
- _net->cancelCommand(cbHandle);
+ _net->cancelCommand(cbHandle, cbState->baton);
return;
}
if (cbState->readyDate != Date_t{}) {
@@ -492,10 +502,10 @@ StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::enqueueCallback
return cbHandle;
}
-ThreadPoolTaskExecutor::WorkQueue ThreadPoolTaskExecutor::makeSingletonWorkQueue(CallbackFn work,
- Date_t when) {
+ThreadPoolTaskExecutor::WorkQueue ThreadPoolTaskExecutor::makeSingletonWorkQueue(
+ CallbackFn work, const transport::BatonHandle& baton, Date_t when) {
WorkQueue result;
- result.emplace_front(CallbackState::make(std::move(work), when));
+ result.emplace_front(CallbackState::make(std::move(work), when, baton));
result.front()->iter = result.begin();
return result;
}
@@ -547,10 +557,15 @@ void ThreadPoolTaskExecutor::scheduleIntoPool_inlock(WorkQueue* fromQueue,
}
for (const auto& cbState : todo) {
- const auto status = _pool->schedule([this, cbState] { runCallback(std::move(cbState)); });
- if (status == ErrorCodes::ShutdownInProgress)
- break;
- fassert(28735, status);
+ if (cbState->baton) {
+ cbState->baton->schedule([this, cbState] { runCallback(std::move(cbState)); });
+ } else {
+ const auto status =
+ _pool->schedule([this, cbState] { runCallback(std::move(cbState)); });
+ if (status == ErrorCodes::ShutdownInProgress)
+ break;
+ fassert(28735, status);
+ }
}
_net->signalWorkAvailable();
}
diff --git a/src/mongo/executor/thread_pool_task_executor.h b/src/mongo/executor/thread_pool_task_executor.h
index 8e81e3a7f07..92b809bc0db 100644
--- a/src/mongo/executor/thread_pool_task_executor.h
+++ b/src/mongo/executor/thread_pool_task_executor.h
@@ -36,6 +36,7 @@
#include "mongo/stdx/list.h"
#include "mongo/stdx/mutex.h"
#include "mongo/stdx/thread.h"
+#include "mongo/transport/baton.h"
namespace mongo {
@@ -58,7 +59,7 @@ public:
* for network operations.
*/
ThreadPoolTaskExecutor(std::unique_ptr<ThreadPoolInterface> pool,
- std::unique_ptr<NetworkInterface> net);
+ std::shared_ptr<NetworkInterface> net);
/**
* Destroys a ThreadPoolTaskExecutor.
@@ -79,8 +80,10 @@ public:
void waitForEvent(const EventHandle& event) override;
StatusWith<CallbackHandle> scheduleWork(const CallbackFn& work) override;
StatusWith<CallbackHandle> scheduleWorkAt(Date_t when, const CallbackFn& work) override;
- StatusWith<CallbackHandle> scheduleRemoteCommand(const RemoteCommandRequest& request,
- const RemoteCommandCallbackFn& cb) override;
+ StatusWith<CallbackHandle> scheduleRemoteCommand(
+ const RemoteCommandRequest& request,
+ const RemoteCommandCallbackFn& cb,
+ const transport::BatonHandle& baton = nullptr) override;
void cancel(const CallbackHandle& cbHandle) override;
void wait(const CallbackHandle& cbHandle) override;
@@ -128,7 +131,9 @@ private:
* executing "work" no sooner than "when" (defaults to ASAP). This function may and should be
* called outside of _mutex.
*/
- static WorkQueue makeSingletonWorkQueue(CallbackFn work, Date_t when = {});
+ static WorkQueue makeSingletonWorkQueue(CallbackFn work,
+ const transport::BatonHandle& baton,
+ Date_t when = {});
/**
* Moves the single callback in "wq" to the end of "queue". It is required that "wq" was
@@ -174,7 +179,7 @@ private:
stdx::unique_lock<stdx::mutex> _join(stdx::unique_lock<stdx::mutex> lk);
// The network interface used for remote command execution and waiting.
- std::unique_ptr<NetworkInterface> _net;
+ std::shared_ptr<NetworkInterface> _net;
// The thread pool that executes scheduled work items.
std::unique_ptr<ThreadPoolInterface> _pool;
diff --git a/src/mongo/s/async_requests_sender.cpp b/src/mongo/s/async_requests_sender.cpp
index 5e70f3630ff..d63429ff8e9 100644
--- a/src/mongo/s/async_requests_sender.cpp
+++ b/src/mongo/s/async_requests_sender.cpp
@@ -33,16 +33,22 @@
#include "mongo/s/async_requests_sender.h"
#include "mongo/client/remote_command_targeter.h"
+#include "mongo/db/server_parameters.h"
#include "mongo/executor/remote_command_request.h"
#include "mongo/rpc/get_status_from_command_result.h"
#include "mongo/s/client/shard_registry.h"
#include "mongo/s/grid.h"
#include "mongo/stdx/memory.h"
+#include "mongo/transport/baton.h"
+#include "mongo/transport/transport_layer.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/log.h"
#include "mongo/util/scopeguard.h"
namespace mongo {
+
+MONGO_EXPORT_SERVER_PARAMETER(AsyncRequestsSenderUseBaton, bool, true);
+
namespace {
// Maximum number of retries for network and replication notMaster errors (per host).
@@ -58,6 +64,7 @@ AsyncRequestsSender::AsyncRequestsSender(OperationContext* opCtx,
Shard::RetryPolicy retryPolicy)
: _opCtx(opCtx),
_executor(executor),
+ _baton(opCtx),
_db(dbName.toString()),
_readPreference(readPreference),
_retryPolicy(retryPolicy) {
@@ -71,6 +78,7 @@ AsyncRequestsSender::AsyncRequestsSender(OperationContext* opCtx,
// Schedule the requests immediately.
_scheduleRequests();
}
+
AsyncRequestsSender::~AsyncRequestsSender() {
_cancelPendingRequests();
@@ -90,7 +98,7 @@ AsyncRequestsSender::Response AsyncRequestsSender::next() {
// Otherwise, wait for some response to be received.
if (_interruptStatus.isOK()) {
try {
- _handleResponse(_responseQueue.pop(_opCtx));
+ _makeProgress(_opCtx);
} catch (const AssertionException& ex) {
// If the operation is interrupted, we cancel outstanding requests and switch to
// waiting for the (canceled) callbacks to finish without checking for interrupts.
@@ -99,7 +107,7 @@ AsyncRequestsSender::Response AsyncRequestsSender::next() {
continue;
}
} else {
- _handleResponse(_responseQueue.pop());
+ _makeProgress(nullptr);
}
}
return *readyResponse;
@@ -130,6 +138,11 @@ boost::optional<AsyncRequestsSender::Response> AsyncRequestsSender::_ready() {
_scheduleRequests();
}
+ // If we have baton requests, we want to process those before proceeding
+ if (_batonRequests) {
+ return boost::none;
+ }
+
// Check if any remote is ready.
invariant(!_remotes.empty());
for (auto& remote : _remotes) {
@@ -200,6 +213,12 @@ void AsyncRequestsSender::_scheduleRequests() {
auto scheduleStatus = _scheduleRequest(i);
if (!scheduleStatus.isOK()) {
remote.swResponse = std::move(scheduleStatus);
+
+ if (_baton) {
+ _batonRequests++;
+ _baton->schedule([this] { _batonRequests--; });
+ }
+
// Push a noop response to the queue to indicate that a remote is ready for
// re-processing due to failure.
_responseQueue.push(boost::none);
@@ -214,7 +233,7 @@ Status AsyncRequestsSender::_scheduleRequest(size_t remoteIndex) {
invariant(!remote.cbHandle.isValid());
invariant(!remote.swResponse);
- Status resolveStatus = remote.resolveShardIdToHostAndPort(_readPreference);
+ Status resolveStatus = remote.resolveShardIdToHostAndPort(this, _readPreference);
if (!resolveStatus.isOK()) {
return resolveStatus;
}
@@ -225,8 +244,14 @@ Status AsyncRequestsSender::_scheduleRequest(size_t remoteIndex) {
auto callbackStatus = _executor->scheduleRemoteCommand(
request,
[remoteIndex, this](const executor::TaskExecutor::RemoteCommandCallbackArgs& cbData) {
+ if (_baton) {
+ _batonRequests++;
+ _baton->schedule([this] { _batonRequests--; });
+ }
+
_responseQueue.push(Job{cbData, remoteIndex});
- });
+ },
+ _baton);
if (!callbackStatus.isOK()) {
return callbackStatus.getStatus();
}
@@ -235,7 +260,21 @@ Status AsyncRequestsSender::_scheduleRequest(size_t remoteIndex) {
return Status::OK();
}
-void AsyncRequestsSender::_handleResponse(boost::optional<Job> job) {
+void AsyncRequestsSender::_makeProgress(OperationContext* opCtx) {
+ boost::optional<Job> job;
+
+ if (_baton) {
+ // If we're using a baton, we peek the queue, and block on the baton if it's empty
+ if (boost::optional<boost::optional<Job>> tryJob = _responseQueue.tryPop()) {
+ job = std::move(*tryJob);
+ } else {
+ _baton->run(_opCtx, boost::none);
+ }
+ } else {
+ // Otherwise we block on the queue
+ job = _opCtx ? _responseQueue.pop(_opCtx) : _responseQueue.pop();
+ }
+
if (!job) {
return;
}
@@ -274,14 +313,57 @@ AsyncRequestsSender::RemoteData::RemoteData(ShardId shardId, BSONObj cmdObj)
: shardId(std::move(shardId)), cmdObj(std::move(cmdObj)) {}
Status AsyncRequestsSender::RemoteData::resolveShardIdToHostAndPort(
- const ReadPreferenceSetting& readPref) {
+ AsyncRequestsSender* ars, const ReadPreferenceSetting& readPref) {
const auto shard = getShard();
if (!shard) {
return Status(ErrorCodes::ShardNotFound,
str::stream() << "Could not find shard " << shardId);
}
- auto findHostStatus = shard->getTargeter()->findHostWithMaxWait(readPref, Seconds{20});
+ auto clock = ars->_opCtx->getServiceContext()->getFastClockSource();
+
+ auto deadline = clock->now() + Seconds(20);
+
+ auto targeter = shard->getTargeter();
+
+ auto findHostStatus = [&] {
+ // If we don't have a baton, just go ahead and block in targeting
+ if (!ars->_baton) {
+ return targeter->findHostWithMaxWait(readPref, Seconds{20});
+ }
+
+ // If we do have a baton, and we can target quickly, just do that
+ {
+ auto findHostStatus = targeter->findHostNoWait(readPref);
+ if (findHostStatus.isOK()) {
+ return findHostStatus;
+ }
+ }
+
+ // If it's going to take a while to target, we spin up a background thread to do our
+ // targeting, while running the baton on the calling thread. This allows us to make forward
+ // progress on previous requests.
+ Promise<HostAndPort> promise;
+ auto future = promise.getFuture();
+
+ ars->_batonRequests++;
+ stdx::thread bgChecker([&] {
+ promise.setWith(
+ [&] { return targeter->findHostWithMaxWait(readPref, deadline - clock->now()); });
+
+ ars->_baton->schedule([ars] { ars->_batonRequests--; });
+ });
+ const auto guard = MakeGuard([&] { bgChecker.join(); });
+
+ while (!future.isReady()) {
+ if (!ars->_baton->run(nullptr, deadline)) {
+ break;
+ }
+ }
+
+ return future.getNoThrow();
+ }();
+
if (!findHostStatus.isOK()) {
return findHostStatus.getStatus();
}
@@ -296,4 +378,17 @@ std::shared_ptr<Shard> AsyncRequestsSender::RemoteData::getShard() {
return grid.shardRegistry()->getShardNoReload(shardId);
}
+AsyncRequestsSender::BatonDetacher::BatonDetacher(OperationContext* opCtx)
+ : _baton(AsyncRequestsSenderUseBaton.load()
+ ? (opCtx->getServiceContext()->getTransportLayer()
+ ? opCtx->getServiceContext()->getTransportLayer()->makeBaton(opCtx)
+ : nullptr)
+ : nullptr) {}
+
+AsyncRequestsSender::BatonDetacher::~BatonDetacher() {
+ if (_baton) {
+ _baton->detach();
+ }
+}
+
} // namespace mongo
diff --git a/src/mongo/s/async_requests_sender.h b/src/mongo/s/async_requests_sender.h
index 2371ee988bb..dfb053c977e 100644
--- a/src/mongo/s/async_requests_sender.h
+++ b/src/mongo/s/async_requests_sender.h
@@ -171,7 +171,8 @@ private:
/**
* Given a read preference, selects a host on which the command should be run.
*/
- Status resolveShardIdToHostAndPort(const ReadPreferenceSetting& readPref);
+ Status resolveShardIdToHostAndPort(AsyncRequestsSender* ars,
+ const ReadPreferenceSetting& readPref);
/**
* Returns the Shard object associated with this remote.
@@ -203,7 +204,7 @@ private:
};
/**
- * Job for _handleResponse. We use a producer consumer queue to coordinate with TaskExecutors
+ * Job for _makeProgress. We use a producer consumer queue to coordinate with TaskExecutors
* off thread, and this wraps up the arguments for that call.
*/
struct Job {
@@ -212,6 +213,38 @@ private:
};
/**
+ * We have to make sure to detach the baton if we throw in construction. We also need a baton
+ * that lives longer than this type (because it can end up in callbacks that won't actually
+ * modify it).
+ *
+ * TODO: work out actual lifetime semantics for a baton. For now, leaving this as a wort in ARS
+ */
+ class BatonDetacher {
+ public:
+ explicit BatonDetacher(OperationContext* opCtx);
+ ~BatonDetacher();
+
+ transport::Baton& operator*() const {
+ return *_baton;
+ }
+
+ transport::Baton* operator->() const noexcept {
+ return _baton.get();
+ }
+
+ operator transport::BatonHandle() const {
+ return _baton;
+ }
+
+ explicit operator bool() const noexcept {
+ return static_cast<bool>(_baton);
+ }
+
+ private:
+ transport::BatonHandle _baton;
+ };
+
+ /**
* Cancels all outstanding requests on the TaskExecutor and sets the _stopRetrying flag.
*/
void _cancelPendingRequests();
@@ -246,17 +279,19 @@ private:
Status _scheduleRequest(size_t remoteIndex);
/**
- * The callback for a remote command.
+ * Waits for forward progress in gathering responses from a remote.
*
- * If the job is not set, we've failed targeting and calling this function is a noop.
+ * If the opCtx is non-null, use it while waiting on completion.
*
* Stores the response or error in the remote.
*/
- void _handleResponse(boost::optional<Job> job);
+ void _makeProgress(OperationContext* opCtx);
OperationContext* _opCtx;
executor::TaskExecutor* _executor;
+ BatonDetacher _baton;
+ size_t _batonRequests = 0;
// The metadata obj to pass along with the command remote. Used to indicate that the command is
// ok to run on secondaries.
diff --git a/src/mongo/s/sharding_egress_metadata_hook_for_mongos.cpp b/src/mongo/s/sharding_egress_metadata_hook_for_mongos.cpp
index e64860f2513..1582cb70291 100644
--- a/src/mongo/s/sharding_egress_metadata_hook_for_mongos.cpp
+++ b/src/mongo/s/sharding_egress_metadata_hook_for_mongos.cpp
@@ -42,40 +42,7 @@ namespace mongo {
namespace rpc {
void ShardingEgressMetadataHookForMongos::_saveGLEStats(const BSONObj& metadata,
- StringData hostString) {
- if (!haveClient()) {
- // Client will be present only when write commands are used.
- return;
- }
-
- auto swShardingMetadata = rpc::ShardingMetadata::readFromMetadata(metadata);
- if (swShardingMetadata.getStatus() == ErrorCodes::NoSuchKey) {
- return;
- } else if (!swShardingMetadata.isOK()) {
- warning() << "Got invalid sharding metadata " << redact(swShardingMetadata.getStatus())
- << " metadata object was '" << redact(metadata) << "'";
- return;
- }
-
- auto shardConn = ConnectionString::parse(hostString.toString());
-
- // If we got the reply from this host, we expect that its 'hostString' must be valid.
- if (!shardConn.isOK()) {
- severe() << "got bad host string in saveGLEStats: " << hostString;
- }
- invariant(shardConn.getStatus());
-
- auto shardingMetadata = std::move(swShardingMetadata.getValue());
-
- auto& clientInfo = cc();
- LOG(4) << "saveGLEStats lastOpTime:" << shardingMetadata.getLastOpTime()
- << " electionId:" << shardingMetadata.getLastElectionId();
-
- ClusterLastErrorInfo::get(clientInfo)
- ->addHostOpTime(
- shardConn.getValue(),
- HostOpTime(shardingMetadata.getLastOpTime(), shardingMetadata.getLastElectionId()));
-}
+ StringData hostString) {}
repl::OpTime ShardingEgressMetadataHookForMongos::_getConfigServerOpTime() {
return Grid::get(_serviceContext)->configOpTime();
diff --git a/src/mongo/s/sharding_initialization.cpp b/src/mongo/s/sharding_initialization.cpp
index 1bb4215fa2c..7b595826256 100644
--- a/src/mongo/s/sharding_initialization.cpp
+++ b/src/mongo/s/sharding_initialization.cpp
@@ -130,11 +130,11 @@ std::unique_ptr<TaskExecutorPool> makeShardingTaskExecutorPool(
const auto poolSize = taskExecutorPoolSize.value_or(TaskExecutorPool::getSuggestedPoolSize());
for (size_t i = 0; i < poolSize; ++i) {
- auto exec = makeShardingTaskExecutor(executor::makeNetworkInterface(
- "NetworkInterfaceASIO-TaskExecutorPool-" + std::to_string(i),
- stdx::make_unique<ShardingNetworkConnectionHook>(),
- metadataHookBuilder(),
- connPoolOptions));
+ auto exec = makeShardingTaskExecutor(
+ executor::makeNetworkInterface("TaskExecutorPool-" + std::to_string(i),
+ stdx::make_unique<ShardingNetworkConnectionHook>(),
+ metadataHookBuilder(),
+ connPoolOptions));
executors.emplace_back(std::move(exec));
}
@@ -219,7 +219,7 @@ Status initializeGlobalShardingState(OperationContext* opCtx,
}
auto network =
- executor::makeNetworkInterface("NetworkInterfaceASIO-ShardRegistry",
+ executor::makeNetworkInterface("ShardRegistry",
stdx::make_unique<ShardingNetworkConnectionHook>(),
hookBuilder(),
connPoolOptions);
diff --git a/src/mongo/s/sharding_task_executor.cpp b/src/mongo/s/sharding_task_executor.cpp
index ee15fdf3a6b..e7401272645 100644
--- a/src/mongo/s/sharding_task_executor.cpp
+++ b/src/mongo/s/sharding_task_executor.cpp
@@ -111,11 +111,13 @@ StatusWith<TaskExecutor::CallbackHandle> ShardingTaskExecutor::scheduleWorkAt(
}
StatusWith<TaskExecutor::CallbackHandle> ShardingTaskExecutor::scheduleRemoteCommand(
- const RemoteCommandRequest& request, const RemoteCommandCallbackFn& cb) {
+ const RemoteCommandRequest& request,
+ const RemoteCommandCallbackFn& cb,
+ const transport::BatonHandle& baton) {
// schedule the user's callback if there is not opCtx
if (!request.opCtx) {
- return _executor->scheduleRemoteCommand(request, cb);
+ return _executor->scheduleRemoteCommand(request, cb, baton);
}
boost::optional<RemoteCommandRequest> newRequest;
@@ -201,7 +203,7 @@ StatusWith<TaskExecutor::CallbackHandle> ShardingTaskExecutor::scheduleRemoteCom
}
};
- return _executor->scheduleRemoteCommand(newRequest ? *newRequest : request, shardingCb);
+ return _executor->scheduleRemoteCommand(newRequest ? *newRequest : request, shardingCb, baton);
}
void ShardingTaskExecutor::cancel(const CallbackHandle& cbHandle) {
diff --git a/src/mongo/s/sharding_task_executor.h b/src/mongo/s/sharding_task_executor.h
index 4c2571c684a..92a78169549 100644
--- a/src/mongo/s/sharding_task_executor.h
+++ b/src/mongo/s/sharding_task_executor.h
@@ -67,8 +67,10 @@ public:
Date_t deadline) override;
StatusWith<CallbackHandle> scheduleWork(const CallbackFn& work) override;
StatusWith<CallbackHandle> scheduleWorkAt(Date_t when, const CallbackFn& work) override;
- StatusWith<CallbackHandle> scheduleRemoteCommand(const RemoteCommandRequest& request,
- const RemoteCommandCallbackFn& cb) override;
+ StatusWith<CallbackHandle> scheduleRemoteCommand(
+ const RemoteCommandRequest& request,
+ const RemoteCommandCallbackFn& cb,
+ const transport::BatonHandle& baton = nullptr) override;
void cancel(const CallbackHandle& cbHandle) override;
void wait(const CallbackHandle& cbHandle) override;
diff --git a/src/mongo/transport/baton.h b/src/mongo/transport/baton.h
new file mode 100644
index 00000000000..59c2d7c06ab
--- /dev/null
+++ b/src/mongo/transport/baton.h
@@ -0,0 +1,133 @@
+/**
+ * Copyright (C) 2018 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#pragma once
+
+#include <memory>
+
+#include "mongo/stdx/functional.h"
+#include "mongo/transport/transport_layer.h"
+#include "mongo/util/future.h"
+#include "mongo/util/time_support.h"
+
+namespace mongo {
+
+class OperationContext;
+
+namespace transport {
+
+class TransportLayer;
+class Session;
+class ReactorTimer;
+
+/**
+ * A Baton is basically a networking reactor, with limited functionality and no forward progress
+ * guarantees. Rather than asynchronously running tasks through one, the baton records the intent
+ * of those tasks and defers waiting and execution to a later call to run();
+ *
+ * Baton's provide a mechanism to allow consumers of a transport layer to execute IO themselves,
+ * rather than having this occur on another thread. This can improve performance by minimizing
+ * context switches, as well as improving the readability of stack traces by grounding async
+ * execution on top of a regular client call stack.
+ */
+class Baton {
+public:
+ virtual ~Baton() = default;
+
+ /**
+ * Detaches a baton from an associated opCtx.
+ */
+ virtual void detach() = 0;
+
+ /**
+ * Executes a callback on the baton via schedule. Returns a future which will execute on the
+ * baton runner.
+ */
+ template <typename Callback>
+ Future<FutureContinuationResult<Callback>> execute(Callback&& cb) {
+ Promise<FutureContinuationResult<Callback>> promise;
+ auto future = promise.getFuture();
+
+ schedule([ cb = std::forward<Callback>(cb), sp = promise.share() ]() mutable {
+ sp.setWith(std::move(cb));
+ });
+
+ return future;
+ }
+
+ /**
+ * Executes a callback on the baton.
+ */
+ virtual void schedule(stdx::function<void()> func) = 0;
+
+ /**
+ * Adds a session, returning a future which activates on read/write-ability of the session.
+ */
+ enum class Type {
+ In,
+ Out,
+ };
+ virtual Future<void> addSession(Session& session, Type type) = 0;
+
+ /**
+ * Adds a timer, returning a future which activates after a duration.
+ */
+ virtual Future<void> waitFor(const ReactorTimer& timer, Milliseconds timeout) = 0;
+
+ /**
+ * Adds a timer, returning a future which activates after a deadline.
+ */
+ virtual Future<void> waitUntil(const ReactorTimer& timer, Date_t expiration) = 0;
+
+ /**
+ * Cancels waiting on a session.
+ *
+ * Returns true if the session was in the baton to be cancelled.
+ */
+ virtual bool cancelSession(Session& session) = 0;
+
+ /**
+ * Cancels waiting on a timer
+ *
+ * Returns true if the timer was in the baton to be cancelled.
+ */
+ virtual bool cancelTimer(const ReactorTimer& timer) = 0;
+
+ /**
+ * Runs the baton. This blocks, waiting for networking events or timeouts, and fulfills
+ * promises and executes scheduled work.
+ *
+ * Returns false if the optional deadline has passed
+ */
+ virtual bool run(OperationContext* opCtx, boost::optional<Date_t> deadline) = 0;
+};
+
+using BatonHandle = std::shared_ptr<Baton>;
+
+} // namespace transport
+} // namespace mongo
diff --git a/src/mongo/transport/baton_asio_linux.h b/src/mongo/transport/baton_asio_linux.h
new file mode 100644
index 00000000000..65d894dbea7
--- /dev/null
+++ b/src/mongo/transport/baton_asio_linux.h
@@ -0,0 +1,424 @@
+/**
+ * Copyright (C) 2018 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#pragma once
+
+#include <memory>
+#include <set>
+#include <vector>
+
+#include <poll.h>
+#include <sys/eventfd.h>
+
+#include "mongo/base/checked_cast.h"
+#include "mongo/db/operation_context.h"
+#include "mongo/stdx/mutex.h"
+#include "mongo/stdx/unordered_map.h"
+#include "mongo/transport/baton.h"
+#include "mongo/transport/session_asio.h"
+#include "mongo/util/errno_util.h"
+#include "mongo/util/future.h"
+#include "mongo/util/time_support.h"
+
+namespace mongo {
+namespace transport {
+
+/**
+ * TransportLayerASIO Baton implementation for linux.
+ *
+ * We implement our networking reactor on top of poll + eventfd for wakeups
+ */
+class TransportLayerASIO::BatonASIO : public Baton {
+
+ /**
+ * RAII type that wraps up an eventfd and reading/writing to it. We don't actually need the
+ * counter portion, just the notify/wakeup
+ */
+ struct EventFDHolder {
+ EventFDHolder() : fd(::eventfd(0, EFD_CLOEXEC)) {
+ if (fd < 0) {
+ severe() << "error in eventfd: " << errnoWithDescription(errno);
+ fassertFailed(50833);
+ }
+ }
+
+ ~EventFDHolder() {
+ ::close(fd);
+ }
+
+ // Writes to the underlying eventfd
+ void notify() {
+ while (true) {
+ if (eventfd_write(fd, 1) == 0) {
+ break;
+ }
+
+ invariant(errno == EINTR);
+ }
+ }
+
+ void wait() {
+ while (true) {
+ // If we have activity on the eventfd, pull the count out
+ uint64_t u;
+ if (::eventfd_read(fd, &u) == 0) {
+ break;
+ }
+
+ invariant(errno == EINTR);
+ }
+ }
+
+ const int fd;
+ };
+
+public:
+ BatonASIO(OperationContext* opCtx) : _opCtx(opCtx) {}
+
+ ~BatonASIO() {
+ invariant(!_opCtx);
+ invariant(_sessions.empty());
+ invariant(_scheduled.empty());
+ invariant(_timers.empty());
+ }
+
+ void detach() override {
+ {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ invariant(_sessions.empty());
+ invariant(_scheduled.empty());
+ invariant(_timers.empty());
+ }
+
+ {
+ stdx::lock_guard<Client> lk(*_opCtx->getClient());
+ invariant(_opCtx->getBaton().get() == this);
+ _opCtx->setBaton(nullptr);
+ }
+
+ _opCtx = nullptr;
+ }
+
+ Future<void> addSession(Session& session, Type type) override {
+ auto fd = checked_cast<ASIOSession&>(session).getSocket().native_handle();
+
+ Promise<void> promise;
+ auto out = promise.getFuture();
+
+ _safeExecute([ fd, type, sp = promise.share(), this ] {
+ _sessions[fd] = TransportSession{type, sp};
+ });
+
+ return out;
+ }
+
+ Future<void> waitFor(const ReactorTimer& timer, Milliseconds timeout) override {
+ return waitUntil(timer, Date_t::now() + timeout);
+ }
+
+ Future<void> waitUntil(const ReactorTimer& timer, Date_t expiration) override {
+ Promise<void> promise;
+ auto out = promise.getFuture();
+
+ _safeExecute([ timerPtr = &timer, expiration, sp = promise.share(), this ] {
+ auto pair = _timers.insert({
+ timerPtr, expiration, sp,
+ });
+ invariant(pair.second);
+ _timersById[pair.first->id] = pair.first;
+ });
+
+ return out;
+ }
+
+ bool cancelSession(Session& session) override {
+ const auto fd = checked_cast<ASIOSession&>(session).getSocket().native_handle();
+
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+
+ if (_sessions.find(fd) == _sessions.end()) {
+ return false;
+ }
+
+ // TODO: There's an ABA issue here with fds where between previously and before we could
+ // have removed the fd, then opened and added a new socket with the same fd. We need to
+ // solve it via using session id's for handles.
+ _safeExecute(std::move(lk), [fd, this] { _sessions.erase(fd); });
+
+ return true;
+ }
+
+ bool cancelTimer(const ReactorTimer& timer) override {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+
+ if (_timersById.find(&timer) == _timersById.end()) {
+ return false;
+ }
+
+ // TODO: Same ABA issue as above, but for pointers.
+ _safeExecute(std::move(lk), [ timerPtr = &timer, this ] {
+ auto iter = _timersById.find(timerPtr);
+
+ if (iter != _timersById.end()) {
+ _timers.erase(iter->second);
+ _timersById.erase(iter);
+ }
+ });
+
+ return true;
+ }
+
+ void schedule(stdx::function<void()> func) override {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+
+ _scheduled.push_back(std::move(func));
+
+ if (_inPoll) {
+ _efd.notify();
+ }
+ }
+
+ bool run(OperationContext* opCtx, boost::optional<Date_t> deadline) override {
+ std::vector<SharedPromise<void>> toFulfill;
+
+ // We'll fulfill promises and run jobs on the way out, ensuring we don't hold any locks
+ const auto guard = MakeGuard([&] {
+ for (auto& promise : toFulfill) {
+ promise.emplaceValue();
+ }
+
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ while (_scheduled.size()) {
+ decltype(_scheduled) toRun;
+ {
+ using std::swap;
+ swap(_scheduled, toRun);
+ }
+
+ lk.unlock();
+ for (auto& job : toRun) {
+ job();
+ }
+ lk.lock();
+ }
+ });
+
+ bool eventfdFired = false;
+
+ // Note that it's important to check for interrupt without the lock, because markKilled
+ // calls schedule, which will deadlock if we're holding the lock when calling this.
+ if (opCtx) {
+ opCtx->checkForInterrupt();
+ }
+
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ if (opCtx) {
+ invariant(opCtx == _opCtx);
+ }
+
+ auto now = Date_t::now();
+
+ // If our deadline has passed, return that we've already failed
+ if (deadline && *deadline <= now) {
+ return false;
+ }
+
+ // If anything was scheduled, run it now. No need to poll
+ if (_scheduled.size()) {
+ return true;
+ }
+
+ boost::optional<Milliseconds> timeout;
+
+ // If we have a timer, poll no longer than that
+ if (_timers.size()) {
+ timeout = _timers.begin()->expiration - now;
+ }
+
+ if (deadline) {
+ auto deadlineTimeout = *deadline - now;
+
+ // If we didn't have a timer with a deadline, or our deadline is sooner than that
+ // timer
+ if (!timeout || (deadlineTimeout < *timeout)) {
+ timeout = deadlineTimeout;
+ }
+ }
+
+ std::vector<decltype(_sessions)::iterator> sessions;
+ sessions.reserve(_sessions.size());
+ std::vector<pollfd> pollSet;
+ pollSet.reserve(_sessions.size() + 1);
+
+ pollSet.push_back(pollfd{_efd.fd, POLLIN, 0});
+
+ for (auto iter = _sessions.begin(); iter != _sessions.end(); ++iter) {
+ pollSet.push_back(
+ pollfd{iter->first,
+ static_cast<short>(iter->second.type == Type::In ? POLLIN : POLLOUT),
+ 0});
+ sessions.push_back(iter);
+ }
+
+ int rval = 0;
+ // If we don't have a timeout, or we have a timeout that's unexpired, run poll.
+ if (!timeout || (*timeout > Milliseconds(0))) {
+ _inPoll = true;
+ lk.unlock();
+ rval =
+ ::poll(pollSet.data(), pollSet.size(), timeout.value_or(Milliseconds(-1)).count());
+
+ const auto pollGuard = MakeGuard([&] {
+ lk.lock();
+ _inPoll = false;
+ });
+
+ // If poll failed, it better be in EINTR
+ if (rval < 0 && errno != EINTR) {
+ severe() << "error in poll: " << errnoWithDescription(errno);
+ fassertFailed(50834);
+ }
+
+ // Note that it's important to check for interrupt without the lock, because markKilled
+ // calls schedule, which will deadlock if we're holding the lock when calling this.
+ if (opCtx) {
+ opCtx->checkForInterrupt();
+ }
+ }
+
+ now = Date_t::now();
+
+ // If our deadline passed while in poll, we've failed
+ if (deadline && now > *deadline) {
+ return false;
+ }
+
+ // Fire expired timers
+ for (auto iter = _timers.begin(); iter != _timers.end() && iter->expiration < now;) {
+ toFulfill.push_back(std::move(iter->promise));
+ _timersById.erase(iter->id);
+ iter = _timers.erase(iter);
+ }
+
+ // If poll found some activity
+ if (rval > 0) {
+ size_t remaining = rval;
+
+ auto pollIter = pollSet.begin();
+
+ if (pollIter->revents) {
+ _efd.wait();
+ eventfdFired = true;
+
+ remaining--;
+ }
+
+ ++pollIter;
+ for (auto sessionIter = sessions.begin(); sessionIter != sessions.end() && remaining;
+ ++sessionIter, ++pollIter) {
+ if (pollIter->revents) {
+ toFulfill.push_back(std::move((*sessionIter)->second.promise));
+ _sessions.erase(*sessionIter);
+
+ remaining--;
+ }
+ }
+
+ invariant(remaining == 0);
+ }
+
+ // If we got here, we should have done something
+ invariant(toFulfill.size() || _scheduled.size() || eventfdFired);
+
+ return true;
+ }
+
+private:
+ struct Timer {
+ const ReactorTimer* id;
+ Date_t expiration;
+ SharedPromise<void> promise;
+
+ struct LessThan {
+ bool operator()(const Timer& lhs, const Timer& rhs) const {
+ return std::tie(lhs.expiration, lhs.id) < std::tie(rhs.expiration, rhs.id);
+ }
+ };
+ };
+
+ struct TransportSession {
+ Type type;
+ SharedPromise<void> promise;
+ };
+
+ template <typename Callback>
+ void _safeExecute(Callback&& cb) {
+ return _safeExecute(stdx::unique_lock<stdx::mutex>(_mutex), std::forward<Callback>(cb));
+ }
+
+ /**
+ * Safely executes method on the reactor. If we're in poll, we schedule a task, then write to
+ * the eventfd. If not, we run inline.
+ */
+ template <typename Callback>
+ void _safeExecute(stdx::unique_lock<stdx::mutex> lk, Callback&& cb) {
+ if (_inPoll) {
+ _scheduled.push_back([cb, this] {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ cb();
+ });
+
+ _efd.notify();
+ } else {
+ cb();
+ }
+ }
+
+ stdx::mutex _mutex;
+
+ OperationContext* _opCtx;
+
+ bool _inPoll = false;
+
+ EventFDHolder _efd;
+
+ // This map stores the sessions we need to poll on. We unwind it into a pollset for every
+ // blocking call to run
+ stdx::unordered_map<int, TransportSession> _sessions;
+
+ // The set is used to find the next timer which will fire. The unordered_map looks up the
+ // timers so we can remove them in O(1)
+ std::set<Timer, Timer::LessThan> _timers;
+ stdx::unordered_map<const ReactorTimer*, decltype(_timers)::const_iterator> _timersById;
+
+ // For tasks that come in via schedule. Or that were deferred because we were in poll
+ std::vector<std::function<void()>> _scheduled;
+};
+
+} // namespace transport
+} // namespace mongo
diff --git a/src/mongo/transport/mock_session.h b/src/mongo/transport/mock_session.h
index 60e6a4e33e2..0620650ce35 100644
--- a/src/mongo/transport/mock_session.h
+++ b/src/mongo/transport/mock_session.h
@@ -83,7 +83,7 @@ public:
return Message(); // Subclasses can do something different.
}
- Future<Message> asyncSourceMessage() override {
+ Future<Message> asyncSourceMessage(const transport::BatonHandle& handle = nullptr) override {
return Future<Message>::makeReady(sourceMessage());
}
@@ -99,11 +99,12 @@ public:
return Status::OK();
}
- Future<void> asyncSinkMessage(Message message) override {
+ Future<void> asyncSinkMessage(Message message,
+ const transport::BatonHandle& handle = nullptr) override {
return Future<void>::makeReady(sinkMessage(message));
}
- void cancelAsyncOperations() override {}
+ void cancelAsyncOperations(const transport::BatonHandle& handle = nullptr) override {}
void setTimeout(boost::optional<Milliseconds>) override {}
diff --git a/src/mongo/transport/session.h b/src/mongo/transport/session.h
index bfe45a6cbe2..304c8f11da4 100644
--- a/src/mongo/transport/session.h
+++ b/src/mongo/transport/session.h
@@ -44,6 +44,8 @@ namespace transport {
class TransportLayer;
class Session;
+class Baton;
+using BatonHandle = std::shared_ptr<Baton>;
using SessionHandle = std::shared_ptr<Session>;
using ConstSessionHandle = std::shared_ptr<const Session>;
@@ -103,7 +105,7 @@ public:
* Source (receive) a new Message from the remote host for this Session.
*/
virtual StatusWith<Message> sourceMessage() = 0;
- virtual Future<Message> asyncSourceMessage() = 0;
+ virtual Future<Message> asyncSourceMessage(const transport::BatonHandle& handle = nullptr) = 0;
/**
* Sink (send) a Message to the remote host for this Session.
@@ -111,14 +113,15 @@ public:
* Async version will keep the buffer alive until the operation completes.
*/
virtual Status sinkMessage(Message message) = 0;
- virtual Future<void> asyncSinkMessage(Message message) = 0;
+ virtual Future<void> asyncSinkMessage(Message message,
+ const transport::BatonHandle& handle = nullptr) = 0;
/**
* Cancel any outstanding async operations. There is no way to cancel synchronous calls.
* Futures will finish with an ErrorCodes::CallbackCancelled error if they haven't already
* completed.
*/
- virtual void cancelAsyncOperations() = 0;
+ virtual void cancelAsyncOperations(const transport::BatonHandle& handle = nullptr) = 0;
/**
* This should only be used to detect when the remote host has disappeared without
diff --git a/src/mongo/transport/session_asio.h b/src/mongo/transport/session_asio.h
index 994b0941b04..459cab76676 100644
--- a/src/mongo/transport/session_asio.h
+++ b/src/mongo/transport/session_asio.h
@@ -34,6 +34,7 @@
#include "mongo/config.h"
#include "mongo/db/stats/counters.h"
#include "mongo/transport/asio_utils.h"
+#include "mongo/transport/baton.h"
#include "mongo/transport/transport_layer_asio.h"
#include "mongo/util/net/sock.h"
#ifdef MONGO_CONFIG_SSL
@@ -58,6 +59,14 @@ auto futurize(const std::error_code& ec, SuccessValue&& successValue) {
return Result::makeReady(successValue);
}
+Future<void> futurize(const std::error_code& ec) {
+ using Result = Future<void>;
+ if (MONGO_unlikely(ec)) {
+ return Result::makeReady(errorCodeToStatus(ec));
+ }
+ return Result::makeReady();
+}
+
using GenericSocket = asio::generic::stream_protocol::socket;
class TransportLayerASIO::ASIOSession final : public Session {
@@ -117,34 +126,35 @@ public:
return sourceMessageImpl().getNoThrow();
}
- Future<Message> asyncSourceMessage() override {
+ Future<Message> asyncSourceMessage(const transport::BatonHandle& baton = nullptr) override {
ensureAsync();
- return sourceMessageImpl();
+ return sourceMessageImpl(baton);
}
Status sinkMessage(Message message) override {
ensureSync();
return write(asio::buffer(message.buf(), message.size()))
- .then([&message](size_t size) {
- invariant(size == size_t(message.size()));
- networkCounter.hitPhysicalOut(message.size());
- })
+ .then([&message] { networkCounter.hitPhysicalOut(message.size()); })
.getNoThrow();
}
- Future<void> asyncSinkMessage(Message message) override {
+ Future<void> asyncSinkMessage(Message message,
+ const transport::BatonHandle& baton = nullptr) override {
ensureAsync();
- return write(asio::buffer(message.buf(), message.size()))
- .then([message /*keep the buffer alive*/](size_t size) {
- invariant(size == size_t(message.size()));
+ return write(asio::buffer(message.buf(), message.size()), baton)
+ .then([message /*keep the buffer alive*/]() {
networkCounter.hitPhysicalOut(message.size());
});
}
- void cancelAsyncOperations() override {
+ void cancelAsyncOperations(const transport::BatonHandle& baton = nullptr) override {
LOG(3) << "Cancelling outstanding I/O operations on connection to " << _remote;
- getSocket().cancel();
+ if (baton) {
+ baton->cancelSession(*this);
+ } else {
+ getSocket().cancel();
+ }
}
void setTimeout(boost::optional<Milliseconds> timeout) override {
@@ -186,6 +196,7 @@ public:
protected:
friend class TransportLayerASIO;
+ friend TransportLayerASIO::BatonASIO;
#ifdef MONGO_CONFIG_SSL
Future<void> handshakeSSLForEgress(const HostAndPort& target) {
@@ -307,19 +318,17 @@ private:
return _socket;
}
- Future<Message> sourceMessageImpl() {
+ Future<Message> sourceMessageImpl(const transport::BatonHandle& baton = nullptr) {
static constexpr auto kHeaderSize = sizeof(MSGHEADER::Value);
auto headerBuffer = SharedBuffer::allocate(kHeaderSize);
auto ptr = headerBuffer.get();
- return read(asio::buffer(ptr, kHeaderSize))
- .then([ headerBuffer = std::move(headerBuffer), this ](size_t size) mutable {
- if (checkForHTTPRequest(asio::buffer(headerBuffer.get(), size))) {
- return sendHTTPResponse();
+ return read(asio::buffer(ptr, kHeaderSize), baton)
+ .then([ headerBuffer = std::move(headerBuffer), this, baton ]() mutable {
+ if (checkForHTTPRequest(asio::buffer(headerBuffer.get(), kHeaderSize))) {
+ return sendHTTPResponse(baton);
}
- invariant(size == kHeaderSize);
-
const auto msgLen = size_t(MSGHEADER::View(headerBuffer.get()).getMessageLength());
if (msgLen < kHeaderSize || msgLen > MaxMessageSizeBytes) {
StringBuilder sb;
@@ -331,7 +340,7 @@ private:
return Future<Message>::makeReady(Status(ErrorCodes::ProtocolError, str));
}
- if (msgLen == size) {
+ if (msgLen == kHeaderSize) {
// This probably isn't a real case since all (current) messages have bodies.
networkCounter.hitPhysicalIn(msgLen);
return Future<Message>::makeReady(Message(std::move(headerBuffer)));
@@ -341,8 +350,8 @@ private:
memcpy(buffer.get(), headerBuffer.get(), kHeaderSize);
MsgData::View msgView(buffer.get());
- return read(asio::buffer(msgView.data(), msgView.dataLen()))
- .then([ buffer = std::move(buffer), msgLen ](size_t size) mutable {
+ return read(asio::buffer(msgView.data(), msgView.dataLen()), baton)
+ .then([ buffer = std::move(buffer), msgLen ]() mutable {
networkCounter.hitPhysicalIn(msgLen);
return Message(std::move(buffer));
});
@@ -350,55 +359,59 @@ private:
}
template <typename MutableBufferSequence>
- Future<size_t> read(const MutableBufferSequence& buffers) {
+ Future<void> read(const MutableBufferSequence& buffers,
+ const transport::BatonHandle& baton = nullptr) {
#ifdef MONGO_CONFIG_SSL
if (_sslSocket) {
- return opportunisticRead(*_sslSocket, buffers);
+ return opportunisticRead(*_sslSocket, buffers, baton);
} else if (!_ranHandshake) {
invariant(asio::buffer_size(buffers) >= sizeof(MSGHEADER::Value));
- return opportunisticRead(_socket, buffers)
- .then([this, buffers](size_t size) mutable {
+ return opportunisticRead(_socket, buffers, baton)
+ .then([this, buffers]() mutable {
_ranHandshake = true;
return maybeHandshakeSSLForIngress(buffers);
})
- .then([this, buffers](bool needsRead) mutable {
+ .then([this, buffers, baton](bool needsRead) mutable {
if (needsRead) {
- return read(buffers);
+ return read(buffers, baton);
} else {
- return Future<size_t>::makeReady(asio::buffer_size(buffers));
+ return Future<void>::makeReady();
}
});
}
#endif
- return opportunisticRead(_socket, buffers);
+ return opportunisticRead(_socket, buffers, baton);
}
template <typename ConstBufferSequence>
- Future<size_t> write(const ConstBufferSequence& buffers) {
+ Future<void> write(const ConstBufferSequence& buffers,
+ const transport::BatonHandle& baton = nullptr) {
#ifdef MONGO_CONFIG_SSL
_ranHandshake = true;
if (_sslSocket) {
#ifdef __linux__
// We do some trickery in asio (see moreToSend), which appears to work well on linux,
// but fails on other platforms.
- return opportunisticWrite(*_sslSocket, buffers);
+ return opportunisticWrite(*_sslSocket, buffers, baton);
#else
if (_blockingMode == Async) {
// Opportunistic writes are broken for async egress SSL (switching between blocking
// and non-blocking mode corrupts the TLS exchange).
- return asio::async_write(*_sslSocket, buffers, UseFuture{});
+ return asio::async_write(*_sslSocket, buffers, UseFuture{}).ignoreValue();
} else {
- return opportunisticWrite(*_sslSocket, buffers);
+ return opportunisticWrite(*_sslSocket, buffers, baton);
}
#endif
}
#endif
- return opportunisticWrite(_socket, buffers);
+ return opportunisticWrite(_socket, buffers, baton);
}
template <typename Stream, typename MutableBufferSequence>
- Future<size_t> opportunisticRead(Stream& stream, const MutableBufferSequence& buffers) {
+ Future<void> opportunisticRead(Stream& stream,
+ const MutableBufferSequence& buffers,
+ const transport::BatonHandle& baton = nullptr) {
std::error_code ec;
auto size = asio::read(stream, buffers, ec);
if (((ec == asio::error::would_block) || (ec == asio::error::try_again)) &&
@@ -410,13 +423,17 @@ private:
if (size > 0) {
asyncBuffers += size;
}
- return asio::async_read(stream, asyncBuffers, UseFuture{})
- .then([size](size_t asyncSize) {
- // Add back in the size read opportunistically.
- return size + asyncSize;
- });
+
+ if (baton) {
+ return baton->addSession(*this, Baton::Type::In)
+ .then([&stream, asyncBuffers, baton, this] {
+ return opportunisticRead(stream, asyncBuffers, baton);
+ });
+ }
+
+ return asio::async_read(stream, asyncBuffers, UseFuture{}).ignoreValue();
} else {
- return futurize(ec, size);
+ return futurize(ec);
}
}
@@ -430,24 +447,21 @@ private:
* needs to keep sending).
*/
template <typename ConstBufferSequence>
- boost::optional<Future<size_t>> moreToSend(GenericSocket& socket,
- const ConstBufferSequence& buffers,
- size_t size) {
+ boost::optional<Future<void>> moreToSend(GenericSocket& socket,
+ const ConstBufferSequence& buffers,
+ const transport::BatonHandle& baton) {
return boost::none;
}
#ifdef MONGO_CONFIG_SSL
template <typename ConstBufferSequence>
- boost::optional<Future<size_t>> moreToSend(asio::ssl::stream<GenericSocket>& socket,
- const ConstBufferSequence& buffers,
- size_t sizeFromBefore) {
+ boost::optional<Future<void>> moreToSend(asio::ssl::stream<GenericSocket>& socket,
+ const ConstBufferSequence& buffers,
+ const BatonHandle& baton) {
if (_sslSocket->getCoreOutputBuffer().size()) {
- return opportunisticWrite(getSocket(), _sslSocket->getCoreOutputBuffer())
- .then([this, &socket, buffers, sizeFromBefore](size_t) {
- return opportunisticWrite(socket, buffers)
- .then([sizeFromBefore](size_t justWritten) {
- return justWritten + sizeFromBefore;
- });
+ return opportunisticWrite(getSocket(), _sslSocket->getCoreOutputBuffer(), baton)
+ .then([this, &socket, buffers, baton] {
+ return opportunisticWrite(socket, buffers, baton);
});
}
@@ -456,7 +470,9 @@ private:
#endif
template <typename Stream, typename ConstBufferSequence>
- Future<size_t> opportunisticWrite(Stream& stream, const ConstBufferSequence& buffers) {
+ Future<void> opportunisticWrite(Stream& stream,
+ const ConstBufferSequence& buffers,
+ const transport::BatonHandle& baton = nullptr) {
std::error_code ec;
auto size = asio::write(stream, buffers, ec);
if (((ec == asio::error::would_block) || (ec == asio::error::try_again)) &&
@@ -470,17 +486,20 @@ private:
asyncBuffers += size;
}
- if (auto more = moreToSend(stream, asyncBuffers, size)) {
+ if (auto more = moreToSend(stream, asyncBuffers, baton)) {
return std::move(*more);
}
- return asio::async_write(stream, asyncBuffers, UseFuture{})
- .then([size](size_t asyncSize) {
- // Add back in the size written opportunistically.
- return size + asyncSize;
- });
+ if (baton) {
+ return baton->addSession(*this, Baton::Type::Out)
+ .then([&stream, asyncBuffers, baton, this] {
+ return opportunisticWrite(stream, asyncBuffers, baton);
+ });
+ }
+
+ return asio::async_write(stream, asyncBuffers, UseFuture{}).ignoreValue();
} else {
- return futurize(ec, size);
+ return futurize(ec);
}
}
@@ -568,7 +587,7 @@ private:
// Called from read() to send an HTTP response back to a client that's trying to use HTTP
// over a native MongoDB port. This returns a Future<Message> to match its only caller, but it
// always contains an error, so it could really return Future<Anything>
- Future<Message> sendHTTPResponse() {
+ Future<Message> sendHTTPResponse(const BatonHandle& baton = nullptr) {
constexpr auto userMsg =
"It looks like you are trying to access MongoDB over HTTP"
" on the native driver port.\r\n"_sd;
@@ -580,17 +599,17 @@ private:
<< userMsg.size() << "\r\n\r\n"
<< userMsg;
- return write(asio::buffer(httpResp.data(), httpResp.size()))
+ return write(asio::buffer(httpResp.data(), httpResp.size()), baton)
.onError(
[](const Status& status) {
- return StatusWith<size_t>(
+ return Status(
ErrorCodes::ProtocolError,
str::stream()
<< "Client sent an HTTP request over a native MongoDB connection, "
"but there was an error sending a response: "
<< status.toString());
})
- .then([](size_t size) {
+ .then([] {
return StatusWith<Message>(
ErrorCodes::ProtocolError,
"Client sent an HTTP request over a native MongoDB connection");
diff --git a/src/mongo/transport/transport_layer.h b/src/mongo/transport/transport_layer.h
index 492fbf82ea0..ab36f7954a3 100644
--- a/src/mongo/transport/transport_layer.h
+++ b/src/mongo/transport/transport_layer.h
@@ -38,6 +38,9 @@
#include "mongo/util/time_support.h"
namespace mongo {
+
+class OperationContext;
+
namespace transport {
enum ConnectSSLMode { kGlobalSSLMode, kEnableSSL, kDisableSSL };
@@ -103,6 +106,10 @@ public:
enum WhichReactor { kIngress, kEgress, kNewReactor };
virtual ReactorHandle getReactor(WhichReactor which) = 0;
+ virtual BatonHandle makeBaton(OperationContext* opCtx) {
+ return nullptr;
+ }
+
protected:
TransportLayer() = default;
};
@@ -124,15 +131,15 @@ public:
*
* If no future is outstanding, then this is a noop.
*/
- virtual void cancel() = 0;
+ virtual void cancel(const BatonHandle& baton = nullptr) = 0;
/*
* Returns a future that will be filled with Status::OK after the timeout has ellapsed.
*
* Calling this implicitly calls cancel().
*/
- virtual Future<void> waitFor(Milliseconds timeout) = 0;
- virtual Future<void> waitUntil(Date_t timeout) = 0;
+ virtual Future<void> waitFor(Milliseconds timeout, const BatonHandle& baton = nullptr) = 0;
+ virtual Future<void> waitUntil(Date_t timeout, const BatonHandle& baton = nullptr) = 0;
};
class Reactor {
diff --git a/src/mongo/transport/transport_layer_asio.cpp b/src/mongo/transport/transport_layer_asio.cpp
index 508e70f39e1..c194073d000 100644
--- a/src/mongo/transport/transport_layer_asio.cpp
+++ b/src/mongo/transport/transport_layer_asio.cpp
@@ -56,6 +56,9 @@
#endif
// session_asio.h has some header dependencies that require it to be the last header.
+#ifdef __linux__
+#include "mongo/transport/baton_asio_linux.h"
+#endif
#include "mongo/transport/session_asio.h"
namespace mongo {
@@ -72,7 +75,7 @@ public:
cancel();
}
- void cancel() override {
+ void cancel(const BatonHandle& baton = nullptr) override {
auto promise = [&] {
stdx::lock_guard<stdx::mutex> lk(_timerState->mutex);
_timerState->generation++;
@@ -86,18 +89,40 @@ public:
promise.setError({ErrorCodes::CallbackCanceled, "Timer was canceled"});
});
}
- _timerState->timer.cancel();
+
+ if (!(baton && baton->cancelTimer(*this))) {
+ _timerState->timer.cancel();
+ }
}
- Future<void> waitFor(Milliseconds timeout) override {
- return _asyncWait([&] { _timerState->timer.expires_after(timeout.toSystemDuration()); });
+ Future<void> waitFor(Milliseconds timeout, const BatonHandle& baton = nullptr) override {
+ if (baton) {
+ return _asyncWait([&] { return baton->waitFor(*this, timeout); }, baton);
+ } else {
+ return _asyncWait(
+ [&] { _timerState->timer.expires_after(timeout.toSystemDuration()); });
+ }
}
- Future<void> waitUntil(Date_t expiration) override {
- return _asyncWait([&] { _timerState->timer.expires_at(expiration.toSystemTimePoint()); });
+ Future<void> waitUntil(Date_t expiration, const BatonHandle& baton = nullptr) override {
+ if (baton) {
+ return _asyncWait([&] { return baton->waitUntil(*this, expiration); }, baton);
+ } else {
+ return _asyncWait(
+ [&] { _timerState->timer.expires_at(expiration.toSystemTimePoint()); });
+ }
}
private:
+ std::pair<Future<void>, uint64_t> _getFuture() {
+ stdx::lock_guard<stdx::mutex> lk(_timerState->mutex);
+ auto id = ++_timerState->generation;
+ invariant(!_timerState->finalPromise);
+ _timerState->finalPromise = std::make_unique<Promise<void>>();
+ auto future = _timerState->finalPromise->getFuture();
+ return std::make_pair(std::move(future), id);
+ }
+
template <typename ArmTimerCb>
Future<void> _asyncWait(ArmTimerCb&& armTimer) {
try {
@@ -105,14 +130,7 @@ private:
Future<void> ret;
uint64_t id;
- std::tie(ret, id) = [&] {
- stdx::lock_guard<stdx::mutex> lk(_timerState->mutex);
- auto id = ++_timerState->generation;
- invariant(!_timerState->finalPromise);
- _timerState->finalPromise = std::make_unique<Promise<void>>();
- auto future = _timerState->finalPromise->getFuture();
- return std::make_pair(std::move(future), id);
- }();
+ std::tie(ret, id) = _getFuture();
armTimer();
_timerState->timer.async_wait(
@@ -137,6 +155,32 @@ private:
}
}
+ template <typename ArmTimerCb>
+ Future<void> _asyncWait(ArmTimerCb&& armTimer, const BatonHandle& baton) {
+ cancel(baton);
+
+ Future<void> ret;
+ uint64_t id;
+ std::tie(ret, id) = _getFuture();
+
+ armTimer().getAsync([ id, state = _timerState ](Status status) mutable {
+ stdx::unique_lock<stdx::mutex> lk(state->mutex);
+ if (id != state->generation) {
+ return;
+ }
+ auto promise = std::move(state->finalPromise);
+ lk.unlock();
+
+ if (status.isOK()) {
+ promise->emplaceValue();
+ } else {
+ promise->setError(status);
+ }
+ });
+
+ return ret;
+ }
+
// The timer itself and its state are stored in this struct managed by a shared_ptr so we can
// extend the lifetime of the timer until all callbacks to timer.async_wait have run.
struct TimerState {
@@ -729,5 +773,21 @@ SSLParams::SSLModes TransportLayerASIO::_sslMode() const {
}
#endif
+BatonHandle TransportLayerASIO::makeBaton(OperationContext* opCtx) {
+#ifdef __linux__
+ auto baton = std::make_shared<BatonASIO>(opCtx);
+
+ {
+ stdx::lock_guard<Client> lk(*opCtx->getClient());
+ invariant(!opCtx->getBaton());
+ opCtx->setBaton(baton);
+ }
+
+ return std::move(baton);
+#else
+ return nullptr;
+#endif
+}
+
} // namespace transport
} // namespace mongo
diff --git a/src/mongo/transport/transport_layer_asio.h b/src/mongo/transport/transport_layer_asio.h
index 85ead5799b3..2e774210c55 100644
--- a/src/mongo/transport/transport_layer_asio.h
+++ b/src/mongo/transport/transport_layer_asio.h
@@ -125,7 +125,10 @@ public:
return _listenerPort;
}
+ BatonHandle makeBaton(OperationContext* opCtx) override;
+
private:
+ class BatonASIO;
class ASIOSession;
class ASIOReactor;
diff --git a/src/mongo/transport/transport_layer_manager.h b/src/mongo/transport/transport_layer_manager.h
index d90386829e2..18df98e99cd 100644
--- a/src/mongo/transport/transport_layer_manager.h
+++ b/src/mongo/transport/transport_layer_manager.h
@@ -88,6 +88,13 @@ public:
static std::unique_ptr<TransportLayer> makeAndStartDefaultEgressTransportLayer();
+ BatonHandle makeBaton(OperationContext* opCtx) override {
+ stdx::lock_guard<stdx::mutex> lk(_tlsMutex);
+ // TODO: figure out what to do about managers with more than one transport layer.
+ invariant(_tls.size() == 1);
+ return _tls[0]->makeBaton(opCtx);
+ }
+
private:
template <typename Callable>
void _foreach(Callable&& cb) const;
diff --git a/src/mongo/unittest/task_executor_proxy.cpp b/src/mongo/unittest/task_executor_proxy.cpp
index 03a9c90246e..b58d75a394e 100644
--- a/src/mongo/unittest/task_executor_proxy.cpp
+++ b/src/mongo/unittest/task_executor_proxy.cpp
@@ -99,8 +99,10 @@ StatusWith<executor::TaskExecutor::CallbackHandle> TaskExecutorProxy::scheduleWo
}
StatusWith<executor::TaskExecutor::CallbackHandle> TaskExecutorProxy::scheduleRemoteCommand(
- const executor::RemoteCommandRequest& request, const RemoteCommandCallbackFn& cb) {
- return _executor->scheduleRemoteCommand(request, cb);
+ const executor::RemoteCommandRequest& request,
+ const RemoteCommandCallbackFn& cb,
+ const transport::BatonHandle& baton) {
+ return _executor->scheduleRemoteCommand(request, cb, baton);
}
void TaskExecutorProxy::cancel(const CallbackHandle& cbHandle) {
diff --git a/src/mongo/unittest/task_executor_proxy.h b/src/mongo/unittest/task_executor_proxy.h
index fdcbc9c71d4..3c2bb935692 100644
--- a/src/mongo/unittest/task_executor_proxy.h
+++ b/src/mongo/unittest/task_executor_proxy.h
@@ -66,7 +66,9 @@ public:
virtual StatusWith<CallbackHandle> scheduleWork(const CallbackFn& work) override;
virtual StatusWith<CallbackHandle> scheduleWorkAt(Date_t when, const CallbackFn& work) override;
virtual StatusWith<CallbackHandle> scheduleRemoteCommand(
- const executor::RemoteCommandRequest& request, const RemoteCommandCallbackFn& cb) override;
+ const executor::RemoteCommandRequest& request,
+ const RemoteCommandCallbackFn& cb,
+ const transport::BatonHandle& baton = nullptr) override;
virtual void cancel(const CallbackHandle& cbHandle) override;
virtual void wait(const CallbackHandle& cbHandle) override;
virtual void appendConnectionStats(executor::ConnectionPoolStats* stats) const override;
diff --git a/src/mongo/util/future.h b/src/mongo/util/future.h
index 2485b170463..2b4d2c9c4f4 100644
--- a/src/mongo/util/future.h
+++ b/src/mongo/util/future.h
@@ -537,6 +537,11 @@ public:
setImpl([&] { sharedState->setError(std::move(status)); });
}
+ // TODO rename to not XXXWith and handle void
+ void setFromStatusWith(StatusWith<T> sw) noexcept {
+ setImpl([&] { sharedState->setFromStatusWith(std::move(sw)); });
+ }
+
/**
* Get a copyable SharedPromise that can be used to complete this Promise's Future.
*
@@ -1008,6 +1013,15 @@ public:
[](Func && func, const Status& status) noexcept { call(func, status); });
}
+ /**
+ * Ignores the return value of a future, transforming it down into a Future<void>.
+ *
+ * This only ignores values, not errors. Those remain propogated until an onError handler.
+ *
+ * Equivalent to then([](auto&&){});
+ */
+ Future<void> ignoreValue() && noexcept;
+
private:
template <typename T2>
friend class Future;
@@ -1203,6 +1217,10 @@ public:
return std::move(inner).tapAll(std::forward<Func>(func));
}
+ Future<void> ignoreValue() && noexcept {
+ return std::move(*this);
+ }
+
private:
template <typename T>
friend class Future;
@@ -1225,6 +1243,11 @@ private:
Future<FakeVoid> inner;
};
+template <typename T>
+ Future<void> Future<T>::ignoreValue() && noexcept {
+ return std::move(*this).then([](auto&&) {});
+}
+
/**
* Makes a ready Future with the return value of a nullary function. This has the same semantics as
* Promise::setWith, and has the same reasons to prefer it over Future<T>::makeReady(). Also, it