diff options
4 files changed, 32 insertions, 30 deletions
diff --git a/src/mongo/executor/network_interface_asio.h b/src/mongo/executor/network_interface_asio.h index 62f9f6a4829..8e464754bfa 100644 --- a/src/mongo/executor/network_interface_asio.h +++ b/src/mongo/executor/network_interface_asio.h @@ -289,7 +289,8 @@ private: // has to form the command manually (e.g. to use a specific requestBuilder). Status beginCommand(Message&& newCommand, const HostAndPort& target); - AsyncCommand* command(); + AsyncCommand& command(); + bool commandIsInitialized() const; void finish(TaskExecutor::ResponseStatus&& status); diff --git a/src/mongo/executor/network_interface_asio_auth.cpp b/src/mongo/executor/network_interface_asio_auth.cpp index 3070de706c5..3ed78167d0a 100644 --- a/src/mongo/executor/network_interface_asio_auth.cpp +++ b/src/mongo/executor/network_interface_asio_auth.cpp @@ -92,7 +92,7 @@ void NetworkInterfaceASIO::_runIsMaster(AsyncOp* op) { // Callback to parse protocol information out of received ismaster response auto parseIsMaster = [this, op]() { - auto swCommandReply = op->command()->response(op, rpc::Protocol::kOpQuery, now()); + auto swCommandReply = op->command().response(op, rpc::Protocol::kOpQuery, now()); if (!swCommandReply.isOK()) { return _completeOperation(op, swCommandReply); } @@ -195,8 +195,7 @@ void NetworkInterfaceASIO::_authenticate(AsyncOp* op) { } auto callAuthCompletionHandler = [this, op, handler]() { - auto authResponse = - op->command()->response(op, op->operationProtocol(), now(), nullptr); + auto authResponse = op->command().response(op, op->operationProtocol(), now(), nullptr); handler(authResponse); }; diff --git a/src/mongo/executor/network_interface_asio_command.cpp b/src/mongo/executor/network_interface_asio_command.cpp index 8102ed93a5f..157173358a1 100644 --- a/src/mongo/executor/network_interface_asio_command.cpp +++ b/src/mongo/executor/network_interface_asio_command.cpp @@ -264,8 +264,7 @@ void NetworkInterfaceASIO::_beginCommunication(AsyncOp* op) { } void NetworkInterfaceASIO::_completedOpCallback(AsyncOp* op) { - auto response = - op->command()->response(op, op->operationProtocol(), now(), _metadataHook.get()); + auto response = op->command().response(op, op->operationProtocol(), now(), _metadataHook.get()); _completeOperation(op, response); } @@ -311,8 +310,8 @@ void NetworkInterfaceASIO::_completeOperation(AsyncOp* op, ResponseStatus resp) // If we fail during heartbeating, we won't be able to access any of op's members after // calling finish(), so we return here. log() << "Failed asio heartbeat to " - << (op->command() ? op->command()->target().toString() : "unknown"s) << " - " - << redact(resp.status); + << (op->commandIsInitialized() ? op->command().target().toString() : "unknown"s) + << " - " << redact(resp.status); _numFailedOps.fetchAndAdd(1); op->finish(std::move(resp)); return; @@ -321,7 +320,7 @@ void NetworkInterfaceASIO::_completeOperation(AsyncOp* op, ResponseStatus resp) if (!resp.isOK()) { // In the case that resp is not OK, but _inSetup is false, we are using a connection // that we got from the pool to execute a command, but it failed for some reason. - if (op->command() && shouldLog(LogstreamBuilder::severityCast(2))) { + if (op->commandIsInitialized() && shouldLog(LogstreamBuilder::severityCast(2))) { const auto performLog = [&resp](Message& message) { LOG(2) << "Failed to send message. Reason: " << redact(resp.status) << ". Message: " << rpc::opMsgRequestFromAnyProtocol(message).body.toString( @@ -329,12 +328,12 @@ void NetworkInterfaceASIO::_completeOperation(AsyncOp* op, ResponseStatus resp) }; // Message might be compressed, decompress in that case so we can log the body - Message& maybeCompressed = op->command()->toSend(); + Message& maybeCompressed = op->command().toSend(); if (maybeCompressed.operation() != dbCompressed) { performLog(maybeCompressed); } else { StatusWith<Message> decompressedMessage = - op->command()->conn().getCompressorManager().decompressMessage(maybeCompressed); + op->command().conn().getCompressorManager().decompressMessage(maybeCompressed); if (decompressedMessage.isOK()) { performLog(decompressedMessage.getValue()); } else { @@ -413,23 +412,23 @@ void NetworkInterfaceASIO::_asyncRunCommand(AsyncOp* op, NetworkOpHandler handle // 2 - receive a header for the response // 3 - validate and receive response body // 4 - advance the state machine by calling handler() - auto cmd = op->command(); + auto& cmd = op->command(); // Step 4 - auto recvMessageCallback = [this, cmd, handler](std::error_code ec, size_t bytes) { + auto recvMessageCallback = [this, handler](std::error_code ec, size_t bytes) { // We don't call _validateAndRun here as we assume the caller will. handler(ec, bytes); }; // Step 3 - auto recvHeaderCallback = [this, cmd, handler, recvMessageCallback, op](std::error_code ec, - size_t bytes) { + auto recvHeaderCallback = [this, &cmd, handler, recvMessageCallback, op](std::error_code ec, + size_t bytes) { // The operation could have been canceled after starting the command, but before // receiving the header - _validateAndRun(op, ec, [this, recvMessageCallback, ec, bytes, cmd, handler] { + _validateAndRun(op, ec, [this, recvMessageCallback, ec, bytes, &cmd, handler] { // validate response id - uint32_t expectedId = cmd->toSend().header().getId(); - uint32_t actualId = cmd->header().constView().getResponseToMsgId(); + uint32_t expectedId = cmd.toSend().header().getId(); + uint32_t actualId = cmd.header().constView().getResponseToMsgId(); if (actualId != expectedId) { LOG(3) << "got wrong response:" << " expected response id: " << expectedId @@ -437,26 +436,24 @@ void NetworkInterfaceASIO::_asyncRunCommand(AsyncOp* op, NetworkOpHandler handle return handler(make_error_code(ErrorCodes::ProtocolError), bytes); } - asyncRecvMessageBody(cmd->conn().stream(), - &cmd->header(), - &cmd->toRecv(), - std::move(recvMessageCallback)); + asyncRecvMessageBody( + cmd.conn().stream(), &cmd.header(), &cmd.toRecv(), std::move(recvMessageCallback)); }); }; // Step 2 - auto sendMessageCallback = [this, cmd, handler, recvHeaderCallback, op](std::error_code ec, - size_t bytes) { - _validateAndRun(op, ec, [this, cmd, recvHeaderCallback] { + auto sendMessageCallback = [this, &cmd, handler, recvHeaderCallback, op](std::error_code ec, + size_t bytes) { + _validateAndRun(op, ec, [this, &cmd, recvHeaderCallback] { asyncRecvMessageHeader( - cmd->conn().stream(), &cmd->header(), std::move(recvHeaderCallback)); + cmd.conn().stream(), &cmd.header(), std::move(recvHeaderCallback)); }); }; // Step 1 - asyncSendMessage(cmd->conn().stream(), &cmd->toSend(), std::move(sendMessageCallback)); + asyncSendMessage(cmd.conn().stream(), &cmd.toSend(), std::move(sendMessageCallback)); } void NetworkInterfaceASIO::_runConnectionHook(AsyncOp* op) { @@ -484,7 +481,7 @@ void NetworkInterfaceASIO::_runConnectionHook(AsyncOp* op) { auto finishHook = [this, op]() { auto response = - op->command()->response(op, op->operationProtocol(), now(), _metadataHook.get()); + op->command().response(op, op->operationProtocol(), now(), _metadataHook.get()); if (!response.isOK()) { return _completeOperation(op, response); diff --git a/src/mongo/executor/network_interface_asio_operation.cpp b/src/mongo/executor/network_interface_asio_operation.cpp index fa74d9996f8..6360173dd73 100644 --- a/src/mongo/executor/network_interface_asio_operation.cpp +++ b/src/mongo/executor/network_interface_asio_operation.cpp @@ -171,9 +171,14 @@ Status NetworkInterfaceASIO::AsyncOp::beginCommand(const RemoteCommandRequest& r request.target); } -NetworkInterfaceASIO::AsyncCommand* NetworkInterfaceASIO::AsyncOp::command() { +NetworkInterfaceASIO::AsyncCommand& NetworkInterfaceASIO::AsyncOp::command() { MONGO_ASYNC_OP_INVARIANT(_command.is_initialized(), "Command is not yet initialized"); - return _command.get_ptr(); + + return *_command; +} + +bool NetworkInterfaceASIO::AsyncOp::commandIsInitialized() const { + return _command.is_initialized(); } void NetworkInterfaceASIO::AsyncOp::finish(ResponseStatus&& rs) { |