summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorBen Caimano <ben.caimano@mongodb.com>2020-01-21 22:38:52 +0000
committerevergreen <evergreen@mongodb.com>2020-01-21 22:38:52 +0000
commit71ae1d4bf4eb92b5e46922efdebb96f99081f5cd (patch)
tree6f13c73e80dfbac90bc39c0708f913e8636a1e67 /src/mongo
parent30b8a9ee82306012a1a344c684932df1b15f1be8 (diff)
downloadmongo-71ae1d4bf4eb92b5e46922efdebb96f99081f5cd.tar.gz
SERVER-43606 Enforce connection reuse in response to remote interruption
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/client/async_client.cpp4
-rw-r--r--src/mongo/executor/network_interface.h5
-rw-r--r--src/mongo/executor/network_interface_integration_fixture.cpp3
-rw-r--r--src/mongo/executor/network_interface_integration_test.cpp39
-rw-r--r--src/mongo/executor/network_interface_tl.cpp367
-rw-r--r--src/mongo/executor/network_interface_tl.h63
6 files changed, 319 insertions, 162 deletions
diff --git a/src/mongo/client/async_client.cpp b/src/mongo/client/async_client.cpp
index e34fe4c7bec..a6748f5ffe6 100644
--- a/src/mongo/client/async_client.cpp
+++ b/src/mongo/client/async_client.cpp
@@ -245,10 +245,6 @@ Future<executor::RemoteCommandResponse> AsyncDBClient::runCommandRequest(
.then([start, clkSource, this](rpc::UniqueReply response) {
auto duration = duration_cast<Milliseconds>(clkSource->now() - start);
return executor::RemoteCommandResponse(*response, duration);
- })
- .onError([start, clkSource](Status status) {
- auto duration = duration_cast<Milliseconds>(clkSource->now() - start);
- return executor::RemoteCommandResponse(status, duration);
});
}
diff --git a/src/mongo/executor/network_interface.h b/src/mongo/executor/network_interface.h
index ffb203221c9..386893b4881 100644
--- a/src/mongo/executor/network_interface.h
+++ b/src/mongo/executor/network_interface.h
@@ -121,9 +121,11 @@ public:
virtual std::string getHostName() = 0;
struct Counters {
+ uint64_t sent = 0;
uint64_t canceled = 0;
uint64_t timedOut = 0;
uint64_t failed = 0;
+ uint64_t failedRemotely = 0;
uint64_t succeeded = 0;
};
/*
@@ -172,6 +174,9 @@ public:
/**
* Requests cancelation of the network activity associated with "cbHandle" if it has not yet
* completed.
+ *
+ * Note that the work involved in onFinish may run locally as a result of invoking this
+ * function. Do not hold locks while calling cancelCommand(...).
*/
virtual void cancelCommand(const TaskExecutor::CallbackHandle& cbHandle,
const BatonHandle& baton = nullptr) = 0;
diff --git a/src/mongo/executor/network_interface_integration_fixture.cpp b/src/mongo/executor/network_interface_integration_fixture.cpp
index 62972eef6c3..51e3092a17c 100644
--- a/src/mongo/executor/network_interface_integration_fixture.cpp
+++ b/src/mongo/executor/network_interface_integration_fixture.cpp
@@ -50,6 +50,9 @@ namespace executor {
void NetworkInterfaceIntegrationFixture::createNet(
std::unique_ptr<NetworkConnectionHook> connectHook) {
ConnectionPool::Options options;
+
+ options.minConnections = 0u;
+
#ifdef _WIN32
// Connections won't queue on widnows, so attempting to open too many connections
// concurrently will result in refused connections and test failure.
diff --git a/src/mongo/executor/network_interface_integration_test.cpp b/src/mongo/executor/network_interface_integration_test.cpp
index 8e593d0535b..08907fa240b 100644
--- a/src/mongo/executor/network_interface_integration_test.cpp
+++ b/src/mongo/executor/network_interface_integration_test.cpp
@@ -39,6 +39,7 @@
#include "mongo/client/connection_string.h"
#include "mongo/db/commands/test_commands_enabled.h"
#include "mongo/db/wire_version.h"
+#include "mongo/executor/connection_pool_stats.h"
#include "mongo/executor/network_connection_hook.h"
#include "mongo/executor/network_interface_integration_fixture.h"
#include "mongo/executor/test_network_connection_hook.h"
@@ -227,37 +228,51 @@ TEST_F(NetworkInterfaceTest, CancelMissingOperation) {
assertNumOps(0u, 0u, 0u, 0u);
}
+constexpr auto kMaxWait = Milliseconds(Minutes(1));
+
TEST_F(NetworkInterfaceTest, CancelOperation) {
auto cbh = makeCallbackHandle();
- // Kick off our operation
- FailPointEnableBlock fpb("networkInterfaceDiscardCommandsAfterAcquireConn");
+ auto deferred = [&] {
+ // Kick off our operation
+ FailPointEnableBlock fpb("networkInterfaceDiscardCommandsAfterAcquireConn");
- auto deferred = runCommand(cbh, makeTestCommand());
+ auto deferred = runCommand(cbh, makeTestCommand(kMaxWait));
- waitForIsMaster();
+ waitForIsMaster();
- net().cancelCommand(cbh);
+ fpb->waitForTimesEntered(fpb.initialTimesEntered() + 1);
+
+ net().cancelCommand(cbh);
+
+ return deferred;
+ }();
// Wait for op to complete, assert that it was canceled.
auto result = deferred.get();
ASSERT_EQ(ErrorCodes::CallbackCanceled, result.status);
ASSERT(result.elapsedMillis);
+
assertNumOps(1u, 0u, 0u, 0u);
}
TEST_F(NetworkInterfaceTest, ImmediateCancel) {
auto cbh = makeCallbackHandle();
- // Kick off our operation
+ auto deferred = [&] {
+ // Kick off our operation
+ FailPointEnableBlock fpb("networkInterfaceDiscardCommandsBeforeAcquireConn");
- FailPointEnableBlock fpb("networkInterfaceDiscardCommandsBeforeAcquireConn");
+ auto deferred = runCommand(cbh, makeTestCommand(kMaxWait));
- auto deferred = runCommand(cbh, makeTestCommand());
+ fpb->waitForTimesEntered(fpb.initialTimesEntered() + 1);
- net().cancelCommand(cbh);
+ net().cancelCommand(cbh);
+
+ return deferred;
+ }();
- ASSERT_FALSE(hasIsMaster());
+ ASSERT_EQ(net().getCounters().sent, 0);
// Wait for op to complete, assert that it was canceled.
auto result = deferred.get();
@@ -269,13 +284,13 @@ TEST_F(NetworkInterfaceTest, ImmediateCancel) {
TEST_F(NetworkInterfaceTest, LateCancel) {
auto cbh = makeCallbackHandle();
- auto deferred = runCommand(cbh, makeTestCommand());
+ auto deferred = runCommand(cbh, makeTestCommand(kMaxWait));
// Wait for op to complete, assert that it was canceled.
auto result = deferred.get();
net().cancelCommand(cbh);
- ASSERT(result.isOK());
+ ASSERT_OK(result.status);
ASSERT(result.elapsedMillis);
assertNumOps(0u, 0u, 0u, 1u);
}
diff --git a/src/mongo/executor/network_interface_tl.cpp b/src/mongo/executor/network_interface_tl.cpp
index 01eceebbbd3..4aee5b2bc7c 100644
--- a/src/mongo/executor/network_interface_tl.cpp
+++ b/src/mongo/executor/network_interface_tl.cpp
@@ -36,6 +36,7 @@
#include "mongo/db/commands/test_commands_enabled.h"
#include "mongo/db/server_options.h"
#include "mongo/executor/connection_pool_tl.h"
+#include "mongo/rpc/get_status_from_command_result.h"
#include "mongo/transport/transport_layer_manager.h"
#include "mongo/util/concurrency/idle_thread_block.h"
#include "mongo/util/log.h"
@@ -44,6 +45,51 @@
namespace mongo {
namespace executor {
+/**
+ * SynchronizedCounters is synchronized bucket of event counts for commands
+ */
+class NetworkInterfaceTL::SynchronizedCounters {
+public:
+ auto get() const {
+ stdx::lock_guard lk(_mutex);
+ return _data;
+ }
+
+
+ void recordResult(const Status& status) {
+ stdx::lock_guard lk(_mutex);
+ if (status.isOK()) {
+ // Increment the count of commands that received a valid response
+ ++_data.succeeded;
+ } else if (ErrorCodes::isExceededTimeLimitError(status)) {
+ // Increment the count of commands that experienced a local timeout
+ // Note that these commands do not count as "failed".
+ ++_data.timedOut;
+ } else if (ErrorCodes::isCancelationError(status)) {
+ // Increment the count of commands that were canceled locally
+ ++_data.canceled;
+ } else if (ErrorCodes::isShutdownError(status)) {
+ // Increment the count of commands that received an unrecoverable response
+ ++_data.failedRemotely;
+ } else {
+ // Increment the count of commands that experienced a network failure
+ ++_data.failed;
+ }
+ }
+
+ /**
+ * Increment the count of commands sent over the network
+ */
+ void recordSent() {
+ ++_data.sent;
+ }
+
+private:
+ mutable Mutex _mutex = MONGO_MAKE_LATCH(HierarchicalAcquisitionLevel(0),
+ "NetworkInterfaceTL::SynchronizedCounters::_mutex");
+ Counters _data;
+};
+
NetworkInterfaceTL::NetworkInterfaceTL(std::string instanceName,
ConnectionPool::Options connPoolOpts,
ServiceContext* svcCtx,
@@ -51,9 +97,6 @@ NetworkInterfaceTL::NetworkInterfaceTL(std::string instanceName,
std::unique_ptr<rpc::EgressMetadataHook> metadataHook)
: _instanceName(std::move(instanceName)),
_svcCtx(svcCtx),
- _tl(nullptr),
- _ownedTransportLayer(nullptr),
- _reactor(nullptr),
_connPoolOpts(std::move(connPoolOpts)),
_onConnectHook(std::move(onConnectHook)),
_metadataHook(std::move(metadataHook)),
@@ -75,6 +118,10 @@ NetworkInterfaceTL::NetworkInterfaceTL(std::string instanceName,
_reactor, _tl, std::move(_onConnectHook), _connPoolOpts);
_pool = std::make_shared<ConnectionPool>(
std::move(typeFactory), std::string("NetworkInterfaceTL-") + _instanceName, _connPoolOpts);
+
+ if (getTestCommandsEnabled()) {
+ _counters = std::make_unique<SynchronizedCounters>();
+ }
}
std::string NetworkInterfaceTL::getDiagnosticString() {
@@ -91,9 +138,8 @@ void NetworkInterfaceTL::appendConnectionStats(ConnectionPoolStats* stats) const
}
NetworkInterface::Counters NetworkInterfaceTL::getCounters() const {
- invariant(getTestCommandsEnabled());
- stdx::lock_guard<Latch> lk(_mutex);
- return _counters;
+ invariant(_counters);
+ return _counters->get();
}
std::string NetworkInterfaceTL::getHostName() {
@@ -201,15 +247,109 @@ auto NetworkInterfaceTL::CommandState::make(NetworkInterfaceTL* interface,
return state;
}
-NetworkInterfaceTL::CommandState::~CommandState() {
- invariant(interface);
+AsyncDBClient* NetworkInterfaceTL::CommandState::client() {
+ if (!conn) {
+ return nullptr;
+ }
+
+ return checked_cast<connection_pool_tl::TLConnection*>(conn.get())->client();
+}
+
+void NetworkInterfaceTL::CommandState::setTimer() {
+ if (deadline == RemoteCommandRequest::kNoExpirationDate) {
+ return;
+ }
+
+ const auto nowVal = interface->now();
+ if (nowVal >= deadline) {
+ auto connDuration = nowVal - start;
+ uasserted(ErrorCodes::NetworkInterfaceExceededTimeLimit,
+ str::stream() << "Remote command timed out while waiting to get a "
+ "connection from the pool, took "
+ << connDuration << ", timeout was set to " << requestOnAny.timeout);
+ }
+
+ // TODO reform with SERVER-41459
+ timer = interface->_reactor->makeTimer();
+ timer->waitUntil(deadline, baton).getAsync([this, anchor = shared_from_this()](Status status) {
+ if (!status.isOK()) {
+ return;
+ }
+
+ if (done.swap(true)) {
+ return;
+ }
+
+ const std::string message = str::stream() << "Request " << requestOnAny.id << " timed out"
+ << ", deadline was " << deadline.toString()
+ << ", op was " << redact(requestOnAny.toString());
+
+ LOG(2) << message;
+ promise.setError(Status(ErrorCodes::NetworkInterfaceExceededTimeLimit, message));
+ });
+}
+
+void NetworkInterfaceTL::CommandState::returnConnection(Status status) {
+ // Settle the connection object on the reactor
+ invariant(conn);
+ invariant(interface->_reactor->onReactorThread());
+
+ auto connToReturn = std::exchange(conn, {});
+
+ if (!status.isOK()) {
+ connToReturn->indicateFailure(std::move(status));
+ return;
+ }
+
+ connToReturn->indicateUsed();
+ connToReturn->indicateSuccess();
+}
+
+void NetworkInterfaceTL::CommandState::tryFinish(Status status) {
+ if (timer) {
+ // The command has resolved one way or another,
+ timer->cancel(baton);
+ }
+
+ if (!status.isOK() && !finishLine.arriveStrongly()) {
+ // If we failed, then get the client to finish up.
+ // Note: CommandState::returnConnection() and CommandState::cancel() run on the reactor
+ // thread only. One goes first and then the other, so there isn't a risk of canceling
+ // the next command to run on the connection.
+ if (interface->_reactor->onReactorThread()) {
+ cancel();
+ } else {
+ ExecutorFuture<void>(interface->_reactor)
+ .getAsync([this, anchor = shared_from_this()](Status status) {
+ invariant(status.isOK());
+ cancel();
+ });
+ }
+ }
+
+ if (interface->_counters) {
+ // Increment our counters for the integration test
+ interface->_counters->recordResult(status);
+ }
{
+ // We've finished, we're not in progress anymore
stdx::lock_guard lk(interface->_inProgressMutex);
interface->_inProgress.erase(cbHandle);
}
}
+void NetworkInterfaceTL::CommandState::cancel() {
+ if (auto clientPtr = client()) {
+ // If we have a client, cancel it
+ clientPtr->cancel(baton);
+ }
+}
+
+NetworkInterfaceTL::CommandState::~CommandState() {
+ invariant(!conn);
+}
+
Status NetworkInterfaceTL::startCommand(const TaskExecutor::CallbackHandle& cbHandle,
RemoteCommandRequestOnAny& request,
RemoteCommandCompletionFn&& onFinish,
@@ -218,7 +358,7 @@ Status NetworkInterfaceTL::startCommand(const TaskExecutor::CallbackHandle& cbHa
return {ErrorCodes::ShutdownInProgress, "NetworkInterface shutdown in progress"};
}
- LOG(3) << "startCommand: " << redact(request.toString());
+ LOG(kDiagnosticLogLevel) << "startCommand: " << redact(request.toString());
if (_metadataHook) {
BSONObjBuilder newMetadata(std::move(request.metadata));
@@ -238,6 +378,7 @@ Status NetworkInterfaceTL::startCommand(const TaskExecutor::CallbackHandle& cbHa
if (cmdState->requestOnAny.timeout != cmdState->requestOnAny.kNoTimeout) {
cmdState->deadline = cmdState->start + cmdState->requestOnAny.timeout;
}
+ cmdState->baton = baton;
/**
* It is important that onFinish() runs out of line. That said, we can't thenRunOn() arbitrarily
@@ -262,11 +403,14 @@ Status NetworkInterfaceTL::startCommand(const TaskExecutor::CallbackHandle& cbHa
*/
// When our command finishes, run onFinish
- std::move(pf.future)
- .onError([requestId = cmdState->requestOnAny.id](
- auto error) -> StatusWith<RemoteCommandOnAnyResponse> {
- LOG(2) << "Failed to get connection from pool for request " << requestId << ": "
- << redact(error);
+ std::move(pf.future).getAsync([this, cmdState, onFinish = std::move(onFinish)](
+ StatusWith<RemoteCommandOnAnyResponse> response) {
+ cmdState->tryFinish(response.getStatus());
+
+ auto duration = now() - cmdState->start;
+ if (!response.isOK()) {
+ auto error = response.getStatus();
+ LOG(2) << "Request " << cmdState->requestOnAny.id << " failed: " << redact(error);
// The TransportLayer has, for historical reasons returned SocketException
// for network errors, but sharding assumes HostUnreachable on network
@@ -274,20 +418,15 @@ Status NetworkInterfaceTL::startCommand(const TaskExecutor::CallbackHandle& cbHa
if (error == ErrorCodes::SocketException) {
error = Status(ErrorCodes::HostUnreachable, error.reason());
}
- return error;
- })
- .getAsync([this, cmdState, onFinish = std::move(onFinish)](
- StatusWith<RemoteCommandOnAnyResponse> response) {
- auto duration = now() - cmdState->start;
- if (!response.isOK()) {
- onFinish(RemoteCommandOnAnyResponse(boost::none, response.getStatus(), duration));
- } else {
- const auto& rs = response.getValue();
- LOG(2) << "Request " << cmdState->requestOnAny.id << " finished with response: "
- << redact(rs.isOK() ? rs.data.toString() : rs.status.toString());
- onFinish(rs);
- }
- });
+
+ onFinish(RemoteCommandOnAnyResponse(boost::none, error, duration));
+ } else {
+ const auto& rs = response.getValue();
+ LOG(2) << "Request " << cmdState->requestOnAny.id << " finished with response: "
+ << redact(rs.isOK() ? rs.data.toString() : rs.status.toString());
+ onFinish(rs);
+ }
+ });
if (MONGO_unlikely(networkInterfaceDiscardCommandsBeforeAcquireConn.shouldFail())) {
log() << "Discarding command due to failpoint before acquireConn";
@@ -295,8 +434,8 @@ Status NetworkInterfaceTL::startCommand(const TaskExecutor::CallbackHandle& cbHa
}
// Attempt to use a connection and update our accounting
- auto resolver = [this, baton, cmdState](StatusWith<ConnectionPool::ConnectionHandle> swConn,
- size_t idx) -> Status {
+ auto resolver = [this, cmdState](StatusWith<ConnectionPool::ConnectionHandle> swConn,
+ size_t idx) -> Status {
// Our connection wasn't any good
if (!swConn.isOK()) {
if (cmdState->finishLine.arriveWeakly()) {
@@ -313,17 +452,11 @@ Status NetworkInterfaceTL::startCommand(const TaskExecutor::CallbackHandle& cbHa
// We have a connection and the command hasn't already been attempted
cmdState->request.emplace(cmdState->requestOnAny, idx);
+ cmdState->conn = std::move(swConn.getValue());
- if (MONGO_unlikely(networkInterfaceDiscardCommandsAfterAcquireConn.shouldFail())) {
- log() << "Discarding command due to failpoint after acquireConn";
- return Status::OK();
- }
+ networkInterfaceDiscardCommandsAfterAcquireConn.pauseWhileSet();
- try {
- _onAcquireConn(cmdState, std::move(swConn.getValue()), baton);
- } catch (const DBException& ex) {
- return ex.toStatus();
- }
+ _onAcquireConn(cmdState);
return Status::OK();
};
@@ -364,112 +497,78 @@ Status NetworkInterfaceTL::startCommand(const TaskExecutor::CallbackHandle& cbHa
return Status::OK();
}
-// This is only called from within a then() callback on a future, so throwing is equivalent to
-// returning a ready Future with a not-OK status.
-void NetworkInterfaceTL::_onAcquireConn(std::shared_ptr<CommandState> state,
- ConnectionPool::ConnectionHandle conn,
- const BatonHandle& baton) {
-
- if (state->done.load()) {
- conn->indicateSuccess();
- uasserted(ErrorCodes::CallbackCanceled, "Command was canceled");
- }
+void NetworkInterfaceTL::_onAcquireConn(std::shared_ptr<CommandState> state) noexcept {
+ auto clientFuture = makeReadyFutureWith([this, state] {
+ // Do everything we need to in the initial scope and then use the client to run the command
+ // to the network
- state->conn = std::move(conn);
- auto tlconn = checked_cast<connection_pool_tl::TLConnection*>(state->conn.get());
- auto client = tlconn->client();
-
- if (state->deadline != RemoteCommandRequest::kNoExpirationDate) {
- auto nowVal = now();
- if (nowVal >= state->deadline) {
- auto connDuration = nowVal - state->start;
- uasserted(ErrorCodes::NetworkInterfaceExceededTimeLimit,
- str::stream() << "Remote command timed out while waiting to get a "
- "connection from the pool, took "
- << connDuration << ", timeout was set to "
- << state->requestOnAny.timeout);
+ if (state->done.load()) {
+ uasserted(ErrorCodes::CallbackCanceled, "Command was canceled");
}
- // TODO reform with SERVER-41459
- state->timer = _reactor->makeTimer();
- state->timer->waitUntil(state->deadline, baton)
- .getAsync([this, client, state, baton](Status status) {
- if (status == ErrorCodes::CallbackCanceled) {
- invariant(state->done.load());
- return;
- }
-
- if (state->done.swap(true)) {
- return;
- }
-
- if (getTestCommandsEnabled()) {
- stdx::lock_guard<Latch> lk(_mutex);
- _counters.timedOut++;
- }
-
- const std::string message = str::stream()
- << "Request " << state->requestOnAny.id << " timed out"
- << ", deadline was " << state->deadline.toString() << ", op was "
- << redact(state->requestOnAny.toString());
+ state->setTimer();
- LOG(2) << message;
- state->promise.setError(
- Status(ErrorCodes::NetworkInterfaceExceededTimeLimit, message));
-
- client->cancel(baton);
- });
- }
+ if (_counters) {
+ _counters->recordSent();
+ }
- client->runCommandRequest(*state->request, baton)
- .thenRunOn(baton ? ExecutorPtr(baton) : ExecutorPtr(_reactor))
- .then([this, state](RemoteCommandResponse response) {
- if (state->done.load()) {
- uasserted(ErrorCodes::CallbackCanceled, "Callback was canceled");
- }
+ return state->client()->runCommandRequest(*state->request, state->baton);
+ });
- const auto& target = state->conn->getHostAndPort();
+ auto metadataCallback = [this, state](RemoteCommandResponse response) {
+ // This callback will package up an RCR into a RCoaR and run the metadata hook. We hold it
+ // separate because it needs to run on both paths after thenRunOn().
+ if (state->done.load()) {
+ uasserted(ErrorCodes::CallbackCanceled, "Callback was canceled");
+ }
- if (_metadataHook && response.status.isOK()) {
- response.status =
- _metadataHook->readReplyMetadata(nullptr, target.toString(), response.data);
- }
+ const auto& target = state->conn->getHostAndPort();
- return RemoteCommandOnAnyResponse(target, std::move(response));
- })
- .getAsync([this, state, baton](StatusWith<RemoteCommandOnAnyResponse> swr) {
- if (!swr.isOK()) {
- state->conn->indicateFailure(swr.getStatus());
- } else if (!swr.getValue().isOK()) {
- state->conn->indicateFailure(swr.getValue().status);
- } else {
- state->conn->indicateUsed();
- state->conn->indicateSuccess();
- }
+ if (_metadataHook && response.status.isOK()) {
+ uassertStatusOK(
+ _metadataHook->readReplyMetadata(nullptr, target.toString(), response.data));
+ }
- if (state->done.swap(true)) {
- return;
- }
+ return RemoteCommandOnAnyResponse(target, std::move(response));
+ };
- if (getTestCommandsEnabled()) {
- stdx::lock_guard<Latch> lk(_mutex);
- if (swr.isOK() && swr.getValue().status.isOK()) {
- _counters.succeeded++;
- } else {
- _counters.failed++;
+ if (state->baton) {
+ // If we have a baton then use it for the promise and then switch to the reactor to return
+ // our connection.
+ std::move(clientFuture)
+ .thenRunOn(state->baton)
+ .then(std::move(metadataCallback))
+ .onCompletion([this, state](StatusWith<RemoteCommandOnAnyResponse> swr) {
+ auto status = swr.getStatus();
+ if (state->done.swap(true)) {
+ return status;
}
- }
- if (state->timer) {
- state->timer->cancel(baton);
- }
+ state->promise.setFromStatusWith(std::move(swr));
+ return status;
+ })
+ .thenRunOn(_reactor)
+ .getAsync([this, state](Status status) { state->returnConnection(status); });
+ } else {
+ // If we do not have a baton, then we can fulfill the promise and return our connection in
+ // the same callback
+ std::move(clientFuture)
+ .thenRunOn(_reactor)
+ .then(std::move(metadataCallback))
+ .getAsync([this, state](StatusWith<RemoteCommandOnAnyResponse> swr) {
+ auto status = swr.getStatus();
+ ON_BLOCK_EXIT([&] { state->returnConnection(status); });
+ if (state->done.swap(true)) {
+ return;
+ }
- state->promise.setFromStatusWith(std::move(swr));
- });
+ state->promise.setFromStatusWith(std::move(swr));
+ });
+ }
}
void NetworkInterfaceTL::cancelCommand(const TaskExecutor::CallbackHandle& cbHandle,
- const BatonHandle& baton) {
+ const BatonHandle&) {
stdx::unique_lock<Latch> lk(_inProgressMutex);
auto it = _inProgress.find(cbHandle);
if (it == _inProgress.end()) {
@@ -487,20 +586,12 @@ void NetworkInterfaceTL::cancelCommand(const TaskExecutor::CallbackHandle& cbHan
return;
}
- if (getTestCommandsEnabled()) {
- stdx::lock_guard<Latch> lk(_mutex);
- _counters.canceled++;
- }
-
+ // Satisfy the promise locally
LOG(2) << "Canceling operation; original request was: "
<< redact(state->requestOnAny.toString());
state->promise.setError({ErrorCodes::CallbackCanceled,
str::stream() << "Command canceled; original request was: "
<< redact(state->requestOnAny.toString())});
- if (state->conn) {
- auto client = checked_cast<connection_pool_tl::TLConnection*>(state->conn.get());
- client->client()->cancel(baton);
- }
}
Status NetworkInterfaceTL::schedule(unique_function<void(Status)> action) {
diff --git a/src/mongo/executor/network_interface_tl.h b/src/mongo/executor/network_interface_tl.h
index 80e510a015f..edfe9c2efe5 100644
--- a/src/mongo/executor/network_interface_tl.h
+++ b/src/mongo/executor/network_interface_tl.h
@@ -47,6 +47,8 @@ namespace mongo {
namespace executor {
class NetworkInterfaceTL : public NetworkInterface {
+ static constexpr int kDiagnosticLogLevel = 4;
+
public:
NetworkInterfaceTL(std::string instanceName,
ConnectionPool::Options connPoolOpts,
@@ -86,7 +88,7 @@ public:
void dropConnections(const HostAndPort& hostAndPort) override;
private:
- struct CommandState {
+ struct CommandState final : public std::enable_shared_from_this<CommandState> {
CommandState(NetworkInterfaceTL* interface_,
RemoteCommandRequestOnAny request_,
const TaskExecutor::CallbackHandle& cbHandle_,
@@ -100,6 +102,41 @@ private:
const TaskExecutor::CallbackHandle& cbHandle,
Promise<RemoteCommandOnAnyResponse> promise);
+ /**
+ * Return the client object bound to the current command or nullptr if there isn't one.
+ *
+ * This is only useful on the networking thread (i.e. the reactor).
+ */
+ AsyncDBClient* client();
+
+ /**
+ * Set a timer to fulfill the promise with a timeout error.
+ */
+ void setTimer();
+
+ /**
+ * Cancel the current client operation or do nothing if there is no client.
+ *
+ * This must be called from the networking thread (i.e. the reactor).
+ */
+ void cancel();
+
+ /**
+ * Return the current connection to the pool and unset it locally.
+ *
+ * This must be called from the networking thread (i.e. the reactor).
+ */
+ void returnConnection(Status status);
+
+ /**
+ * Fulfill the promise for the Command.
+ *
+ * This will throw/invariant if called multiple times. In an ideal world, this would do the
+ * swap on CommandState::done for you and return early if it was already true. It does not
+ * do so currently.
+ */
+ void tryFinish(Status status);
+
NetworkInterfaceTL* interface;
RemoteCommandRequestOnAny requestOnAny;
@@ -108,9 +145,11 @@ private:
Date_t deadline = RemoteCommandRequest::kNoExpirationDate;
Date_t start;
+ BatonHandle baton;
+ std::unique_ptr<transport::ReactorTimer> timer;
+
StrongWeakFinishLine finishLine;
ConnectionPool::ConnectionHandle conn;
- std::unique_ptr<transport::ReactorTimer> timer;
AtomicWord<bool> done;
Promise<RemoteCommandOnAnyResponse> promise;
@@ -137,13 +176,19 @@ private:
void _answerAlarm(Status status, std::shared_ptr<AlarmState> state);
void _run();
- void _onAcquireConn(std::shared_ptr<CommandState> state,
- ConnectionPool::ConnectionHandle conn,
- const BatonHandle& baton);
+
+ /**
+ * Structure a future chain based upon a CommandState that has received a good connection
+ *
+ * This command starts on the reactor to launch the command and its future chain must end on the
+ * reactor to return the connection. The internal future chain essentially starts with sending
+ * the RemoteCommandRequest and ends with receiving the RemoteCommandResponse.
+ */
+ void _onAcquireConn(std::shared_ptr<CommandState> state) noexcept;
std::string _instanceName;
- ServiceContext* _svcCtx;
- transport::TransportLayer* _tl;
+ ServiceContext* _svcCtx = nullptr;
+ transport::TransportLayer* _tl = nullptr;
// Will be created if ServiceContext is null, or if no TransportLayer was configured at startup
std::unique_ptr<transport::TransportLayer> _ownedTransportLayer;
transport::ReactorHandle _reactor;
@@ -153,7 +198,9 @@ private:
ConnectionPool::Options _connPoolOpts;
std::unique_ptr<NetworkConnectionHook> _onConnectHook;
std::shared_ptr<ConnectionPool> _pool;
- Counters _counters;
+
+ class SynchronizedCounters;
+ std::shared_ptr<SynchronizedCounters> _counters;
std::unique_ptr<rpc::EgressMetadataHook> _metadataHook;