summaryrefslogtreecommitdiff
path: root/src/mongo/s/async_requests_sender.cpp
diff options
context:
space:
mode:
authorJason Carey <jcarey@argv.me>2018-03-21 00:15:35 -0400
committerJason Carey <jcarey@argv.me>2018-04-27 19:49:28 -0400
commit4ddf18bcf4d517c3dc0f005f9222ffaab9a86ffa (patch)
tree438865c1065d0a96c427b1ed3a89e5163d85699a /src/mongo/s/async_requests_sender.cpp
parent91eaa878c4feeebd9397c49180631fc719238aaf (diff)
downloadmongo-4ddf18bcf4d517c3dc0f005f9222ffaab9a86ffa.tar.gz
SERVER-34739 Migrate to 1 connpool in ARS
Migrate to 1 connection pool in mongos. This change involves the introduction of a transport layer baton, which improves perf for a particular transport layer when doing local scatter/gather operations.
Diffstat (limited to 'src/mongo/s/async_requests_sender.cpp')
-rw-r--r--src/mongo/s/async_requests_sender.cpp109
1 files changed, 102 insertions, 7 deletions
diff --git a/src/mongo/s/async_requests_sender.cpp b/src/mongo/s/async_requests_sender.cpp
index 5e70f3630ff..d63429ff8e9 100644
--- a/src/mongo/s/async_requests_sender.cpp
+++ b/src/mongo/s/async_requests_sender.cpp
@@ -33,16 +33,22 @@
#include "mongo/s/async_requests_sender.h"
#include "mongo/client/remote_command_targeter.h"
+#include "mongo/db/server_parameters.h"
#include "mongo/executor/remote_command_request.h"
#include "mongo/rpc/get_status_from_command_result.h"
#include "mongo/s/client/shard_registry.h"
#include "mongo/s/grid.h"
#include "mongo/stdx/memory.h"
+#include "mongo/transport/baton.h"
+#include "mongo/transport/transport_layer.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/log.h"
#include "mongo/util/scopeguard.h"
namespace mongo {
+
+MONGO_EXPORT_SERVER_PARAMETER(AsyncRequestsSenderUseBaton, bool, true);
+
namespace {
// Maximum number of retries for network and replication notMaster errors (per host).
@@ -58,6 +64,7 @@ AsyncRequestsSender::AsyncRequestsSender(OperationContext* opCtx,
Shard::RetryPolicy retryPolicy)
: _opCtx(opCtx),
_executor(executor),
+ _baton(opCtx),
_db(dbName.toString()),
_readPreference(readPreference),
_retryPolicy(retryPolicy) {
@@ -71,6 +78,7 @@ AsyncRequestsSender::AsyncRequestsSender(OperationContext* opCtx,
// Schedule the requests immediately.
_scheduleRequests();
}
+
AsyncRequestsSender::~AsyncRequestsSender() {
_cancelPendingRequests();
@@ -90,7 +98,7 @@ AsyncRequestsSender::Response AsyncRequestsSender::next() {
// Otherwise, wait for some response to be received.
if (_interruptStatus.isOK()) {
try {
- _handleResponse(_responseQueue.pop(_opCtx));
+ _makeProgress(_opCtx);
} catch (const AssertionException& ex) {
// If the operation is interrupted, we cancel outstanding requests and switch to
// waiting for the (canceled) callbacks to finish without checking for interrupts.
@@ -99,7 +107,7 @@ AsyncRequestsSender::Response AsyncRequestsSender::next() {
continue;
}
} else {
- _handleResponse(_responseQueue.pop());
+ _makeProgress(nullptr);
}
}
return *readyResponse;
@@ -130,6 +138,11 @@ boost::optional<AsyncRequestsSender::Response> AsyncRequestsSender::_ready() {
_scheduleRequests();
}
+ // If we have baton requests, we want to process those before proceeding
+ if (_batonRequests) {
+ return boost::none;
+ }
+
// Check if any remote is ready.
invariant(!_remotes.empty());
for (auto& remote : _remotes) {
@@ -200,6 +213,12 @@ void AsyncRequestsSender::_scheduleRequests() {
auto scheduleStatus = _scheduleRequest(i);
if (!scheduleStatus.isOK()) {
remote.swResponse = std::move(scheduleStatus);
+
+ if (_baton) {
+ _batonRequests++;
+ _baton->schedule([this] { _batonRequests--; });
+ }
+
// Push a noop response to the queue to indicate that a remote is ready for
// re-processing due to failure.
_responseQueue.push(boost::none);
@@ -214,7 +233,7 @@ Status AsyncRequestsSender::_scheduleRequest(size_t remoteIndex) {
invariant(!remote.cbHandle.isValid());
invariant(!remote.swResponse);
- Status resolveStatus = remote.resolveShardIdToHostAndPort(_readPreference);
+ Status resolveStatus = remote.resolveShardIdToHostAndPort(this, _readPreference);
if (!resolveStatus.isOK()) {
return resolveStatus;
}
@@ -225,8 +244,14 @@ Status AsyncRequestsSender::_scheduleRequest(size_t remoteIndex) {
auto callbackStatus = _executor->scheduleRemoteCommand(
request,
[remoteIndex, this](const executor::TaskExecutor::RemoteCommandCallbackArgs& cbData) {
+ if (_baton) {
+ _batonRequests++;
+ _baton->schedule([this] { _batonRequests--; });
+ }
+
_responseQueue.push(Job{cbData, remoteIndex});
- });
+ },
+ _baton);
if (!callbackStatus.isOK()) {
return callbackStatus.getStatus();
}
@@ -235,7 +260,21 @@ Status AsyncRequestsSender::_scheduleRequest(size_t remoteIndex) {
return Status::OK();
}
-void AsyncRequestsSender::_handleResponse(boost::optional<Job> job) {
+void AsyncRequestsSender::_makeProgress(OperationContext* opCtx) {
+ boost::optional<Job> job;
+
+ if (_baton) {
+ // If we're using a baton, we peek the queue, and block on the baton if it's empty
+ if (boost::optional<boost::optional<Job>> tryJob = _responseQueue.tryPop()) {
+ job = std::move(*tryJob);
+ } else {
+ _baton->run(_opCtx, boost::none);
+ }
+ } else {
+ // Otherwise we block on the queue
+ job = _opCtx ? _responseQueue.pop(_opCtx) : _responseQueue.pop();
+ }
+
if (!job) {
return;
}
@@ -274,14 +313,57 @@ AsyncRequestsSender::RemoteData::RemoteData(ShardId shardId, BSONObj cmdObj)
: shardId(std::move(shardId)), cmdObj(std::move(cmdObj)) {}
Status AsyncRequestsSender::RemoteData::resolveShardIdToHostAndPort(
- const ReadPreferenceSetting& readPref) {
+ AsyncRequestsSender* ars, const ReadPreferenceSetting& readPref) {
const auto shard = getShard();
if (!shard) {
return Status(ErrorCodes::ShardNotFound,
str::stream() << "Could not find shard " << shardId);
}
- auto findHostStatus = shard->getTargeter()->findHostWithMaxWait(readPref, Seconds{20});
+ auto clock = ars->_opCtx->getServiceContext()->getFastClockSource();
+
+ auto deadline = clock->now() + Seconds(20);
+
+ auto targeter = shard->getTargeter();
+
+ auto findHostStatus = [&] {
+ // If we don't have a baton, just go ahead and block in targeting
+ if (!ars->_baton) {
+ return targeter->findHostWithMaxWait(readPref, Seconds{20});
+ }
+
+ // If we do have a baton, and we can target quickly, just do that
+ {
+ auto findHostStatus = targeter->findHostNoWait(readPref);
+ if (findHostStatus.isOK()) {
+ return findHostStatus;
+ }
+ }
+
+ // If it's going to take a while to target, we spin up a background thread to do our
+ // targeting, while running the baton on the calling thread. This allows us to make forward
+ // progress on previous requests.
+ Promise<HostAndPort> promise;
+ auto future = promise.getFuture();
+
+ ars->_batonRequests++;
+ stdx::thread bgChecker([&] {
+ promise.setWith(
+ [&] { return targeter->findHostWithMaxWait(readPref, deadline - clock->now()); });
+
+ ars->_baton->schedule([ars] { ars->_batonRequests--; });
+ });
+ const auto guard = MakeGuard([&] { bgChecker.join(); });
+
+ while (!future.isReady()) {
+ if (!ars->_baton->run(nullptr, deadline)) {
+ break;
+ }
+ }
+
+ return future.getNoThrow();
+ }();
+
if (!findHostStatus.isOK()) {
return findHostStatus.getStatus();
}
@@ -296,4 +378,17 @@ std::shared_ptr<Shard> AsyncRequestsSender::RemoteData::getShard() {
return grid.shardRegistry()->getShardNoReload(shardId);
}
+AsyncRequestsSender::BatonDetacher::BatonDetacher(OperationContext* opCtx)
+ : _baton(AsyncRequestsSenderUseBaton.load()
+ ? (opCtx->getServiceContext()->getTransportLayer()
+ ? opCtx->getServiceContext()->getTransportLayer()->makeBaton(opCtx)
+ : nullptr)
+ : nullptr) {}
+
+AsyncRequestsSender::BatonDetacher::~BatonDetacher() {
+ if (_baton) {
+ _baton->detach();
+ }
+}
+
} // namespace mongo