diff options
author | Cheahuychou Mao <cheahuychou.mao@mongodb.com> | 2020-02-10 13:42:10 -0500 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-02-19 18:59:41 +0000 |
commit | 3805c2bf3921474671fdb6bd78ea17848ecef3d9 (patch) | |
tree | e5550805a0ffd7d7879ebbd000c97f97661c3cae | |
parent | d2c07e12a87325dca3265e1b078045cbcf909044 (diff) | |
download | mongo-3805c2bf3921474671fdb6bd78ea17848ecef3d9.tar.gz |
SERVER-45464 Kill canceled operations remotely
-rw-r--r-- | src/mongo/executor/network_interface_integration_test.cpp | 137 | ||||
-rw-r--r-- | src/mongo/executor/network_interface_tl.cpp | 122 | ||||
-rw-r--r-- | src/mongo/executor/network_interface_tl.h | 15 | ||||
-rw-r--r-- | src/mongo/executor/remote_command_request.cpp | 10 | ||||
-rw-r--r-- | src/mongo/executor/remote_command_request.h | 2 |
5 files changed, 244 insertions, 42 deletions
diff --git a/src/mongo/executor/network_interface_integration_test.cpp b/src/mongo/executor/network_interface_integration_test.cpp index 08907fa240b..cebca17052f 100644 --- a/src/mongo/executor/network_interface_integration_test.cpp +++ b/src/mongo/executor/network_interface_integration_test.cpp @@ -162,17 +162,20 @@ public: startNet(std::make_unique<WaitForIsMasterHook>(this)); } - RemoteCommandRequest makeTestCommand(boost::optional<Milliseconds> timeout = boost::none, - BSONObj cmd = BSON("echo" << 1 << "foo" - << "bar"), - OperationContext* opCtx = nullptr) { + RemoteCommandRequest makeTestCommand( + boost::optional<Milliseconds> timeout = boost::none, + BSONObj cmd = BSON("echo" << 1 << "foo" + << "bar"), + OperationContext* opCtx = nullptr, + boost::optional<RemoteCommandRequest::HedgeOptions> hedgeOptions = boost::none) { auto cs = fixture(); return RemoteCommandRequest(cs.getServers().front(), "admin", std::move(cmd), BSONObj(), opCtx, - timeout ? *timeout : RemoteCommandRequest::kNoTimeout); + timeout ? *timeout : RemoteCommandRequest::kNoTimeout, + hedgeOptions); } struct IsMasterData { @@ -256,6 +259,107 @@ TEST_F(NetworkInterfaceTest, CancelOperation) { assertNumOps(1u, 0u, 0u, 0u); } +TEST_F(NetworkInterfaceTest, CancelRemotely) { + auto runCommandAssertStatusOK = [this](BSONObj cmdObj) { + auto request = makeTestCommand(RemoteCommandRequest::kNoTimeout, cmdObj); + auto result = runCommandSync(request); + ASSERT_OK(result.status); + }; + + // Enable blockConnection for "echo". + runCommandAssertStatusOK(BSON("configureFailPoint" + << "failCommand" + << "mode" + << "alwaysOn" + << "data" + << BSON("blockConnection" << true << "blockTimeMS" << 1000000000 + << "failCommands" + << BSON_ARRAY("echo")))); + + auto cbh = makeCallbackHandle(); + auto deferred = [&] { + // Kick off an "echo" operation, which should block until cancelCommand causes + // the operation to be killed. + auto cmdObj = BSON("echo" << 1 << "foo" + << "bar"); + auto deferred = runCommand( + cbh, + makeTestCommand( + boost::none, cmdObj, nullptr /* opCtx */, RemoteCommandRequest::HedgeOptions())); + + // Run cancelCommand to kill the above operation. + net().cancelCommand(cbh); + + return deferred; + }(); + + // Wait for the operation to complete, assert that it was canceled. + auto result = deferred.get(); + ASSERT_EQ(ErrorCodes::CallbackCanceled, result.status); + ASSERT(result.elapsedMillis); + + // We have one canceled operation (echo) and two succeeded operations (configureFailPoint + // and _killOperations). + assertNumOps(1u, 0u, 0u, 2u); + + // Disable blockConnection. + runCommandAssertStatusOK(BSON("configureFailPoint" + << "failCommand" + << "mode" + << "off")); +} + +TEST_F(NetworkInterfaceTest, CancelRemotelyTimedOut) { + auto runCommandAssertStatusOK = [this](BSONObj cmdObj) { + auto request = makeTestCommand(RemoteCommandRequest::kNoTimeout, cmdObj); + auto result = runCommandSync(request); + ASSERT_OK(result.status); + }; + + // Enable blockConnection for "echo" and "_killOperations". + runCommandAssertStatusOK(BSON("configureFailPoint" + << "failCommand" + << "mode" + << "alwaysOn" + << "data" + << BSON("blockConnection" << true << "blockTimeMS" << 5000 + << "failCommands" + << BSON_ARRAY("echo" + << "_killOperations")))); + + auto cbh = makeCallbackHandle(); + auto deferred = [&] { + // Kick off a blocking "echo" operation. + auto cmdObj = BSON("echo" << 1 << "foo" + << "bar"); + auto deferred = runCommand( + cbh, + makeTestCommand( + boost::none, cmdObj, nullptr /* opCtx */, RemoteCommandRequest::HedgeOptions())); + + // Run cancelCommand to kill the above operation. _killOperations is expected to block and + // time out, and the cancel timer is expected to cancel the operations. + net().cancelCommand(cbh); + + return deferred; + }(); + + // Wait for op to complete, assert that it was canceled. + auto result = deferred.get(); + ASSERT_EQ(ErrorCodes::NetworkInterfaceExceededTimeLimit, result.status); + ASSERT(result.elapsedMillis); + + // We have two timedout operations (echo and _killOperations), and one succeeded operation + // (configureFailPoint). + assertNumOps(0u, 2u, 0u, 1u); + + // Disable blockConnection. + runCommandAssertStatusOK(BSON("configureFailPoint" + << "failCommand" + << "mode" + << "off")); +} + TEST_F(NetworkInterfaceTest, ImmediateCancel) { auto cbh = makeCallbackHandle(); @@ -396,16 +500,8 @@ TEST_F(NetworkInterfaceTest, StartCommand) { auto commandRequest = BSON("echo" << 1 << "boop" << "bop"); - // This opmsg request expect the following reply, which is generated below - // { echo: { echo: 1, boop: "bop", $db: "admin" }, ok: 1.0 } - auto expectedCommandReply = [&] { - BSONObjBuilder echoed; - echoed.appendElements(commandRequest); - echoed << "$db" - << "admin"; - return echoed.obj(); - }(); - auto request = makeTestCommand(boost::none, commandRequest); + auto request = makeTestCommand( + boost::none, commandRequest, nullptr /* opCtx */, RemoteCommandRequest::HedgeOptions()); auto deferred = runCommand(makeCallbackHandle(), std::move(request)); @@ -413,8 +509,15 @@ TEST_F(NetworkInterfaceTest, StartCommand) { ASSERT(res.elapsedMillis); uassertStatusOK(res.status); - ASSERT_BSONOBJ_EQ(res.data.getObjectField("echo"), expectedCommandReply); - ASSERT_EQ(res.data.getIntField("ok"), 1); + + // This opmsg request expect the following reply, which is generated below + // { echo: { echo: 1, boop: "bop", clientOperationKey: uuid, $db: "admin" }, ok: 1.0 } + auto cmdObj = res.data.getObjectField("echo"); + ASSERT_EQ(1, cmdObj.getIntField("echo")); + ASSERT_EQ("bop"_sd, cmdObj.getStringField("boop")); + ASSERT_EQ("admin"_sd, cmdObj.getStringField("$db")); + ASSERT_FALSE(cmdObj["clientOperationKey"].eoo()); + ASSERT_EQ(1, res.data.getIntField("ok")); assertNumOps(0u, 0u, 0u, 1u); } diff --git a/src/mongo/executor/network_interface_tl.cpp b/src/mongo/executor/network_interface_tl.cpp index 0ad76215257..fcc650295b5 100644 --- a/src/mongo/executor/network_interface_tl.cpp +++ b/src/mongo/executor/network_interface_tl.cpp @@ -228,7 +228,8 @@ NetworkInterfaceTL::CommandState::CommandState(NetworkInterfaceTL* interface_, : interface(interface_), requestOnAny(std::move(request_)), cbHandle(cbHandle_), - finishLine(maxRequestFailures()) {} + finishLine(maxRequestFailures()), + operationKey(request_.operationKey) {} auto NetworkInterfaceTL::CommandState::make(NetworkInterfaceTL* interface, @@ -415,7 +416,7 @@ Status NetworkInterfaceTL::startCommand(const TaskExecutor::CallbackHandle& cbHa * it then schedules onto the reactor to finish. * 2. All nodes are bad but some needed new connections. The reaction to the new connection * needs to be scheduled onto the reactor. - * 3. The timer in onAcquireConn() fires and the operation times out. ASIO timers run on the + * 3. The timer in sendRequest() fires and the operation times out. ASIO timers run on the * reactor. * 4. AsyncDBClient::runCommandRequest() concludes. This path is sadly indeterminate since * early failure can still be inline. The future chain is thenRunOn() either the baton or @@ -423,8 +424,8 @@ Status NetworkInterfaceTL::startCommand(const TaskExecutor::CallbackHandle& cbHa * * The important bits to remember here: * - onFinish() is out-of-line - * - Stay inline as long as feasible until onAcquireConn()---i.e. until network operations - * - Baton execution *cannot* be relied upon at least until onAcquireConn() + * - Stay inline as long as feasible until sendRequest()---i.e. until network operations + * - Baton execution *cannot* be relied upon at least until sendRequest() * - Connection failure and command failure are related but distinct */ @@ -433,6 +434,7 @@ Status NetworkInterfaceTL::startCommand(const TaskExecutor::CallbackHandle& cbHa StatusWith<RemoteCommandOnAnyResponse> swr) { invariant(swr.isOK()); auto rs = std::move(swr.getValue()); + LOGV2_DEBUG(22597, 2, "Request {cmdState_requestOnAny_id} finished with response: " @@ -496,6 +498,11 @@ void NetworkInterfaceTL::CommandState::doMetadataHook(const RemoteCommandOnAnyRe void NetworkInterfaceTL::RequestState::trySend(StatusWith<ConnectionPool::ConnectionHandle> swConn, size_t idx) noexcept { + trySend(std::move(swConn), {cmdState->requestOnAny, idx}); +} + +void NetworkInterfaceTL::RequestState::trySend(StatusWith<ConnectionPool::ConnectionHandle> swConn, + RemoteCommandRequest remoteCommandRequest) noexcept { // Our connection wasn't any good if (!swConn.isOK()) { if (!connFinishLine.arriveWeakly()) { @@ -524,8 +531,8 @@ void NetworkInterfaceTL::RequestState::trySend(StatusWith<ConnectionPool::Connec } // We have a connection and the command hasn't already been attempted - request.emplace(cmdState->requestOnAny, idx); - host = cmdState->requestOnAny.target[idx]; + request.emplace(remoteCommandRequest); + host = request.get().target; conn = std::move(swConn.getValue()); networkInterfaceDiscardCommandsAfterAcquireConn.pauseWhileSet(); @@ -602,27 +609,110 @@ void NetworkInterfaceTL::cancelCommand(const TaskExecutor::CallbackHandle& cbHan if (it == _inProgress.end()) { return; } - auto state = it->second.lock(); - if (!state) { + auto cmdStateToCancel = it->second.lock(); + if (!cmdStateToCancel) { return; } _inProgress.erase(it); lk.unlock(); - if (!state->finishLine.arriveStrongly()) { + if (!cmdStateToCancel->finishLine.arriveStrongly()) { // If we didn't cross the command finishLine first, the promise is already fulfilled return; } - // Satisfy the promise locally - LOGV2_DEBUG(22599, + auto requestStateToCancel = cmdStateToCancel->requestStatePtr.lock(); + + // If we didn't cross the connection finishLine first, the command must have acquired a + // connection. + auto hasAcquiredConn = + requestStateToCancel && !requestStateToCancel->connFinishLine.arriveStrongly(); + + // Only kill the command if it has an operation key and was attempted. + bool shouldKillOp = + cmdStateToCancel->operationKey && hasAcquiredConn && requestStateToCancel->request; + + if (!shouldKillOp) { + // Satisfy the promise locally immediately. + LOGV2_DEBUG(22599, + 2, + "Canceling operation; original request was: {cmdStateToCancel_requestOnAny}", + "cmdStateToCancel_requestOnAny"_attr = + redact(cmdStateToCancel->requestOnAny.toString())); + cmdStateToCancel->promise.setError( + {ErrorCodes::CallbackCanceled, + str::stream() << "Command canceled; original request was: " + << redact(cmdStateToCancel->requestOnAny.toString())}); + return; + } + + _killOperation(requestStateToCancel); +} + +void NetworkInterfaceTL::_killOperation(std::shared_ptr<RequestState> requestStateToKill) { + auto [target, sslMode] = [&] { + invariant(requestStateToKill->request); + auto request = requestStateToKill->request.get(); + return std::make_pair(request.target, request.sslMode); + }(); + auto cmdStateToKill = requestStateToKill->cmdState; + auto operationKey = cmdStateToKill->operationKey.get(); + + // Make a request state for _killOperations. + executor::RemoteCommandRequest killOpRequest( + target, + "admin", + BSON("_killOperations" << 1 << "operationKeys" << BSON_ARRAY(operationKey)), + nullptr, + kCancelCommandTimeout); + + auto cbHandle = executor::TaskExecutor::CallbackHandle(); + auto [killOpCmdState, future] = CommandState::make(this, killOpRequest, cbHandle); + killOpCmdState->deadline = killOpCmdState->stopwatch.start() + killOpRequest.timeout; + + std::move(future).getAsync( + [this, operationKey, cmdStateToKill](StatusWith<RemoteCommandOnAnyResponse> swr) { + invariant(swr.isOK()); + auto rs = std::move(swr.getValue()); + LOGV2_DEBUG( + 51813, 2, - "Canceling operation; original request was: {state_requestOnAny}", - "state_requestOnAny"_attr = redact(state->requestOnAny.toString())); - state->promise.setError({ErrorCodes::CallbackCanceled, - str::stream() << "Command canceled; original request was: " - << redact(state->requestOnAny.toString())}); + "Remote _killOperations request to cancel command with operationKey {operationKey}" + " finished with response: {rsdata_or_status}", + "operationKey"_attr = operationKey, + "rsdata_or_status"_attr = + redact(rs.isOK() ? rs.data.toString() : rs.status.toString())); + + // Satisfy the promise locally. + if (rs.isOK()) { + // _killOperations succeeded but the operation interrupted error is expected to be + // ignored in resolve() since cancelCommand() crossed the command finishLine first. + cmdStateToKill->promise.setError( + {ErrorCodes::CallbackCanceled, + str::stream() << "cancelCommand successfully issued remote interruption"}); + } else { + // _killOperations timed out or failed due to other errors. + rs.status.addContext("operation's client canceled by cancelCommand"); + cmdStateToKill->promise.setError(std::move(rs.status)); + } + }); + + auto killOpRequestState = std::make_shared<RequestState>(killOpCmdState); + killOpCmdState->requestStatePtr = killOpRequestState; + + // Send the _killOperations request. + auto connFuture = _pool->get(target, sslMode, killOpRequest.kNoTimeout); + if (connFuture.isReady()) { + killOpRequestState->trySend(std::move(connFuture).getNoThrow(), killOpRequest); + return; + } + + std::move(connFuture) + .thenRunOn(_reactor) + .getAsync([this, killOpRequestState, killOpRequest](auto swConn) { + killOpRequestState->trySend(std::move(swConn), killOpRequest); + }); } Status NetworkInterfaceTL::schedule(unique_function<void(Status)> action) { diff --git a/src/mongo/executor/network_interface_tl.h b/src/mongo/executor/network_interface_tl.h index c8ed6ff4ba4..8957cd875c6 100644 --- a/src/mongo/executor/network_interface_tl.h +++ b/src/mongo/executor/network_interface_tl.h @@ -56,6 +56,8 @@ public: std::unique_ptr<NetworkConnectionHook> onConnectHook, std::unique_ptr<rpc::EgressMetadataHook> metadataHook); + constexpr static Milliseconds kCancelCommandTimeout{1000}; + std::string getDiagnosticString() override; void appendConnectionStats(ConnectionPoolStats* stats) const override; std::string getHostName() override; @@ -149,6 +151,8 @@ private: StrongWeakFinishLine finishLine; Promise<RemoteCommandOnAnyResponse> promise; + + boost::optional<UUID> operationKey; }; struct RequestState final : public std::enable_shared_from_this<RequestState> { @@ -182,6 +186,8 @@ private: * Attempt to send a request using the given connection */ void trySend(StatusWith<ConnectionPool::ConnectionHandle> swConn, size_t idx) noexcept; + void trySend(StatusWith<ConnectionPool::ConnectionHandle> swConn, + RemoteCommandRequest remoteCommandRequest) noexcept; /** * Resolve an eventual response @@ -225,14 +231,7 @@ private: void _run(); - /** - * Structure a future chain based upon a CommandState that has received a good connection - * - * This command starts on the reactor to launch the command and its future chain must end on the - * reactor to return the connection. The internal future chain essentially starts with sending - * the RemoteCommandRequest and ends with receiving the RemoteCommandResponse. - */ - void _onAcquireConn(std::shared_ptr<CommandState> state) noexcept; + void _killOperation(std::shared_ptr<RequestState> requestStateToKill); std::string _instanceName; ServiceContext* _svcCtx = nullptr; diff --git a/src/mongo/executor/remote_command_request.cpp b/src/mongo/executor/remote_command_request.cpp index ab3df29383d..770b067c7f4 100644 --- a/src/mongo/executor/remote_command_request.cpp +++ b/src/mongo/executor/remote_command_request.cpp @@ -80,11 +80,17 @@ RemoteCommandRequestBase::RemoteCommandRequestBase(RequestId requestId, cmdObj = theCmdObj; } + if (hedgeOptions) { + operationKey.emplace(UUID::gen()); + cmdObj = cmdObj.addField(BSON("clientOperationKey" << operationKey.get()).firstElement()); + } + timeout = opCtx ? std::min<Milliseconds>(opCtx->getRemainingMaxTimeMillis(), timeoutMillis) : timeoutMillis; } -RemoteCommandRequestBase::RemoteCommandRequestBase() : id(requestIdCounter.addAndFetch(1)) {} +RemoteCommandRequestBase::RemoteCommandRequestBase() + : id(requestIdCounter.addAndFetch(1)), operationKey(UUID::gen()) {} template <typename T> RemoteCommandRequestImpl<T>::RemoteCommandRequestImpl() = default; @@ -150,7 +156,9 @@ std::string RemoteCommandRequestImpl<T>::toString() const { } if (hedgeOptions) { + invariant(operationKey); out << " hedgeOptions.count: " << hedgeOptions->count; + out << " operationKey: " << operationKey.get(); } out << " cmd:" << cmdObj.toString(); diff --git a/src/mongo/executor/remote_command_request.h b/src/mongo/executor/remote_command_request.h index d217462fb09..83608bf3ebd 100644 --- a/src/mongo/executor/remote_command_request.h +++ b/src/mongo/executor/remote_command_request.h @@ -83,6 +83,8 @@ struct RemoteCommandRequestBase { boost::optional<HedgeOptions> hedgeOptions; + boost::optional<UUID> operationKey; + Milliseconds timeout = kNoTimeout; // Deadline by when the request must be completed |