diff options
author | samantharitter <samantha.ritter@10gen.com> | 2015-08-13 13:29:57 -0400 |
---|---|---|
committer | samantharitter <samantha.ritter@10gen.com> | 2015-08-14 13:50:27 -0400 |
commit | 3211eea8dbfe317ad3e1434abf1a4cd7190a1b1c (patch) | |
tree | d366da062186d3e9b19b7bd6de86575e422d9927 /src/mongo | |
parent | 35cb5b7a665e8dddb885ee126e08dcc8f765f378 (diff) | |
download | mongo-3211eea8dbfe317ad3e1434abf1a4cd7190a1b1c.tar.gz |
SERVER-19697 AsyncCommand responses return proper elapsed time
Diffstat (limited to 'src/mongo')
4 files changed, 40 insertions, 55 deletions
diff --git a/src/mongo/executor/network_interface_asio.h b/src/mongo/executor/network_interface_asio.h index 9b972aaacd0..4cfee95ef72 100644 --- a/src/mongo/executor/network_interface_asio.h +++ b/src/mongo/executor/network_interface_asio.h @@ -120,20 +120,16 @@ private: */ class AsyncCommand { public: - AsyncCommand(AsyncConnection* conn); - - // This method resets the Messages and associated information held inside - // an AsyncCommand so that it may be reused to run a new network roundtrip. - void reset(); + AsyncCommand(AsyncConnection* conn, Message&& command, Date_t now); NetworkInterfaceASIO::AsyncConnection& conn(); Message& toSend(); - void setToSend(Message&& message); - Message& toRecv(); MSGHEADER::Value& header(); + ResponseStatus response(rpc::Protocol protocol, Date_t now); + private: NetworkInterfaceASIO::AsyncConnection* const _conn; @@ -142,6 +138,8 @@ private: // TODO: Investigate efficiency of storing header separately. MSGHEADER::Value _header; + + const Date_t _start; }; /** @@ -166,8 +164,10 @@ private: // AsyncOp may run multiple commands over its lifetime (for example, an ismaster // command, the command provided to the NetworkInterface via startCommand(), etc.) // Calling beginCommand() resets internal state to prepare to run newCommand. - AsyncCommand& beginCommand(const RemoteCommandRequest& request, rpc::Protocol protocol); - AsyncCommand& beginCommand(Message&& newCommand); + AsyncCommand& beginCommand(const RemoteCommandRequest& request, + rpc::Protocol protocol, + Date_t now); + AsyncCommand& beginCommand(Message&& newCommand, Date_t now); AsyncCommand& command(); void finish(const TaskExecutor::ResponseStatus& status); @@ -232,8 +232,6 @@ private: handler(); } - ResponseStatus _responseFromMessage(const Message& received, rpc::Protocol protocol); - // Connection void _connect(AsyncOp* op); diff --git a/src/mongo/executor/network_interface_asio_auth.cpp b/src/mongo/executor/network_interface_asio_auth.cpp index 361304f44e3..a6f1fdcfa38 100644 --- a/src/mongo/executor/network_interface_asio_auth.cpp +++ b/src/mongo/executor/network_interface_asio_auth.cpp @@ -58,7 +58,7 @@ void NetworkInterfaceASIO::_runIsMaster(AsyncOp* op) { requestBuilder.setCommandArgs(BSON("isMaster" << 1)); // Set current command to ismaster request and run - auto& cmd = op->beginCommand(std::move(*(requestBuilder.done()))); + auto& cmd = op->beginCommand(std::move(*(requestBuilder.done())), now()); // Callback to parse protocol information out of received ismaster response auto parseIsMaster = [this, op]() { @@ -119,11 +119,10 @@ void NetworkInterfaceASIO::_authenticate(AsyncOp* op) { // authenticateClient will use this to run auth-related commands over our connection. auto runCommandHook = [this, op](executor::RemoteCommandRequest request, auth::AuthCompletionHandler handler) { - auto& cmd = op->beginCommand(request, op->operationProtocol()); + auto& cmd = op->beginCommand(request, op->operationProtocol(), now()); auto callAuthCompletionHandler = [this, op, handler]() { - auto authResponse = - _responseFromMessage(op->command().toRecv(), op->operationProtocol()); + auto authResponse = op->command().response(op->operationProtocol(), now()); handler(authResponse); }; diff --git a/src/mongo/executor/network_interface_asio_command.cpp b/src/mongo/executor/network_interface_asio_command.cpp index 1f3e2494b91..8b64ed87e09 100644 --- a/src/mongo/executor/network_interface_asio_command.cpp +++ b/src/mongo/executor/network_interface_asio_command.cpp @@ -124,12 +124,11 @@ void asyncRecvMessageBody(AsyncStreamInterface& stream, } // namespace -NetworkInterfaceASIO::AsyncCommand::AsyncCommand(AsyncConnection* conn) : _conn(conn) {} - -void NetworkInterfaceASIO::AsyncCommand::reset() { - // TODO: Optimize reuse of Messages to be more space-efficient. - _toSend.reset(); - _toRecv.reset(); +NetworkInterfaceASIO::AsyncCommand::AsyncCommand(AsyncConnection* conn, + Message&& command, + Date_t now) + : _conn(conn), _toSend(std::move(command)), _start(now) { + _toSend.header().setResponseTo(0); } NetworkInterfaceASIO::AsyncConnection& NetworkInterfaceASIO::AsyncCommand::conn() { @@ -140,10 +139,6 @@ Message& NetworkInterfaceASIO::AsyncCommand::toSend() { return _toSend; } -void NetworkInterfaceASIO::AsyncCommand::setToSend(Message&& message) { - _toSend = std::move(message); -} - Message& NetworkInterfaceASIO::AsyncCommand::toRecv() { return _toRecv; } @@ -152,22 +147,9 @@ MSGHEADER::Value& NetworkInterfaceASIO::AsyncCommand::header() { return _header; } -void NetworkInterfaceASIO::_startCommand(AsyncOp* op) { - LOG(3) << "running command " << op->request().cmdObj << " against database " - << op->request().dbname << " across network to " << op->request().target.toString(); - if (inShutdown()) { - return; - } - - // _connect() will continue the state machine. - _connect(op); -} - -ResponseStatus NetworkInterfaceASIO::_responseFromMessage(const Message& received, - rpc::Protocol protocol) { +ResponseStatus NetworkInterfaceASIO::AsyncCommand::response(rpc::Protocol protocol, Date_t now) { + auto& received = _toRecv; try { - // TODO: elapsed isn't going to be correct here, SERVER-19697 - auto start = now(); auto reply = rpc::makeReply(&received); if (reply->getProtocol() != protocol) { @@ -185,16 +167,27 @@ ResponseStatus NetworkInterfaceASIO::_responseFromMessage(const Message& receive // unavoidable copy auto ownedCommandReply = reply->getCommandReply().getOwned(); auto ownedReplyMetadata = reply->getMetadata().getOwned(); - return ResponseStatus( - RemoteCommandResponse(ownedCommandReply, ownedReplyMetadata, now() - start)); + return ResponseStatus(RemoteCommandResponse( + std::move(ownedCommandReply), std::move(ownedReplyMetadata), now - _start)); } catch (...) { // makeReply can throw if the reply was invalid. return exceptionToStatus(); } } +void NetworkInterfaceASIO::_startCommand(AsyncOp* op) { + LOG(3) << "running command " << op->request().cmdObj << " against database " + << op->request().dbname << " across network to " << op->request().target.toString(); + if (inShutdown()) { + return; + } + + // _connect() will continue the state machine. + _connect(op); +} + void NetworkInterfaceASIO::_beginCommunication(AsyncOp* op) { - auto& cmd = op->beginCommand(op->request(), op->operationProtocol()); + auto& cmd = op->beginCommand(op->request(), op->operationProtocol(), now()); _asyncRunCommand(&cmd, [this, op](std::error_code ec, size_t bytes) { @@ -204,7 +197,7 @@ void NetworkInterfaceASIO::_beginCommunication(AsyncOp* op) { void NetworkInterfaceASIO::_completedOpCallback(AsyncOp* op) { // TODO: handle metadata readers. - auto response = _responseFromMessage(op->command().toRecv(), op->operationProtocol()); + auto response = op->command().response(op->operationProtocol(), now()); _completeOperation(op, response); } diff --git a/src/mongo/executor/network_interface_asio_operation.cpp b/src/mongo/executor/network_interface_asio_operation.cpp index c79cb72b2bd..087534ee936 100644 --- a/src/mongo/executor/network_interface_asio_operation.cpp +++ b/src/mongo/executor/network_interface_asio_operation.cpp @@ -92,25 +92,20 @@ void NetworkInterfaceASIO::AsyncOp::setConnection(AsyncConnection&& conn) { } NetworkInterfaceASIO::AsyncCommand& NetworkInterfaceASIO::AsyncOp::beginCommand( - Message&& newCommand) { + Message&& newCommand, Date_t now) { // NOTE: We operate based on the assumption that AsyncOp's // AsyncConnection does not change over its lifetime. invariant(_connection.is_initialized()); - if (_command.is_initialized()) { - // We can just reset our state if initialized. - _command->reset(); - } else { - _command.emplace(_connection.get_ptr()); - } - newCommand.header().setResponseTo(0); - _command->setToSend(std::move(newCommand)); + + // Construct a new AsyncCommand object for each command. + _command.emplace(_connection.get_ptr(), std::move(newCommand), now); return _command.get(); } NetworkInterfaceASIO::AsyncCommand& NetworkInterfaceASIO::AsyncOp::beginCommand( - const RemoteCommandRequest& request, rpc::Protocol protocol) { + const RemoteCommandRequest& request, rpc::Protocol protocol, Date_t now) { auto newCommand = messageFromRequest(request, protocol); - return beginCommand(std::move(*newCommand)); + return beginCommand(std::move(*newCommand), now); } NetworkInterfaceASIO::AsyncCommand& NetworkInterfaceASIO::AsyncOp::command() { |