summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/mongo/executor/network_interface_asio.h3
-rw-r--r--src/mongo/executor/network_interface_asio_auth.cpp5
-rw-r--r--src/mongo/executor/network_interface_asio_command.cpp45
-rw-r--r--src/mongo/executor/network_interface_asio_operation.cpp9
4 files changed, 32 insertions, 30 deletions
diff --git a/src/mongo/executor/network_interface_asio.h b/src/mongo/executor/network_interface_asio.h
index 62f9f6a4829..8e464754bfa 100644
--- a/src/mongo/executor/network_interface_asio.h
+++ b/src/mongo/executor/network_interface_asio.h
@@ -289,7 +289,8 @@ private:
// has to form the command manually (e.g. to use a specific requestBuilder).
Status beginCommand(Message&& newCommand, const HostAndPort& target);
- AsyncCommand* command();
+ AsyncCommand& command();
+ bool commandIsInitialized() const;
void finish(TaskExecutor::ResponseStatus&& status);
diff --git a/src/mongo/executor/network_interface_asio_auth.cpp b/src/mongo/executor/network_interface_asio_auth.cpp
index 3070de706c5..3ed78167d0a 100644
--- a/src/mongo/executor/network_interface_asio_auth.cpp
+++ b/src/mongo/executor/network_interface_asio_auth.cpp
@@ -92,7 +92,7 @@ void NetworkInterfaceASIO::_runIsMaster(AsyncOp* op) {
// Callback to parse protocol information out of received ismaster response
auto parseIsMaster = [this, op]() {
- auto swCommandReply = op->command()->response(op, rpc::Protocol::kOpQuery, now());
+ auto swCommandReply = op->command().response(op, rpc::Protocol::kOpQuery, now());
if (!swCommandReply.isOK()) {
return _completeOperation(op, swCommandReply);
}
@@ -195,8 +195,7 @@ void NetworkInterfaceASIO::_authenticate(AsyncOp* op) {
}
auto callAuthCompletionHandler = [this, op, handler]() {
- auto authResponse =
- op->command()->response(op, op->operationProtocol(), now(), nullptr);
+ auto authResponse = op->command().response(op, op->operationProtocol(), now(), nullptr);
handler(authResponse);
};
diff --git a/src/mongo/executor/network_interface_asio_command.cpp b/src/mongo/executor/network_interface_asio_command.cpp
index 8102ed93a5f..157173358a1 100644
--- a/src/mongo/executor/network_interface_asio_command.cpp
+++ b/src/mongo/executor/network_interface_asio_command.cpp
@@ -264,8 +264,7 @@ void NetworkInterfaceASIO::_beginCommunication(AsyncOp* op) {
}
void NetworkInterfaceASIO::_completedOpCallback(AsyncOp* op) {
- auto response =
- op->command()->response(op, op->operationProtocol(), now(), _metadataHook.get());
+ auto response = op->command().response(op, op->operationProtocol(), now(), _metadataHook.get());
_completeOperation(op, response);
}
@@ -311,8 +310,8 @@ void NetworkInterfaceASIO::_completeOperation(AsyncOp* op, ResponseStatus resp)
// If we fail during heartbeating, we won't be able to access any of op's members after
// calling finish(), so we return here.
log() << "Failed asio heartbeat to "
- << (op->command() ? op->command()->target().toString() : "unknown"s) << " - "
- << redact(resp.status);
+ << (op->commandIsInitialized() ? op->command().target().toString() : "unknown"s)
+ << " - " << redact(resp.status);
_numFailedOps.fetchAndAdd(1);
op->finish(std::move(resp));
return;
@@ -321,7 +320,7 @@ void NetworkInterfaceASIO::_completeOperation(AsyncOp* op, ResponseStatus resp)
if (!resp.isOK()) {
// In the case that resp is not OK, but _inSetup is false, we are using a connection
// that we got from the pool to execute a command, but it failed for some reason.
- if (op->command() && shouldLog(LogstreamBuilder::severityCast(2))) {
+ if (op->commandIsInitialized() && shouldLog(LogstreamBuilder::severityCast(2))) {
const auto performLog = [&resp](Message& message) {
LOG(2) << "Failed to send message. Reason: " << redact(resp.status) << ". Message: "
<< rpc::opMsgRequestFromAnyProtocol(message).body.toString(
@@ -329,12 +328,12 @@ void NetworkInterfaceASIO::_completeOperation(AsyncOp* op, ResponseStatus resp)
};
// Message might be compressed, decompress in that case so we can log the body
- Message& maybeCompressed = op->command()->toSend();
+ Message& maybeCompressed = op->command().toSend();
if (maybeCompressed.operation() != dbCompressed) {
performLog(maybeCompressed);
} else {
StatusWith<Message> decompressedMessage =
- op->command()->conn().getCompressorManager().decompressMessage(maybeCompressed);
+ op->command().conn().getCompressorManager().decompressMessage(maybeCompressed);
if (decompressedMessage.isOK()) {
performLog(decompressedMessage.getValue());
} else {
@@ -413,23 +412,23 @@ void NetworkInterfaceASIO::_asyncRunCommand(AsyncOp* op, NetworkOpHandler handle
// 2 - receive a header for the response
// 3 - validate and receive response body
// 4 - advance the state machine by calling handler()
- auto cmd = op->command();
+ auto& cmd = op->command();
// Step 4
- auto recvMessageCallback = [this, cmd, handler](std::error_code ec, size_t bytes) {
+ auto recvMessageCallback = [this, handler](std::error_code ec, size_t bytes) {
// We don't call _validateAndRun here as we assume the caller will.
handler(ec, bytes);
};
// Step 3
- auto recvHeaderCallback = [this, cmd, handler, recvMessageCallback, op](std::error_code ec,
- size_t bytes) {
+ auto recvHeaderCallback = [this, &cmd, handler, recvMessageCallback, op](std::error_code ec,
+ size_t bytes) {
// The operation could have been canceled after starting the command, but before
// receiving the header
- _validateAndRun(op, ec, [this, recvMessageCallback, ec, bytes, cmd, handler] {
+ _validateAndRun(op, ec, [this, recvMessageCallback, ec, bytes, &cmd, handler] {
// validate response id
- uint32_t expectedId = cmd->toSend().header().getId();
- uint32_t actualId = cmd->header().constView().getResponseToMsgId();
+ uint32_t expectedId = cmd.toSend().header().getId();
+ uint32_t actualId = cmd.header().constView().getResponseToMsgId();
if (actualId != expectedId) {
LOG(3) << "got wrong response:"
<< " expected response id: " << expectedId
@@ -437,26 +436,24 @@ void NetworkInterfaceASIO::_asyncRunCommand(AsyncOp* op, NetworkOpHandler handle
return handler(make_error_code(ErrorCodes::ProtocolError), bytes);
}
- asyncRecvMessageBody(cmd->conn().stream(),
- &cmd->header(),
- &cmd->toRecv(),
- std::move(recvMessageCallback));
+ asyncRecvMessageBody(
+ cmd.conn().stream(), &cmd.header(), &cmd.toRecv(), std::move(recvMessageCallback));
});
};
// Step 2
- auto sendMessageCallback = [this, cmd, handler, recvHeaderCallback, op](std::error_code ec,
- size_t bytes) {
- _validateAndRun(op, ec, [this, cmd, recvHeaderCallback] {
+ auto sendMessageCallback = [this, &cmd, handler, recvHeaderCallback, op](std::error_code ec,
+ size_t bytes) {
+ _validateAndRun(op, ec, [this, &cmd, recvHeaderCallback] {
asyncRecvMessageHeader(
- cmd->conn().stream(), &cmd->header(), std::move(recvHeaderCallback));
+ cmd.conn().stream(), &cmd.header(), std::move(recvHeaderCallback));
});
};
// Step 1
- asyncSendMessage(cmd->conn().stream(), &cmd->toSend(), std::move(sendMessageCallback));
+ asyncSendMessage(cmd.conn().stream(), &cmd.toSend(), std::move(sendMessageCallback));
}
void NetworkInterfaceASIO::_runConnectionHook(AsyncOp* op) {
@@ -484,7 +481,7 @@ void NetworkInterfaceASIO::_runConnectionHook(AsyncOp* op) {
auto finishHook = [this, op]() {
auto response =
- op->command()->response(op, op->operationProtocol(), now(), _metadataHook.get());
+ op->command().response(op, op->operationProtocol(), now(), _metadataHook.get());
if (!response.isOK()) {
return _completeOperation(op, response);
diff --git a/src/mongo/executor/network_interface_asio_operation.cpp b/src/mongo/executor/network_interface_asio_operation.cpp
index fa74d9996f8..6360173dd73 100644
--- a/src/mongo/executor/network_interface_asio_operation.cpp
+++ b/src/mongo/executor/network_interface_asio_operation.cpp
@@ -171,9 +171,14 @@ Status NetworkInterfaceASIO::AsyncOp::beginCommand(const RemoteCommandRequest& r
request.target);
}
-NetworkInterfaceASIO::AsyncCommand* NetworkInterfaceASIO::AsyncOp::command() {
+NetworkInterfaceASIO::AsyncCommand& NetworkInterfaceASIO::AsyncOp::command() {
MONGO_ASYNC_OP_INVARIANT(_command.is_initialized(), "Command is not yet initialized");
- return _command.get_ptr();
+
+ return *_command;
+}
+
+bool NetworkInterfaceASIO::AsyncOp::commandIsInitialized() const {
+ return _command.is_initialized();
}
void NetworkInterfaceASIO::AsyncOp::finish(ResponseStatus&& rs) {