summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCheahuychou Mao <cheahuychou.mao@mongodb.com>2020-02-10 13:42:10 -0500
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-02-19 18:59:41 +0000
commit3805c2bf3921474671fdb6bd78ea17848ecef3d9 (patch)
treee5550805a0ffd7d7879ebbd000c97f97661c3cae
parentd2c07e12a87325dca3265e1b078045cbcf909044 (diff)
downloadmongo-3805c2bf3921474671fdb6bd78ea17848ecef3d9.tar.gz
SERVER-45464 Kill canceled operations remotely
-rw-r--r--src/mongo/executor/network_interface_integration_test.cpp137
-rw-r--r--src/mongo/executor/network_interface_tl.cpp122
-rw-r--r--src/mongo/executor/network_interface_tl.h15
-rw-r--r--src/mongo/executor/remote_command_request.cpp10
-rw-r--r--src/mongo/executor/remote_command_request.h2
5 files changed, 244 insertions, 42 deletions
diff --git a/src/mongo/executor/network_interface_integration_test.cpp b/src/mongo/executor/network_interface_integration_test.cpp
index 08907fa240b..cebca17052f 100644
--- a/src/mongo/executor/network_interface_integration_test.cpp
+++ b/src/mongo/executor/network_interface_integration_test.cpp
@@ -162,17 +162,20 @@ public:
startNet(std::make_unique<WaitForIsMasterHook>(this));
}
- RemoteCommandRequest makeTestCommand(boost::optional<Milliseconds> timeout = boost::none,
- BSONObj cmd = BSON("echo" << 1 << "foo"
- << "bar"),
- OperationContext* opCtx = nullptr) {
+ RemoteCommandRequest makeTestCommand(
+ boost::optional<Milliseconds> timeout = boost::none,
+ BSONObj cmd = BSON("echo" << 1 << "foo"
+ << "bar"),
+ OperationContext* opCtx = nullptr,
+ boost::optional<RemoteCommandRequest::HedgeOptions> hedgeOptions = boost::none) {
auto cs = fixture();
return RemoteCommandRequest(cs.getServers().front(),
"admin",
std::move(cmd),
BSONObj(),
opCtx,
- timeout ? *timeout : RemoteCommandRequest::kNoTimeout);
+ timeout ? *timeout : RemoteCommandRequest::kNoTimeout,
+ hedgeOptions);
}
struct IsMasterData {
@@ -256,6 +259,107 @@ TEST_F(NetworkInterfaceTest, CancelOperation) {
assertNumOps(1u, 0u, 0u, 0u);
}
+TEST_F(NetworkInterfaceTest, CancelRemotely) {
+ auto runCommandAssertStatusOK = [this](BSONObj cmdObj) {
+ auto request = makeTestCommand(RemoteCommandRequest::kNoTimeout, cmdObj);
+ auto result = runCommandSync(request);
+ ASSERT_OK(result.status);
+ };
+
+ // Enable blockConnection for "echo".
+ runCommandAssertStatusOK(BSON("configureFailPoint"
+ << "failCommand"
+ << "mode"
+ << "alwaysOn"
+ << "data"
+ << BSON("blockConnection" << true << "blockTimeMS" << 1000000000
+ << "failCommands"
+ << BSON_ARRAY("echo"))));
+
+ auto cbh = makeCallbackHandle();
+ auto deferred = [&] {
+ // Kick off an "echo" operation, which should block until cancelCommand causes
+ // the operation to be killed.
+ auto cmdObj = BSON("echo" << 1 << "foo"
+ << "bar");
+ auto deferred = runCommand(
+ cbh,
+ makeTestCommand(
+ boost::none, cmdObj, nullptr /* opCtx */, RemoteCommandRequest::HedgeOptions()));
+
+ // Run cancelCommand to kill the above operation.
+ net().cancelCommand(cbh);
+
+ return deferred;
+ }();
+
+ // Wait for the operation to complete, assert that it was canceled.
+ auto result = deferred.get();
+ ASSERT_EQ(ErrorCodes::CallbackCanceled, result.status);
+ ASSERT(result.elapsedMillis);
+
+ // We have one canceled operation (echo) and two succeeded operations (configureFailPoint
+ // and _killOperations).
+ assertNumOps(1u, 0u, 0u, 2u);
+
+ // Disable blockConnection.
+ runCommandAssertStatusOK(BSON("configureFailPoint"
+ << "failCommand"
+ << "mode"
+ << "off"));
+}
+
+TEST_F(NetworkInterfaceTest, CancelRemotelyTimedOut) {
+ auto runCommandAssertStatusOK = [this](BSONObj cmdObj) {
+ auto request = makeTestCommand(RemoteCommandRequest::kNoTimeout, cmdObj);
+ auto result = runCommandSync(request);
+ ASSERT_OK(result.status);
+ };
+
+ // Enable blockConnection for "echo" and "_killOperations".
+ runCommandAssertStatusOK(BSON("configureFailPoint"
+ << "failCommand"
+ << "mode"
+ << "alwaysOn"
+ << "data"
+ << BSON("blockConnection" << true << "blockTimeMS" << 5000
+ << "failCommands"
+ << BSON_ARRAY("echo"
+ << "_killOperations"))));
+
+ auto cbh = makeCallbackHandle();
+ auto deferred = [&] {
+ // Kick off a blocking "echo" operation.
+ auto cmdObj = BSON("echo" << 1 << "foo"
+ << "bar");
+ auto deferred = runCommand(
+ cbh,
+ makeTestCommand(
+ boost::none, cmdObj, nullptr /* opCtx */, RemoteCommandRequest::HedgeOptions()));
+
+ // Run cancelCommand to kill the above operation. _killOperations is expected to block and
+ // time out, and the cancel timer is expected to cancel the operations.
+ net().cancelCommand(cbh);
+
+ return deferred;
+ }();
+
+ // Wait for op to complete, assert that it was canceled.
+ auto result = deferred.get();
+ ASSERT_EQ(ErrorCodes::NetworkInterfaceExceededTimeLimit, result.status);
+ ASSERT(result.elapsedMillis);
+
+ // We have two timedout operations (echo and _killOperations), and one succeeded operation
+ // (configureFailPoint).
+ assertNumOps(0u, 2u, 0u, 1u);
+
+ // Disable blockConnection.
+ runCommandAssertStatusOK(BSON("configureFailPoint"
+ << "failCommand"
+ << "mode"
+ << "off"));
+}
+
TEST_F(NetworkInterfaceTest, ImmediateCancel) {
auto cbh = makeCallbackHandle();
@@ -396,16 +500,8 @@ TEST_F(NetworkInterfaceTest, StartCommand) {
auto commandRequest = BSON("echo" << 1 << "boop"
<< "bop");
- // This opmsg request expect the following reply, which is generated below
- // { echo: { echo: 1, boop: "bop", $db: "admin" }, ok: 1.0 }
- auto expectedCommandReply = [&] {
- BSONObjBuilder echoed;
- echoed.appendElements(commandRequest);
- echoed << "$db"
- << "admin";
- return echoed.obj();
- }();
- auto request = makeTestCommand(boost::none, commandRequest);
+ auto request = makeTestCommand(
+ boost::none, commandRequest, nullptr /* opCtx */, RemoteCommandRequest::HedgeOptions());
auto deferred = runCommand(makeCallbackHandle(), std::move(request));
@@ -413,8 +509,15 @@ TEST_F(NetworkInterfaceTest, StartCommand) {
ASSERT(res.elapsedMillis);
uassertStatusOK(res.status);
- ASSERT_BSONOBJ_EQ(res.data.getObjectField("echo"), expectedCommandReply);
- ASSERT_EQ(res.data.getIntField("ok"), 1);
+
+ // This opmsg request expect the following reply, which is generated below
+ // { echo: { echo: 1, boop: "bop", clientOperationKey: uuid, $db: "admin" }, ok: 1.0 }
+ auto cmdObj = res.data.getObjectField("echo");
+ ASSERT_EQ(1, cmdObj.getIntField("echo"));
+ ASSERT_EQ("bop"_sd, cmdObj.getStringField("boop"));
+ ASSERT_EQ("admin"_sd, cmdObj.getStringField("$db"));
+ ASSERT_FALSE(cmdObj["clientOperationKey"].eoo());
+ ASSERT_EQ(1, res.data.getIntField("ok"));
assertNumOps(0u, 0u, 0u, 1u);
}
diff --git a/src/mongo/executor/network_interface_tl.cpp b/src/mongo/executor/network_interface_tl.cpp
index 0ad76215257..fcc650295b5 100644
--- a/src/mongo/executor/network_interface_tl.cpp
+++ b/src/mongo/executor/network_interface_tl.cpp
@@ -228,7 +228,8 @@ NetworkInterfaceTL::CommandState::CommandState(NetworkInterfaceTL* interface_,
: interface(interface_),
requestOnAny(std::move(request_)),
cbHandle(cbHandle_),
- finishLine(maxRequestFailures()) {}
+ finishLine(maxRequestFailures()),
+ operationKey(request_.operationKey) {}
auto NetworkInterfaceTL::CommandState::make(NetworkInterfaceTL* interface,
@@ -415,7 +416,7 @@ Status NetworkInterfaceTL::startCommand(const TaskExecutor::CallbackHandle& cbHa
* it then schedules onto the reactor to finish.
* 2. All nodes are bad but some needed new connections. The reaction to the new connection
* needs to be scheduled onto the reactor.
- * 3. The timer in onAcquireConn() fires and the operation times out. ASIO timers run on the
+ * 3. The timer in sendRequest() fires and the operation times out. ASIO timers run on the
* reactor.
* 4. AsyncDBClient::runCommandRequest() concludes. This path is sadly indeterminate since
* early failure can still be inline. The future chain is thenRunOn() either the baton or
@@ -423,8 +424,8 @@ Status NetworkInterfaceTL::startCommand(const TaskExecutor::CallbackHandle& cbHa
*
* The important bits to remember here:
* - onFinish() is out-of-line
- * - Stay inline as long as feasible until onAcquireConn()---i.e. until network operations
- * - Baton execution *cannot* be relied upon at least until onAcquireConn()
+ * - Stay inline as long as feasible until sendRequest()---i.e. until network operations
+ * - Baton execution *cannot* be relied upon at least until sendRequest()
* - Connection failure and command failure are related but distinct
*/
@@ -433,6 +434,7 @@ Status NetworkInterfaceTL::startCommand(const TaskExecutor::CallbackHandle& cbHa
StatusWith<RemoteCommandOnAnyResponse> swr) {
invariant(swr.isOK());
auto rs = std::move(swr.getValue());
+
LOGV2_DEBUG(22597,
2,
"Request {cmdState_requestOnAny_id} finished with response: "
@@ -496,6 +498,11 @@ void NetworkInterfaceTL::CommandState::doMetadataHook(const RemoteCommandOnAnyRe
void NetworkInterfaceTL::RequestState::trySend(StatusWith<ConnectionPool::ConnectionHandle> swConn,
size_t idx) noexcept {
+ trySend(std::move(swConn), {cmdState->requestOnAny, idx});
+}
+
+void NetworkInterfaceTL::RequestState::trySend(StatusWith<ConnectionPool::ConnectionHandle> swConn,
+ RemoteCommandRequest remoteCommandRequest) noexcept {
// Our connection wasn't any good
if (!swConn.isOK()) {
if (!connFinishLine.arriveWeakly()) {
@@ -524,8 +531,8 @@ void NetworkInterfaceTL::RequestState::trySend(StatusWith<ConnectionPool::Connec
}
// We have a connection and the command hasn't already been attempted
- request.emplace(cmdState->requestOnAny, idx);
- host = cmdState->requestOnAny.target[idx];
+ request.emplace(remoteCommandRequest);
+ host = request.get().target;
conn = std::move(swConn.getValue());
networkInterfaceDiscardCommandsAfterAcquireConn.pauseWhileSet();
@@ -602,27 +609,110 @@ void NetworkInterfaceTL::cancelCommand(const TaskExecutor::CallbackHandle& cbHan
if (it == _inProgress.end()) {
return;
}
- auto state = it->second.lock();
- if (!state) {
+ auto cmdStateToCancel = it->second.lock();
+ if (!cmdStateToCancel) {
return;
}
_inProgress.erase(it);
lk.unlock();
- if (!state->finishLine.arriveStrongly()) {
+ if (!cmdStateToCancel->finishLine.arriveStrongly()) {
// If we didn't cross the command finishLine first, the promise is already fulfilled
return;
}
- // Satisfy the promise locally
- LOGV2_DEBUG(22599,
+ auto requestStateToCancel = cmdStateToCancel->requestStatePtr.lock();
+
+ // If we didn't cross the connection finishLine first, the command must have acquired a
+ // connection.
+ auto hasAcquiredConn =
+ requestStateToCancel && !requestStateToCancel->connFinishLine.arriveStrongly();
+
+ // Only kill the command if it has an operation key and was attempted.
+ bool shouldKillOp =
+ cmdStateToCancel->operationKey && hasAcquiredConn && requestStateToCancel->request;
+
+ if (!shouldKillOp) {
+ // Satisfy the promise locally immediately.
+ LOGV2_DEBUG(22599,
+ 2,
+ "Canceling operation; original request was: {cmdStateToCancel_requestOnAny}",
+ "cmdStateToCancel_requestOnAny"_attr =
+ redact(cmdStateToCancel->requestOnAny.toString()));
+ cmdStateToCancel->promise.setError(
+ {ErrorCodes::CallbackCanceled,
+ str::stream() << "Command canceled; original request was: "
+ << redact(cmdStateToCancel->requestOnAny.toString())});
+ return;
+ }
+
+ _killOperation(requestStateToCancel);
+}
+
+void NetworkInterfaceTL::_killOperation(std::shared_ptr<RequestState> requestStateToKill) {
+ auto [target, sslMode] = [&] {
+ invariant(requestStateToKill->request);
+ auto request = requestStateToKill->request.get();
+ return std::make_pair(request.target, request.sslMode);
+ }();
+ auto cmdStateToKill = requestStateToKill->cmdState;
+ auto operationKey = cmdStateToKill->operationKey.get();
+
+ // Make a request state for _killOperations.
+ executor::RemoteCommandRequest killOpRequest(
+ target,
+ "admin",
+ BSON("_killOperations" << 1 << "operationKeys" << BSON_ARRAY(operationKey)),
+ nullptr,
+ kCancelCommandTimeout);
+
+ auto cbHandle = executor::TaskExecutor::CallbackHandle();
+ auto [killOpCmdState, future] = CommandState::make(this, killOpRequest, cbHandle);
+ killOpCmdState->deadline = killOpCmdState->stopwatch.start() + killOpRequest.timeout;
+
+ std::move(future).getAsync(
+ [this, operationKey, cmdStateToKill](StatusWith<RemoteCommandOnAnyResponse> swr) {
+ invariant(swr.isOK());
+ auto rs = std::move(swr.getValue());
+ LOGV2_DEBUG(
+ 51813,
2,
- "Canceling operation; original request was: {state_requestOnAny}",
- "state_requestOnAny"_attr = redact(state->requestOnAny.toString()));
- state->promise.setError({ErrorCodes::CallbackCanceled,
- str::stream() << "Command canceled; original request was: "
- << redact(state->requestOnAny.toString())});
+ "Remote _killOperations request to cancel command with operationKey {operationKey}"
+ " finished with response: {rsdata_or_status}",
+ "operationKey"_attr = operationKey,
+ "rsdata_or_status"_attr =
+ redact(rs.isOK() ? rs.data.toString() : rs.status.toString()));
+
+ // Satisfy the promise locally.
+ if (rs.isOK()) {
+ // _killOperations succeeded but the operation interrupted error is expected to be
+ // ignored in resolve() since cancelCommand() crossed the command finishLine first.
+ cmdStateToKill->promise.setError(
+ {ErrorCodes::CallbackCanceled,
+ str::stream() << "cancelCommand successfully issued remote interruption"});
+ } else {
+ // _killOperations timed out or failed due to other errors.
+ rs.status.addContext("operation's client canceled by cancelCommand");
+ cmdStateToKill->promise.setError(std::move(rs.status));
+ }
+ });
+
+ auto killOpRequestState = std::make_shared<RequestState>(killOpCmdState);
+ killOpCmdState->requestStatePtr = killOpRequestState;
+
+ // Send the _killOperations request.
+ auto connFuture = _pool->get(target, sslMode, killOpRequest.kNoTimeout);
+ if (connFuture.isReady()) {
+ killOpRequestState->trySend(std::move(connFuture).getNoThrow(), killOpRequest);
+ return;
+ }
+
+ std::move(connFuture)
+ .thenRunOn(_reactor)
+ .getAsync([this, killOpRequestState, killOpRequest](auto swConn) {
+ killOpRequestState->trySend(std::move(swConn), killOpRequest);
+ });
}
Status NetworkInterfaceTL::schedule(unique_function<void(Status)> action) {
diff --git a/src/mongo/executor/network_interface_tl.h b/src/mongo/executor/network_interface_tl.h
index c8ed6ff4ba4..8957cd875c6 100644
--- a/src/mongo/executor/network_interface_tl.h
+++ b/src/mongo/executor/network_interface_tl.h
@@ -56,6 +56,8 @@ public:
std::unique_ptr<NetworkConnectionHook> onConnectHook,
std::unique_ptr<rpc::EgressMetadataHook> metadataHook);
+ constexpr static Milliseconds kCancelCommandTimeout{1000};
+
std::string getDiagnosticString() override;
void appendConnectionStats(ConnectionPoolStats* stats) const override;
std::string getHostName() override;
@@ -149,6 +151,8 @@ private:
StrongWeakFinishLine finishLine;
Promise<RemoteCommandOnAnyResponse> promise;
+
+ boost::optional<UUID> operationKey;
};
struct RequestState final : public std::enable_shared_from_this<RequestState> {
@@ -182,6 +186,8 @@ private:
* Attempt to send a request using the given connection
*/
void trySend(StatusWith<ConnectionPool::ConnectionHandle> swConn, size_t idx) noexcept;
+ void trySend(StatusWith<ConnectionPool::ConnectionHandle> swConn,
+ RemoteCommandRequest remoteCommandRequest) noexcept;
/**
* Resolve an eventual response
@@ -225,14 +231,7 @@ private:
void _run();
- /**
- * Structure a future chain based upon a CommandState that has received a good connection
- *
- * This command starts on the reactor to launch the command and its future chain must end on the
- * reactor to return the connection. The internal future chain essentially starts with sending
- * the RemoteCommandRequest and ends with receiving the RemoteCommandResponse.
- */
- void _onAcquireConn(std::shared_ptr<CommandState> state) noexcept;
+ void _killOperation(std::shared_ptr<RequestState> requestStateToKill);
std::string _instanceName;
ServiceContext* _svcCtx = nullptr;
diff --git a/src/mongo/executor/remote_command_request.cpp b/src/mongo/executor/remote_command_request.cpp
index ab3df29383d..770b067c7f4 100644
--- a/src/mongo/executor/remote_command_request.cpp
+++ b/src/mongo/executor/remote_command_request.cpp
@@ -80,11 +80,17 @@ RemoteCommandRequestBase::RemoteCommandRequestBase(RequestId requestId,
cmdObj = theCmdObj;
}
+ if (hedgeOptions) {
+ operationKey.emplace(UUID::gen());
+ cmdObj = cmdObj.addField(BSON("clientOperationKey" << operationKey.get()).firstElement());
+ }
+
timeout = opCtx ? std::min<Milliseconds>(opCtx->getRemainingMaxTimeMillis(), timeoutMillis)
: timeoutMillis;
}
-RemoteCommandRequestBase::RemoteCommandRequestBase() : id(requestIdCounter.addAndFetch(1)) {}
+RemoteCommandRequestBase::RemoteCommandRequestBase()
+ : id(requestIdCounter.addAndFetch(1)), operationKey(UUID::gen()) {}
template <typename T>
RemoteCommandRequestImpl<T>::RemoteCommandRequestImpl() = default;
@@ -150,7 +156,9 @@ std::string RemoteCommandRequestImpl<T>::toString() const {
}
if (hedgeOptions) {
+ invariant(operationKey);
out << " hedgeOptions.count: " << hedgeOptions->count;
+ out << " operationKey: " << operationKey.get();
}
out << " cmd:" << cmdObj.toString();
diff --git a/src/mongo/executor/remote_command_request.h b/src/mongo/executor/remote_command_request.h
index d217462fb09..83608bf3ebd 100644
--- a/src/mongo/executor/remote_command_request.h
+++ b/src/mongo/executor/remote_command_request.h
@@ -83,6 +83,8 @@ struct RemoteCommandRequestBase {
boost::optional<HedgeOptions> hedgeOptions;
+ boost::optional<UUID> operationKey;
+
Milliseconds timeout = kNoTimeout;
// Deadline by when the request must be completed