diff options
author | Ben Caimano <ben.caimano@mongodb.com> | 2020-01-21 22:38:52 +0000 |
---|---|---|
committer | evergreen <evergreen@mongodb.com> | 2020-01-21 22:38:52 +0000 |
commit | 71ae1d4bf4eb92b5e46922efdebb96f99081f5cd (patch) | |
tree | 6f13c73e80dfbac90bc39c0708f913e8636a1e67 /src/mongo | |
parent | 30b8a9ee82306012a1a344c684932df1b15f1be8 (diff) | |
download | mongo-71ae1d4bf4eb92b5e46922efdebb96f99081f5cd.tar.gz |
SERVER-43606 Enforce connection reuse in response to remote interruption
Diffstat (limited to 'src/mongo')
-rw-r--r-- | src/mongo/client/async_client.cpp | 4 | ||||
-rw-r--r-- | src/mongo/executor/network_interface.h | 5 | ||||
-rw-r--r-- | src/mongo/executor/network_interface_integration_fixture.cpp | 3 | ||||
-rw-r--r-- | src/mongo/executor/network_interface_integration_test.cpp | 39 | ||||
-rw-r--r-- | src/mongo/executor/network_interface_tl.cpp | 367 | ||||
-rw-r--r-- | src/mongo/executor/network_interface_tl.h | 63 |
6 files changed, 319 insertions, 162 deletions
diff --git a/src/mongo/client/async_client.cpp b/src/mongo/client/async_client.cpp index e34fe4c7bec..a6748f5ffe6 100644 --- a/src/mongo/client/async_client.cpp +++ b/src/mongo/client/async_client.cpp @@ -245,10 +245,6 @@ Future<executor::RemoteCommandResponse> AsyncDBClient::runCommandRequest( .then([start, clkSource, this](rpc::UniqueReply response) { auto duration = duration_cast<Milliseconds>(clkSource->now() - start); return executor::RemoteCommandResponse(*response, duration); - }) - .onError([start, clkSource](Status status) { - auto duration = duration_cast<Milliseconds>(clkSource->now() - start); - return executor::RemoteCommandResponse(status, duration); }); } diff --git a/src/mongo/executor/network_interface.h b/src/mongo/executor/network_interface.h index ffb203221c9..386893b4881 100644 --- a/src/mongo/executor/network_interface.h +++ b/src/mongo/executor/network_interface.h @@ -121,9 +121,11 @@ public: virtual std::string getHostName() = 0; struct Counters { + uint64_t sent = 0; uint64_t canceled = 0; uint64_t timedOut = 0; uint64_t failed = 0; + uint64_t failedRemotely = 0; uint64_t succeeded = 0; }; /* @@ -172,6 +174,9 @@ public: /** * Requests cancelation of the network activity associated with "cbHandle" if it has not yet * completed. + * + * Note that the work involved in onFinish may run locally as a result of invoking this + * function. Do not hold locks while calling cancelCommand(...). */ virtual void cancelCommand(const TaskExecutor::CallbackHandle& cbHandle, const BatonHandle& baton = nullptr) = 0; diff --git a/src/mongo/executor/network_interface_integration_fixture.cpp b/src/mongo/executor/network_interface_integration_fixture.cpp index 62972eef6c3..51e3092a17c 100644 --- a/src/mongo/executor/network_interface_integration_fixture.cpp +++ b/src/mongo/executor/network_interface_integration_fixture.cpp @@ -50,6 +50,9 @@ namespace executor { void NetworkInterfaceIntegrationFixture::createNet( std::unique_ptr<NetworkConnectionHook> connectHook) { ConnectionPool::Options options; + + options.minConnections = 0u; + #ifdef _WIN32 // Connections won't queue on widnows, so attempting to open too many connections // concurrently will result in refused connections and test failure. diff --git a/src/mongo/executor/network_interface_integration_test.cpp b/src/mongo/executor/network_interface_integration_test.cpp index 8e593d0535b..08907fa240b 100644 --- a/src/mongo/executor/network_interface_integration_test.cpp +++ b/src/mongo/executor/network_interface_integration_test.cpp @@ -39,6 +39,7 @@ #include "mongo/client/connection_string.h" #include "mongo/db/commands/test_commands_enabled.h" #include "mongo/db/wire_version.h" +#include "mongo/executor/connection_pool_stats.h" #include "mongo/executor/network_connection_hook.h" #include "mongo/executor/network_interface_integration_fixture.h" #include "mongo/executor/test_network_connection_hook.h" @@ -227,37 +228,51 @@ TEST_F(NetworkInterfaceTest, CancelMissingOperation) { assertNumOps(0u, 0u, 0u, 0u); } +constexpr auto kMaxWait = Milliseconds(Minutes(1)); + TEST_F(NetworkInterfaceTest, CancelOperation) { auto cbh = makeCallbackHandle(); - // Kick off our operation - FailPointEnableBlock fpb("networkInterfaceDiscardCommandsAfterAcquireConn"); + auto deferred = [&] { + // Kick off our operation + FailPointEnableBlock fpb("networkInterfaceDiscardCommandsAfterAcquireConn"); - auto deferred = runCommand(cbh, makeTestCommand()); + auto deferred = runCommand(cbh, makeTestCommand(kMaxWait)); - waitForIsMaster(); + waitForIsMaster(); - net().cancelCommand(cbh); + fpb->waitForTimesEntered(fpb.initialTimesEntered() + 1); + + net().cancelCommand(cbh); + + return deferred; + }(); // Wait for op to complete, assert that it was canceled. auto result = deferred.get(); ASSERT_EQ(ErrorCodes::CallbackCanceled, result.status); ASSERT(result.elapsedMillis); + assertNumOps(1u, 0u, 0u, 0u); } TEST_F(NetworkInterfaceTest, ImmediateCancel) { auto cbh = makeCallbackHandle(); - // Kick off our operation + auto deferred = [&] { + // Kick off our operation + FailPointEnableBlock fpb("networkInterfaceDiscardCommandsBeforeAcquireConn"); - FailPointEnableBlock fpb("networkInterfaceDiscardCommandsBeforeAcquireConn"); + auto deferred = runCommand(cbh, makeTestCommand(kMaxWait)); - auto deferred = runCommand(cbh, makeTestCommand()); + fpb->waitForTimesEntered(fpb.initialTimesEntered() + 1); - net().cancelCommand(cbh); + net().cancelCommand(cbh); + + return deferred; + }(); - ASSERT_FALSE(hasIsMaster()); + ASSERT_EQ(net().getCounters().sent, 0); // Wait for op to complete, assert that it was canceled. auto result = deferred.get(); @@ -269,13 +284,13 @@ TEST_F(NetworkInterfaceTest, ImmediateCancel) { TEST_F(NetworkInterfaceTest, LateCancel) { auto cbh = makeCallbackHandle(); - auto deferred = runCommand(cbh, makeTestCommand()); + auto deferred = runCommand(cbh, makeTestCommand(kMaxWait)); // Wait for op to complete, assert that it was canceled. auto result = deferred.get(); net().cancelCommand(cbh); - ASSERT(result.isOK()); + ASSERT_OK(result.status); ASSERT(result.elapsedMillis); assertNumOps(0u, 0u, 0u, 1u); } diff --git a/src/mongo/executor/network_interface_tl.cpp b/src/mongo/executor/network_interface_tl.cpp index 01eceebbbd3..4aee5b2bc7c 100644 --- a/src/mongo/executor/network_interface_tl.cpp +++ b/src/mongo/executor/network_interface_tl.cpp @@ -36,6 +36,7 @@ #include "mongo/db/commands/test_commands_enabled.h" #include "mongo/db/server_options.h" #include "mongo/executor/connection_pool_tl.h" +#include "mongo/rpc/get_status_from_command_result.h" #include "mongo/transport/transport_layer_manager.h" #include "mongo/util/concurrency/idle_thread_block.h" #include "mongo/util/log.h" @@ -44,6 +45,51 @@ namespace mongo { namespace executor { +/** + * SynchronizedCounters is synchronized bucket of event counts for commands + */ +class NetworkInterfaceTL::SynchronizedCounters { +public: + auto get() const { + stdx::lock_guard lk(_mutex); + return _data; + } + + + void recordResult(const Status& status) { + stdx::lock_guard lk(_mutex); + if (status.isOK()) { + // Increment the count of commands that received a valid response + ++_data.succeeded; + } else if (ErrorCodes::isExceededTimeLimitError(status)) { + // Increment the count of commands that experienced a local timeout + // Note that these commands do not count as "failed". + ++_data.timedOut; + } else if (ErrorCodes::isCancelationError(status)) { + // Increment the count of commands that were canceled locally + ++_data.canceled; + } else if (ErrorCodes::isShutdownError(status)) { + // Increment the count of commands that received an unrecoverable response + ++_data.failedRemotely; + } else { + // Increment the count of commands that experienced a network failure + ++_data.failed; + } + } + + /** + * Increment the count of commands sent over the network + */ + void recordSent() { + ++_data.sent; + } + +private: + mutable Mutex _mutex = MONGO_MAKE_LATCH(HierarchicalAcquisitionLevel(0), + "NetworkInterfaceTL::SynchronizedCounters::_mutex"); + Counters _data; +}; + NetworkInterfaceTL::NetworkInterfaceTL(std::string instanceName, ConnectionPool::Options connPoolOpts, ServiceContext* svcCtx, @@ -51,9 +97,6 @@ NetworkInterfaceTL::NetworkInterfaceTL(std::string instanceName, std::unique_ptr<rpc::EgressMetadataHook> metadataHook) : _instanceName(std::move(instanceName)), _svcCtx(svcCtx), - _tl(nullptr), - _ownedTransportLayer(nullptr), - _reactor(nullptr), _connPoolOpts(std::move(connPoolOpts)), _onConnectHook(std::move(onConnectHook)), _metadataHook(std::move(metadataHook)), @@ -75,6 +118,10 @@ NetworkInterfaceTL::NetworkInterfaceTL(std::string instanceName, _reactor, _tl, std::move(_onConnectHook), _connPoolOpts); _pool = std::make_shared<ConnectionPool>( std::move(typeFactory), std::string("NetworkInterfaceTL-") + _instanceName, _connPoolOpts); + + if (getTestCommandsEnabled()) { + _counters = std::make_unique<SynchronizedCounters>(); + } } std::string NetworkInterfaceTL::getDiagnosticString() { @@ -91,9 +138,8 @@ void NetworkInterfaceTL::appendConnectionStats(ConnectionPoolStats* stats) const } NetworkInterface::Counters NetworkInterfaceTL::getCounters() const { - invariant(getTestCommandsEnabled()); - stdx::lock_guard<Latch> lk(_mutex); - return _counters; + invariant(_counters); + return _counters->get(); } std::string NetworkInterfaceTL::getHostName() { @@ -201,15 +247,109 @@ auto NetworkInterfaceTL::CommandState::make(NetworkInterfaceTL* interface, return state; } -NetworkInterfaceTL::CommandState::~CommandState() { - invariant(interface); +AsyncDBClient* NetworkInterfaceTL::CommandState::client() { + if (!conn) { + return nullptr; + } + + return checked_cast<connection_pool_tl::TLConnection*>(conn.get())->client(); +} + +void NetworkInterfaceTL::CommandState::setTimer() { + if (deadline == RemoteCommandRequest::kNoExpirationDate) { + return; + } + + const auto nowVal = interface->now(); + if (nowVal >= deadline) { + auto connDuration = nowVal - start; + uasserted(ErrorCodes::NetworkInterfaceExceededTimeLimit, + str::stream() << "Remote command timed out while waiting to get a " + "connection from the pool, took " + << connDuration << ", timeout was set to " << requestOnAny.timeout); + } + + // TODO reform with SERVER-41459 + timer = interface->_reactor->makeTimer(); + timer->waitUntil(deadline, baton).getAsync([this, anchor = shared_from_this()](Status status) { + if (!status.isOK()) { + return; + } + + if (done.swap(true)) { + return; + } + + const std::string message = str::stream() << "Request " << requestOnAny.id << " timed out" + << ", deadline was " << deadline.toString() + << ", op was " << redact(requestOnAny.toString()); + + LOG(2) << message; + promise.setError(Status(ErrorCodes::NetworkInterfaceExceededTimeLimit, message)); + }); +} + +void NetworkInterfaceTL::CommandState::returnConnection(Status status) { + // Settle the connection object on the reactor + invariant(conn); + invariant(interface->_reactor->onReactorThread()); + + auto connToReturn = std::exchange(conn, {}); + + if (!status.isOK()) { + connToReturn->indicateFailure(std::move(status)); + return; + } + + connToReturn->indicateUsed(); + connToReturn->indicateSuccess(); +} + +void NetworkInterfaceTL::CommandState::tryFinish(Status status) { + if (timer) { + // The command has resolved one way or another, + timer->cancel(baton); + } + + if (!status.isOK() && !finishLine.arriveStrongly()) { + // If we failed, then get the client to finish up. + // Note: CommandState::returnConnection() and CommandState::cancel() run on the reactor + // thread only. One goes first and then the other, so there isn't a risk of canceling + // the next command to run on the connection. + if (interface->_reactor->onReactorThread()) { + cancel(); + } else { + ExecutorFuture<void>(interface->_reactor) + .getAsync([this, anchor = shared_from_this()](Status status) { + invariant(status.isOK()); + cancel(); + }); + } + } + + if (interface->_counters) { + // Increment our counters for the integration test + interface->_counters->recordResult(status); + } { + // We've finished, we're not in progress anymore stdx::lock_guard lk(interface->_inProgressMutex); interface->_inProgress.erase(cbHandle); } } +void NetworkInterfaceTL::CommandState::cancel() { + if (auto clientPtr = client()) { + // If we have a client, cancel it + clientPtr->cancel(baton); + } +} + +NetworkInterfaceTL::CommandState::~CommandState() { + invariant(!conn); +} + Status NetworkInterfaceTL::startCommand(const TaskExecutor::CallbackHandle& cbHandle, RemoteCommandRequestOnAny& request, RemoteCommandCompletionFn&& onFinish, @@ -218,7 +358,7 @@ Status NetworkInterfaceTL::startCommand(const TaskExecutor::CallbackHandle& cbHa return {ErrorCodes::ShutdownInProgress, "NetworkInterface shutdown in progress"}; } - LOG(3) << "startCommand: " << redact(request.toString()); + LOG(kDiagnosticLogLevel) << "startCommand: " << redact(request.toString()); if (_metadataHook) { BSONObjBuilder newMetadata(std::move(request.metadata)); @@ -238,6 +378,7 @@ Status NetworkInterfaceTL::startCommand(const TaskExecutor::CallbackHandle& cbHa if (cmdState->requestOnAny.timeout != cmdState->requestOnAny.kNoTimeout) { cmdState->deadline = cmdState->start + cmdState->requestOnAny.timeout; } + cmdState->baton = baton; /** * It is important that onFinish() runs out of line. That said, we can't thenRunOn() arbitrarily @@ -262,11 +403,14 @@ Status NetworkInterfaceTL::startCommand(const TaskExecutor::CallbackHandle& cbHa */ // When our command finishes, run onFinish - std::move(pf.future) - .onError([requestId = cmdState->requestOnAny.id]( - auto error) -> StatusWith<RemoteCommandOnAnyResponse> { - LOG(2) << "Failed to get connection from pool for request " << requestId << ": " - << redact(error); + std::move(pf.future).getAsync([this, cmdState, onFinish = std::move(onFinish)]( + StatusWith<RemoteCommandOnAnyResponse> response) { + cmdState->tryFinish(response.getStatus()); + + auto duration = now() - cmdState->start; + if (!response.isOK()) { + auto error = response.getStatus(); + LOG(2) << "Request " << cmdState->requestOnAny.id << " failed: " << redact(error); // The TransportLayer has, for historical reasons returned SocketException // for network errors, but sharding assumes HostUnreachable on network @@ -274,20 +418,15 @@ Status NetworkInterfaceTL::startCommand(const TaskExecutor::CallbackHandle& cbHa if (error == ErrorCodes::SocketException) { error = Status(ErrorCodes::HostUnreachable, error.reason()); } - return error; - }) - .getAsync([this, cmdState, onFinish = std::move(onFinish)]( - StatusWith<RemoteCommandOnAnyResponse> response) { - auto duration = now() - cmdState->start; - if (!response.isOK()) { - onFinish(RemoteCommandOnAnyResponse(boost::none, response.getStatus(), duration)); - } else { - const auto& rs = response.getValue(); - LOG(2) << "Request " << cmdState->requestOnAny.id << " finished with response: " - << redact(rs.isOK() ? rs.data.toString() : rs.status.toString()); - onFinish(rs); - } - }); + + onFinish(RemoteCommandOnAnyResponse(boost::none, error, duration)); + } else { + const auto& rs = response.getValue(); + LOG(2) << "Request " << cmdState->requestOnAny.id << " finished with response: " + << redact(rs.isOK() ? rs.data.toString() : rs.status.toString()); + onFinish(rs); + } + }); if (MONGO_unlikely(networkInterfaceDiscardCommandsBeforeAcquireConn.shouldFail())) { log() << "Discarding command due to failpoint before acquireConn"; @@ -295,8 +434,8 @@ Status NetworkInterfaceTL::startCommand(const TaskExecutor::CallbackHandle& cbHa } // Attempt to use a connection and update our accounting - auto resolver = [this, baton, cmdState](StatusWith<ConnectionPool::ConnectionHandle> swConn, - size_t idx) -> Status { + auto resolver = [this, cmdState](StatusWith<ConnectionPool::ConnectionHandle> swConn, + size_t idx) -> Status { // Our connection wasn't any good if (!swConn.isOK()) { if (cmdState->finishLine.arriveWeakly()) { @@ -313,17 +452,11 @@ Status NetworkInterfaceTL::startCommand(const TaskExecutor::CallbackHandle& cbHa // We have a connection and the command hasn't already been attempted cmdState->request.emplace(cmdState->requestOnAny, idx); + cmdState->conn = std::move(swConn.getValue()); - if (MONGO_unlikely(networkInterfaceDiscardCommandsAfterAcquireConn.shouldFail())) { - log() << "Discarding command due to failpoint after acquireConn"; - return Status::OK(); - } + networkInterfaceDiscardCommandsAfterAcquireConn.pauseWhileSet(); - try { - _onAcquireConn(cmdState, std::move(swConn.getValue()), baton); - } catch (const DBException& ex) { - return ex.toStatus(); - } + _onAcquireConn(cmdState); return Status::OK(); }; @@ -364,112 +497,78 @@ Status NetworkInterfaceTL::startCommand(const TaskExecutor::CallbackHandle& cbHa return Status::OK(); } -// This is only called from within a then() callback on a future, so throwing is equivalent to -// returning a ready Future with a not-OK status. -void NetworkInterfaceTL::_onAcquireConn(std::shared_ptr<CommandState> state, - ConnectionPool::ConnectionHandle conn, - const BatonHandle& baton) { - - if (state->done.load()) { - conn->indicateSuccess(); - uasserted(ErrorCodes::CallbackCanceled, "Command was canceled"); - } +void NetworkInterfaceTL::_onAcquireConn(std::shared_ptr<CommandState> state) noexcept { + auto clientFuture = makeReadyFutureWith([this, state] { + // Do everything we need to in the initial scope and then use the client to run the command + // to the network - state->conn = std::move(conn); - auto tlconn = checked_cast<connection_pool_tl::TLConnection*>(state->conn.get()); - auto client = tlconn->client(); - - if (state->deadline != RemoteCommandRequest::kNoExpirationDate) { - auto nowVal = now(); - if (nowVal >= state->deadline) { - auto connDuration = nowVal - state->start; - uasserted(ErrorCodes::NetworkInterfaceExceededTimeLimit, - str::stream() << "Remote command timed out while waiting to get a " - "connection from the pool, took " - << connDuration << ", timeout was set to " - << state->requestOnAny.timeout); + if (state->done.load()) { + uasserted(ErrorCodes::CallbackCanceled, "Command was canceled"); } - // TODO reform with SERVER-41459 - state->timer = _reactor->makeTimer(); - state->timer->waitUntil(state->deadline, baton) - .getAsync([this, client, state, baton](Status status) { - if (status == ErrorCodes::CallbackCanceled) { - invariant(state->done.load()); - return; - } - - if (state->done.swap(true)) { - return; - } - - if (getTestCommandsEnabled()) { - stdx::lock_guard<Latch> lk(_mutex); - _counters.timedOut++; - } - - const std::string message = str::stream() - << "Request " << state->requestOnAny.id << " timed out" - << ", deadline was " << state->deadline.toString() << ", op was " - << redact(state->requestOnAny.toString()); + state->setTimer(); - LOG(2) << message; - state->promise.setError( - Status(ErrorCodes::NetworkInterfaceExceededTimeLimit, message)); - - client->cancel(baton); - }); - } + if (_counters) { + _counters->recordSent(); + } - client->runCommandRequest(*state->request, baton) - .thenRunOn(baton ? ExecutorPtr(baton) : ExecutorPtr(_reactor)) - .then([this, state](RemoteCommandResponse response) { - if (state->done.load()) { - uasserted(ErrorCodes::CallbackCanceled, "Callback was canceled"); - } + return state->client()->runCommandRequest(*state->request, state->baton); + }); - const auto& target = state->conn->getHostAndPort(); + auto metadataCallback = [this, state](RemoteCommandResponse response) { + // This callback will package up an RCR into a RCoaR and run the metadata hook. We hold it + // separate because it needs to run on both paths after thenRunOn(). + if (state->done.load()) { + uasserted(ErrorCodes::CallbackCanceled, "Callback was canceled"); + } - if (_metadataHook && response.status.isOK()) { - response.status = - _metadataHook->readReplyMetadata(nullptr, target.toString(), response.data); - } + const auto& target = state->conn->getHostAndPort(); - return RemoteCommandOnAnyResponse(target, std::move(response)); - }) - .getAsync([this, state, baton](StatusWith<RemoteCommandOnAnyResponse> swr) { - if (!swr.isOK()) { - state->conn->indicateFailure(swr.getStatus()); - } else if (!swr.getValue().isOK()) { - state->conn->indicateFailure(swr.getValue().status); - } else { - state->conn->indicateUsed(); - state->conn->indicateSuccess(); - } + if (_metadataHook && response.status.isOK()) { + uassertStatusOK( + _metadataHook->readReplyMetadata(nullptr, target.toString(), response.data)); + } - if (state->done.swap(true)) { - return; - } + return RemoteCommandOnAnyResponse(target, std::move(response)); + }; - if (getTestCommandsEnabled()) { - stdx::lock_guard<Latch> lk(_mutex); - if (swr.isOK() && swr.getValue().status.isOK()) { - _counters.succeeded++; - } else { - _counters.failed++; + if (state->baton) { + // If we have a baton then use it for the promise and then switch to the reactor to return + // our connection. + std::move(clientFuture) + .thenRunOn(state->baton) + .then(std::move(metadataCallback)) + .onCompletion([this, state](StatusWith<RemoteCommandOnAnyResponse> swr) { + auto status = swr.getStatus(); + if (state->done.swap(true)) { + return status; } - } - if (state->timer) { - state->timer->cancel(baton); - } + state->promise.setFromStatusWith(std::move(swr)); + return status; + }) + .thenRunOn(_reactor) + .getAsync([this, state](Status status) { state->returnConnection(status); }); + } else { + // If we do not have a baton, then we can fulfill the promise and return our connection in + // the same callback + std::move(clientFuture) + .thenRunOn(_reactor) + .then(std::move(metadataCallback)) + .getAsync([this, state](StatusWith<RemoteCommandOnAnyResponse> swr) { + auto status = swr.getStatus(); + ON_BLOCK_EXIT([&] { state->returnConnection(status); }); + if (state->done.swap(true)) { + return; + } - state->promise.setFromStatusWith(std::move(swr)); - }); + state->promise.setFromStatusWith(std::move(swr)); + }); + } } void NetworkInterfaceTL::cancelCommand(const TaskExecutor::CallbackHandle& cbHandle, - const BatonHandle& baton) { + const BatonHandle&) { stdx::unique_lock<Latch> lk(_inProgressMutex); auto it = _inProgress.find(cbHandle); if (it == _inProgress.end()) { @@ -487,20 +586,12 @@ void NetworkInterfaceTL::cancelCommand(const TaskExecutor::CallbackHandle& cbHan return; } - if (getTestCommandsEnabled()) { - stdx::lock_guard<Latch> lk(_mutex); - _counters.canceled++; - } - + // Satisfy the promise locally LOG(2) << "Canceling operation; original request was: " << redact(state->requestOnAny.toString()); state->promise.setError({ErrorCodes::CallbackCanceled, str::stream() << "Command canceled; original request was: " << redact(state->requestOnAny.toString())}); - if (state->conn) { - auto client = checked_cast<connection_pool_tl::TLConnection*>(state->conn.get()); - client->client()->cancel(baton); - } } Status NetworkInterfaceTL::schedule(unique_function<void(Status)> action) { diff --git a/src/mongo/executor/network_interface_tl.h b/src/mongo/executor/network_interface_tl.h index 80e510a015f..edfe9c2efe5 100644 --- a/src/mongo/executor/network_interface_tl.h +++ b/src/mongo/executor/network_interface_tl.h @@ -47,6 +47,8 @@ namespace mongo { namespace executor { class NetworkInterfaceTL : public NetworkInterface { + static constexpr int kDiagnosticLogLevel = 4; + public: NetworkInterfaceTL(std::string instanceName, ConnectionPool::Options connPoolOpts, @@ -86,7 +88,7 @@ public: void dropConnections(const HostAndPort& hostAndPort) override; private: - struct CommandState { + struct CommandState final : public std::enable_shared_from_this<CommandState> { CommandState(NetworkInterfaceTL* interface_, RemoteCommandRequestOnAny request_, const TaskExecutor::CallbackHandle& cbHandle_, @@ -100,6 +102,41 @@ private: const TaskExecutor::CallbackHandle& cbHandle, Promise<RemoteCommandOnAnyResponse> promise); + /** + * Return the client object bound to the current command or nullptr if there isn't one. + * + * This is only useful on the networking thread (i.e. the reactor). + */ + AsyncDBClient* client(); + + /** + * Set a timer to fulfill the promise with a timeout error. + */ + void setTimer(); + + /** + * Cancel the current client operation or do nothing if there is no client. + * + * This must be called from the networking thread (i.e. the reactor). + */ + void cancel(); + + /** + * Return the current connection to the pool and unset it locally. + * + * This must be called from the networking thread (i.e. the reactor). + */ + void returnConnection(Status status); + + /** + * Fulfill the promise for the Command. + * + * This will throw/invariant if called multiple times. In an ideal world, this would do the + * swap on CommandState::done for you and return early if it was already true. It does not + * do so currently. + */ + void tryFinish(Status status); + NetworkInterfaceTL* interface; RemoteCommandRequestOnAny requestOnAny; @@ -108,9 +145,11 @@ private: Date_t deadline = RemoteCommandRequest::kNoExpirationDate; Date_t start; + BatonHandle baton; + std::unique_ptr<transport::ReactorTimer> timer; + StrongWeakFinishLine finishLine; ConnectionPool::ConnectionHandle conn; - std::unique_ptr<transport::ReactorTimer> timer; AtomicWord<bool> done; Promise<RemoteCommandOnAnyResponse> promise; @@ -137,13 +176,19 @@ private: void _answerAlarm(Status status, std::shared_ptr<AlarmState> state); void _run(); - void _onAcquireConn(std::shared_ptr<CommandState> state, - ConnectionPool::ConnectionHandle conn, - const BatonHandle& baton); + + /** + * Structure a future chain based upon a CommandState that has received a good connection + * + * This command starts on the reactor to launch the command and its future chain must end on the + * reactor to return the connection. The internal future chain essentially starts with sending + * the RemoteCommandRequest and ends with receiving the RemoteCommandResponse. + */ + void _onAcquireConn(std::shared_ptr<CommandState> state) noexcept; std::string _instanceName; - ServiceContext* _svcCtx; - transport::TransportLayer* _tl; + ServiceContext* _svcCtx = nullptr; + transport::TransportLayer* _tl = nullptr; // Will be created if ServiceContext is null, or if no TransportLayer was configured at startup std::unique_ptr<transport::TransportLayer> _ownedTransportLayer; transport::ReactorHandle _reactor; @@ -153,7 +198,9 @@ private: ConnectionPool::Options _connPoolOpts; std::unique_ptr<NetworkConnectionHook> _onConnectHook; std::shared_ptr<ConnectionPool> _pool; - Counters _counters; + + class SynchronizedCounters; + std::shared_ptr<SynchronizedCounters> _counters; std::unique_ptr<rpc::EgressMetadataHook> _metadataHook; |