diff options
author | Waley Chen <waleycz@gmail.com> | 2016-08-03 15:54:57 -0400 |
---|---|---|
committer | Waley Chen <waleycz@gmail.com> | 2016-08-03 15:54:57 -0400 |
commit | 05e1c33649e08ec3736121254da7b29a73934788 (patch) | |
tree | 591de8a744ed1e31535038474247102e540c209d /src | |
parent | 1aeb9f04c0cdaaa4832ada812797b50456986baf (diff) | |
download | mongo-05e1c33649e08ec3736121254da7b29a73934788.tar.gz |
SERVER-24067 TaskExecutor RemoteCommandCallbackArgs should include elapsedMS and metadata
Diffstat (limited to 'src')
53 files changed, 708 insertions, 439 deletions
diff --git a/src/mongo/client/authenticate.cpp b/src/mongo/client/authenticate.cpp index 224517e00be..7d45da9ef08 100644 --- a/src/mongo/client/authenticate.cpp +++ b/src/mongo/client/authenticate.cpp @@ -159,7 +159,7 @@ void authMongoCR(RunCommandHook runCommand, const BSONObj& params, AuthCompletio // Ensure response was valid std::string nonce; - BSONObj nonceResponse = response.getValue().data; + BSONObj nonceResponse = response.data; auto valid = bsonExtractStringField(nonceResponse, "nonce", &nonce); if (!valid.isOK()) return handler({ErrorCodes::AuthenticationFailed, @@ -235,7 +235,8 @@ void authX509(RunCommandHook runCommand, // bool isFailedAuthOk(const AuthResponse& response) { - return (response == ErrorCodes::AuthenticationFailed && serverGlobalParams.transitionToAuth); + return (response.status == ErrorCodes::AuthenticationFailed && + serverGlobalParams.transitionToAuth); } void auth(RunCommandHook runCommand, @@ -305,9 +306,9 @@ void authenticateClient(const BSONObj& params, // NOTE: this assumes that runCommand executes synchronously. asyncAuth(runCommand, params, hostname, clientName, [](AuthResponse response) { // DBClient expects us to throw in case of an auth error. - uassertStatusOK(response); + uassertStatusOK(response.status); - auto serverResponse = response.getValue().data; + auto serverResponse = response.data; uassert(ErrorCodes::AuthenticationFailed, serverResponse["errmsg"].str(), isOk(serverResponse)); diff --git a/src/mongo/client/authenticate.h b/src/mongo/client/authenticate.h index 65c649c49c2..6ad596477bf 100644 --- a/src/mongo/client/authenticate.h +++ b/src/mongo/client/authenticate.h @@ -43,7 +43,7 @@ class BSONObj; namespace auth { -using AuthResponse = StatusWith<executor::RemoteCommandResponse>; +using AuthResponse = executor::RemoteCommandResponse; using AuthCompletionHandler = stdx::function<void(AuthResponse)>; using RunCommandResultHandler = AuthCompletionHandler; using RunCommandHook = diff --git a/src/mongo/client/authenticate_test.cpp b/src/mongo/client/authenticate_test.cpp index 3044da9f6cd..97b876cf814 100644 --- a/src/mongo/client/authenticate_test.cpp +++ b/src/mongo/client/authenticate_test.cpp @@ -92,7 +92,7 @@ public: // Then pop a response and call the handler ASSERT(!_responses.empty()); - handler(StatusWith<RemoteCommandResponse>(_responses.front())); + handler(_responses.front()); _responses.pop(); } diff --git a/src/mongo/client/dbclient.cpp b/src/mongo/client/dbclient.cpp index 254a537c255..cbf72c4ae7d 100644 --- a/src/mongo/client/dbclient.cpp +++ b/src/mongo/client/dbclient.cpp @@ -445,8 +445,7 @@ void DBClientWithCommands::_auth(const BSONObj& params) { Milliseconds millis(Date_t::now() - start); // Hand control back to authenticateClient() - handler(StatusWith<RemoteCommandResponse>( - RemoteCommandResponse(data, metadata, millis))); + handler({data, metadata, millis}); } catch (...) { handler(exceptionToStatus()); @@ -713,7 +712,7 @@ private: /** * Initializes the wire version of conn, and returns the isMaster reply. */ -StatusWith<executor::RemoteCommandResponse> initWireVersion(DBClientConnection* conn) { +executor::RemoteCommandResponse initWireVersion(DBClientConnection* conn) { try { // We need to force the usage of OP_QUERY on this command, even if we have previously // detected support for OP_COMMAND on a connection. This is necessary to handle the case @@ -773,16 +772,16 @@ Status DBClientConnection::connect(const HostAndPort& serverAddress) { auto swIsMasterReply = initWireVersion(this); if (!swIsMasterReply.isOK()) { _failed = true; - return swIsMasterReply.getStatus(); + return swIsMasterReply.status; } // Ensure that the isMaster response is "ok:1". - auto isMasterStatus = getStatusFromCommandResult(swIsMasterReply.getValue().data); + auto isMasterStatus = getStatusFromCommandResult(swIsMasterReply.data); if (!isMasterStatus.isOK()) { return isMasterStatus; } - auto swProtocolSet = rpc::parseProtocolSetFromIsMasterReply(swIsMasterReply.getValue().data); + auto swProtocolSet = rpc::parseProtocolSetFromIsMasterReply(swIsMasterReply.data); if (!swProtocolSet.isOK()) { return swProtocolSet.getStatus(); } @@ -799,7 +798,7 @@ Status DBClientConnection::connect(const HostAndPort& serverAddress) { } if (_hook) { - auto validationStatus = _hook(swIsMasterReply.getValue()); + auto validationStatus = _hook(swIsMasterReply); if (!validationStatus.isOK()) { // Disconnect and mark failed. _failed = true; diff --git a/src/mongo/client/fetcher.cpp b/src/mongo/client/fetcher.cpp index 5aa46e549cd..e2a8bdf2402 100644 --- a/src/mongo/client/fetcher.cpp +++ b/src/mongo/client/fetcher.cpp @@ -301,7 +301,7 @@ Status Fetcher::_scheduleGetMore(const BSONObj& cmdObj) { void Fetcher::_callback(const RemoteCommandCallbackArgs& rcbd, const char* batchFieldName) { if (!rcbd.response.isOK()) { - _work(StatusWith<Fetcher::QueryResponse>(rcbd.response.getStatus()), nullptr, nullptr); + _work(StatusWith<Fetcher::QueryResponse>(rcbd.response.status), nullptr, nullptr); _finishCallback(); return; } @@ -312,7 +312,7 @@ void Fetcher::_callback(const RemoteCommandCallbackArgs& rcbd, const char* batch return; } - const BSONObj& queryResponseObj = rcbd.response.getValue().data; + const BSONObj& queryResponseObj = rcbd.response.data; Status status = getStatusFromCommandResult(queryResponseObj); if (!status.isOK()) { _work(StatusWith<Fetcher::QueryResponse>(status), nullptr, nullptr); @@ -328,8 +328,8 @@ void Fetcher::_callback(const RemoteCommandCallbackArgs& rcbd, const char* batch return; } - batchData.otherFields.metadata = std::move(rcbd.response.getValue().metadata); - batchData.elapsedMillis = rcbd.response.getValue().elapsedMillis; + batchData.otherFields.metadata = std::move(rcbd.response.metadata); + batchData.elapsedMillis = rcbd.response.elapsedMillis.value_or(Milliseconds{0}); { stdx::lock_guard<stdx::mutex> lk(_mutex); batchData.first = _first; @@ -380,10 +380,10 @@ void Fetcher::_sendKillCursors(const CursorId id, const NamespaceString& nss) { if (id) { auto logKillCursorsResult = [](const RemoteCommandCallbackArgs& args) { if (!args.response.isOK()) { - warning() << "killCursors command task failed: " << args.response.getStatus(); + warning() << "killCursors command task failed: " << args.response.status; return; } - auto status = getStatusFromCommandResult(args.response.getValue().data); + auto status = getStatusFromCommandResult(args.response.data); if (!status.isOK()) { warning() << "killCursors command failed: " << status; } diff --git a/src/mongo/client/fetcher_test.cpp b/src/mongo/client/fetcher_test.cpp index b20cb9a099f..26778806b6c 100644 --- a/src/mongo/client/fetcher_test.cpp +++ b/src/mongo/client/fetcher_test.cpp @@ -44,6 +44,8 @@ using namespace mongo; using executor::NetworkInterfaceMock; using executor::TaskExecutor; +using ResponseStatus = TaskExecutor::ResponseStatus; + const HostAndPort source("localhost", -1); const BSONObj findCmdObj = BSON("find" << "coll"); @@ -61,12 +63,11 @@ public: void processNetworkResponse(const BSONObj& obj, ReadyQueueState readyQueueStateAfterProcessing, FetcherState fetcherStateAfterProcessing); - void processNetworkResponse(const BSONObj& obj, - Milliseconds elapsed, + void processNetworkResponse(const ResponseStatus, ReadyQueueState readyQueueStateAfterProcessing, FetcherState fetcherStateAfterProcessing); - void processNetworkResponse(ErrorCodes::Error code, - const std::string& reason, + void processNetworkResponse(const BSONObj& obj, + Milliseconds elapsed, ReadyQueueState readyQueueStateAfterProcessing, FetcherState fetcherStateAfterProcessing); @@ -148,12 +149,11 @@ void FetcherTest::processNetworkResponse(const BSONObj& obj, finishProcessingNetworkResponse(readyQueueStateAfterProcessing, fetcherStateAfterProcessing); } -void FetcherTest::processNetworkResponse(ErrorCodes::Error code, - const std::string& reason, +void FetcherTest::processNetworkResponse(ResponseStatus rs, ReadyQueueState readyQueueStateAfterProcessing, FetcherState fetcherStateAfterProcessing) { executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - getNet()->scheduleErrorResponse({code, reason}); + getNet()->scheduleErrorResponse(rs); finishProcessingNetworkResponse(readyQueueStateAfterProcessing, fetcherStateAfterProcessing); } @@ -373,8 +373,8 @@ TEST_F(FetcherTest, ScheduleButShutdown) { TEST_F(FetcherTest, FindCommandFailed1) { ASSERT_OK(fetcher->schedule()); - processNetworkResponse( - ErrorCodes::BadValue, "bad hint", ReadyQueueState::kEmpty, FetcherState::kInactive); + auto rs = ResponseStatus(ErrorCodes::BadValue, "bad hint", Milliseconds(0)); + processNetworkResponse(rs, ReadyQueueState::kEmpty, FetcherState::kInactive); ASSERT_EQUALS(ErrorCodes::BadValue, status.code()); ASSERT_EQUALS("bad hint", status.reason()); ASSERT_FALSE(fetcher->inShutdown_forTest()); @@ -939,7 +939,7 @@ TEST_F(FetcherTest, UpdateNextActionAfterSecondBatch) { ASSERT_EQUALS(cursorId, cursors.front().numberLong()); // Failed killCursors command response should be logged. - getNet()->scheduleSuccessfulResponse(noi, {BSON("ok" << false), {}}); + getNet()->scheduleSuccessfulResponse(noi, {BSON("ok" << false), {}, Milliseconds(0)}); getNet()->runReadyNetworkOperations(); } @@ -1049,12 +1049,10 @@ TEST_F(FetcherTest, FetcherAppliesRetryPolicyToFirstCommandButNotToGetMoreReques // Retry policy is applied to find command. const BSONObj doc = BSON("_id" << 1); - processNetworkResponse( - ErrorCodes::BadValue, "first", ReadyQueueState::kHasReadyRequests, FetcherState::kActive); - processNetworkResponse(ErrorCodes::InternalError, - "second", - ReadyQueueState::kHasReadyRequests, - FetcherState::kActive); + auto rs = ResponseStatus(ErrorCodes::BadValue, "first", Milliseconds(0)); + processNetworkResponse(rs, ReadyQueueState::kHasReadyRequests, FetcherState::kActive); + rs = ResponseStatus(ErrorCodes::InternalError, "second", Milliseconds(0)); + processNetworkResponse(rs, ReadyQueueState::kHasReadyRequests, FetcherState::kActive); processNetworkResponse(BSON("cursor" << BSON("id" << 1LL << "ns" << "db.coll" << "firstBatch" @@ -1070,11 +1068,9 @@ TEST_F(FetcherTest, FetcherAppliesRetryPolicyToFirstCommandButNotToGetMoreReques ASSERT_EQUALS(doc, documents.front()); ASSERT_TRUE(Fetcher::NextAction::kGetMore == nextAction); + rs = ResponseStatus(ErrorCodes::OperationFailed, "getMore failed", Milliseconds(0)); // No retry policy for subsequent getMore commands. - processNetworkResponse(ErrorCodes::OperationFailed, - "getMore failed", - ReadyQueueState::kEmpty, - FetcherState::kInactive); + processNetworkResponse(rs, ReadyQueueState::kEmpty, FetcherState::kInactive); ASSERT_EQUALS(ErrorCodes::OperationFailed, status); ASSERT_FALSE(fetcher->inShutdown_forTest()); } diff --git a/src/mongo/client/remote_command_retry_scheduler.cpp b/src/mongo/client/remote_command_retry_scheduler.cpp index 8c20d0af925..ded4a10d964 100644 --- a/src/mongo/client/remote_command_retry_scheduler.cpp +++ b/src/mongo/client/remote_command_retry_scheduler.cpp @@ -207,7 +207,7 @@ Status RemoteCommandRetryScheduler::_schedule_inlock(std::size_t requestCount) { void RemoteCommandRetryScheduler::_remoteCommandCallback( const executor::TaskExecutor::RemoteCommandCallbackArgs& rcba, std::size_t requestCount) { - auto status = rcba.response.getStatus(); + auto status = rcba.response.status; if (status.isOK() || status == ErrorCodes::CallbackCanceled || requestCount == _retryPolicy->getMaximumAttempts() || diff --git a/src/mongo/client/remote_command_retry_scheduler_test.cpp b/src/mongo/client/remote_command_retry_scheduler_test.cpp index 6e2c70fe9dc..705324f94c5 100644 --- a/src/mongo/client/remote_command_retry_scheduler_test.cpp +++ b/src/mongo/client/remote_command_retry_scheduler_test.cpp @@ -48,6 +48,7 @@ namespace { using namespace mongo; +using ResponseStatus = executor::TaskExecutor::ResponseStatus; class CallbackResponseSaver; @@ -56,8 +57,8 @@ public: void start(RemoteCommandRetryScheduler* scheduler); void checkCompletionStatus(RemoteCommandRetryScheduler* scheduler, const CallbackResponseSaver& callbackResponseSaver, - const executor::TaskExecutor::ResponseStatus& response); - void processNetworkResponse(const executor::TaskExecutor::ResponseStatus& response); + const ResponseStatus& response); + void processNetworkResponse(const ResponseStatus& response); void runReadyNetworkOperations(); protected: @@ -76,10 +77,10 @@ public: */ void operator()(const executor::TaskExecutor::RemoteCommandCallbackArgs& rcba); - std::vector<StatusWith<executor::RemoteCommandResponse>> getResponses() const; + std::vector<ResponseStatus> getResponses() const; private: - std::vector<StatusWith<executor::RemoteCommandResponse>> _responses; + std::vector<ResponseStatus> _responses; }; /** @@ -119,20 +120,19 @@ void RemoteCommandRetrySchedulerTest::start(RemoteCommandRetryScheduler* schedul void RemoteCommandRetrySchedulerTest::checkCompletionStatus( RemoteCommandRetryScheduler* scheduler, const CallbackResponseSaver& callbackResponseSaver, - const executor::TaskExecutor::ResponseStatus& response) { + const ResponseStatus& response) { ASSERT_FALSE(scheduler->isActive()); auto responses = callbackResponseSaver.getResponses(); ASSERT_EQUALS(1U, responses.size()); if (response.isOK()) { - ASSERT_OK(responses.front().getStatus()); - ASSERT_EQUALS(response.getValue(), responses.front().getValue()); + ASSERT_OK(responses.front().status); + ASSERT_EQUALS(response, responses.front()); } else { - ASSERT_EQUALS(response.getStatus(), responses.front().getStatus()); + ASSERT_EQUALS(response.status, responses.front().status); } } -void RemoteCommandRetrySchedulerTest::processNetworkResponse( - const executor::TaskExecutor::ResponseStatus& response) { +void RemoteCommandRetrySchedulerTest::processNetworkResponse(const ResponseStatus& response) { auto net = getNet(); executor::NetworkInterfaceMock::InNetworkGuard guard(net); ASSERT_TRUE(net->hasReadyRequests()); @@ -163,8 +163,7 @@ void CallbackResponseSaver::operator()( _responses.push_back(rcba.response); } -std::vector<StatusWith<executor::RemoteCommandResponse>> CallbackResponseSaver::getResponses() - const { +std::vector<ResponseStatus> CallbackResponseSaver::getResponses() const { return _responses; } @@ -327,7 +326,6 @@ TEST_F(RemoteCommandRetrySchedulerTest, &scheduler, callback, {ErrorCodes::CallbackCanceled, "executor shutdown"}); } - TEST_F(RemoteCommandRetrySchedulerTest, ShuttingDownSchedulerAfterSchedulerStartupInvokesCallbackWithCallbackCanceledError) { CallbackResponseSaver callback; @@ -353,10 +351,9 @@ TEST_F(RemoteCommandRetrySchedulerTest, SchedulerInvokesCallbackOnNonRetryableEr start(&scheduler); // This should match one of the non-retryable error codes in the policy. - Status response(ErrorCodes::OperationFailed, "injected error"); - - processNetworkResponse(response); - checkCompletionStatus(&scheduler, callback, response); + ResponseStatus rs(ErrorCodes::OperationFailed, "injected error", Milliseconds(0)); + processNetworkResponse(rs); + checkCompletionStatus(&scheduler, callback, rs); } TEST_F(RemoteCommandRetrySchedulerTest, SchedulerInvokesCallbackOnFirstSuccessfulResponse) { @@ -368,8 +365,7 @@ TEST_F(RemoteCommandRetrySchedulerTest, SchedulerInvokesCallbackOnFirstSuccessfu start(&scheduler); // Elapsed time in response is ignored on successful responses. - executor::RemoteCommandResponse response( - BSON("ok" << 1 << "x" << 123), BSON("z" << 456), Milliseconds(100)); + ResponseStatus response(BSON("ok" << 1 << "x" << 123), BSON("z" << 456), Milliseconds(100)); processNetworkResponse(response); checkCompletionStatus(&scheduler, callback, response); @@ -386,11 +382,10 @@ TEST_F(RemoteCommandRetrySchedulerTest, SchedulerIgnoresEmbeddedErrorInSuccessfu // Scheduler does not parse document in a successful response for embedded errors. // This is the case with some commands (e.g. find) which do not always return errors using the // wire protocol. - executor::RemoteCommandResponse response( - BSON("ok" << 0 << "code" << int(ErrorCodes::FailedToParse) << "errmsg" - << "injected error"), - BSON("z" << 456), - Milliseconds(100)); + ResponseStatus response(BSON("ok" << 0 << "code" << int(ErrorCodes::FailedToParse) << "errmsg" + << "injected error"), + BSON("z" << 456), + Milliseconds(100)); processNetworkResponse(response); checkCompletionStatus(&scheduler, callback, response); @@ -406,14 +401,15 @@ TEST_F(RemoteCommandRetrySchedulerTest, &badExecutor, request, stdx::ref(callback), std::move(policy)); start(&scheduler); - processNetworkResponse({ErrorCodes::HostNotFound, "first"}); + processNetworkResponse({ErrorCodes::HostNotFound, "first", Milliseconds(0)}); // scheduleRemoteCommand() will fail with ErrorCodes::ShutdownInProgress when trying to send // third remote command request after processing second failed response. badExecutor.scheduleRemoteCommandFailPoint = true; - processNetworkResponse({ErrorCodes::HostNotFound, "second"}); + processNetworkResponse({ErrorCodes::HostNotFound, "second", Milliseconds(0)}); - checkCompletionStatus(&scheduler, callback, {ErrorCodes::ShutdownInProgress, ""}); + checkCompletionStatus( + &scheduler, callback, {ErrorCodes::ShutdownInProgress, "", Milliseconds(0)}); } TEST_F(RemoteCommandRetrySchedulerTest, @@ -427,10 +423,10 @@ TEST_F(RemoteCommandRetrySchedulerTest, &getExecutor(), request, stdx::ref(callback), std::move(policy)); start(&scheduler); - processNetworkResponse({ErrorCodes::HostNotFound, "first"}); - processNetworkResponse({ErrorCodes::HostUnreachable, "second"}); + processNetworkResponse({ErrorCodes::HostNotFound, "first", Milliseconds(0)}); + processNetworkResponse({ErrorCodes::HostUnreachable, "second", Milliseconds(0)}); - Status response(ErrorCodes::NetworkTimeout, "last"); + ResponseStatus response(ErrorCodes::NetworkTimeout, "last", Milliseconds(0)); processNetworkResponse(response); checkCompletionStatus(&scheduler, callback, response); } @@ -443,10 +439,9 @@ TEST_F(RemoteCommandRetrySchedulerTest, SchedulerShouldRetryUntilSuccessfulRespo &getExecutor(), request, stdx::ref(callback), std::move(policy)); start(&scheduler); - processNetworkResponse({ErrorCodes::HostNotFound, "first"}); + processNetworkResponse({ErrorCodes::HostNotFound, "first", Milliseconds(0)}); - executor::RemoteCommandResponse response( - BSON("ok" << 1 << "x" << 123), BSON("z" << 456), Milliseconds(100)); + ResponseStatus response(BSON("ok" << 1 << "x" << 123), BSON("z" << 456), Milliseconds(100)); processNetworkResponse(response); checkCompletionStatus(&scheduler, callback, response); } diff --git a/src/mongo/client/remote_command_runner_impl.cpp b/src/mongo/client/remote_command_runner_impl.cpp new file mode 100644 index 00000000000..2eef602d1f9 --- /dev/null +++ b/src/mongo/client/remote_command_runner_impl.cpp @@ -0,0 +1,219 @@ +/** + * Copyright (C) 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/client/remote_command_runner_impl.h" + +#include "mongo/base/status_with.h" +#include "mongo/db/namespace_string.h" +#include "mongo/db/query/cursor_response.h" +#include "mongo/db/query/getmore_request.h" +#include "mongo/executor/downconvert_find_and_getmore_commands.h" +#include "mongo/executor/network_connection_hook.h" +#include "mongo/rpc/get_status_from_command_result.h" +#include "mongo/rpc/protocol.h" +#include "mongo/util/assert_util.h" + +namespace mongo { +namespace { + +using executor::RemoteCommandRequest; +using ResponseStatus = executor::RemoteCommandResponse; + +/** + * Calculates the timeout for a network operation expiring at "expDate", given + * that it is now "nowDate". + * + * Returns 0ms to indicate no expiration date, a number of milliseconds until "expDate", or + * ErrorCodes::ExceededTimeLimit if "expDate" is not later than "nowDate". + */ +StatusWith<Milliseconds> getTimeoutMillis(const Date_t expDate, const Date_t nowDate) { + if (expDate == RemoteCommandRequest::kNoExpirationDate) { + return Milliseconds(0); + } + if (expDate <= nowDate) { + return {ErrorCodes::ExceededTimeLimit, + str::stream() << "Went to run command, but it was too late. " + "Expiration was set to " + << dateToISOStringUTC(expDate)}; + } + return expDate - nowDate; +} + +/** + * Peeks at error in cursor. If an error has occurred, converts the {$err: "...", code: N} + * cursor error to a Status. + */ +Status getStatusFromCursorResult(DBClientCursor& cursor) { + BSONObj error; + if (!cursor.peekError(&error)) { + return Status::OK(); + } + + BSONElement e = error.getField("code"); + return Status(e.isNumber() ? ErrorCodes::fromInt(e.numberInt()) : ErrorCodes::UnknownError, + getErrField(error).valuestrsafe()); +} + +using RequestDownconverter = StatusWith<Message> (*)(const RemoteCommandRequest&); +using ReplyUpconverter = ResponseStatus (*)(std::int32_t requestId, + StringData cursorNamespace, + const Message& response); + +template <RequestDownconverter downconvertRequest, ReplyUpconverter upconvertReply> +ResponseStatus runDownconvertedCommand(DBClientConnection* conn, + const RemoteCommandRequest& request) { + auto swDownconvertedRequest = downconvertRequest(request); + if (!swDownconvertedRequest.isOK()) { + return swDownconvertedRequest.getStatus(); + } + + Message requestMsg{std::move(swDownconvertedRequest.getValue())}; + Message responseMsg; + + try { + conn->call(requestMsg, responseMsg, true, nullptr); + } catch (...) { + return exceptionToStatus(); + } + + auto messageId = requestMsg.header().getId(); + + return upconvertReply(messageId, DbMessage(requestMsg).getns(), responseMsg); +} + +/** + * Downconverts the specified find command to a find protocol operation and sends it to the + * server on the specified connection. + */ +ResponseStatus runDownconvertedFindCommand(DBClientConnection* conn, + const RemoteCommandRequest& request) { + return runDownconvertedCommand<executor::downconvertFindCommandRequest, + executor::upconvertLegacyQueryResponse>(conn, request); +} + +/** + * Downconverts the specified getMore command to legacy getMore operation and sends it to the + * server on the specified connection. + */ +ResponseStatus runDownconvertedGetMoreCommand(DBClientConnection* conn, + const RemoteCommandRequest& request) { + return runDownconvertedCommand<executor::downconvertGetMoreCommandRequest, + executor::upconvertLegacyGetMoreResponse>(conn, request); +} + +} // namespace + +RemoteCommandRunnerImpl::RemoteCommandRunnerImpl( + int messagingTags, std::unique_ptr<executor::NetworkConnectionHook> hook) + : _connPool(messagingTags, std::move(hook)) {} + +RemoteCommandRunnerImpl::~RemoteCommandRunnerImpl() { + invariant(!_active); +} + +void RemoteCommandRunnerImpl::startup() { + _active = true; +} + +void RemoteCommandRunnerImpl::shutdown() { + if (!_active) { + return; + } + + _connPool.closeAllInUseConnections(); + _active = false; +} + +ResponseStatus RemoteCommandRunnerImpl::runCommand(const RemoteCommandRequest& request) { + try { + const Date_t requestStartDate = Date_t::now(); + const auto timeoutMillis = getTimeoutMillis(request.expirationDate, requestStartDate); + if (!timeoutMillis.isOK()) { + return {timeoutMillis.getStatus()}; + } + + ConnectionPool::ConnectionPtr conn( + &_connPool, request.target, requestStartDate, timeoutMillis.getValue()); + + BSONObj output; + BSONObj metadata; + + // If remote server does not support either find or getMore commands, down convert + // to using DBClientInterface::query()/getMore(). + // Perform down conversion based on wire protocol version. + + // 'commandName' will be an empty string if the command object is an empty BSON + // document. + StringData commandName = request.cmdObj.firstElement().fieldNameStringData(); + const auto isFindCmd = commandName == QueryRequest::kFindCommandName; + const auto isGetMoreCmd = commandName == GetMoreRequest::kGetMoreCommandName; + const auto isFindOrGetMoreCmd = isFindCmd || isGetMoreCmd; + + // We are using the wire version to check if we need to downconverting find/getMore + // requests because coincidentally, the find/getMore command is only supported by + // servers that also accept OP_COMMAND. + bool supportsFindAndGetMoreCommands = rpc::supportsWireVersionForOpCommandInMongod( + conn.get()->getMinWireVersion(), conn.get()->getMaxWireVersion()); + + if (!isFindOrGetMoreCmd || supportsFindAndGetMoreCommands) { + rpc::UniqueReply commandResponse = + conn.get()->runCommandWithMetadata(request.dbname, + request.cmdObj.firstElementFieldName(), + request.metadata, + request.cmdObj); + + output = commandResponse->getCommandReply().getOwned(); + metadata = commandResponse->getMetadata().getOwned(); + } else if (isFindCmd) { + return runDownconvertedFindCommand(conn.get(), request); + } else if (isGetMoreCmd) { + return runDownconvertedGetMoreCommand(conn.get(), request); + } + + const Date_t requestFinishDate = Date_t::now(); + conn.done(requestFinishDate); + + return {std::move(output), + std::move(metadata), + Milliseconds(requestFinishDate - requestStartDate)}; + } catch (const DBException& ex) { + return {ex.toStatus()}; + } catch (const std::exception& ex) { + return {ErrorCodes::UnknownError, + str::stream() << "Sending command " << request.cmdObj << " on database " + << request.dbname + << " over network to " + << request.target.toString() + << " received exception " + << ex.what()}; + } +} + +} // namespace mongo diff --git a/src/mongo/client/sasl_client_authenticate_impl.cpp b/src/mongo/client/sasl_client_authenticate_impl.cpp index aa4d7e64838..8a06acedb31 100644 --- a/src/mongo/client/sasl_client_authenticate_impl.cpp +++ b/src/mongo/client/sasl_client_authenticate_impl.cpp @@ -210,7 +210,7 @@ void asyncSaslConversation(auth::RunCommandHook runCommand, return handler(std::move(response)); } - auto serverResponse = response.getValue().data.getOwned(); + auto serverResponse = response.data.getOwned(); auto code = getStatusFromCommandResult(serverResponse).code(); // Server versions 2.3.2 and earlier may return "ok: 1" with a non-zero diff --git a/src/mongo/db/repl/base_cloner_test_fixture.cpp b/src/mongo/db/repl/base_cloner_test_fixture.cpp index ac926addcd9..399be148782 100644 --- a/src/mongo/db/repl/base_cloner_test_fixture.cpp +++ b/src/mongo/db/repl/base_cloner_test_fixture.cpp @@ -146,7 +146,7 @@ void BaseClonerTest::scheduleNetworkResponse(NetworkOperationIterator noi, auto net = getNet(); executor::TaskExecutor::ResponseStatus responseStatus(code, reason); log() << "Scheduling error response to request:" << noi->getDiagnosticString() - << " -- status:" << responseStatus.getStatus().toString(); + << " -- status:" << responseStatus.status.toString(); net->scheduleResponse(noi, net->now(), responseStatus); } diff --git a/src/mongo/db/repl/check_quorum_for_config_change.cpp b/src/mongo/db/repl/check_quorum_for_config_change.cpp index bd3740ed410..423ea40cc94 100644 --- a/src/mongo/db/repl/check_quorum_for_config_change.cpp +++ b/src/mongo/db/repl/check_quorum_for_config_change.cpp @@ -181,12 +181,12 @@ void QuorumChecker::_tabulateHeartbeatResponse(const RemoteCommandRequest& reque ++_numResponses; if (!response.isOK()) { warning() << "Failed to complete heartbeat request to " << request.target << "; " - << response.getStatus(); - _badResponses.push_back(std::make_pair(request.target, response.getStatus())); + << response.status; + _badResponses.push_back(std::make_pair(request.target, response.status)); return; } - BSONObj resBSON = response.getValue().data; + BSONObj resBSON = response.data; ReplSetHeartbeatResponse hbResp; Status hbStatus = hbResp.initialize(resBSON, 0); @@ -219,7 +219,7 @@ void QuorumChecker::_tabulateHeartbeatResponse(const RemoteCommandRequest& reque if (_rsConfig->hasReplicaSetId()) { StatusWith<rpc::ReplSetMetadata> replMetadata = - rpc::ReplSetMetadata::readFromMetadata(response.getValue().metadata); + rpc::ReplSetMetadata::readFromMetadata(response.metadata); if (replMetadata.isOK() && replMetadata.getValue().getReplicaSetId().isSet() && _rsConfig->getReplicaSetId() != replMetadata.getValue().getReplicaSetId()) { std::string message = str::stream() diff --git a/src/mongo/db/repl/check_quorum_for_config_change_test.cpp b/src/mongo/db/repl/check_quorum_for_config_change_test.cpp index 86527def28a..610dbad4cf7 100644 --- a/src/mongo/db/repl/check_quorum_for_config_change_test.cpp +++ b/src/mongo/db/repl/check_quorum_for_config_change_test.cpp @@ -203,7 +203,7 @@ TEST_F(CheckQuorumForInitiate, QuorumCheckFailedDueToSeveralDownNodes) { for (int i = 0; i < numCommandsExpected; ++i) { _net->scheduleResponse(_net->getNextReadyRequest(), startDate + Milliseconds(10), - ResponseStatus(ErrorCodes::NoSuchKey, "No reply")); + {ErrorCodes::NoSuchKey, "No reply"}); } _net->runUntil(startDate + Milliseconds(10)); _net->exitNetwork(); @@ -320,9 +320,8 @@ TEST_F(CheckQuorumForInitiate, QuorumCheckFailedDueToOneDownNode) { ASSERT(seenHosts.insert(request.target).second) << "Already saw " << request.target.toString(); if (request.target == HostAndPort("h2", 1)) { - _net->scheduleResponse(noi, - startDate + Milliseconds(10), - ResponseStatus(ErrorCodes::NoSuchKey, "No response")); + _net->scheduleResponse( + noi, startDate + Milliseconds(10), {ErrorCodes::NoSuchKey, "No response"}); } else { _net->scheduleResponse( noi, @@ -766,9 +765,8 @@ TEST_F(CheckQuorumForReconfig, QuorumCheckVetoedDueToIncompatibleSetName) { ResponseStatus(RemoteCommandResponse( BSON("ok" << 0 << "mismatch" << true), BSONObj(), Milliseconds(8)))); } else { - _net->scheduleResponse(noi, - startDate + Milliseconds(10), - ResponseStatus(ErrorCodes::NoSuchKey, "No response")); + _net->scheduleResponse( + noi, startDate + Milliseconds(10), {ErrorCodes::NoSuchKey, "No response"}); } } _net->runUntil(startDate + Milliseconds(10)); @@ -832,9 +830,8 @@ TEST_F(CheckQuorumForReconfig, QuorumCheckFailsDueToInsufficientVoters) { startDate + Milliseconds(10), ResponseStatus(RemoteCommandResponse(BSON("ok" << 1), BSONObj(), Milliseconds(8)))); } else { - _net->scheduleResponse(noi, - startDate + Milliseconds(10), - ResponseStatus(ErrorCodes::NoSuchKey, "No response")); + _net->scheduleResponse( + noi, startDate + Milliseconds(10), {ErrorCodes::NoSuchKey, "No response"}); } } _net->runUntil(startDate + Milliseconds(10)); @@ -894,9 +891,8 @@ TEST_F(CheckQuorumForReconfig, QuorumCheckFailsDueToNoElectableNodeResponding) { startDate + Milliseconds(10), ResponseStatus(RemoteCommandResponse(BSON("ok" << 1), BSONObj(), Milliseconds(8)))); } else { - _net->scheduleResponse(noi, - startDate + Milliseconds(10), - ResponseStatus(ErrorCodes::NoSuchKey, "No response")); + _net->scheduleResponse( + noi, startDate + Milliseconds(10), {ErrorCodes::NoSuchKey, "No response"}); } } _net->runUntil(startDate + Milliseconds(10)); diff --git a/src/mongo/db/repl/databases_cloner.cpp b/src/mongo/db/repl/databases_cloner.cpp index 3b01a07e139..cbc8d763490 100644 --- a/src/mongo/db/repl/databases_cloner.cpp +++ b/src/mongo/db/repl/databases_cloner.cpp @@ -191,9 +191,9 @@ void DatabasesCloner::setScheduleDbWorkFn_forTest(const CollectionCloner::Schedu } void DatabasesCloner::_onListDatabaseFinish(const CommandCallbackArgs& cbd) { - Status respStatus = cbd.response.getStatus(); + Status respStatus = cbd.response.status; if (respStatus.isOK()) { - respStatus = getStatusFromCommandResult(cbd.response.getValue().data); + respStatus = getStatusFromCommandResult(cbd.response.data); } UniqueLock lk(_mutex); @@ -204,7 +204,7 @@ void DatabasesCloner::_onListDatabaseFinish(const CommandCallbackArgs& cbd) { return; } - const auto respBSON = cbd.response.getValue().data; + const auto respBSON = cbd.response.data; // There should not be any cloners yet invariant(_databaseCloners.size() == 0); const auto dbsElem = respBSON["databases"].Obj(); diff --git a/src/mongo/db/repl/elect_cmd_runner.cpp b/src/mongo/db/repl/elect_cmd_runner.cpp index afe28960c1f..907f396c23c 100644 --- a/src/mongo/db/repl/elect_cmd_runner.cpp +++ b/src/mongo/db/repl/elect_cmd_runner.cpp @@ -108,7 +108,7 @@ void ElectCmdRunner::Algorithm::processResponse(const RemoteCommandRequest& requ ++_actualResponses; if (response.isOK()) { - BSONObj res = response.getValue().data; + BSONObj res = response.data; log() << "received " << res["vote"] << " votes from " << request.target; LOG(1) << "full elect res: " << res.toString(); BSONElement vote(res["vote"]); @@ -121,7 +121,7 @@ void ElectCmdRunner::Algorithm::processResponse(const RemoteCommandRequest& requ _receivedVotes += vote._numberInt(); } else { - warning() << "elect command to " << request.target << " failed: " << response.getStatus(); + warning() << "elect command to " << request.target << " failed: " << response.status; } } diff --git a/src/mongo/db/repl/freshness_checker.cpp b/src/mongo/db/repl/freshness_checker.cpp index d3927c33102..1b74c3810b5 100644 --- a/src/mongo/db/repl/freshness_checker.cpp +++ b/src/mongo/db/repl/freshness_checker.cpp @@ -128,8 +128,7 @@ void FreshnessChecker::Algorithm::processResponse(const RemoteCommandRequest& re Status status = Status::OK(); - if (!response.isOK() || - !((status = getStatusFromCommandResult(response.getValue().data)).isOK())) { + if (!response.isOK() || !((status = getStatusFromCommandResult(response.data)).isOK())) { if (votingMember) { ++_failedVoterResponses; if (hadTooManyFailedVoterResponses()) { @@ -145,7 +144,7 @@ void FreshnessChecker::Algorithm::processResponse(const RemoteCommandRequest& re return; } - const BSONObj res = response.getValue().data; + const BSONObj res = response.data; LOG(2) << "FreshnessChecker: Got response from " << request.target << " of " << res; diff --git a/src/mongo/db/repl/freshness_scanner.cpp b/src/mongo/db/repl/freshness_scanner.cpp index 87182309a39..2623fb32b88 100644 --- a/src/mongo/db/repl/freshness_scanner.cpp +++ b/src/mongo/db/repl/freshness_scanner.cpp @@ -74,9 +74,9 @@ void FreshnessScanner::Algorithm::processResponse(const RemoteCommandRequest& re _responsesProcessed++; if (!response.isOK()) { // failed response LOG(2) << "FreshnessScanner: Got failed response from " << request.target << ": " - << response.getStatus(); + << response.status; } else { - BSONObj opTimesObj = response.getValue().data.getObjectField("optimes"); + BSONObj opTimesObj = response.data.getObjectField("optimes"); OpTime lastOpTime; Status status = bsonExtractOpTimeField(opTimesObj, "appliedOpTime", &lastOpTime); if (!status.isOK()) { diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp index e56fa347fcd..d0fe0aa4e58 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp @@ -127,7 +127,7 @@ void ReplicationCoordinatorImpl::_handleHeartbeatResponse( // Parse and validate the response. At the end of this step, if responseStatus is OK then // hbResponse is valid. - Status responseStatus = cbData.response.getStatus(); + Status responseStatus = cbData.response.status; if (responseStatus == ErrorCodes::CallbackCanceled) { return; } @@ -136,10 +136,10 @@ void ReplicationCoordinatorImpl::_handleHeartbeatResponse( ReplSetHeartbeatResponse hbResponse; BSONObj resp; if (responseStatus.isOK()) { - resp = cbData.response.getValue().data; + resp = cbData.response.data; responseStatus = hbResponse.initialize(resp, _topCoord->getTerm()); StatusWith<rpc::ReplSetMetadata> replMetadata = - rpc::ReplSetMetadata::readFromMetadata(cbData.response.getValue().metadata); + rpc::ReplSetMetadata::readFromMetadata(cbData.response.metadata); // Reject heartbeat responses (and metadata) from nodes with mismatched replica set IDs. // It is problematic to perform this check in the heartbeat reconfiguring logic because it @@ -170,7 +170,7 @@ void ReplicationCoordinatorImpl::_handleHeartbeatResponse( StatusWith<ReplSetHeartbeatResponse> hbStatusResponse(hbResponse); if (responseStatus.isOK()) { - networkTime = cbData.response.getValue().elapsedMillis; + networkTime = cbData.response.elapsedMillis.value_or(Milliseconds{0}); // TODO(sz) Because the term is duplicated in ReplSetMetaData, we can get rid of this // and update tests. _updateTerm_incallback(hbStatusResponse.getValue().getTerm()); @@ -288,17 +288,17 @@ namespace { * This callback is purely for logging and has no effect on any other operations */ void remoteStepdownCallback(const ReplicationExecutor::RemoteCommandCallbackArgs& cbData) { - const Status status = cbData.response.getStatus(); + const Status status = cbData.response.status; if (status == ErrorCodes::CallbackCanceled) { return; } if (status.isOK()) { LOG(1) << "stepdown of primary(" << cbData.request.target << ") succeeded with response -- " - << cbData.response.getValue().data; + << cbData.response.data; } else { warning() << "stepdown of primary(" << cbData.request.target << ") failed due to " - << cbData.response.getStatus(); + << cbData.response.status; } } } // namespace diff --git a/src/mongo/db/repl/replication_executor.cpp b/src/mongo/db/repl/replication_executor.cpp index 6a076aff54d..dd077167b0e 100644 --- a/src/mongo/db/repl/replication_executor.cpp +++ b/src/mongo/db/repl/replication_executor.cpp @@ -317,7 +317,7 @@ void ReplicationExecutor::_finishRemoteCommand(const RemoteCommandRequest& reque } LOG(4) << "Received remote response: " - << (response.isOK() ? response.getValue().toString() : response.getStatus().toString()); + << (response.isOK() ? response.toString() : response.status.toString()); callback->_callbackFn = stdx::bind(remoteCommandFinished, stdx::placeholders::_1, cb, request, response); diff --git a/src/mongo/db/repl/replication_executor.h b/src/mongo/db/repl/replication_executor.h index ffb90fb78ea..be231372c8a 100644 --- a/src/mongo/db/repl/replication_executor.h +++ b/src/mongo/db/repl/replication_executor.h @@ -263,7 +263,7 @@ private: void finishShutdown(); void _finishRemoteCommand(const executor::RemoteCommandRequest& request, - const StatusWith<executor::RemoteCommandResponse>& response, + const executor::RemoteCommandResponse& response, const CallbackHandle& cbHandle, const uint64_t expectedHandleGeneration, const RemoteCommandCallbackFn& cb); diff --git a/src/mongo/db/repl/reporter.cpp b/src/mongo/db/repl/reporter.cpp index 2e19e804108..64b7dd27e12 100644 --- a/src/mongo/db/repl/reporter.cpp +++ b/src/mongo/db/repl/reporter.cpp @@ -257,14 +257,14 @@ void Reporter::_processResponseCallback( return; } - _status = rcbd.response.getStatus(); + _status = rcbd.response.status; if (!_status.isOK()) { _onShutdown_inlock(); return; } // Override _status with the one embedded in the command result. - const auto& commandResult = rcbd.response.getValue().data; + const auto& commandResult = rcbd.response.data; _status = getStatusFromCommandResult(commandResult); // Some error types are OK and should not cause the reporter to stop sending updates to the diff --git a/src/mongo/db/repl/reporter_test.cpp b/src/mongo/db/repl/reporter_test.cpp index 9d89e00bf95..42ee28c6aa5 100644 --- a/src/mongo/db/repl/reporter_test.cpp +++ b/src/mongo/db/repl/reporter_test.cpp @@ -46,6 +46,8 @@ using executor::NetworkInterfaceMock; using executor::RemoteCommandRequest; using executor::RemoteCommandResponse; +using ResponseStatus = mongo::executor::TaskExecutor::ResponseStatus; + class MockProgressManager { public: void updateMap(int memberId, const OpTime& lastDurableOpTime, const OpTime& lastAppliedOpTime) { @@ -120,8 +122,7 @@ public: */ BSONObj processNetworkResponse(const BSONObj& obj, bool expectReadyRequestsAfterProcessing = false); - BSONObj processNetworkResponse(ErrorCodes::Error code, - const std::string& reason, + BSONObj processNetworkResponse(const ResponseStatus rs, bool expectReadyRequestsAfterProcessing = false); void runUntil(Date_t when, bool expectReadyRequestsAfterAdvancingClock = false); @@ -210,12 +211,11 @@ BSONObj ReporterTest::processNetworkResponse(const BSONObj& obj, return cmdObj; } -BSONObj ReporterTest::processNetworkResponse(ErrorCodes::Error code, - const std::string& reason, +BSONObj ReporterTest::processNetworkResponse(const ResponseStatus rs, bool expectReadyRequestsAfterProcessing) { auto net = getNet(); net->enterNetwork(); - auto cmdObj = net->scheduleErrorResponse({code, reason}).cmdObj; + auto cmdObj = net->scheduleErrorResponse(rs).cmdObj; net->runReadyNetworkOperations(); ASSERT_EQUALS(expectReadyRequestsAfterProcessing, net->hasReadyRequests()); net->exitNetwork(); @@ -320,7 +320,7 @@ TEST_F(ReporterTest, TaskExecutorAndNetworkErrorsStopTheReporter) { ASSERT_TRUE(reporter->isActive()); ASSERT_TRUE(reporter->isWaitingToSendReport()); - processNetworkResponse(ErrorCodes::NoSuchKey, "waaaah"); + processNetworkResponse({ErrorCodes::NoSuchKey, "waaaah", Milliseconds(0)}); ASSERT_EQUALS(ErrorCodes::NoSuchKey, reporter->join()); assertReporterDone(); @@ -561,7 +561,7 @@ TEST_F(ReporterTest, } TEST_F(ReporterTest, FailedUpdateShouldNotRescheduleUpdate) { - processNetworkResponse(ErrorCodes::OperationFailed, "update failed"); + processNetworkResponse({ErrorCodes::OperationFailed, "update failed", Milliseconds(0)}); ASSERT_EQUALS(ErrorCodes::OperationFailed, reporter->join()); assertReporterDone(); @@ -576,7 +576,7 @@ TEST_F(ReporterTest, SuccessfulUpdateShouldRescheduleUpdate) { runUntil(until, true); - processNetworkResponse(ErrorCodes::OperationFailed, "update failed"); + processNetworkResponse({ErrorCodes::OperationFailed, "update failed", Milliseconds(0)}); ASSERT_EQUALS(ErrorCodes::OperationFailed, reporter->join()); assertReporterDone(); diff --git a/src/mongo/db/repl/rollback_checker.cpp b/src/mongo/db/repl/rollback_checker.cpp index 2a1685d51ab..ffca29bc211 100644 --- a/src/mongo/db/repl/rollback_checker.cpp +++ b/src/mongo/db/repl/rollback_checker.cpp @@ -51,14 +51,14 @@ RollbackChecker::~RollbackChecker() {} RollbackChecker::CallbackHandle RollbackChecker::checkForRollback(const CallbackFn& nextAction) { return _scheduleGetRollbackId( [this, nextAction](const RemoteCommandCallbackArgs& args) { - if (args.response.getStatus() == ErrorCodes::CallbackCanceled) { + if (args.response.status == ErrorCodes::CallbackCanceled) { return; } if (!args.response.isOK()) { - nextAction(args.response.getStatus()); + nextAction(args.response.status); return; } - if (auto rbidElement = args.response.getValue().data["rbid"]) { + if (auto rbidElement = args.response.data["rbid"]) { int remoteRBID = rbidElement.numberInt(); UniqueLock lk(_mutex); @@ -97,14 +97,14 @@ StatusWith<bool> RollbackChecker::hasHadRollback() { RollbackChecker::CallbackHandle RollbackChecker::reset(const CallbackFn& nextAction) { return _scheduleGetRollbackId( [this, nextAction](const RemoteCommandCallbackArgs& args) { - if (args.response.getStatus() == ErrorCodes::CallbackCanceled) { + if (args.response.status == ErrorCodes::CallbackCanceled) { return; } if (!args.response.isOK()) { - nextAction(args.response.getStatus()); + nextAction(args.response.status); return; } - if (auto rbidElement = args.response.getValue().data["rbid"]) { + if (auto rbidElement = args.response.data["rbid"]) { int newRBID = rbidElement.numberInt(); UniqueLock lk(_mutex); diff --git a/src/mongo/db/repl/scatter_gather_runner.cpp b/src/mongo/db/repl/scatter_gather_runner.cpp index 90a16ea470a..597239179e0 100644 --- a/src/mongo/db/repl/scatter_gather_runner.cpp +++ b/src/mongo/db/repl/scatter_gather_runner.cpp @@ -138,7 +138,7 @@ void ScatterGatherRunner::RunnerImpl::processResponse( std::swap(*iter, _callbacks.back()); _callbacks.pop_back(); - if (cbData.response.getStatus() == ErrorCodes::CallbackCanceled) { + if (cbData.response.status == ErrorCodes::CallbackCanceled) { return; } diff --git a/src/mongo/db/repl/vote_requester.cpp b/src/mongo/db/repl/vote_requester.cpp index b02b3bb7c0c..4f710827884 100644 --- a/src/mongo/db/repl/vote_requester.cpp +++ b/src/mongo/db/repl/vote_requester.cpp @@ -93,24 +93,23 @@ void VoteRequester::Algorithm::processResponse(const RemoteCommandRequest& reque _responsesProcessed++; if (!response.isOK()) { // failed response log() << "VoteRequester: Got failed response from " << request.target << ": " - << response.getStatus(); + << response.status; } else { _responders.insert(request.target); ReplSetRequestVotesResponse voteResponse; - const auto status = voteResponse.initialize(response.getValue().data); + const auto status = voteResponse.initialize(response.data); if (!status.isOK()) { log() << "VoteRequester: Got error processing response with status: " << status - << ", resp:" << response.getValue().data; + << ", resp:" << response.data; } if (voteResponse.getVoteGranted()) { LOG(3) << "VoteRequester: Got yes vote from " << request.target - << ", resp:" << response.getValue().data; + << ", resp:" << response.data; _votes++; } else { log() << "VoteRequester: Got no vote from " << request.target - << " because: " << voteResponse.getReason() - << ", resp:" << response.getValue().data; + << " because: " << voteResponse.getReason() << ", resp:" << response.data; } if (voteResponse.getTerm() > _term) { diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp index 500b11e57e7..08721b06419 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp +++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp @@ -467,7 +467,7 @@ void MigrationChunkClonerSourceLegacy::_cleanup(OperationContext* txn) { } StatusWith<BSONObj> MigrationChunkClonerSourceLegacy::_callRecipient(const BSONObj& cmdObj) { - StatusWith<executor::RemoteCommandResponse> responseStatus( + executor::RemoteCommandResponse responseStatus( Status{ErrorCodes::InternalError, "Uninitialized value"}); auto executor = grid.getExecutorPool()->getArbitraryExecutor(); @@ -485,15 +485,15 @@ StatusWith<BSONObj> MigrationChunkClonerSourceLegacy::_callRecipient(const BSONO executor->wait(scheduleStatus.getValue()); if (!responseStatus.isOK()) { - return responseStatus.getStatus(); + return responseStatus.status; } - Status commandStatus = getStatusFromCommandResult(responseStatus.getValue().data); + Status commandStatus = getStatusFromCommandResult(responseStatus.data); if (!commandStatus.isOK()) { return commandStatus; } - return responseStatus.getValue().data.getOwned(); + return responseStatus.data.getOwned(); } Status MigrationChunkClonerSourceLegacy::_storeCurrentLocs(OperationContext* txn) { diff --git a/src/mongo/executor/connection_pool_asio.cpp b/src/mongo/executor/connection_pool_asio.cpp index 99e4fadb135..0d13314ff03 100644 --- a/src/mongo/executor/connection_pool_asio.cpp +++ b/src/mongo/executor/connection_pool_asio.cpp @@ -160,9 +160,9 @@ std::unique_ptr<NetworkInterfaceASIO::AsyncOp> ASIOConnection::makeAsyncOp(ASIOC BSON("isMaster" << 1), BSONObj(), nullptr}, - [conn](const TaskExecutor::ResponseStatus& status) { + [conn](const TaskExecutor::ResponseStatus& rs) { auto cb = std::move(conn->_setupCallback); - cb(conn, status.isOK() ? Status::OK() : status.getStatus()); + cb(conn, rs.status); }, conn->_global->now()); } @@ -244,10 +244,10 @@ void ASIOConnection::refresh(Milliseconds timeout, RefreshCallback cb) { // need to intercept those calls so we can capture them. This will get cleared out when we // fill // in the real onFinish in startCommand. - op->setOnFinish([this](StatusWith<RemoteCommandResponse> failedResponse) { + op->setOnFinish([this](RemoteCommandResponse failedResponse) { invariant(!failedResponse.isOK()); auto cb = std::move(_refreshCallback); - cb(this, failedResponse.getStatus()); + cb(this, failedResponse.status); }); op->_inRefresh = true; diff --git a/src/mongo/executor/connection_pool_asio_integration_test.cpp b/src/mongo/executor/connection_pool_asio_integration_test.cpp index 04c39348ee6..fb503fe8502 100644 --- a/src/mongo/executor/connection_pool_asio_integration_test.cpp +++ b/src/mongo/executor/connection_pool_asio_integration_test.cpp @@ -103,16 +103,16 @@ TEST(ConnectionPoolASIO, TestPing) { for (auto& thread : threads) { thread = stdx::thread([&net, &fixture]() { auto status = Status::OK(); - Deferred<StatusWith<RemoteCommandResponse>> deferred; + Deferred<RemoteCommandResponse> deferred; RemoteCommandRequest request{ fixture.getServers()[0], "admin", BSON("ping" << 1), BSONObj(), nullptr}; net.startCommand( - makeCallbackHandle(), request, [&deferred](StatusWith<RemoteCommandResponse> resp) { + makeCallbackHandle(), request, [&deferred](RemoteCommandResponse resp) { deferred.emplace(std::move(resp)); }); - ASSERT_OK(deferred.get().getStatus()); + ASSERT_OK(deferred.get().status); }); } @@ -140,15 +140,14 @@ TEST(ConnectionPoolASIO, TestHostTimeoutRace) { auto guard = MakeGuard([&] { net.shutdown(); }); for (int i = 0; i < 1000; i++) { - Deferred<StatusWith<RemoteCommandResponse>> deferred; + Deferred<RemoteCommandResponse> deferred; RemoteCommandRequest request{ fixture.getServers()[0], "admin", BSON("ping" << 1), BSONObj(), nullptr}; - net.startCommand( - makeCallbackHandle(), request, [&](StatusWith<RemoteCommandResponse> resp) { - deferred.emplace(std::move(resp)); - }); + net.startCommand(makeCallbackHandle(), request, [&](RemoteCommandResponse resp) { + deferred.emplace(std::move(resp)); + }); - ASSERT_OK(deferred.get().getStatus()); + ASSERT_OK(deferred.get().status); sleepmillis(1); } } @@ -169,14 +168,14 @@ TEST(ConnectionPoolASIO, ConnSetupTimeout) { net.startup(); auto guard = MakeGuard([&] { net.shutdown(); }); - Deferred<StatusWith<RemoteCommandResponse>> deferred; + Deferred<RemoteCommandResponse> deferred; RemoteCommandRequest request{ fixture.getServers()[0], "admin", BSON("ping" << 1), BSONObj(), nullptr}; - net.startCommand(makeCallbackHandle(), request, [&](StatusWith<RemoteCommandResponse> resp) { + net.startCommand(makeCallbackHandle(), request, [&](RemoteCommandResponse resp) { deferred.emplace(std::move(resp)); }); - ASSERT_EQ(deferred.get().getStatus().code(), ErrorCodes::ExceededTimeLimit); + ASSERT_EQ(deferred.get().status.code(), ErrorCodes::ExceededTimeLimit); } /** @@ -195,7 +194,7 @@ TEST(ConnectionPoolASIO, ConnRefreshHappens) { net.startup(); auto guard = MakeGuard([&] { net.shutdown(); }); - std::array<Deferred<StatusWith<RemoteCommandResponse>>, 10> deferreds; + std::array<Deferred<RemoteCommandResponse>, 10> deferreds; RemoteCommandRequest request{fixture.getServers()[0], "admin", @@ -207,10 +206,9 @@ TEST(ConnectionPoolASIO, ConnRefreshHappens) { nullptr}; for (auto& deferred : deferreds) { - net.startCommand( - makeCallbackHandle(), request, [&](StatusWith<RemoteCommandResponse> resp) { - deferred.emplace(std::move(resp)); - }); + net.startCommand(makeCallbackHandle(), request, [&](RemoteCommandResponse resp) { + deferred.emplace(std::move(resp)); + }); } for (auto& deferred : deferreds) { @@ -241,11 +239,11 @@ TEST(ConnectionPoolASIO, ConnRefreshSurvivesFailure) { net.startup(); auto guard = MakeGuard([&] { net.shutdown(); }); - Deferred<StatusWith<RemoteCommandResponse>> deferred; + Deferred<RemoteCommandResponse> deferred; RemoteCommandRequest request{ fixture.getServers()[0], "admin", BSON("ping" << 1), BSONObj(), nullptr}; - net.startCommand(makeCallbackHandle(), request, [&](StatusWith<RemoteCommandResponse> resp) { + net.startCommand(makeCallbackHandle(), request, [&](RemoteCommandResponse resp) { deferred.emplace(std::move(resp)); }); diff --git a/src/mongo/executor/downconvert_find_and_getmore_commands.cpp b/src/mongo/executor/downconvert_find_and_getmore_commands.cpp index df3492d15f1..50a59c4d81f 100644 --- a/src/mongo/executor/downconvert_find_and_getmore_commands.cpp +++ b/src/mongo/executor/downconvert_find_and_getmore_commands.cpp @@ -208,12 +208,12 @@ StatusWith<Message> downconvertFindCommandRequest(const RemoteCommandRequest& re return {std::move(message)}; } -StatusWith<RemoteCommandResponse> upconvertLegacyQueryResponse(std::int32_t requestId, - StringData cursorNamespace, - const Message& response) { +RemoteCommandResponse upconvertLegacyQueryResponse(std::int32_t requestId, + StringData cursorNamespace, + const Message& response) { auto swBatch = getBatchFromReply(requestId, response); if (!swBatch.isOK()) { - return swBatch.getStatus(); + return {swBatch.getStatus()}; } BSONArray batch; @@ -252,12 +252,12 @@ StatusWith<Message> downconvertGetMoreCommandRequest(const RemoteCommandRequest& return {std::move(m)}; } -StatusWith<RemoteCommandResponse> upconvertLegacyGetMoreResponse(std::int32_t requestId, - StringData cursorNamespace, - const Message& response) { +RemoteCommandResponse upconvertLegacyGetMoreResponse(std::int32_t requestId, + StringData cursorNamespace, + const Message& response) { auto swBatch = getBatchFromReply(requestId, response); if (!swBatch.isOK()) { - return swBatch.getStatus(); + return {swBatch.getStatus()}; } BSONArray batch; diff --git a/src/mongo/executor/downconvert_find_and_getmore_commands.h b/src/mongo/executor/downconvert_find_and_getmore_commands.h index d3f24f7331d..dd991358e78 100644 --- a/src/mongo/executor/downconvert_find_and_getmore_commands.h +++ b/src/mongo/executor/downconvert_find_and_getmore_commands.h @@ -56,9 +56,9 @@ StatusWith<Message> downconvertFindCommandRequest(const RemoteCommandRequest& re * find command response. The 'requestId' parameter is the messageId of the original OP_QUERY, and * the 'cursorNamespace' is the full namespace of the collection the query ran on. */ -StatusWith<RemoteCommandResponse> upconvertLegacyQueryResponse(std::int32_t requestId, - StringData cursorNamespace, - const Message& response); +RemoteCommandResponse upconvertLegacyQueryResponse(std::int32_t requestId, + StringData cursorNamespace, + const Message& response); /** * Downconverts a getMore command request to the legacy OP_GET_MORE format. The returned message @@ -73,9 +73,9 @@ StatusWith<Message> downconvertGetMoreCommandRequest(const RemoteCommandRequest& * getMore command response. The 'requestId' parameter is the messageId of the original OP_GET_MORE, * and the 'curesorNamespace' is the full namespace of the collection the original query ran on. */ -StatusWith<RemoteCommandResponse> upconvertLegacyGetMoreResponse(std::int32_t requestId, - StringData cursorNamespace, - const Message& response); +RemoteCommandResponse upconvertLegacyGetMoreResponse(std::int32_t requestId, + StringData cursorNamespace, + const Message& response); } // namespace mongo } // namespace executor diff --git a/src/mongo/executor/network_interface_asio.cpp b/src/mongo/executor/network_interface_asio.cpp index 8617ce6e20c..e9732dbc125 100644 --- a/src/mongo/executor/network_interface_asio.cpp +++ b/src/mongo/executor/network_interface_asio.cpp @@ -276,9 +276,10 @@ Status NetworkInterfaceASIO::startCommand(const TaskExecutor::CallbackHandle& cb wasPreviouslyCanceled = _inGetConnection.erase(cbHandle) == 0; } - onFinish(wasPreviouslyCanceled - ? Status(ErrorCodes::CallbackCanceled, "Callback canceled") - : swConn.getStatus()); + Status status = wasPreviouslyCanceled + ? Status(ErrorCodes::CallbackCanceled, "Callback canceled") + : swConn.getStatus(); + onFinish({status, now() - getConnectionStartTime}); signalWorkAvailable(); return; } @@ -295,7 +296,9 @@ Status NetworkInterfaceASIO::startCommand(const TaskExecutor::CallbackHandle& cb if (eraseCount == 0) { lk.unlock(); - onFinish({ErrorCodes::CallbackCanceled, "Callback canceled"}); + onFinish({ErrorCodes::CallbackCanceled, + "Callback canceled", + now() - getConnectionStartTime}); // Though we were canceled, we know that the stream is fine, so indicate success. conn->indicateSuccess(); @@ -342,7 +345,9 @@ Status NetworkInterfaceASIO::startCommand(const TaskExecutor::CallbackHandle& cb std::stringstream msg; msg << "Remote command timed out while waiting to get a connection from the " << "pool, took " << getConnectionDuration; - return _completeOperation(op, {ErrorCodes::ExceededTimeLimit, msg.str()}); + auto rs = ResponseStatus( + ErrorCodes::ExceededTimeLimit, msg.str(), getConnectionDuration); + return _completeOperation(op, rs); } // The above conditional guarantees that the adjusted timeout will never underflow. diff --git a/src/mongo/executor/network_interface_asio.h b/src/mongo/executor/network_interface_asio.h index 6e80c436abc..b0a8d946b1e 100644 --- a/src/mongo/executor/network_interface_asio.h +++ b/src/mongo/executor/network_interface_asio.h @@ -95,6 +95,7 @@ class NetworkInterfaceASIO final : public NetworkInterface { friend class connection_pool_asio::ASIOConnection; friend class connection_pool_asio::ASIOTimer; friend class connection_pool_asio::ASIOImpl; + class AsyncOp; public: struct Options { @@ -207,7 +208,8 @@ private: Message& toRecv(); MSGHEADER::Value& header(); - ResponseStatus response(rpc::Protocol protocol, + ResponseStatus response(AsyncOp* op, + rpc::Protocol protocol, Date_t now, rpc::EgressMetadataHook* metadataHook = nullptr); @@ -314,6 +316,9 @@ private: void setOperationProtocol(rpc::Protocol proto); + void setResponseMetadata(BSONObj m); + BSONObj getResponseMetadata(); + void reset(); void clearStateTransitions(); @@ -416,6 +421,8 @@ private: * Must be holding the access control's lock to edit. */ std::array<State, kMaxStateTransitions> _states; + + BSONObj _responseMetadata{}; }; void _startCommand(AsyncOp* op); @@ -430,16 +437,17 @@ private: */ template <typename Handler> void _validateAndRun(AsyncOp* op, std::error_code ec, Handler&& handler) { - if (op->canceled()) - return _completeOperation(op, - Status(ErrorCodes::CallbackCanceled, "Callback canceled")); - if (op->timedOut()) { + if (op->canceled()) { + auto rs = ResponseStatus( + ErrorCodes::CallbackCanceled, "Callback canceled", now() - op->start()); + return _completeOperation(op, rs); + } else if (op->timedOut()) { str::stream msg; msg << "Operation timed out" << ", request was " << op->_request.toString(); - return _completeOperation(op, Status(ErrorCodes::ExceededTimeLimit, msg)); - } - if (ec) + auto rs = ResponseStatus(ErrorCodes::ExceededTimeLimit, msg, now() - op->start()); + return _completeOperation(op, rs); + } else if (ec) return _networkErrorCallback(op, ec); handler(); @@ -459,7 +467,7 @@ private: void _beginCommunication(AsyncOp* op); void _completedOpCallback(AsyncOp* op); void _networkErrorCallback(AsyncOp* op, const std::error_code& ec); - void _completeOperation(AsyncOp* op, const TaskExecutor::ResponseStatus& resp); + void _completeOperation(AsyncOp* op, TaskExecutor::ResponseStatus resp); void _signalWorkAvailable_inlock(); diff --git a/src/mongo/executor/network_interface_asio_auth.cpp b/src/mongo/executor/network_interface_asio_auth.cpp index 9bd83daca84..5f5fe523f9c 100644 --- a/src/mongo/executor/network_interface_asio_auth.cpp +++ b/src/mongo/executor/network_interface_asio_auth.cpp @@ -85,12 +85,12 @@ void NetworkInterfaceASIO::_runIsMaster(AsyncOp* op) { // Callback to parse protocol information out of received ismaster response auto parseIsMaster = [this, op]() { - auto swCommandReply = op->command()->response(rpc::Protocol::kOpQuery, now()); + auto swCommandReply = op->command()->response(op, rpc::Protocol::kOpQuery, now()); if (!swCommandReply.isOK()) { - return _completeOperation(op, swCommandReply.getStatus()); + return _completeOperation(op, swCommandReply); } - auto commandReply = std::move(swCommandReply.getValue()); + auto commandReply = std::move(swCommandReply); // Ensure that the isMaster response is "ok:1". auto commandStatus = getStatusFromCommandResult(commandReply.data); @@ -178,7 +178,8 @@ void NetworkInterfaceASIO::_authenticate(AsyncOp* op) { } auto callAuthCompletionHandler = [this, op, handler]() { - auto authResponse = op->command()->response(op->operationProtocol(), now(), nullptr); + auto authResponse = + op->command()->response(op, op->operationProtocol(), now(), nullptr); handler(authResponse); }; diff --git a/src/mongo/executor/network_interface_asio_command.cpp b/src/mongo/executor/network_interface_asio_command.cpp index c0aa346fd6e..0ce9a1505de 100644 --- a/src/mongo/executor/network_interface_asio_command.cpp +++ b/src/mongo/executor/network_interface_asio_command.cpp @@ -142,15 +142,16 @@ ResponseStatus decodeRPC(Message* received, if (reply->getProtocol() != protocol) { auto requestProtocol = rpc::toString(static_cast<rpc::ProtocolSet>(protocol)); if (!requestProtocol.isOK()) - return requestProtocol.getStatus(); - - return Status(ErrorCodes::RPCProtocolNegotiationFailed, - str::stream() << "Mismatched RPC protocols - request was '" - << requestProtocol.getValue().toString() - << "' '" - << " but reply was '" - << networkOpToString(received->operation()) - << "'"); + return {requestProtocol.getStatus(), elapsed}; + + return {ErrorCodes::RPCProtocolNegotiationFailed, + str::stream() << "Mismatched RPC protocols - request was '" + << requestProtocol.getValue().toString() + << "' '" + << " but reply was '" + << networkOpToString(received->operation()) + << "'", + elapsed}; } auto commandReply = reply->getCommandReply(); auto replyMetadata = reply->getMetadata(); @@ -160,14 +161,14 @@ ResponseStatus decodeRPC(Message* received, auto listenStatus = callNoexcept( *metadataHook, &rpc::EgressMetadataHook::readReplyMetadata, source, replyMetadata); if (!listenStatus.isOK()) { - return listenStatus; + return {listenStatus, elapsed}; } } return {RemoteCommandResponse( std::move(*received), std::move(commandReply), std::move(replyMetadata), elapsed)}; } catch (...) { - return exceptionToStatus(); + return {exceptionToStatus(), elapsed}; } } @@ -198,13 +199,17 @@ MSGHEADER::Value& NetworkInterfaceASIO::AsyncCommand::header() { return _header; } -ResponseStatus NetworkInterfaceASIO::AsyncCommand::response(rpc::Protocol protocol, +ResponseStatus NetworkInterfaceASIO::AsyncCommand::response(AsyncOp* op, + rpc::Protocol protocol, Date_t now, rpc::EgressMetadataHook* metadataHook) { auto& received = _toRecv; switch (_type) { case CommandType::kRPC: { - return decodeRPC(&received, protocol, now - _start, _target, metadataHook); + auto rs = decodeRPC(&received, protocol, now - _start, _target, metadataHook); + if (rs.isOK()) + op->setResponseMetadata(rs.metadata); + return rs; } case CommandType::kDownConvertedFind: { auto ns = DbMessage(_toSend).getns(); @@ -257,30 +262,33 @@ void NetworkInterfaceASIO::_beginCommunication(AsyncOp* op) { } void NetworkInterfaceASIO::_completedOpCallback(AsyncOp* op) { - auto response = op->command()->response(op->operationProtocol(), now(), _metadataHook.get()); + auto response = + op->command()->response(op, op->operationProtocol(), now(), _metadataHook.get()); _completeOperation(op, response); } void NetworkInterfaceASIO::_networkErrorCallback(AsyncOp* op, const std::error_code& ec) { - if (ec.category() == mongoErrorCategory()) { - // If we get a Mongo error code, we can preserve it. - _completeOperation(op, Status(ErrorCodes::fromInt(ec.value()), ec.message())); - } else { - // If we get an asio or system error, we just convert it to a network error. - _completeOperation(op, Status(ErrorCodes::HostUnreachable, ec.message())); - } + ErrorCodes::Error errorCode = (ec.category() == mongoErrorCategory()) + ? ErrorCodes::fromInt(ec.value()) + : ErrorCodes::HostUnreachable; + _completeOperation(op, {errorCode, ec.message(), Milliseconds(now() - op->_start)}); } // NOTE: This method may only be called by ASIO threads // (do not call from methods entered by TaskExecutor threads) -void NetworkInterfaceASIO::_completeOperation(AsyncOp* op, const ResponseStatus& resp) { +void NetworkInterfaceASIO::_completeOperation(AsyncOp* op, ResponseStatus resp) { + auto metadata = op->getResponseMetadata(); + if (!metadata.isEmpty()) { + resp.metadata = metadata; + } + // Cancel this operation's timeout. Note that the timeout callback may already be running, // may have run, or may have already been scheduled to run in the near future. if (op->_timeoutAlarm) { op->_timeoutAlarm->cancel(); } - if (resp.getStatus().code() == ErrorCodes::ExceededTimeLimit) { + if (resp.status.code() == ErrorCodes::ExceededTimeLimit) { _numTimedOutOps.fetchAndAdd(1); } @@ -289,7 +297,7 @@ void NetworkInterfaceASIO::_completeOperation(AsyncOp* op, const ResponseStatus& MONGO_ASIO_INVARIANT(!resp.isOK(), "Failed to connect in setup", op); // If we fail during connection, we won't be able to access any of op's members after // calling finish(), so we return here. - log() << "Failed to connect to " << op->request().target << " - " << resp.getStatus(); + log() << "Failed to connect to " << op->request().target << " - " << resp.status; _numFailedOps.fetchAndAdd(1); op->finish(resp); return; @@ -301,7 +309,7 @@ void NetworkInterfaceASIO::_completeOperation(AsyncOp* op, const ResponseStatus& // If we fail during heartbeating, we won't be able to access any of op's members after // calling finish(), so we return here. log() << "Failed asio heartbeat to " << op->request().target << " - " - << redact(resp.getStatus()); + << redact(resp.status); _numFailedOps.fetchAndAdd(1); op->finish(resp); return; @@ -312,9 +320,9 @@ void NetworkInterfaceASIO::_completeOperation(AsyncOp* op, const ResponseStatus& // that // we got from the pool to execute a command, but it failed for some reason. LOG(2) << "Failed to execute command: " << redact(op->request().toString()) - << " reason: " << redact(resp.getStatus()); + << " reason: " << redact(resp.status); - if (resp.getStatus().code() != ErrorCodes::CallbackCanceled) { + if (resp.status.code() != ErrorCodes::CallbackCanceled) { _numFailedOps.fetchAndAdd(1); } } else { @@ -357,7 +365,7 @@ void NetworkInterfaceASIO::_completeOperation(AsyncOp* op, const ResponseStatus& asioConn->bindAsyncOp(std::move(ownedOp)); if (!resp.isOK()) { - asioConn->indicateFailure(resp.getStatus()); + asioConn->indicateFailure(resp.status); } else { asioConn->indicateUsed(); asioConn->indicateSuccess(); @@ -451,16 +459,14 @@ void NetworkInterfaceASIO::_runConnectionHook(AsyncOp* op) { auto finishHook = [this, op]() { auto response = - op->command()->response(op->operationProtocol(), now(), _metadataHook.get()); + op->command()->response(op, op->operationProtocol(), now(), _metadataHook.get()); if (!response.isOK()) { - return _completeOperation(op, response.getStatus()); + return _completeOperation(op, response); } - auto handleStatus = callNoexcept(*_hook, - &NetworkConnectionHook::handleReply, - op->request().target, - std::move(response.getValue())); + auto handleStatus = callNoexcept( + *_hook, &NetworkConnectionHook::handleReply, op->request().target, std::move(response)); if (!handleStatus.isOK()) { return _completeOperation(op, handleStatus); diff --git a/src/mongo/executor/network_interface_asio_integration_test.cpp b/src/mongo/executor/network_interface_asio_integration_test.cpp index 8b39dd5c992..b9d2ebb375e 100644 --- a/src/mongo/executor/network_interface_asio_integration_test.cpp +++ b/src/mongo/executor/network_interface_asio_integration_test.cpp @@ -87,23 +87,22 @@ public: return _rng; } - Deferred<StatusWith<RemoteCommandResponse>> runCommand( - const TaskExecutor::CallbackHandle& cbHandle, RemoteCommandRequest& request) { - Deferred<StatusWith<RemoteCommandResponse>> deferred; - net().startCommand( - cbHandle, request, [deferred](StatusWith<RemoteCommandResponse> resp) mutable { - deferred.emplace(std::move(resp)); - }); + Deferred<RemoteCommandResponse> runCommand(const TaskExecutor::CallbackHandle& cbHandle, + RemoteCommandRequest& request) { + Deferred<RemoteCommandResponse> deferred; + net().startCommand(cbHandle, request, [deferred](RemoteCommandResponse resp) mutable { + deferred.emplace(std::move(resp)); + }); return deferred; } - StatusWith<RemoteCommandResponse> runCommandSync(RemoteCommandRequest& request) { + RemoteCommandResponse runCommandSync(RemoteCommandRequest& request) { auto deferred = runCommand(makeCallbackHandle(), request); auto& res = deferred.get(); if (res.isOK()) { - log() << "got command result: " << res.getValue().toString(); + log() << "got command result: " << res.toString(); } else { - log() << "command failed: " << res.getStatus(); + log() << "command failed: " << res.status; } return res; } @@ -113,7 +112,8 @@ public: Milliseconds timeoutMillis = Milliseconds(-1)) { RemoteCommandRequest request{ fixture().getServers()[0], db.toString(), cmd, BSONObj(), nullptr, timeoutMillis}; - auto res = unittest::assertGet(runCommandSync(request)); + auto res = runCommandSync(request); + ASSERT_OK(res.status); ASSERT_OK(getStatusFromCommandResult(res.data)); } @@ -123,8 +123,8 @@ public: ErrorCodes::Error reason) { RemoteCommandRequest request{ fixture().getServers()[0], db.toString(), cmd, BSONObj(), nullptr, timeoutMillis}; - auto clientStatus = runCommandSync(request); - ASSERT_TRUE(clientStatus == reason); + auto res = runCommandSync(request); + ASSERT_EQ(reason, res.status.code()); } void assertCommandFailsOnServer(StringData db, @@ -133,9 +133,10 @@ public: ErrorCodes::Error reason) { RemoteCommandRequest request{ fixture().getServers()[0], db.toString(), cmd, BSONObj(), nullptr, timeoutMillis}; - auto res = unittest::assertGet(runCommandSync(request)); + auto res = runCommandSync(request); + ASSERT_OK(res.status); auto serverStatus = getStatusFromCommandResult(res.data); - ASSERT_TRUE(serverStatus == reason); + ASSERT_EQ(reason, serverStatus); } private: @@ -185,10 +186,10 @@ public: nullptr, timeout}; auto out = fixture->runCommand(cb, request) - .then(pool, [self](StatusWith<RemoteCommandResponse> resp) -> Status { + .then(pool, [self](RemoteCommandResponse resp) -> Status { auto status = resp.isOK() - ? getStatusFromCommandResult(resp.getValue().data) - : resp.getStatus(); + ? getStatusFromCommandResult(resp.data) + : resp.status; return status == self._expected ? Status::OK() diff --git a/src/mongo/executor/network_interface_asio_operation.cpp b/src/mongo/executor/network_interface_asio_operation.cpp index a05dba95fc0..a359099e004 100644 --- a/src/mongo/executor/network_interface_asio_operation.cpp +++ b/src/mongo/executor/network_interface_asio_operation.cpp @@ -217,16 +217,15 @@ NetworkInterfaceASIO::AsyncCommand* NetworkInterfaceASIO::AsyncOp::command() { return _command.get_ptr(); } -void NetworkInterfaceASIO::AsyncOp::finish(const ResponseStatus& status) { +void NetworkInterfaceASIO::AsyncOp::finish(const ResponseStatus& rs) { // We never hold the access lock when we call finish from NetworkInterfaceASIO. _transitionToState(AsyncOp::State::kFinished); - LOG(2) << "Request " << _request.id << " finished with response: " - << redact(status.getStatus().isOK() ? status.getValue().data.toString() - : status.getStatus().toString()); + LOG(2) << "Request " << _request.id + << " finished with response: " << redact(rs.isOK() ? rs.data.toString() : rs.status.toString()); // Calling the completion handler may invalidate state in this op, so do it last. - _onFinish(status); + _onFinish(rs); } const RemoteCommandRequest& NetworkInterfaceASIO::AsyncOp::request() const { @@ -253,6 +252,14 @@ void NetworkInterfaceASIO::AsyncOp::setOperationProtocol(rpc::Protocol proto) { _operationProtocol = proto; } +void NetworkInterfaceASIO::AsyncOp::setResponseMetadata(BSONObj m) { + _responseMetadata = m; +} + +BSONObj NetworkInterfaceASIO::AsyncOp::getResponseMetadata() { + return _responseMetadata; +} + void NetworkInterfaceASIO::AsyncOp::reset() { // We don't reset owner as it never changes _cbHandle = {}; diff --git a/src/mongo/executor/network_interface_asio_test.cpp b/src/mongo/executor/network_interface_asio_test.cpp index 1b52f951017..1c2f3a6dee3 100644 --- a/src/mongo/executor/network_interface_asio_test.cpp +++ b/src/mongo/executor/network_interface_asio_test.cpp @@ -52,6 +52,8 @@ namespace mongo { namespace executor { namespace { +using ResponseStatus = TaskExecutor::ResponseStatus; + HostAndPort testHost{"localhost", 20000}; void initWireSpecMongoD() { @@ -101,20 +103,18 @@ public: } } - Deferred<StatusWith<RemoteCommandResponse>> startCommand( + Deferred<RemoteCommandResponse> startCommand( const TaskExecutor::CallbackHandle& cbHandle, RemoteCommandRequest& request) { - Deferred<StatusWith<RemoteCommandResponse>> deferredResponse; + Deferred<RemoteCommandResponse> deferredResponse; ASSERT_OK(net().startCommand( - cbHandle, - request, - [deferredResponse](StatusWith<RemoteCommandResponse> response) mutable { + cbHandle, request, [deferredResponse](ResponseStatus response) mutable { deferredResponse.emplace(std::move(response)); })); return deferredResponse; } // Helper to run startCommand and wait for it - StatusWith<RemoteCommandResponse> startCommandSync(RemoteCommandRequest& request) { + RemoteCommandResponse startCommandSync(RemoteCommandRequest& request) { auto deferred = startCommand(makeCallbackHandle(), request); // wait for the operation to complete @@ -180,7 +180,8 @@ TEST_F(NetworkInterfaceASIOTest, CancelOperation) { // Wait for op to complete, assert that it was canceled. auto& result = deferred.get(); - ASSERT(result == ErrorCodes::CallbackCanceled); + ASSERT(result.status == ErrorCodes::CallbackCanceled); + ASSERT(result.elapsedMillis); assertNumOps(1u, 0u, 0u, 0u); } @@ -202,7 +203,8 @@ TEST_F(NetworkInterfaceASIOTest, ImmediateCancel) { }); auto& result = deferred.get(); - ASSERT(result == ErrorCodes::CallbackCanceled); + ASSERT(result.status == ErrorCodes::CallbackCanceled); + ASSERT(result.elapsedMillis); // expect 0 completed ops because the op was canceled before getting a connection assertNumOps(1u, 0u, 0u, 0u); } @@ -230,9 +232,11 @@ TEST_F(NetworkInterfaceASIOTest, LateCancel) { }); // Allow to complete, then cancel, nothing should happen. - deferred.get(); + auto& result = deferred.get(); net().cancelCommand(cbh); + ASSERT(result.isOK()); + ASSERT(result.elapsedMillis); assertNumOps(0u, 0u, 0u, 1u); } @@ -262,7 +266,8 @@ TEST_F(NetworkInterfaceASIOTest, CancelWithNetworkError) { // Wait for op to complete, assert that cancellation error had precedence. auto& result = deferred.get(); - ASSERT(result == ErrorCodes::CallbackCanceled); + ASSERT(result.status == ErrorCodes::CallbackCanceled); + ASSERT(result.elapsedMillis); assertNumOps(1u, 0u, 0u, 0u); } @@ -290,7 +295,8 @@ TEST_F(NetworkInterfaceASIOTest, CancelWithTimeout) { // Wait for op to complete, assert that cancellation error had precedence. auto& result = deferred.get(); - ASSERT(result == ErrorCodes::CallbackCanceled); + ASSERT(result.status == ErrorCodes::CallbackCanceled); + ASSERT(result.elapsedMillis); assertNumOps(1u, 0u, 0u, 0u); } @@ -319,7 +325,8 @@ TEST_F(NetworkInterfaceASIOTest, TimeoutWithNetworkError) { // Wait for op to complete, assert that timeout had precedence. auto& result = deferred.get(); - ASSERT(result == ErrorCodes::ExceededTimeLimit); + ASSERT(result.status == ErrorCodes::ExceededTimeLimit); + ASSERT(result.elapsedMillis); assertNumOps(0u, 1u, 1u, 0u); } @@ -349,7 +356,8 @@ TEST_F(NetworkInterfaceASIOTest, CancelWithTimeoutAndNetworkError) { // Wait for op to complete, assert that the cancellation had precedence. auto& result = deferred.get(); - ASSERT(result == ErrorCodes::CallbackCanceled); + ASSERT(result.status == ErrorCodes::CallbackCanceled); + ASSERT(result.elapsedMillis); assertNumOps(1u, 0u, 0u, 0u); } @@ -386,7 +394,8 @@ TEST_F(NetworkInterfaceASIOTest, AsyncOpTimeout) { } auto& result = deferred.get(); - ASSERT(result == ErrorCodes::ExceededTimeLimit); + ASSERT(result.status == ErrorCodes::ExceededTimeLimit); + ASSERT(result.elapsedMillis); assertNumOps(0u, 1u, 1u, 0u); } @@ -425,10 +434,10 @@ TEST_F(NetworkInterfaceASIOTest, StartCommand) { }); auto& res = deferred.get(); - - auto response = uassertStatusOK(res); - ASSERT_EQ(response.data, expectedCommandReply); - ASSERT_EQ(response.metadata, expectedMetadata); + ASSERT(res.elapsedMillis); + uassertStatusOK(res.status); + ASSERT_EQ(res.data, expectedCommandReply); + ASSERT_EQ(res.metadata, expectedMetadata); assertNumOps(0u, 0u, 0u, 1u); } @@ -442,7 +451,7 @@ TEST_F(NetworkInterfaceASIOTest, StartCommandReturnsNotOKIfShutdownHasStarted) { net().shutdown(); RemoteCommandRequest request; ASSERT_NOT_OK(net().startCommand( - makeCallbackHandle(), request, [&](StatusWith<RemoteCommandResponse> resp) {})); + makeCallbackHandle(), request, [&](RemoteCommandResponse resp) {})); } class MalformedMessageTest : public NetworkInterfaceASIOTest { @@ -502,7 +511,8 @@ public: } auto& response = deferred.get(); - ASSERT(response == code); + ASSERT(response.status == code); + ASSERT(response.elapsedMillis); assertNumOps(0u, 0u, 1u, 0u); } }; @@ -614,8 +624,8 @@ TEST_F(NetworkInterfaceASIOConnectionHookTest, InvalidIsMaster) { // we should stop here. auto& res = deferred.get(); - - ASSERT(res == validationFailedStatus); + ASSERT(res.status == validationFailedStatus); + ASSERT(res.elapsedMillis); assertNumOps(0u, 0u, 1u, 0u); } @@ -671,9 +681,8 @@ TEST_F(NetworkInterfaceASIOConnectionHookTest, ValidateHostInvalid) { // we should stop here. auto& res = deferred.get(); - - // auto result = uassertStatusOK(res); - ASSERT(res == validationFailedStatus); + ASSERT(res.status == validationFailedStatus); + ASSERT(res.elapsedMillis); ASSERT(validateCalled); ASSERT(hostCorrect); ASSERT(isMasterReplyCorrect); @@ -721,7 +730,8 @@ TEST_F(NetworkInterfaceASIOConnectionHookTest, MakeRequestReturnsError) { // We should stop here. auto& res = deferred.get(); - ASSERT(res == makeRequestError); + ASSERT(res.status == makeRequestError); + ASSERT(res.elapsedMillis); ASSERT(makeRequestCalled); ASSERT(!handleReplyCalled); assertNumOps(0u, 0u, 1u, 0u); @@ -782,8 +792,9 @@ TEST_F(NetworkInterfaceASIOConnectionHookTest, MakeRequestReturnsNone) { auto& result = deferred.get(); ASSERT(result.isOK()); - ASSERT(result.getValue().data == commandReply); - ASSERT(result.getValue().metadata == metadata); + ASSERT(result.data == commandReply); + ASSERT(result.elapsedMillis); + ASSERT(result.metadata == metadata); assertNumOps(0u, 0u, 0u, 1u); } @@ -849,8 +860,8 @@ TEST_F(NetworkInterfaceASIOConnectionHookTest, HandleReplyReturnsError) { }); auto& result = deferred.get(); - - ASSERT(result == handleReplyError); + ASSERT(result.status == handleReplyError); + ASSERT(result.elapsedMillis); ASSERT(makeRequestCalled); ASSERT(handleReplyCalled); ASSERT(handleReplyArgumentCorrect); @@ -954,7 +965,8 @@ TEST_F(NetworkInterfaceASIOMetadataTest, Metadata) { return response; }); - deferred.get(); + auto& res = deferred.get(); + ASSERT(res.elapsedMillis); ASSERT(wroteRequestMetadata); ASSERT(gotReplyMetadata); assertNumOps(0u, 0u, 0u, 1u); diff --git a/src/mongo/executor/network_interface_mock.cpp b/src/mongo/executor/network_interface_mock.cpp index 2ebd1244e47..2c84a34071a 100644 --- a/src/mongo/executor/network_interface_mock.cpp +++ b/src/mongo/executor/network_interface_mock.cpp @@ -149,8 +149,8 @@ void NetworkInterfaceMock::cancelCommand(const CallbackHandle& cbHandle) { invariant(!inShutdown()); stdx::lock_guard<stdx::mutex> lk(_mutex); - ResponseStatus response(ErrorCodes::CallbackCanceled, "Network operation canceled"); - _cancelCommand_inlock(cbHandle, response); + ResponseStatus rs(ErrorCodes::CallbackCanceled, "Network operation canceled", Milliseconds(0)); + _cancelCommand_inlock(cbHandle, rs); } @@ -216,7 +216,7 @@ void NetworkInterfaceMock::shutdown() { lk.unlock(); for (NetworkOperationIterator iter = todo.begin(); iter != todo.end(); ++iter) { iter->setResponse( - now, ResponseStatus(ErrorCodes::ShutdownInProgress, "Shutting down mock network")); + now, {ErrorCodes::ShutdownInProgress, "Shutting down mock network", Milliseconds(0)}); iter->finishResponse(); } lk.lock(); @@ -298,7 +298,7 @@ void NetworkInterfaceMock::scheduleResponse(NetworkOperationIterator noi, // If no RemoteCommandResponse was returned (for example, on a simulated network error), then // do not attempt to run the metadata hook, since there is no returned metadata. if (_metadataHook && response.isOK()) { - _metadataHook->readReplyMetadata(noi->getRequest().target, response.getValue().metadata); + _metadataHook->readReplyMetadata(noi->getRequest().target, response.metadata); } noi->setResponse(when, response); @@ -307,7 +307,7 @@ void NetworkInterfaceMock::scheduleResponse(NetworkOperationIterator noi, RemoteCommandRequest NetworkInterfaceMock::scheduleSuccessfulResponse(const BSONObj& response) { BSONObj metadata; - return scheduleSuccessfulResponse(RemoteCommandResponse(response, metadata)); + return scheduleSuccessfulResponse(RemoteCommandResponse(response, metadata, Milliseconds(0))); } RemoteCommandRequest NetworkInterfaceMock::scheduleSuccessfulResponse( @@ -330,6 +330,12 @@ RemoteCommandRequest NetworkInterfaceMock::scheduleErrorResponse(const Status& r return scheduleErrorResponse(getNextReadyRequest(), response); } +RemoteCommandRequest NetworkInterfaceMock::scheduleErrorResponse(const ResponseStatus response) { + auto noi = getNextReadyRequest(); + scheduleResponse(noi, now(), response); + return noi->getRequest(); +} + RemoteCommandRequest NetworkInterfaceMock::scheduleErrorResponse(NetworkOperationIterator noi, const Status& response) { return scheduleErrorResponse(noi, now(), response); @@ -426,9 +432,9 @@ void NetworkInterfaceMock::_enqueueOperation_inlock( if (op.getRequest().timeout != RemoteCommandRequest::kNoTimeout) { invariant(op.getRequest().timeout >= Milliseconds(0)); - ResponseStatus response(ErrorCodes::NetworkTimeout, "Network timeout"); + ResponseStatus rs(ErrorCodes::NetworkTimeout, "Network timeout", Milliseconds(0)); auto action = stdx::bind( - &NetworkInterfaceMock::_cancelCommand_inlock, this, op.getCallbackHandle(), response); + &NetworkInterfaceMock::_cancelCommand_inlock, this, op.getCallbackHandle(), rs); _alarms.emplace(_now_inlock() + op.getRequest().timeout, action); } } @@ -470,17 +476,15 @@ void NetworkInterfaceMock::_connectThenEnqueueOperation_inlock(const HostAndPort } // The completion handler for the postconnect command schedules the original command. - auto postconnectCompletionHandler = [this, - op](StatusWith<RemoteCommandResponse> response) mutable { + auto postconnectCompletionHandler = [this, op](ResponseStatus rs) mutable { stdx::lock_guard<stdx::mutex> lk(_mutex); - if (!response.isOK()) { - op.setResponse(_now_inlock(), response.getStatus()); + if (!rs.isOK()) { + op.setResponse(_now_inlock(), rs); op.finishResponse(); return; } - auto handleStatus = - _hook->handleReply(op.getRequest().target, std::move(response.getValue())); + auto handleStatus = _hook->handleReply(op.getRequest().target, std::move(rs)); if (!handleStatus.isOK()) { op.setResponse(_now_inlock(), handleStatus); @@ -585,8 +589,8 @@ bool NetworkInterfaceMock::_isExecutorThreadRunnable_inlock() { return _waitingToRunMask & kExecutorThread; } -static const StatusWith<RemoteCommandResponse> kUnsetResponse( - ErrorCodes::InternalError, "NetworkOperation::_response never set"); +static const ResponseStatus kUnsetResponse(ErrorCodes::InternalError, + "NetworkOperation::_response never set"); NetworkInterfaceMock::NetworkOperation::NetworkOperation() : _requestDate(), @@ -612,9 +616,8 @@ NetworkInterfaceMock::NetworkOperation::~NetworkOperation() {} std::string NetworkInterfaceMock::NetworkOperation::getDiagnosticString() const { return str::stream() << "NetworkOperation -- request:'" << _request.toString() - << "', responseStatus: '" << _response.getStatus().toString() - << "', responseBody: '" - << (_response.getStatus().isOK() ? _response.getValue().toString() : "") + << "', responseStatus: '" << _response.status.toString() + << "', responseBody: '" << (_response.isOK() ? _response.toString() : "") << "', reqDate: " << _requestDate.toString() << ", nextConsiderDate: " << _nextConsiderationDate.toString() << ", respDate: " << _responseDate.toString(); diff --git a/src/mongo/executor/network_interface_mock.h b/src/mongo/executor/network_interface_mock.h index df972ca17f6..de4a224ac91 100644 --- a/src/mongo/executor/network_interface_mock.h +++ b/src/mongo/executor/network_interface_mock.h @@ -49,6 +49,7 @@ class BSONObj; namespace executor { +using ResponseStatus = TaskExecutor::ResponseStatus; class NetworkConnectionHook; /** @@ -173,7 +174,7 @@ public: */ void scheduleResponse(NetworkOperationIterator noi, Date_t when, - const TaskExecutor::ResponseStatus& response); + const ResponseStatus& response); /** * Schedules a successful "response" to "noi" at virtual time "when". @@ -195,6 +196,7 @@ public: * "when" defaults to now(). */ RemoteCommandRequest scheduleErrorResponse(const Status& response); + RemoteCommandRequest scheduleErrorResponse(const ResponseStatus response); RemoteCommandRequest scheduleErrorResponse(NetworkOperationIterator noi, const Status& response); RemoteCommandRequest scheduleErrorResponse(NetworkOperationIterator noi, @@ -247,7 +249,7 @@ public: * Cancel a command with specified response, e.g. NetworkTimeout or CallbackCanceled errors. */ void _cancelCommand_inlock(const TaskExecutor::CallbackHandle& cbHandle, - const TaskExecutor::ResponseStatus& response); + const ResponseStatus& response); private: /** @@ -412,7 +414,7 @@ public: /** * Sets the response and thet virtual time at which it will be delivered. */ - void setResponse(Date_t responseDate, const TaskExecutor::ResponseStatus& response); + void setResponse(Date_t responseDate, const ResponseStatus& response); /** * Predicate that returns true if cbHandle equals the executor's handle for this network @@ -472,7 +474,7 @@ private: Date_t _responseDate; TaskExecutor::CallbackHandle _cbHandle; RemoteCommandRequest _request; - TaskExecutor::ResponseStatus _response; + ResponseStatus _response; RemoteCommandCompletionFn _onFinish; }; diff --git a/src/mongo/executor/network_interface_mock_test.cpp b/src/mongo/executor/network_interface_mock_test.cpp index 74d5701a55f..e6907dc6769 100644 --- a/src/mongo/executor/network_interface_mock_test.cpp +++ b/src/mongo/executor/network_interface_mock_test.cpp @@ -166,14 +166,12 @@ TEST_F(NetworkInterfaceMockTest, ConnectionHook) { BSONObj(), Milliseconds(0)}; - ASSERT_OK( - net().startCommand(cb, actualCommandExpected, [&](StatusWith<RemoteCommandResponse> resp) { - commandFinished = true; - if (resp.isOK()) { - gotCorrectCommandReply = - (actualResponseExpected.toString() == resp.getValue().toString()); - } - })); + ASSERT_OK(net().startCommand(cb, actualCommandExpected, [&](RemoteCommandResponse resp) { + commandFinished = true; + if (resp.isOK()) { + gotCorrectCommandReply = (actualResponseExpected.toString() == resp.toString()); + } + })); // At this point validate and makeRequest should have been called. ASSERT(validateCalled); @@ -239,10 +237,10 @@ TEST_F(NetworkInterfaceMockTest, ConnectionHookFailedValidation) { bool statusPropagated = false; RemoteCommandRequest request; - ASSERT_OK(net().startCommand(cb, request, [&](StatusWith<RemoteCommandResponse> resp) { + ASSERT_OK(net().startCommand(cb, request, [&](RemoteCommandResponse resp) { commandFinished = true; - statusPropagated = resp.getStatus().code() == ErrorCodes::ConflictingOperationInProgress; + statusPropagated = resp.status.code() == ErrorCodes::ConflictingOperationInProgress; })); { @@ -280,7 +278,7 @@ TEST_F(NetworkInterfaceMockTest, ConnectionHookNoRequest) { RemoteCommandRequest request; ASSERT_OK(net().startCommand( - cb, request, [&](StatusWith<RemoteCommandResponse> resp) { commandFinished = true; })); + cb, request, [&](RemoteCommandResponse resp) { commandFinished = true; })); { net().enterNetwork(); @@ -316,9 +314,9 @@ TEST_F(NetworkInterfaceMockTest, ConnectionHookMakeRequestFails) { bool errorPropagated = false; RemoteCommandRequest request; - ASSERT_OK(net().startCommand(cb, request, [&](StatusWith<RemoteCommandResponse> resp) { + ASSERT_OK(net().startCommand(cb, request, [&](RemoteCommandResponse resp) { commandFinished = true; - errorPropagated = resp.getStatus().code() == ErrorCodes::InvalidSyncSource; + errorPropagated = resp.status.code() == ErrorCodes::InvalidSyncSource; })); { @@ -353,9 +351,9 @@ TEST_F(NetworkInterfaceMockTest, ConnectionHookHandleReplyFails) { bool errorPropagated = false; RemoteCommandRequest request; - ASSERT_OK(net().startCommand(cb, request, [&](StatusWith<RemoteCommandResponse> resp) { + ASSERT_OK(net().startCommand(cb, request, [&](RemoteCommandResponse resp) { commandFinished = true; - errorPropagated = resp.getStatus().code() == ErrorCodes::CappedPositionLost; + errorPropagated = resp.status.code() == ErrorCodes::CappedPositionLost; })); ASSERT(!handleReplyCalled); @@ -387,7 +385,7 @@ TEST_F(NetworkInterfaceMockTest, StartCommandReturnsNotOKIfShutdownHasStarted) { TaskExecutor::CallbackHandle cb{}; RemoteCommandRequest request; - ASSERT_NOT_OK(net().startCommand(cb, request, [](StatusWith<RemoteCommandResponse> resp) {})); + ASSERT_NOT_OK(net().startCommand(cb, request, [](RemoteCommandResponse resp) {})); } TEST_F(NetworkInterfaceMockTest, SetAlarmReturnsNotOKIfShutdownHasStarted) { @@ -404,9 +402,7 @@ TEST_F(NetworkInterfaceMockTest, CommandTimeout) { request.timeout = Milliseconds(2000); ErrorCodes::Error statusPropagated = ErrorCodes::OK; - auto finishFn = [&](StatusWith<RemoteCommandResponse> resp) { - statusPropagated = resp.getStatus().code(); - }; + auto finishFn = [&](RemoteCommandResponse resp) { statusPropagated = resp.status.code(); }; // // Command times out. @@ -435,8 +431,7 @@ TEST_F(NetworkInterfaceMockTest, CommandTimeout) { ASSERT_EQUALS(start + Milliseconds(1000), net().now()); ASSERT_NOT_EQUALS(ErrorCodes::OK, statusPropagated); // Reply with a successful response. - StatusWith<RemoteCommandResponse> responseStatus(RemoteCommandResponse{}); - net().scheduleResponse(noi, net().now(), responseStatus); + net().scheduleResponse(noi, net().now(), {}); net().runReadyNetworkOperations(); net().exitNetwork(); ASSERT_EQUALS(ErrorCodes::OK, statusPropagated); diff --git a/src/mongo/executor/network_interface_perf_test.cpp b/src/mongo/executor/network_interface_perf_test.cpp index 8cbd529d91e..abb94cf7014 100644 --- a/src/mongo/executor/network_interface_perf_test.cpp +++ b/src/mongo/executor/network_interface_perf_test.cpp @@ -75,8 +75,8 @@ int timeNetworkTestMillis(std::size_t operations, NetworkInterface* net) { const auto bsonObjPing = BSON("ping" << 1); - const auto callback = [&](StatusWith<RemoteCommandResponse> resp) { - uassertStatusOK(resp); + const auto callback = [&](RemoteCommandResponse resp) { + uassertStatusOK(resp.status); if (--remainingOps) { return func(); } diff --git a/src/mongo/executor/network_test_env.cpp b/src/mongo/executor/network_test_env.cpp index fae46697d5e..2d5ec28ebc3 100644 --- a/src/mongo/executor/network_test_env.cpp +++ b/src/mongo/executor/network_test_env.cpp @@ -58,7 +58,8 @@ void NetworkTestEnv::onCommand(OnCommandFunction func) { _mockNetwork->scheduleResponse(noi, _mockNetwork->now(), response); } else { - _mockNetwork->scheduleResponse(noi, _mockNetwork->now(), resultStatus.getStatus()); + _mockNetwork->scheduleResponse( + noi, _mockNetwork->now(), {resultStatus.getStatus(), Milliseconds(0)}); } _mockNetwork->runReadyNetworkOperations(); @@ -72,18 +73,18 @@ void NetworkTestEnv::onCommandWithMetadata(OnCommandWithMetadataFunction func) { const RemoteCommandRequest& request = noi->getRequest(); const auto cmdResponseStatus = func(request); - const auto cmdResponse = cmdResponseStatus.getValue(); BSONObjBuilder result; if (cmdResponseStatus.isOK()) { - result.appendElements(cmdResponse.data); - Command::appendCommandStatus(result, cmdResponseStatus.getStatus()); - const RemoteCommandResponse response(result.obj(), cmdResponse.metadata, Milliseconds(1)); + result.appendElements(cmdResponseStatus.data); + Command::appendCommandStatus(result, cmdResponseStatus.status); + const RemoteCommandResponse response( + result.obj(), cmdResponseStatus.metadata, Milliseconds(1)); _mockNetwork->scheduleResponse(noi, _mockNetwork->now(), response); } else { - _mockNetwork->scheduleResponse(noi, _mockNetwork->now(), cmdResponseStatus.getStatus()); + _mockNetwork->scheduleResponse(noi, _mockNetwork->now(), cmdResponseStatus.status); } _mockNetwork->runReadyNetworkOperations(); @@ -113,30 +114,29 @@ void NetworkTestEnv::onFindCommand(OnFindCommandFunction func) { } void NetworkTestEnv::onFindWithMetadataCommand(OnFindCommandWithMetadataFunction func) { - onCommandWithMetadata( - [&func](const RemoteCommandRequest& request) -> StatusWith<RemoteCommandResponse> { - const auto& resultStatus = func(request); - - if (!resultStatus.isOK()) { - return resultStatus.getStatus(); - } - - std::vector<BSONObj> result; - BSONObj metadata; - std::tie(result, metadata) = resultStatus.getValue(); - - BSONArrayBuilder arr; - for (const auto& obj : result) { - arr.append(obj); - } - - const NamespaceString nss = - NamespaceString(request.dbname, request.cmdObj.firstElement().String()); - BSONObjBuilder resultBuilder; - appendCursorResponseObject(0LL, nss.toString(), arr.arr(), &resultBuilder); - - return RemoteCommandResponse(resultBuilder.obj(), metadata, Milliseconds(1)); - }); + onCommandWithMetadata([&func](const RemoteCommandRequest& request) -> RemoteCommandResponse { + const auto& resultStatus = func(request); + + if (!resultStatus.isOK()) { + return resultStatus.getStatus(); + } + + std::vector<BSONObj> result; + BSONObj metadata; + std::tie(result, metadata) = resultStatus.getValue(); + + BSONArrayBuilder arr; + for (const auto& obj : result) { + arr.append(obj); + } + + const NamespaceString nss = + NamespaceString(request.dbname, request.cmdObj.firstElement().String()); + BSONObjBuilder resultBuilder; + appendCursorResponseObject(0LL, nss.toString(), arr.arr(), &resultBuilder); + + return RemoteCommandResponse(resultBuilder.obj(), metadata, Milliseconds(1)); + }); } } // namespace executor diff --git a/src/mongo/executor/network_test_env.h b/src/mongo/executor/network_test_env.h index 7e09a18f0fb..4f662f5723e 100644 --- a/src/mongo/executor/network_test_env.h +++ b/src/mongo/executor/network_test_env.h @@ -128,7 +128,7 @@ public: using OnCommandFunction = stdx::function<StatusWith<BSONObj>(const RemoteCommandRequest&)>; using OnCommandWithMetadataFunction = - stdx::function<StatusWith<RemoteCommandResponse>(const RemoteCommandRequest&)>; + stdx::function<RemoteCommandResponse(const RemoteCommandRequest&)>; using OnFindCommandFunction = stdx::function<StatusWith<std::vector<BSONObj>>(const RemoteCommandRequest&)>; diff --git a/src/mongo/executor/remote_command_response.cpp b/src/mongo/executor/remote_command_response.cpp index ab67febd598..e611ea0bc2b 100644 --- a/src/mongo/executor/remote_command_response.cpp +++ b/src/mongo/executor/remote_command_response.cpp @@ -36,6 +36,49 @@ namespace mongo { namespace executor { +RemoteCommandResponse::RemoteCommandResponse(ErrorCodes::Error code, std::string reason) + : status(code, reason){}; + +RemoteCommandResponse::RemoteCommandResponse(ErrorCodes::Error code, + std::string reason, + Milliseconds millis) + : elapsedMillis(millis), status(code, reason) {} + +RemoteCommandResponse::RemoteCommandResponse(Status s) : status(std::move(s)) { + invariant(!isOK()); +}; + +RemoteCommandResponse::RemoteCommandResponse(Status s, Milliseconds millis) + : elapsedMillis(millis), status(std::move(s)) { + invariant(!isOK()); +}; + +RemoteCommandResponse::RemoteCommandResponse(BSONObj dataObj, + BSONObj metadataObj, + Milliseconds millis) + : data(std::move(dataObj)), metadata(std::move(metadataObj)), elapsedMillis(millis) { + // The buffer backing the default empty BSONObj has static duration so it is effectively + // owned. + invariant(data.isOwned() || data.objdata() == BSONObj().objdata()); + invariant(metadata.isOwned() || metadata.objdata() == BSONObj().objdata()); +}; + +RemoteCommandResponse::RemoteCommandResponse(Message messageArg, + BSONObj dataObj, + BSONObj metadataObj, + Milliseconds millis) + : message(std::make_shared<const Message>(std::move(messageArg))), + data(std::move(dataObj)), + metadata(std::move(metadataObj)), + elapsedMillis(millis) { + if (!data.isOwned()) { + data.shareOwnershipWith(message->sharedBuffer()); + } + if (!metadata.isOwned()) { + metadata.shareOwnershipWith(message->sharedBuffer()); + } +} + // TODO(amidvidy): we currently discard output docs when we use this constructor. We should // have RCR hold those too, but we need more machinery before that is possible. RemoteCommandResponse::RemoteCommandResponse(const rpc::ReplyInterface& rpcReply, @@ -43,6 +86,10 @@ RemoteCommandResponse::RemoteCommandResponse(const rpc::ReplyInterface& rpcReply : RemoteCommandResponse(rpcReply.getCommandReply(), rpcReply.getMetadata(), std::move(millis)) { } +bool RemoteCommandResponse::isOK() const { + return status.isOK(); +} + std::string RemoteCommandResponse::toString() const { return str::stream() << "RemoteResponse -- " << " cmd:" << data.toString(); diff --git a/src/mongo/executor/remote_command_response.h b/src/mongo/executor/remote_command_response.h index 3408fe01b6c..ebfc0434067 100644 --- a/src/mongo/executor/remote_command_response.h +++ b/src/mongo/executor/remote_command_response.h @@ -28,10 +28,12 @@ #pragma once +#include <boost/optional.hpp> #include <iosfwd> #include <memory> #include <string> +#include "mongo/base/status.h" #include "mongo/db/jsobj.h" #include "mongo/util/net/message.h" #include "mongo/util/time_support.h" @@ -51,35 +53,25 @@ namespace executor { struct RemoteCommandResponse { RemoteCommandResponse() = default; - RemoteCommandResponse(BSONObj dataObj, BSONObj metadataObj) - : RemoteCommandResponse(dataObj, metadataObj, Milliseconds(0)) {} + RemoteCommandResponse(ErrorCodes::Error code, std::string reason); - RemoteCommandResponse(BSONObj dataObj, BSONObj metadataObj, Milliseconds millis) - : data(std::move(dataObj)), metadata(std::move(metadataObj)), elapsedMillis(millis) { - // The buffer backing the default empty BSONObj has static duration so it is effectively - // owned. - invariant(data.isOwned() || data.objdata() == BSONObj().objdata()); - invariant(metadata.isOwned() || metadata.objdata() == BSONObj().objdata()); - } + RemoteCommandResponse(ErrorCodes::Error code, std::string reason, Milliseconds millis); + + RemoteCommandResponse(Status s); + + RemoteCommandResponse(Status s, Milliseconds millis); + + RemoteCommandResponse(BSONObj dataObj, BSONObj metadataObj, Milliseconds millis); RemoteCommandResponse(Message messageArg, BSONObj dataObj, BSONObj metadataObj, - Milliseconds millis) - : message(std::make_shared<const Message>(std::move(messageArg))), - data(std::move(dataObj)), - metadata(std::move(metadataObj)), - elapsedMillis(millis) { - if (!data.isOwned()) { - data.shareOwnershipWith(message->sharedBuffer()); - } - if (!metadata.isOwned()) { - metadata.shareOwnershipWith(message->sharedBuffer()); - } - } + Milliseconds millis); RemoteCommandResponse(const rpc::ReplyInterface& rpcReply, Milliseconds millis); + bool isOK() const; + std::string toString() const; bool operator==(const RemoteCommandResponse& rhs) const; @@ -88,7 +80,8 @@ struct RemoteCommandResponse { std::shared_ptr<const Message> message; // May be null. BSONObj data; // Always owned. May point into message. BSONObj metadata; // Always owned. May point into message. - Milliseconds elapsedMillis = {}; + boost::optional<Milliseconds> elapsedMillis; + Status status = Status::OK(); }; } // namespace executor diff --git a/src/mongo/executor/task_executor.h b/src/mongo/executor/task_executor.h index fd630e527b7..abf5d5d221a 100644 --- a/src/mongo/executor/task_executor.h +++ b/src/mongo/executor/task_executor.h @@ -82,7 +82,7 @@ public: class EventState; class EventHandle; - using ResponseStatus = StatusWith<RemoteCommandResponse>; + using ResponseStatus = RemoteCommandResponse; /** * Type of a regular callback function. @@ -404,12 +404,12 @@ struct TaskExecutor::RemoteCommandCallbackArgs { RemoteCommandCallbackArgs(TaskExecutor* theExecutor, const CallbackHandle& theHandle, const RemoteCommandRequest& theRequest, - const StatusWith<RemoteCommandResponse>& theResponse); + const ResponseStatus& theResponse); TaskExecutor* executor; CallbackHandle myHandle; RemoteCommandRequest request; - StatusWith<RemoteCommandResponse> response; + ResponseStatus response; }; } // namespace executor diff --git a/src/mongo/executor/task_executor_test_common.cpp b/src/mongo/executor/task_executor_test_common.cpp index 3212e15a376..57c12813250 100644 --- a/src/mongo/executor/task_executor_test_common.cpp +++ b/src/mongo/executor/task_executor_test_common.cpp @@ -367,7 +367,7 @@ static void setStatusOnRemoteCommandCompletion( << getRequestDescription(expectedRequest)); return; } - *outStatus = cbData.response.getStatus(); + *outStatus = cbData.response.status; } COMMON_EXECUTOR_TEST(ScheduleRemoteCommand) { @@ -386,8 +386,7 @@ COMMON_EXECUTOR_TEST(ScheduleRemoteCommand) { net->enterNetwork(); ASSERT(net->hasReadyRequests()); NetworkInterfaceMock::NetworkOperationIterator noi = net->getNextReadyRequest(); - net->scheduleResponse( - noi, net->now(), TaskExecutor::ResponseStatus(ErrorCodes::NoSuchKey, "I'm missing")); + net->scheduleResponse(noi, net->now(), {ErrorCodes::NoSuchKey, "I'm missing"}); net->runReadyNetworkOperations(); ASSERT(!net->hasReadyRequests()); net->exitNetwork(); @@ -434,8 +433,7 @@ COMMON_EXECUTOR_TEST(RemoteCommandWithTimeout) { ASSERT(net->hasReadyRequests()); const Date_t startTime = net->now(); NetworkInterfaceMock::NetworkOperationIterator noi = net->getNextReadyRequest(); - net->scheduleResponse( - noi, startTime + Milliseconds(2), TaskExecutor::ResponseStatus(RemoteCommandResponse{})); + net->scheduleResponse(noi, startTime + Milliseconds(2), {}); net->runUntil(startTime + Milliseconds(2)); ASSERT_EQUALS(startTime + Milliseconds(2), net->now()); net->exitNetwork(); diff --git a/src/mongo/executor/thread_pool_task_executor.cpp b/src/mongo/executor/thread_pool_task_executor.cpp index 1f60031ca5d..26606d8f0ee 100644 --- a/src/mongo/executor/thread_pool_task_executor.cpp +++ b/src/mongo/executor/thread_pool_task_executor.cpp @@ -282,35 +282,29 @@ StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::scheduleWorkAt( namespace { +using ResponseStatus = TaskExecutor::ResponseStatus; + // If the request received a connection from the pool but failed in its execution, -// convert the raw Status in cbData to a StatusWith<RemoteCommandResponse> so that the callback, -// which expects a StatusWith<RemoteCommandResponse> as part of RemoteCommandCallbackArgs, +// convert the raw Status in cbData to a RemoteCommandResponse so that the callback, +// which expects a RemoteCommandResponse as part of RemoteCommandCallbackArgs, // can be run despite a RemoteCommandResponse never having been created. void remoteCommandFinished(const TaskExecutor::CallbackArgs& cbData, const TaskExecutor::RemoteCommandCallbackFn& cb, const RemoteCommandRequest& request, - const TaskExecutor::ResponseStatus& response) { - using ResponseStatus = TaskExecutor::ResponseStatus; - if (cbData.status.isOK()) { - cb(TaskExecutor::RemoteCommandCallbackArgs( - cbData.executor, cbData.myHandle, request, response)); - } else { - cb(TaskExecutor::RemoteCommandCallbackArgs( - cbData.executor, cbData.myHandle, request, ResponseStatus(cbData.status))); - } + const ResponseStatus& rs) { + cb(TaskExecutor::RemoteCommandCallbackArgs(cbData.executor, cbData.myHandle, request, rs)); } // If the request failed to receive a connection from the pool, -// convert the raw Status in cbData to a StatusWith<RemoteCommandResponse> so that the callback, -// which expects a StatusWith<RemoteCommandResponse> as part of RemoteCommandCallbackArgs, +// convert the raw Status in cbData to a RemoteCommandResponse so that the callback, +// which expects a RemoteCommandResponse as part of RemoteCommandCallbackArgs, // can be run despite a RemoteCommandResponse never having been created. void remoteCommandFailedEarly(const TaskExecutor::CallbackArgs& cbData, const TaskExecutor::RemoteCommandCallbackFn& cb, const RemoteCommandRequest& request) { - using ResponseStatus = TaskExecutor::ResponseStatus; invariant(!cbData.status.isOK()); cb(TaskExecutor::RemoteCommandCallbackArgs( - cbData.executor, cbData.myHandle, request, ResponseStatus(cbData.status))); + cbData.executor, cbData.myHandle, request, {cbData.status})); } } // namespace @@ -349,8 +343,7 @@ StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::scheduleRemoteC return; } LOG(3) << "Received remote response: " - << redact(response.isOK() ? response.getValue().toString() - : response.getStatus().toString()); + << redact(response.isOK() ? response.toString() : response.status.toString()); swap(cbState->callback, newCb); scheduleIntoPool_inlock(&_networkInProgressQueue, cbState->iter, std::move(lk)); }); diff --git a/src/mongo/s/balancer/migration_manager.cpp b/src/mongo/s/balancer/migration_manager.cpp index 473ba9ce439..8e1480cc1d3 100644 --- a/src/mongo/s/balancer/migration_manager.cpp +++ b/src/mongo/s/balancer/migration_manager.cpp @@ -203,9 +203,6 @@ void MigrationManager::_executeMigrations(OperationContext* txn, RemoteCommandRequest remoteRequest(host.getValue(), "admin", moveChunkRequestObj, txn); - StatusWith<RemoteCommandResponse> remoteCommandResponse( - Status{ErrorCodes::InternalError, "Uninitialized value"}); - executor::TaskExecutor* executor = Grid::get(txn)->getExecutorPool()->getFixedExecutor(); StatusWith<executor::TaskExecutor::CallbackHandle> callbackHandleWithStatus = @@ -259,18 +256,15 @@ void MigrationManager::_checkMigrationCallback( OperationContext* txn, Migration* migration, MigrationStatuses* migrationStatuses) { - const auto& remoteCommandResponseWithStatus = callbackArgs.response; + const auto& remoteCommandResponse = callbackArgs.response; - if (!remoteCommandResponseWithStatus.isOK()) { + if (!remoteCommandResponse.isOK()) { stdx::lock_guard<stdx::mutex> lk(_mutex); - migrationStatuses->insert( - MigrationStatuses::value_type(migration->chunkInfo.migrateInfo.getName(), - std::move(remoteCommandResponseWithStatus.getStatus()))); + migrationStatuses->insert(MigrationStatuses::value_type( + migration->chunkInfo.migrateInfo.getName(), std::move(remoteCommandResponse.status))); return; } - const auto& remoteCommandResponse = callbackArgs.response.getValue(); - Status commandStatus = getStatusFromCommandResult(remoteCommandResponse.data); if (commandStatus == ErrorCodes::LockBusy && !migration->oldShard) { diff --git a/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp b/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp index d8e507496a3..80bd6c81da8 100644 --- a/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp +++ b/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp @@ -268,7 +268,7 @@ StatusWith<Shard::CommandResponse> ShardingCatalogManagerImpl::_runCommandForAdd executor::RemoteCommandRequest request( host.getValue(), dbName, cmdObj, rpc::makeEmptyMetadata(), nullptr, Seconds(30)); - StatusWith<executor::RemoteCommandResponse> swResponse = + executor::RemoteCommandResponse swResponse = Status(ErrorCodes::InternalError, "Internal error running command"); auto callStatus = _executorForAddShard->scheduleRemoteCommand( @@ -283,14 +283,14 @@ StatusWith<Shard::CommandResponse> ShardingCatalogManagerImpl::_runCommandForAdd _executorForAddShard->wait(callStatus.getValue()); if (!swResponse.isOK()) { - if (swResponse.getStatus().compareCode(ErrorCodes::ExceededTimeLimit)) { - LOG(0) << "Operation for addShard timed out with status " << swResponse.getStatus(); + if (swResponse.status.compareCode(ErrorCodes::ExceededTimeLimit)) { + LOG(0) << "Operation for addShard timed out with status " << swResponse.status; } - return swResponse.getStatus(); + return swResponse.status; } - BSONObj responseObj = swResponse.getValue().data.getOwned(); - BSONObj responseMetadata = swResponse.getValue().metadata.getOwned(); + BSONObj responseObj = swResponse.data.getOwned(); + BSONObj responseMetadata = swResponse.metadata.getOwned(); Status commandStatus = getStatusFromCommandResult(responseObj); Status writeConcernStatus = getWriteConcernStatusFromCommandResult(responseObj); @@ -1247,7 +1247,7 @@ void ShardingCatalogManagerImpl::_handleAddShardTaskResponse( std::shared_ptr<RemoteCommandTargeter> targeter) { stdx::unique_lock<stdx::mutex> lk(_addShardHandlesMutex); - Status responseStatus = cbArgs.response.getStatus(); + Status responseStatus = cbArgs.response.status; if (responseStatus == ErrorCodes::CallbackCanceled) { return; } @@ -1266,12 +1266,12 @@ void ShardingCatalogManagerImpl::_handleAddShardTaskResponse( warning() << "Failed to upsert shardIdentity document during addShard into shard " << shardType.getName() << "(" << shardType.getHost() << "). The shardIdentity upsert will continue to be retried. " - << causedBy(swResponse.getStatus()); + << causedBy(swResponse.status); rescheduleTask = true; } else { // Create a CommandResponse object in order to use processBatchWriteResponse. - BSONObj responseObj = swResponse.getValue().data.getOwned(); - BSONObj responseMetadata = swResponse.getValue().metadata.getOwned(); + BSONObj responseObj = swResponse.data.getOwned(); + BSONObj responseMetadata = swResponse.metadata.getOwned(); Status commandStatus = getStatusFromCommandResult(responseObj); Status writeConcernStatus = getWriteConcernStatusFromCommandResult(responseObj); Shard::CommandResponse commandResponse(std::move(responseObj), diff --git a/src/mongo/s/client/shard_remote.cpp b/src/mongo/s/client/shard_remote.cpp index 098bbae7692..5fae69ea753 100644 --- a/src/mongo/s/client/shard_remote.cpp +++ b/src/mongo/s/client/shard_remote.cpp @@ -191,8 +191,7 @@ Shard::HostWithResponse ShardRemote::_runCommand(OperationContext* txn, txn, requestTimeout < Milliseconds::max() ? requestTimeout : RemoteCommandRequest::kNoTimeout); - StatusWith<RemoteCommandResponse> swResponse = - Status(ErrorCodes::InternalError, "Internal error running command"); + RemoteCommandResponse swResponse = Status(ErrorCodes::InternalError, "Internal error running command"); TaskExecutor* executor = Grid::get(txn)->getExecutorPool()->getFixedExecutor(); auto callStatus = executor->scheduleRemoteCommand( @@ -205,17 +204,17 @@ Shard::HostWithResponse ShardRemote::_runCommand(OperationContext* txn, // Block until the command is carried out executor->wait(callStatus.getValue()); - updateReplSetMonitor(host.getValue(), swResponse.getStatus()); + updateReplSetMonitor(host.getValue(), swResponse.status); if (!swResponse.isOK()) { - if (swResponse.getStatus().compareCode(ErrorCodes::ExceededTimeLimit)) { - LOG(0) << "Operation timed out with status " << redact(swResponse.getStatus()); + if (swResponse.status.compareCode(ErrorCodes::ExceededTimeLimit)) { + LOG(0) << "Operation timed out with status " << redact(swResponse.status); } - return Shard::HostWithResponse(host.getValue(), swResponse.getStatus()); + return Shard::HostWithResponse(host.getValue(), swResponse.status); } - BSONObj responseObj = swResponse.getValue().data.getOwned(); - BSONObj responseMetadata = swResponse.getValue().metadata.getOwned(); + BSONObj responseObj = swResponse.data.getOwned(); + BSONObj responseMetadata = swResponse.metadata.getOwned(); Status commandStatus = getStatusFromCommandResult(responseObj); Status writeConcernStatus = getWriteConcernStatusFromCommandResult(responseObj); diff --git a/src/mongo/s/query/async_results_merger.cpp b/src/mongo/s/query/async_results_merger.cpp index 31d9337b10f..85cc8454dd4 100644 --- a/src/mongo/s/query/async_results_merger.cpp +++ b/src/mongo/s/query/async_results_merger.cpp @@ -403,7 +403,7 @@ void AsyncResultsMerger::handleBatchResponse( // Make a best effort to parse the response and retrieve the cursor id. We need the cursor // id in order to issue a killCursors command against it. if (cbData.response.isOK()) { - auto cursorResponse = parseCursorResponse(cbData.response.getValue().data, remote); + auto cursorResponse = parseCursorResponse(cbData.response.data, remote); if (cursorResponse.isOK()) { remote.cursorId = cursorResponse.getValue().getCursorId(); } @@ -432,8 +432,8 @@ void AsyncResultsMerger::handleBatchResponse( ScopeGuard signaller = MakeGuard(&AsyncResultsMerger::signalCurrentEventIfReady_inlock, this); StatusWith<CursorResponse> cursorResponseStatus( - cbData.response.isOK() ? parseCursorResponse(cbData.response.getValue().data, remote) - : cbData.response.getStatus()); + cbData.response.isOK() ? parseCursorResponse(cbData.response.data, remote) + : cbData.response.status); if (!cursorResponseStatus.isOK()) { // In the case a read is performed against a view, the shard primary can return an error @@ -443,7 +443,7 @@ void AsyncResultsMerger::handleBatchResponse( // collection. if (cursorResponseStatus.getStatus() == ErrorCodes::CommandOnShardedViewNotSupportedOnMongod) { - auto& responseObj = cbData.response.getValue().data; + auto& responseObj = cbData.response.data; if (!responseObj.hasField("resolvedView")) { remote.status = Status(ErrorCodes::InternalError, str::stream() << "Missing field 'resolvedView' in document: " diff --git a/src/mongo/s/query/async_results_merger_test.cpp b/src/mongo/s/query/async_results_merger_test.cpp index a5179f16c68..e4d609cc88d 100644 --- a/src/mongo/s/query/async_results_merger_test.cpp +++ b/src/mongo/s/query/async_results_merger_test.cpp @@ -54,6 +54,8 @@ using executor::NetworkInterfaceMock; using executor::RemoteCommandRequest; using executor::RemoteCommandResponse; +using ResponseStatus = executor::TaskExecutor::ResponseStatus; + const HostAndPort kTestConfigShardHost = HostAndPort("FakeConfigHost", 12345); const std::vector<ShardId> kTestShardIds = { ShardId("FakeShard1"), ShardId("FakeShard2"), ShardId("FakeShard3")}; @@ -197,12 +199,13 @@ protected: return retRequest; } - void scheduleErrorResponse(Status status) { - invariant(!status.isOK()); + void scheduleErrorResponse(ResponseStatus rs) { + invariant(!rs.isOK()); + rs.elapsedMillis = Milliseconds(0); executor::NetworkInterfaceMock* net = network(); net->enterNetwork(); ASSERT_TRUE(net->hasReadyRequests()); - net->scheduleResponse(net->getNextReadyRequest(), net->now(), status); + net->scheduleResponse(net->getNextReadyRequest(), net->now(), rs); net->runReadyNetworkOperations(); net->exitNetwork(); } |