summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/mongo/client/authenticate.cpp9
-rw-r--r--src/mongo/client/authenticate.h2
-rw-r--r--src/mongo/client/authenticate_test.cpp2
-rw-r--r--src/mongo/client/dbclient.cpp13
-rw-r--r--src/mongo/client/fetcher.cpp12
-rw-r--r--src/mongo/client/fetcher_test.cpp36
-rw-r--r--src/mongo/client/remote_command_retry_scheduler.cpp2
-rw-r--r--src/mongo/client/remote_command_retry_scheduler_test.cpp61
-rw-r--r--src/mongo/client/remote_command_runner_impl.cpp219
-rw-r--r--src/mongo/client/sasl_client_authenticate_impl.cpp2
-rw-r--r--src/mongo/db/repl/base_cloner_test_fixture.cpp2
-rw-r--r--src/mongo/db/repl/check_quorum_for_config_change.cpp8
-rw-r--r--src/mongo/db/repl/check_quorum_for_config_change_test.cpp22
-rw-r--r--src/mongo/db/repl/databases_cloner.cpp6
-rw-r--r--src/mongo/db/repl/elect_cmd_runner.cpp4
-rw-r--r--src/mongo/db/repl/freshness_checker.cpp5
-rw-r--r--src/mongo/db/repl/freshness_scanner.cpp4
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp14
-rw-r--r--src/mongo/db/repl/replication_executor.cpp2
-rw-r--r--src/mongo/db/repl/replication_executor.h2
-rw-r--r--src/mongo/db/repl/reporter.cpp4
-rw-r--r--src/mongo/db/repl/reporter_test.cpp16
-rw-r--r--src/mongo/db/repl/rollback_checker.cpp12
-rw-r--r--src/mongo/db/repl/scatter_gather_runner.cpp2
-rw-r--r--src/mongo/db/repl/vote_requester.cpp11
-rw-r--r--src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp8
-rw-r--r--src/mongo/executor/connection_pool_asio.cpp8
-rw-r--r--src/mongo/executor/connection_pool_asio_integration_test.cpp36
-rw-r--r--src/mongo/executor/downconvert_find_and_getmore_commands.cpp16
-rw-r--r--src/mongo/executor/downconvert_find_and_getmore_commands.h12
-rw-r--r--src/mongo/executor/network_interface_asio.cpp15
-rw-r--r--src/mongo/executor/network_interface_asio.h26
-rw-r--r--src/mongo/executor/network_interface_asio_auth.cpp9
-rw-r--r--src/mongo/executor/network_interface_asio_command.cpp74
-rw-r--r--src/mongo/executor/network_interface_asio_integration_test.cpp37
-rw-r--r--src/mongo/executor/network_interface_asio_operation.cpp17
-rw-r--r--src/mongo/executor/network_interface_asio_test.cpp74
-rw-r--r--src/mongo/executor/network_interface_mock.cpp39
-rw-r--r--src/mongo/executor/network_interface_mock.h10
-rw-r--r--src/mongo/executor/network_interface_mock_test.cpp37
-rw-r--r--src/mongo/executor/network_interface_perf_test.cpp4
-rw-r--r--src/mongo/executor/network_test_env.cpp60
-rw-r--r--src/mongo/executor/network_test_env.h2
-rw-r--r--src/mongo/executor/remote_command_response.cpp47
-rw-r--r--src/mongo/executor/remote_command_response.h37
-rw-r--r--src/mongo/executor/task_executor.h6
-rw-r--r--src/mongo/executor/task_executor_test_common.cpp8
-rw-r--r--src/mongo/executor/thread_pool_task_executor.cpp27
-rw-r--r--src/mongo/s/balancer/migration_manager.cpp14
-rw-r--r--src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp20
-rw-r--r--src/mongo/s/client/shard_remote.cpp15
-rw-r--r--src/mongo/s/query/async_results_merger.cpp8
-rw-r--r--src/mongo/s/query/async_results_merger_test.cpp9
53 files changed, 708 insertions, 439 deletions
diff --git a/src/mongo/client/authenticate.cpp b/src/mongo/client/authenticate.cpp
index 224517e00be..7d45da9ef08 100644
--- a/src/mongo/client/authenticate.cpp
+++ b/src/mongo/client/authenticate.cpp
@@ -159,7 +159,7 @@ void authMongoCR(RunCommandHook runCommand, const BSONObj& params, AuthCompletio
// Ensure response was valid
std::string nonce;
- BSONObj nonceResponse = response.getValue().data;
+ BSONObj nonceResponse = response.data;
auto valid = bsonExtractStringField(nonceResponse, "nonce", &nonce);
if (!valid.isOK())
return handler({ErrorCodes::AuthenticationFailed,
@@ -235,7 +235,8 @@ void authX509(RunCommandHook runCommand,
//
bool isFailedAuthOk(const AuthResponse& response) {
- return (response == ErrorCodes::AuthenticationFailed && serverGlobalParams.transitionToAuth);
+ return (response.status == ErrorCodes::AuthenticationFailed &&
+ serverGlobalParams.transitionToAuth);
}
void auth(RunCommandHook runCommand,
@@ -305,9 +306,9 @@ void authenticateClient(const BSONObj& params,
// NOTE: this assumes that runCommand executes synchronously.
asyncAuth(runCommand, params, hostname, clientName, [](AuthResponse response) {
// DBClient expects us to throw in case of an auth error.
- uassertStatusOK(response);
+ uassertStatusOK(response.status);
- auto serverResponse = response.getValue().data;
+ auto serverResponse = response.data;
uassert(ErrorCodes::AuthenticationFailed,
serverResponse["errmsg"].str(),
isOk(serverResponse));
diff --git a/src/mongo/client/authenticate.h b/src/mongo/client/authenticate.h
index 65c649c49c2..6ad596477bf 100644
--- a/src/mongo/client/authenticate.h
+++ b/src/mongo/client/authenticate.h
@@ -43,7 +43,7 @@ class BSONObj;
namespace auth {
-using AuthResponse = StatusWith<executor::RemoteCommandResponse>;
+using AuthResponse = executor::RemoteCommandResponse;
using AuthCompletionHandler = stdx::function<void(AuthResponse)>;
using RunCommandResultHandler = AuthCompletionHandler;
using RunCommandHook =
diff --git a/src/mongo/client/authenticate_test.cpp b/src/mongo/client/authenticate_test.cpp
index 3044da9f6cd..97b876cf814 100644
--- a/src/mongo/client/authenticate_test.cpp
+++ b/src/mongo/client/authenticate_test.cpp
@@ -92,7 +92,7 @@ public:
// Then pop a response and call the handler
ASSERT(!_responses.empty());
- handler(StatusWith<RemoteCommandResponse>(_responses.front()));
+ handler(_responses.front());
_responses.pop();
}
diff --git a/src/mongo/client/dbclient.cpp b/src/mongo/client/dbclient.cpp
index 254a537c255..cbf72c4ae7d 100644
--- a/src/mongo/client/dbclient.cpp
+++ b/src/mongo/client/dbclient.cpp
@@ -445,8 +445,7 @@ void DBClientWithCommands::_auth(const BSONObj& params) {
Milliseconds millis(Date_t::now() - start);
// Hand control back to authenticateClient()
- handler(StatusWith<RemoteCommandResponse>(
- RemoteCommandResponse(data, metadata, millis)));
+ handler({data, metadata, millis});
} catch (...) {
handler(exceptionToStatus());
@@ -713,7 +712,7 @@ private:
/**
* Initializes the wire version of conn, and returns the isMaster reply.
*/
-StatusWith<executor::RemoteCommandResponse> initWireVersion(DBClientConnection* conn) {
+executor::RemoteCommandResponse initWireVersion(DBClientConnection* conn) {
try {
// We need to force the usage of OP_QUERY on this command, even if we have previously
// detected support for OP_COMMAND on a connection. This is necessary to handle the case
@@ -773,16 +772,16 @@ Status DBClientConnection::connect(const HostAndPort& serverAddress) {
auto swIsMasterReply = initWireVersion(this);
if (!swIsMasterReply.isOK()) {
_failed = true;
- return swIsMasterReply.getStatus();
+ return swIsMasterReply.status;
}
// Ensure that the isMaster response is "ok:1".
- auto isMasterStatus = getStatusFromCommandResult(swIsMasterReply.getValue().data);
+ auto isMasterStatus = getStatusFromCommandResult(swIsMasterReply.data);
if (!isMasterStatus.isOK()) {
return isMasterStatus;
}
- auto swProtocolSet = rpc::parseProtocolSetFromIsMasterReply(swIsMasterReply.getValue().data);
+ auto swProtocolSet = rpc::parseProtocolSetFromIsMasterReply(swIsMasterReply.data);
if (!swProtocolSet.isOK()) {
return swProtocolSet.getStatus();
}
@@ -799,7 +798,7 @@ Status DBClientConnection::connect(const HostAndPort& serverAddress) {
}
if (_hook) {
- auto validationStatus = _hook(swIsMasterReply.getValue());
+ auto validationStatus = _hook(swIsMasterReply);
if (!validationStatus.isOK()) {
// Disconnect and mark failed.
_failed = true;
diff --git a/src/mongo/client/fetcher.cpp b/src/mongo/client/fetcher.cpp
index 5aa46e549cd..e2a8bdf2402 100644
--- a/src/mongo/client/fetcher.cpp
+++ b/src/mongo/client/fetcher.cpp
@@ -301,7 +301,7 @@ Status Fetcher::_scheduleGetMore(const BSONObj& cmdObj) {
void Fetcher::_callback(const RemoteCommandCallbackArgs& rcbd, const char* batchFieldName) {
if (!rcbd.response.isOK()) {
- _work(StatusWith<Fetcher::QueryResponse>(rcbd.response.getStatus()), nullptr, nullptr);
+ _work(StatusWith<Fetcher::QueryResponse>(rcbd.response.status), nullptr, nullptr);
_finishCallback();
return;
}
@@ -312,7 +312,7 @@ void Fetcher::_callback(const RemoteCommandCallbackArgs& rcbd, const char* batch
return;
}
- const BSONObj& queryResponseObj = rcbd.response.getValue().data;
+ const BSONObj& queryResponseObj = rcbd.response.data;
Status status = getStatusFromCommandResult(queryResponseObj);
if (!status.isOK()) {
_work(StatusWith<Fetcher::QueryResponse>(status), nullptr, nullptr);
@@ -328,8 +328,8 @@ void Fetcher::_callback(const RemoteCommandCallbackArgs& rcbd, const char* batch
return;
}
- batchData.otherFields.metadata = std::move(rcbd.response.getValue().metadata);
- batchData.elapsedMillis = rcbd.response.getValue().elapsedMillis;
+ batchData.otherFields.metadata = std::move(rcbd.response.metadata);
+ batchData.elapsedMillis = rcbd.response.elapsedMillis.value_or(Milliseconds{0});
{
stdx::lock_guard<stdx::mutex> lk(_mutex);
batchData.first = _first;
@@ -380,10 +380,10 @@ void Fetcher::_sendKillCursors(const CursorId id, const NamespaceString& nss) {
if (id) {
auto logKillCursorsResult = [](const RemoteCommandCallbackArgs& args) {
if (!args.response.isOK()) {
- warning() << "killCursors command task failed: " << args.response.getStatus();
+ warning() << "killCursors command task failed: " << args.response.status;
return;
}
- auto status = getStatusFromCommandResult(args.response.getValue().data);
+ auto status = getStatusFromCommandResult(args.response.data);
if (!status.isOK()) {
warning() << "killCursors command failed: " << status;
}
diff --git a/src/mongo/client/fetcher_test.cpp b/src/mongo/client/fetcher_test.cpp
index b20cb9a099f..26778806b6c 100644
--- a/src/mongo/client/fetcher_test.cpp
+++ b/src/mongo/client/fetcher_test.cpp
@@ -44,6 +44,8 @@ using namespace mongo;
using executor::NetworkInterfaceMock;
using executor::TaskExecutor;
+using ResponseStatus = TaskExecutor::ResponseStatus;
+
const HostAndPort source("localhost", -1);
const BSONObj findCmdObj = BSON("find"
<< "coll");
@@ -61,12 +63,11 @@ public:
void processNetworkResponse(const BSONObj& obj,
ReadyQueueState readyQueueStateAfterProcessing,
FetcherState fetcherStateAfterProcessing);
- void processNetworkResponse(const BSONObj& obj,
- Milliseconds elapsed,
+ void processNetworkResponse(const ResponseStatus,
ReadyQueueState readyQueueStateAfterProcessing,
FetcherState fetcherStateAfterProcessing);
- void processNetworkResponse(ErrorCodes::Error code,
- const std::string& reason,
+ void processNetworkResponse(const BSONObj& obj,
+ Milliseconds elapsed,
ReadyQueueState readyQueueStateAfterProcessing,
FetcherState fetcherStateAfterProcessing);
@@ -148,12 +149,11 @@ void FetcherTest::processNetworkResponse(const BSONObj& obj,
finishProcessingNetworkResponse(readyQueueStateAfterProcessing, fetcherStateAfterProcessing);
}
-void FetcherTest::processNetworkResponse(ErrorCodes::Error code,
- const std::string& reason,
+void FetcherTest::processNetworkResponse(ResponseStatus rs,
ReadyQueueState readyQueueStateAfterProcessing,
FetcherState fetcherStateAfterProcessing) {
executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- getNet()->scheduleErrorResponse({code, reason});
+ getNet()->scheduleErrorResponse(rs);
finishProcessingNetworkResponse(readyQueueStateAfterProcessing, fetcherStateAfterProcessing);
}
@@ -373,8 +373,8 @@ TEST_F(FetcherTest, ScheduleButShutdown) {
TEST_F(FetcherTest, FindCommandFailed1) {
ASSERT_OK(fetcher->schedule());
- processNetworkResponse(
- ErrorCodes::BadValue, "bad hint", ReadyQueueState::kEmpty, FetcherState::kInactive);
+ auto rs = ResponseStatus(ErrorCodes::BadValue, "bad hint", Milliseconds(0));
+ processNetworkResponse(rs, ReadyQueueState::kEmpty, FetcherState::kInactive);
ASSERT_EQUALS(ErrorCodes::BadValue, status.code());
ASSERT_EQUALS("bad hint", status.reason());
ASSERT_FALSE(fetcher->inShutdown_forTest());
@@ -939,7 +939,7 @@ TEST_F(FetcherTest, UpdateNextActionAfterSecondBatch) {
ASSERT_EQUALS(cursorId, cursors.front().numberLong());
// Failed killCursors command response should be logged.
- getNet()->scheduleSuccessfulResponse(noi, {BSON("ok" << false), {}});
+ getNet()->scheduleSuccessfulResponse(noi, {BSON("ok" << false), {}, Milliseconds(0)});
getNet()->runReadyNetworkOperations();
}
@@ -1049,12 +1049,10 @@ TEST_F(FetcherTest, FetcherAppliesRetryPolicyToFirstCommandButNotToGetMoreReques
// Retry policy is applied to find command.
const BSONObj doc = BSON("_id" << 1);
- processNetworkResponse(
- ErrorCodes::BadValue, "first", ReadyQueueState::kHasReadyRequests, FetcherState::kActive);
- processNetworkResponse(ErrorCodes::InternalError,
- "second",
- ReadyQueueState::kHasReadyRequests,
- FetcherState::kActive);
+ auto rs = ResponseStatus(ErrorCodes::BadValue, "first", Milliseconds(0));
+ processNetworkResponse(rs, ReadyQueueState::kHasReadyRequests, FetcherState::kActive);
+ rs = ResponseStatus(ErrorCodes::InternalError, "second", Milliseconds(0));
+ processNetworkResponse(rs, ReadyQueueState::kHasReadyRequests, FetcherState::kActive);
processNetworkResponse(BSON("cursor" << BSON("id" << 1LL << "ns"
<< "db.coll"
<< "firstBatch"
@@ -1070,11 +1068,9 @@ TEST_F(FetcherTest, FetcherAppliesRetryPolicyToFirstCommandButNotToGetMoreReques
ASSERT_EQUALS(doc, documents.front());
ASSERT_TRUE(Fetcher::NextAction::kGetMore == nextAction);
+ rs = ResponseStatus(ErrorCodes::OperationFailed, "getMore failed", Milliseconds(0));
// No retry policy for subsequent getMore commands.
- processNetworkResponse(ErrorCodes::OperationFailed,
- "getMore failed",
- ReadyQueueState::kEmpty,
- FetcherState::kInactive);
+ processNetworkResponse(rs, ReadyQueueState::kEmpty, FetcherState::kInactive);
ASSERT_EQUALS(ErrorCodes::OperationFailed, status);
ASSERT_FALSE(fetcher->inShutdown_forTest());
}
diff --git a/src/mongo/client/remote_command_retry_scheduler.cpp b/src/mongo/client/remote_command_retry_scheduler.cpp
index 8c20d0af925..ded4a10d964 100644
--- a/src/mongo/client/remote_command_retry_scheduler.cpp
+++ b/src/mongo/client/remote_command_retry_scheduler.cpp
@@ -207,7 +207,7 @@ Status RemoteCommandRetryScheduler::_schedule_inlock(std::size_t requestCount) {
void RemoteCommandRetryScheduler::_remoteCommandCallback(
const executor::TaskExecutor::RemoteCommandCallbackArgs& rcba, std::size_t requestCount) {
- auto status = rcba.response.getStatus();
+ auto status = rcba.response.status;
if (status.isOK() || status == ErrorCodes::CallbackCanceled ||
requestCount == _retryPolicy->getMaximumAttempts() ||
diff --git a/src/mongo/client/remote_command_retry_scheduler_test.cpp b/src/mongo/client/remote_command_retry_scheduler_test.cpp
index 6e2c70fe9dc..705324f94c5 100644
--- a/src/mongo/client/remote_command_retry_scheduler_test.cpp
+++ b/src/mongo/client/remote_command_retry_scheduler_test.cpp
@@ -48,6 +48,7 @@
namespace {
using namespace mongo;
+using ResponseStatus = executor::TaskExecutor::ResponseStatus;
class CallbackResponseSaver;
@@ -56,8 +57,8 @@ public:
void start(RemoteCommandRetryScheduler* scheduler);
void checkCompletionStatus(RemoteCommandRetryScheduler* scheduler,
const CallbackResponseSaver& callbackResponseSaver,
- const executor::TaskExecutor::ResponseStatus& response);
- void processNetworkResponse(const executor::TaskExecutor::ResponseStatus& response);
+ const ResponseStatus& response);
+ void processNetworkResponse(const ResponseStatus& response);
void runReadyNetworkOperations();
protected:
@@ -76,10 +77,10 @@ public:
*/
void operator()(const executor::TaskExecutor::RemoteCommandCallbackArgs& rcba);
- std::vector<StatusWith<executor::RemoteCommandResponse>> getResponses() const;
+ std::vector<ResponseStatus> getResponses() const;
private:
- std::vector<StatusWith<executor::RemoteCommandResponse>> _responses;
+ std::vector<ResponseStatus> _responses;
};
/**
@@ -119,20 +120,19 @@ void RemoteCommandRetrySchedulerTest::start(RemoteCommandRetryScheduler* schedul
void RemoteCommandRetrySchedulerTest::checkCompletionStatus(
RemoteCommandRetryScheduler* scheduler,
const CallbackResponseSaver& callbackResponseSaver,
- const executor::TaskExecutor::ResponseStatus& response) {
+ const ResponseStatus& response) {
ASSERT_FALSE(scheduler->isActive());
auto responses = callbackResponseSaver.getResponses();
ASSERT_EQUALS(1U, responses.size());
if (response.isOK()) {
- ASSERT_OK(responses.front().getStatus());
- ASSERT_EQUALS(response.getValue(), responses.front().getValue());
+ ASSERT_OK(responses.front().status);
+ ASSERT_EQUALS(response, responses.front());
} else {
- ASSERT_EQUALS(response.getStatus(), responses.front().getStatus());
+ ASSERT_EQUALS(response.status, responses.front().status);
}
}
-void RemoteCommandRetrySchedulerTest::processNetworkResponse(
- const executor::TaskExecutor::ResponseStatus& response) {
+void RemoteCommandRetrySchedulerTest::processNetworkResponse(const ResponseStatus& response) {
auto net = getNet();
executor::NetworkInterfaceMock::InNetworkGuard guard(net);
ASSERT_TRUE(net->hasReadyRequests());
@@ -163,8 +163,7 @@ void CallbackResponseSaver::operator()(
_responses.push_back(rcba.response);
}
-std::vector<StatusWith<executor::RemoteCommandResponse>> CallbackResponseSaver::getResponses()
- const {
+std::vector<ResponseStatus> CallbackResponseSaver::getResponses() const {
return _responses;
}
@@ -327,7 +326,6 @@ TEST_F(RemoteCommandRetrySchedulerTest,
&scheduler, callback, {ErrorCodes::CallbackCanceled, "executor shutdown"});
}
-
TEST_F(RemoteCommandRetrySchedulerTest,
ShuttingDownSchedulerAfterSchedulerStartupInvokesCallbackWithCallbackCanceledError) {
CallbackResponseSaver callback;
@@ -353,10 +351,9 @@ TEST_F(RemoteCommandRetrySchedulerTest, SchedulerInvokesCallbackOnNonRetryableEr
start(&scheduler);
// This should match one of the non-retryable error codes in the policy.
- Status response(ErrorCodes::OperationFailed, "injected error");
-
- processNetworkResponse(response);
- checkCompletionStatus(&scheduler, callback, response);
+ ResponseStatus rs(ErrorCodes::OperationFailed, "injected error", Milliseconds(0));
+ processNetworkResponse(rs);
+ checkCompletionStatus(&scheduler, callback, rs);
}
TEST_F(RemoteCommandRetrySchedulerTest, SchedulerInvokesCallbackOnFirstSuccessfulResponse) {
@@ -368,8 +365,7 @@ TEST_F(RemoteCommandRetrySchedulerTest, SchedulerInvokesCallbackOnFirstSuccessfu
start(&scheduler);
// Elapsed time in response is ignored on successful responses.
- executor::RemoteCommandResponse response(
- BSON("ok" << 1 << "x" << 123), BSON("z" << 456), Milliseconds(100));
+ ResponseStatus response(BSON("ok" << 1 << "x" << 123), BSON("z" << 456), Milliseconds(100));
processNetworkResponse(response);
checkCompletionStatus(&scheduler, callback, response);
@@ -386,11 +382,10 @@ TEST_F(RemoteCommandRetrySchedulerTest, SchedulerIgnoresEmbeddedErrorInSuccessfu
// Scheduler does not parse document in a successful response for embedded errors.
// This is the case with some commands (e.g. find) which do not always return errors using the
// wire protocol.
- executor::RemoteCommandResponse response(
- BSON("ok" << 0 << "code" << int(ErrorCodes::FailedToParse) << "errmsg"
- << "injected error"),
- BSON("z" << 456),
- Milliseconds(100));
+ ResponseStatus response(BSON("ok" << 0 << "code" << int(ErrorCodes::FailedToParse) << "errmsg"
+ << "injected error"),
+ BSON("z" << 456),
+ Milliseconds(100));
processNetworkResponse(response);
checkCompletionStatus(&scheduler, callback, response);
@@ -406,14 +401,15 @@ TEST_F(RemoteCommandRetrySchedulerTest,
&badExecutor, request, stdx::ref(callback), std::move(policy));
start(&scheduler);
- processNetworkResponse({ErrorCodes::HostNotFound, "first"});
+ processNetworkResponse({ErrorCodes::HostNotFound, "first", Milliseconds(0)});
// scheduleRemoteCommand() will fail with ErrorCodes::ShutdownInProgress when trying to send
// third remote command request after processing second failed response.
badExecutor.scheduleRemoteCommandFailPoint = true;
- processNetworkResponse({ErrorCodes::HostNotFound, "second"});
+ processNetworkResponse({ErrorCodes::HostNotFound, "second", Milliseconds(0)});
- checkCompletionStatus(&scheduler, callback, {ErrorCodes::ShutdownInProgress, ""});
+ checkCompletionStatus(
+ &scheduler, callback, {ErrorCodes::ShutdownInProgress, "", Milliseconds(0)});
}
TEST_F(RemoteCommandRetrySchedulerTest,
@@ -427,10 +423,10 @@ TEST_F(RemoteCommandRetrySchedulerTest,
&getExecutor(), request, stdx::ref(callback), std::move(policy));
start(&scheduler);
- processNetworkResponse({ErrorCodes::HostNotFound, "first"});
- processNetworkResponse({ErrorCodes::HostUnreachable, "second"});
+ processNetworkResponse({ErrorCodes::HostNotFound, "first", Milliseconds(0)});
+ processNetworkResponse({ErrorCodes::HostUnreachable, "second", Milliseconds(0)});
- Status response(ErrorCodes::NetworkTimeout, "last");
+ ResponseStatus response(ErrorCodes::NetworkTimeout, "last", Milliseconds(0));
processNetworkResponse(response);
checkCompletionStatus(&scheduler, callback, response);
}
@@ -443,10 +439,9 @@ TEST_F(RemoteCommandRetrySchedulerTest, SchedulerShouldRetryUntilSuccessfulRespo
&getExecutor(), request, stdx::ref(callback), std::move(policy));
start(&scheduler);
- processNetworkResponse({ErrorCodes::HostNotFound, "first"});
+ processNetworkResponse({ErrorCodes::HostNotFound, "first", Milliseconds(0)});
- executor::RemoteCommandResponse response(
- BSON("ok" << 1 << "x" << 123), BSON("z" << 456), Milliseconds(100));
+ ResponseStatus response(BSON("ok" << 1 << "x" << 123), BSON("z" << 456), Milliseconds(100));
processNetworkResponse(response);
checkCompletionStatus(&scheduler, callback, response);
}
diff --git a/src/mongo/client/remote_command_runner_impl.cpp b/src/mongo/client/remote_command_runner_impl.cpp
new file mode 100644
index 00000000000..2eef602d1f9
--- /dev/null
+++ b/src/mongo/client/remote_command_runner_impl.cpp
@@ -0,0 +1,219 @@
+/**
+ * Copyright (C) 2015 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/client/remote_command_runner_impl.h"
+
+#include "mongo/base/status_with.h"
+#include "mongo/db/namespace_string.h"
+#include "mongo/db/query/cursor_response.h"
+#include "mongo/db/query/getmore_request.h"
+#include "mongo/executor/downconvert_find_and_getmore_commands.h"
+#include "mongo/executor/network_connection_hook.h"
+#include "mongo/rpc/get_status_from_command_result.h"
+#include "mongo/rpc/protocol.h"
+#include "mongo/util/assert_util.h"
+
+namespace mongo {
+namespace {
+
+using executor::RemoteCommandRequest;
+using ResponseStatus = executor::RemoteCommandResponse;
+
+/**
+ * Calculates the timeout for a network operation expiring at "expDate", given
+ * that it is now "nowDate".
+ *
+ * Returns 0ms to indicate no expiration date, a number of milliseconds until "expDate", or
+ * ErrorCodes::ExceededTimeLimit if "expDate" is not later than "nowDate".
+ */
+StatusWith<Milliseconds> getTimeoutMillis(const Date_t expDate, const Date_t nowDate) {
+ if (expDate == RemoteCommandRequest::kNoExpirationDate) {
+ return Milliseconds(0);
+ }
+ if (expDate <= nowDate) {
+ return {ErrorCodes::ExceededTimeLimit,
+ str::stream() << "Went to run command, but it was too late. "
+ "Expiration was set to "
+ << dateToISOStringUTC(expDate)};
+ }
+ return expDate - nowDate;
+}
+
+/**
+ * Peeks at error in cursor. If an error has occurred, converts the {$err: "...", code: N}
+ * cursor error to a Status.
+ */
+Status getStatusFromCursorResult(DBClientCursor& cursor) {
+ BSONObj error;
+ if (!cursor.peekError(&error)) {
+ return Status::OK();
+ }
+
+ BSONElement e = error.getField("code");
+ return Status(e.isNumber() ? ErrorCodes::fromInt(e.numberInt()) : ErrorCodes::UnknownError,
+ getErrField(error).valuestrsafe());
+}
+
+using RequestDownconverter = StatusWith<Message> (*)(const RemoteCommandRequest&);
+using ReplyUpconverter = ResponseStatus (*)(std::int32_t requestId,
+ StringData cursorNamespace,
+ const Message& response);
+
+template <RequestDownconverter downconvertRequest, ReplyUpconverter upconvertReply>
+ResponseStatus runDownconvertedCommand(DBClientConnection* conn,
+ const RemoteCommandRequest& request) {
+ auto swDownconvertedRequest = downconvertRequest(request);
+ if (!swDownconvertedRequest.isOK()) {
+ return swDownconvertedRequest.getStatus();
+ }
+
+ Message requestMsg{std::move(swDownconvertedRequest.getValue())};
+ Message responseMsg;
+
+ try {
+ conn->call(requestMsg, responseMsg, true, nullptr);
+ } catch (...) {
+ return exceptionToStatus();
+ }
+
+ auto messageId = requestMsg.header().getId();
+
+ return upconvertReply(messageId, DbMessage(requestMsg).getns(), responseMsg);
+}
+
+/**
+ * Downconverts the specified find command to a find protocol operation and sends it to the
+ * server on the specified connection.
+ */
+ResponseStatus runDownconvertedFindCommand(DBClientConnection* conn,
+ const RemoteCommandRequest& request) {
+ return runDownconvertedCommand<executor::downconvertFindCommandRequest,
+ executor::upconvertLegacyQueryResponse>(conn, request);
+}
+
+/**
+ * Downconverts the specified getMore command to legacy getMore operation and sends it to the
+ * server on the specified connection.
+ */
+ResponseStatus runDownconvertedGetMoreCommand(DBClientConnection* conn,
+ const RemoteCommandRequest& request) {
+ return runDownconvertedCommand<executor::downconvertGetMoreCommandRequest,
+ executor::upconvertLegacyGetMoreResponse>(conn, request);
+}
+
+} // namespace
+
+RemoteCommandRunnerImpl::RemoteCommandRunnerImpl(
+ int messagingTags, std::unique_ptr<executor::NetworkConnectionHook> hook)
+ : _connPool(messagingTags, std::move(hook)) {}
+
+RemoteCommandRunnerImpl::~RemoteCommandRunnerImpl() {
+ invariant(!_active);
+}
+
+void RemoteCommandRunnerImpl::startup() {
+ _active = true;
+}
+
+void RemoteCommandRunnerImpl::shutdown() {
+ if (!_active) {
+ return;
+ }
+
+ _connPool.closeAllInUseConnections();
+ _active = false;
+}
+
+ResponseStatus RemoteCommandRunnerImpl::runCommand(const RemoteCommandRequest& request) {
+ try {
+ const Date_t requestStartDate = Date_t::now();
+ const auto timeoutMillis = getTimeoutMillis(request.expirationDate, requestStartDate);
+ if (!timeoutMillis.isOK()) {
+ return {timeoutMillis.getStatus()};
+ }
+
+ ConnectionPool::ConnectionPtr conn(
+ &_connPool, request.target, requestStartDate, timeoutMillis.getValue());
+
+ BSONObj output;
+ BSONObj metadata;
+
+ // If remote server does not support either find or getMore commands, down convert
+ // to using DBClientInterface::query()/getMore().
+ // Perform down conversion based on wire protocol version.
+
+ // 'commandName' will be an empty string if the command object is an empty BSON
+ // document.
+ StringData commandName = request.cmdObj.firstElement().fieldNameStringData();
+ const auto isFindCmd = commandName == QueryRequest::kFindCommandName;
+ const auto isGetMoreCmd = commandName == GetMoreRequest::kGetMoreCommandName;
+ const auto isFindOrGetMoreCmd = isFindCmd || isGetMoreCmd;
+
+ // We are using the wire version to check if we need to downconverting find/getMore
+ // requests because coincidentally, the find/getMore command is only supported by
+ // servers that also accept OP_COMMAND.
+ bool supportsFindAndGetMoreCommands = rpc::supportsWireVersionForOpCommandInMongod(
+ conn.get()->getMinWireVersion(), conn.get()->getMaxWireVersion());
+
+ if (!isFindOrGetMoreCmd || supportsFindAndGetMoreCommands) {
+ rpc::UniqueReply commandResponse =
+ conn.get()->runCommandWithMetadata(request.dbname,
+ request.cmdObj.firstElementFieldName(),
+ request.metadata,
+ request.cmdObj);
+
+ output = commandResponse->getCommandReply().getOwned();
+ metadata = commandResponse->getMetadata().getOwned();
+ } else if (isFindCmd) {
+ return runDownconvertedFindCommand(conn.get(), request);
+ } else if (isGetMoreCmd) {
+ return runDownconvertedGetMoreCommand(conn.get(), request);
+ }
+
+ const Date_t requestFinishDate = Date_t::now();
+ conn.done(requestFinishDate);
+
+ return {std::move(output),
+ std::move(metadata),
+ Milliseconds(requestFinishDate - requestStartDate)};
+ } catch (const DBException& ex) {
+ return {ex.toStatus()};
+ } catch (const std::exception& ex) {
+ return {ErrorCodes::UnknownError,
+ str::stream() << "Sending command " << request.cmdObj << " on database "
+ << request.dbname
+ << " over network to "
+ << request.target.toString()
+ << " received exception "
+ << ex.what()};
+ }
+}
+
+} // namespace mongo
diff --git a/src/mongo/client/sasl_client_authenticate_impl.cpp b/src/mongo/client/sasl_client_authenticate_impl.cpp
index aa4d7e64838..8a06acedb31 100644
--- a/src/mongo/client/sasl_client_authenticate_impl.cpp
+++ b/src/mongo/client/sasl_client_authenticate_impl.cpp
@@ -210,7 +210,7 @@ void asyncSaslConversation(auth::RunCommandHook runCommand,
return handler(std::move(response));
}
- auto serverResponse = response.getValue().data.getOwned();
+ auto serverResponse = response.data.getOwned();
auto code = getStatusFromCommandResult(serverResponse).code();
// Server versions 2.3.2 and earlier may return "ok: 1" with a non-zero
diff --git a/src/mongo/db/repl/base_cloner_test_fixture.cpp b/src/mongo/db/repl/base_cloner_test_fixture.cpp
index ac926addcd9..399be148782 100644
--- a/src/mongo/db/repl/base_cloner_test_fixture.cpp
+++ b/src/mongo/db/repl/base_cloner_test_fixture.cpp
@@ -146,7 +146,7 @@ void BaseClonerTest::scheduleNetworkResponse(NetworkOperationIterator noi,
auto net = getNet();
executor::TaskExecutor::ResponseStatus responseStatus(code, reason);
log() << "Scheduling error response to request:" << noi->getDiagnosticString()
- << " -- status:" << responseStatus.getStatus().toString();
+ << " -- status:" << responseStatus.status.toString();
net->scheduleResponse(noi, net->now(), responseStatus);
}
diff --git a/src/mongo/db/repl/check_quorum_for_config_change.cpp b/src/mongo/db/repl/check_quorum_for_config_change.cpp
index bd3740ed410..423ea40cc94 100644
--- a/src/mongo/db/repl/check_quorum_for_config_change.cpp
+++ b/src/mongo/db/repl/check_quorum_for_config_change.cpp
@@ -181,12 +181,12 @@ void QuorumChecker::_tabulateHeartbeatResponse(const RemoteCommandRequest& reque
++_numResponses;
if (!response.isOK()) {
warning() << "Failed to complete heartbeat request to " << request.target << "; "
- << response.getStatus();
- _badResponses.push_back(std::make_pair(request.target, response.getStatus()));
+ << response.status;
+ _badResponses.push_back(std::make_pair(request.target, response.status));
return;
}
- BSONObj resBSON = response.getValue().data;
+ BSONObj resBSON = response.data;
ReplSetHeartbeatResponse hbResp;
Status hbStatus = hbResp.initialize(resBSON, 0);
@@ -219,7 +219,7 @@ void QuorumChecker::_tabulateHeartbeatResponse(const RemoteCommandRequest& reque
if (_rsConfig->hasReplicaSetId()) {
StatusWith<rpc::ReplSetMetadata> replMetadata =
- rpc::ReplSetMetadata::readFromMetadata(response.getValue().metadata);
+ rpc::ReplSetMetadata::readFromMetadata(response.metadata);
if (replMetadata.isOK() && replMetadata.getValue().getReplicaSetId().isSet() &&
_rsConfig->getReplicaSetId() != replMetadata.getValue().getReplicaSetId()) {
std::string message = str::stream()
diff --git a/src/mongo/db/repl/check_quorum_for_config_change_test.cpp b/src/mongo/db/repl/check_quorum_for_config_change_test.cpp
index 86527def28a..610dbad4cf7 100644
--- a/src/mongo/db/repl/check_quorum_for_config_change_test.cpp
+++ b/src/mongo/db/repl/check_quorum_for_config_change_test.cpp
@@ -203,7 +203,7 @@ TEST_F(CheckQuorumForInitiate, QuorumCheckFailedDueToSeveralDownNodes) {
for (int i = 0; i < numCommandsExpected; ++i) {
_net->scheduleResponse(_net->getNextReadyRequest(),
startDate + Milliseconds(10),
- ResponseStatus(ErrorCodes::NoSuchKey, "No reply"));
+ {ErrorCodes::NoSuchKey, "No reply"});
}
_net->runUntil(startDate + Milliseconds(10));
_net->exitNetwork();
@@ -320,9 +320,8 @@ TEST_F(CheckQuorumForInitiate, QuorumCheckFailedDueToOneDownNode) {
ASSERT(seenHosts.insert(request.target).second) << "Already saw "
<< request.target.toString();
if (request.target == HostAndPort("h2", 1)) {
- _net->scheduleResponse(noi,
- startDate + Milliseconds(10),
- ResponseStatus(ErrorCodes::NoSuchKey, "No response"));
+ _net->scheduleResponse(
+ noi, startDate + Milliseconds(10), {ErrorCodes::NoSuchKey, "No response"});
} else {
_net->scheduleResponse(
noi,
@@ -766,9 +765,8 @@ TEST_F(CheckQuorumForReconfig, QuorumCheckVetoedDueToIncompatibleSetName) {
ResponseStatus(RemoteCommandResponse(
BSON("ok" << 0 << "mismatch" << true), BSONObj(), Milliseconds(8))));
} else {
- _net->scheduleResponse(noi,
- startDate + Milliseconds(10),
- ResponseStatus(ErrorCodes::NoSuchKey, "No response"));
+ _net->scheduleResponse(
+ noi, startDate + Milliseconds(10), {ErrorCodes::NoSuchKey, "No response"});
}
}
_net->runUntil(startDate + Milliseconds(10));
@@ -832,9 +830,8 @@ TEST_F(CheckQuorumForReconfig, QuorumCheckFailsDueToInsufficientVoters) {
startDate + Milliseconds(10),
ResponseStatus(RemoteCommandResponse(BSON("ok" << 1), BSONObj(), Milliseconds(8))));
} else {
- _net->scheduleResponse(noi,
- startDate + Milliseconds(10),
- ResponseStatus(ErrorCodes::NoSuchKey, "No response"));
+ _net->scheduleResponse(
+ noi, startDate + Milliseconds(10), {ErrorCodes::NoSuchKey, "No response"});
}
}
_net->runUntil(startDate + Milliseconds(10));
@@ -894,9 +891,8 @@ TEST_F(CheckQuorumForReconfig, QuorumCheckFailsDueToNoElectableNodeResponding) {
startDate + Milliseconds(10),
ResponseStatus(RemoteCommandResponse(BSON("ok" << 1), BSONObj(), Milliseconds(8))));
} else {
- _net->scheduleResponse(noi,
- startDate + Milliseconds(10),
- ResponseStatus(ErrorCodes::NoSuchKey, "No response"));
+ _net->scheduleResponse(
+ noi, startDate + Milliseconds(10), {ErrorCodes::NoSuchKey, "No response"});
}
}
_net->runUntil(startDate + Milliseconds(10));
diff --git a/src/mongo/db/repl/databases_cloner.cpp b/src/mongo/db/repl/databases_cloner.cpp
index 3b01a07e139..cbc8d763490 100644
--- a/src/mongo/db/repl/databases_cloner.cpp
+++ b/src/mongo/db/repl/databases_cloner.cpp
@@ -191,9 +191,9 @@ void DatabasesCloner::setScheduleDbWorkFn_forTest(const CollectionCloner::Schedu
}
void DatabasesCloner::_onListDatabaseFinish(const CommandCallbackArgs& cbd) {
- Status respStatus = cbd.response.getStatus();
+ Status respStatus = cbd.response.status;
if (respStatus.isOK()) {
- respStatus = getStatusFromCommandResult(cbd.response.getValue().data);
+ respStatus = getStatusFromCommandResult(cbd.response.data);
}
UniqueLock lk(_mutex);
@@ -204,7 +204,7 @@ void DatabasesCloner::_onListDatabaseFinish(const CommandCallbackArgs& cbd) {
return;
}
- const auto respBSON = cbd.response.getValue().data;
+ const auto respBSON = cbd.response.data;
// There should not be any cloners yet
invariant(_databaseCloners.size() == 0);
const auto dbsElem = respBSON["databases"].Obj();
diff --git a/src/mongo/db/repl/elect_cmd_runner.cpp b/src/mongo/db/repl/elect_cmd_runner.cpp
index afe28960c1f..907f396c23c 100644
--- a/src/mongo/db/repl/elect_cmd_runner.cpp
+++ b/src/mongo/db/repl/elect_cmd_runner.cpp
@@ -108,7 +108,7 @@ void ElectCmdRunner::Algorithm::processResponse(const RemoteCommandRequest& requ
++_actualResponses;
if (response.isOK()) {
- BSONObj res = response.getValue().data;
+ BSONObj res = response.data;
log() << "received " << res["vote"] << " votes from " << request.target;
LOG(1) << "full elect res: " << res.toString();
BSONElement vote(res["vote"]);
@@ -121,7 +121,7 @@ void ElectCmdRunner::Algorithm::processResponse(const RemoteCommandRequest& requ
_receivedVotes += vote._numberInt();
} else {
- warning() << "elect command to " << request.target << " failed: " << response.getStatus();
+ warning() << "elect command to " << request.target << " failed: " << response.status;
}
}
diff --git a/src/mongo/db/repl/freshness_checker.cpp b/src/mongo/db/repl/freshness_checker.cpp
index d3927c33102..1b74c3810b5 100644
--- a/src/mongo/db/repl/freshness_checker.cpp
+++ b/src/mongo/db/repl/freshness_checker.cpp
@@ -128,8 +128,7 @@ void FreshnessChecker::Algorithm::processResponse(const RemoteCommandRequest& re
Status status = Status::OK();
- if (!response.isOK() ||
- !((status = getStatusFromCommandResult(response.getValue().data)).isOK())) {
+ if (!response.isOK() || !((status = getStatusFromCommandResult(response.data)).isOK())) {
if (votingMember) {
++_failedVoterResponses;
if (hadTooManyFailedVoterResponses()) {
@@ -145,7 +144,7 @@ void FreshnessChecker::Algorithm::processResponse(const RemoteCommandRequest& re
return;
}
- const BSONObj res = response.getValue().data;
+ const BSONObj res = response.data;
LOG(2) << "FreshnessChecker: Got response from " << request.target << " of " << res;
diff --git a/src/mongo/db/repl/freshness_scanner.cpp b/src/mongo/db/repl/freshness_scanner.cpp
index 87182309a39..2623fb32b88 100644
--- a/src/mongo/db/repl/freshness_scanner.cpp
+++ b/src/mongo/db/repl/freshness_scanner.cpp
@@ -74,9 +74,9 @@ void FreshnessScanner::Algorithm::processResponse(const RemoteCommandRequest& re
_responsesProcessed++;
if (!response.isOK()) { // failed response
LOG(2) << "FreshnessScanner: Got failed response from " << request.target << ": "
- << response.getStatus();
+ << response.status;
} else {
- BSONObj opTimesObj = response.getValue().data.getObjectField("optimes");
+ BSONObj opTimesObj = response.data.getObjectField("optimes");
OpTime lastOpTime;
Status status = bsonExtractOpTimeField(opTimesObj, "appliedOpTime", &lastOpTime);
if (!status.isOK()) {
diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp
index e56fa347fcd..d0fe0aa4e58 100644
--- a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp
@@ -127,7 +127,7 @@ void ReplicationCoordinatorImpl::_handleHeartbeatResponse(
// Parse and validate the response. At the end of this step, if responseStatus is OK then
// hbResponse is valid.
- Status responseStatus = cbData.response.getStatus();
+ Status responseStatus = cbData.response.status;
if (responseStatus == ErrorCodes::CallbackCanceled) {
return;
}
@@ -136,10 +136,10 @@ void ReplicationCoordinatorImpl::_handleHeartbeatResponse(
ReplSetHeartbeatResponse hbResponse;
BSONObj resp;
if (responseStatus.isOK()) {
- resp = cbData.response.getValue().data;
+ resp = cbData.response.data;
responseStatus = hbResponse.initialize(resp, _topCoord->getTerm());
StatusWith<rpc::ReplSetMetadata> replMetadata =
- rpc::ReplSetMetadata::readFromMetadata(cbData.response.getValue().metadata);
+ rpc::ReplSetMetadata::readFromMetadata(cbData.response.metadata);
// Reject heartbeat responses (and metadata) from nodes with mismatched replica set IDs.
// It is problematic to perform this check in the heartbeat reconfiguring logic because it
@@ -170,7 +170,7 @@ void ReplicationCoordinatorImpl::_handleHeartbeatResponse(
StatusWith<ReplSetHeartbeatResponse> hbStatusResponse(hbResponse);
if (responseStatus.isOK()) {
- networkTime = cbData.response.getValue().elapsedMillis;
+ networkTime = cbData.response.elapsedMillis.value_or(Milliseconds{0});
// TODO(sz) Because the term is duplicated in ReplSetMetaData, we can get rid of this
// and update tests.
_updateTerm_incallback(hbStatusResponse.getValue().getTerm());
@@ -288,17 +288,17 @@ namespace {
* This callback is purely for logging and has no effect on any other operations
*/
void remoteStepdownCallback(const ReplicationExecutor::RemoteCommandCallbackArgs& cbData) {
- const Status status = cbData.response.getStatus();
+ const Status status = cbData.response.status;
if (status == ErrorCodes::CallbackCanceled) {
return;
}
if (status.isOK()) {
LOG(1) << "stepdown of primary(" << cbData.request.target << ") succeeded with response -- "
- << cbData.response.getValue().data;
+ << cbData.response.data;
} else {
warning() << "stepdown of primary(" << cbData.request.target << ") failed due to "
- << cbData.response.getStatus();
+ << cbData.response.status;
}
}
} // namespace
diff --git a/src/mongo/db/repl/replication_executor.cpp b/src/mongo/db/repl/replication_executor.cpp
index 6a076aff54d..dd077167b0e 100644
--- a/src/mongo/db/repl/replication_executor.cpp
+++ b/src/mongo/db/repl/replication_executor.cpp
@@ -317,7 +317,7 @@ void ReplicationExecutor::_finishRemoteCommand(const RemoteCommandRequest& reque
}
LOG(4) << "Received remote response: "
- << (response.isOK() ? response.getValue().toString() : response.getStatus().toString());
+ << (response.isOK() ? response.toString() : response.status.toString());
callback->_callbackFn =
stdx::bind(remoteCommandFinished, stdx::placeholders::_1, cb, request, response);
diff --git a/src/mongo/db/repl/replication_executor.h b/src/mongo/db/repl/replication_executor.h
index ffb90fb78ea..be231372c8a 100644
--- a/src/mongo/db/repl/replication_executor.h
+++ b/src/mongo/db/repl/replication_executor.h
@@ -263,7 +263,7 @@ private:
void finishShutdown();
void _finishRemoteCommand(const executor::RemoteCommandRequest& request,
- const StatusWith<executor::RemoteCommandResponse>& response,
+ const executor::RemoteCommandResponse& response,
const CallbackHandle& cbHandle,
const uint64_t expectedHandleGeneration,
const RemoteCommandCallbackFn& cb);
diff --git a/src/mongo/db/repl/reporter.cpp b/src/mongo/db/repl/reporter.cpp
index 2e19e804108..64b7dd27e12 100644
--- a/src/mongo/db/repl/reporter.cpp
+++ b/src/mongo/db/repl/reporter.cpp
@@ -257,14 +257,14 @@ void Reporter::_processResponseCallback(
return;
}
- _status = rcbd.response.getStatus();
+ _status = rcbd.response.status;
if (!_status.isOK()) {
_onShutdown_inlock();
return;
}
// Override _status with the one embedded in the command result.
- const auto& commandResult = rcbd.response.getValue().data;
+ const auto& commandResult = rcbd.response.data;
_status = getStatusFromCommandResult(commandResult);
// Some error types are OK and should not cause the reporter to stop sending updates to the
diff --git a/src/mongo/db/repl/reporter_test.cpp b/src/mongo/db/repl/reporter_test.cpp
index 9d89e00bf95..42ee28c6aa5 100644
--- a/src/mongo/db/repl/reporter_test.cpp
+++ b/src/mongo/db/repl/reporter_test.cpp
@@ -46,6 +46,8 @@ using executor::NetworkInterfaceMock;
using executor::RemoteCommandRequest;
using executor::RemoteCommandResponse;
+using ResponseStatus = mongo::executor::TaskExecutor::ResponseStatus;
+
class MockProgressManager {
public:
void updateMap(int memberId, const OpTime& lastDurableOpTime, const OpTime& lastAppliedOpTime) {
@@ -120,8 +122,7 @@ public:
*/
BSONObj processNetworkResponse(const BSONObj& obj,
bool expectReadyRequestsAfterProcessing = false);
- BSONObj processNetworkResponse(ErrorCodes::Error code,
- const std::string& reason,
+ BSONObj processNetworkResponse(const ResponseStatus rs,
bool expectReadyRequestsAfterProcessing = false);
void runUntil(Date_t when, bool expectReadyRequestsAfterAdvancingClock = false);
@@ -210,12 +211,11 @@ BSONObj ReporterTest::processNetworkResponse(const BSONObj& obj,
return cmdObj;
}
-BSONObj ReporterTest::processNetworkResponse(ErrorCodes::Error code,
- const std::string& reason,
+BSONObj ReporterTest::processNetworkResponse(const ResponseStatus rs,
bool expectReadyRequestsAfterProcessing) {
auto net = getNet();
net->enterNetwork();
- auto cmdObj = net->scheduleErrorResponse({code, reason}).cmdObj;
+ auto cmdObj = net->scheduleErrorResponse(rs).cmdObj;
net->runReadyNetworkOperations();
ASSERT_EQUALS(expectReadyRequestsAfterProcessing, net->hasReadyRequests());
net->exitNetwork();
@@ -320,7 +320,7 @@ TEST_F(ReporterTest, TaskExecutorAndNetworkErrorsStopTheReporter) {
ASSERT_TRUE(reporter->isActive());
ASSERT_TRUE(reporter->isWaitingToSendReport());
- processNetworkResponse(ErrorCodes::NoSuchKey, "waaaah");
+ processNetworkResponse({ErrorCodes::NoSuchKey, "waaaah", Milliseconds(0)});
ASSERT_EQUALS(ErrorCodes::NoSuchKey, reporter->join());
assertReporterDone();
@@ -561,7 +561,7 @@ TEST_F(ReporterTest,
}
TEST_F(ReporterTest, FailedUpdateShouldNotRescheduleUpdate) {
- processNetworkResponse(ErrorCodes::OperationFailed, "update failed");
+ processNetworkResponse({ErrorCodes::OperationFailed, "update failed", Milliseconds(0)});
ASSERT_EQUALS(ErrorCodes::OperationFailed, reporter->join());
assertReporterDone();
@@ -576,7 +576,7 @@ TEST_F(ReporterTest, SuccessfulUpdateShouldRescheduleUpdate) {
runUntil(until, true);
- processNetworkResponse(ErrorCodes::OperationFailed, "update failed");
+ processNetworkResponse({ErrorCodes::OperationFailed, "update failed", Milliseconds(0)});
ASSERT_EQUALS(ErrorCodes::OperationFailed, reporter->join());
assertReporterDone();
diff --git a/src/mongo/db/repl/rollback_checker.cpp b/src/mongo/db/repl/rollback_checker.cpp
index 2a1685d51ab..ffca29bc211 100644
--- a/src/mongo/db/repl/rollback_checker.cpp
+++ b/src/mongo/db/repl/rollback_checker.cpp
@@ -51,14 +51,14 @@ RollbackChecker::~RollbackChecker() {}
RollbackChecker::CallbackHandle RollbackChecker::checkForRollback(const CallbackFn& nextAction) {
return _scheduleGetRollbackId(
[this, nextAction](const RemoteCommandCallbackArgs& args) {
- if (args.response.getStatus() == ErrorCodes::CallbackCanceled) {
+ if (args.response.status == ErrorCodes::CallbackCanceled) {
return;
}
if (!args.response.isOK()) {
- nextAction(args.response.getStatus());
+ nextAction(args.response.status);
return;
}
- if (auto rbidElement = args.response.getValue().data["rbid"]) {
+ if (auto rbidElement = args.response.data["rbid"]) {
int remoteRBID = rbidElement.numberInt();
UniqueLock lk(_mutex);
@@ -97,14 +97,14 @@ StatusWith<bool> RollbackChecker::hasHadRollback() {
RollbackChecker::CallbackHandle RollbackChecker::reset(const CallbackFn& nextAction) {
return _scheduleGetRollbackId(
[this, nextAction](const RemoteCommandCallbackArgs& args) {
- if (args.response.getStatus() == ErrorCodes::CallbackCanceled) {
+ if (args.response.status == ErrorCodes::CallbackCanceled) {
return;
}
if (!args.response.isOK()) {
- nextAction(args.response.getStatus());
+ nextAction(args.response.status);
return;
}
- if (auto rbidElement = args.response.getValue().data["rbid"]) {
+ if (auto rbidElement = args.response.data["rbid"]) {
int newRBID = rbidElement.numberInt();
UniqueLock lk(_mutex);
diff --git a/src/mongo/db/repl/scatter_gather_runner.cpp b/src/mongo/db/repl/scatter_gather_runner.cpp
index 90a16ea470a..597239179e0 100644
--- a/src/mongo/db/repl/scatter_gather_runner.cpp
+++ b/src/mongo/db/repl/scatter_gather_runner.cpp
@@ -138,7 +138,7 @@ void ScatterGatherRunner::RunnerImpl::processResponse(
std::swap(*iter, _callbacks.back());
_callbacks.pop_back();
- if (cbData.response.getStatus() == ErrorCodes::CallbackCanceled) {
+ if (cbData.response.status == ErrorCodes::CallbackCanceled) {
return;
}
diff --git a/src/mongo/db/repl/vote_requester.cpp b/src/mongo/db/repl/vote_requester.cpp
index b02b3bb7c0c..4f710827884 100644
--- a/src/mongo/db/repl/vote_requester.cpp
+++ b/src/mongo/db/repl/vote_requester.cpp
@@ -93,24 +93,23 @@ void VoteRequester::Algorithm::processResponse(const RemoteCommandRequest& reque
_responsesProcessed++;
if (!response.isOK()) { // failed response
log() << "VoteRequester: Got failed response from " << request.target << ": "
- << response.getStatus();
+ << response.status;
} else {
_responders.insert(request.target);
ReplSetRequestVotesResponse voteResponse;
- const auto status = voteResponse.initialize(response.getValue().data);
+ const auto status = voteResponse.initialize(response.data);
if (!status.isOK()) {
log() << "VoteRequester: Got error processing response with status: " << status
- << ", resp:" << response.getValue().data;
+ << ", resp:" << response.data;
}
if (voteResponse.getVoteGranted()) {
LOG(3) << "VoteRequester: Got yes vote from " << request.target
- << ", resp:" << response.getValue().data;
+ << ", resp:" << response.data;
_votes++;
} else {
log() << "VoteRequester: Got no vote from " << request.target
- << " because: " << voteResponse.getReason()
- << ", resp:" << response.getValue().data;
+ << " because: " << voteResponse.getReason() << ", resp:" << response.data;
}
if (voteResponse.getTerm() > _term) {
diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
index 500b11e57e7..08721b06419 100644
--- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
+++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
@@ -467,7 +467,7 @@ void MigrationChunkClonerSourceLegacy::_cleanup(OperationContext* txn) {
}
StatusWith<BSONObj> MigrationChunkClonerSourceLegacy::_callRecipient(const BSONObj& cmdObj) {
- StatusWith<executor::RemoteCommandResponse> responseStatus(
+ executor::RemoteCommandResponse responseStatus(
Status{ErrorCodes::InternalError, "Uninitialized value"});
auto executor = grid.getExecutorPool()->getArbitraryExecutor();
@@ -485,15 +485,15 @@ StatusWith<BSONObj> MigrationChunkClonerSourceLegacy::_callRecipient(const BSONO
executor->wait(scheduleStatus.getValue());
if (!responseStatus.isOK()) {
- return responseStatus.getStatus();
+ return responseStatus.status;
}
- Status commandStatus = getStatusFromCommandResult(responseStatus.getValue().data);
+ Status commandStatus = getStatusFromCommandResult(responseStatus.data);
if (!commandStatus.isOK()) {
return commandStatus;
}
- return responseStatus.getValue().data.getOwned();
+ return responseStatus.data.getOwned();
}
Status MigrationChunkClonerSourceLegacy::_storeCurrentLocs(OperationContext* txn) {
diff --git a/src/mongo/executor/connection_pool_asio.cpp b/src/mongo/executor/connection_pool_asio.cpp
index 99e4fadb135..0d13314ff03 100644
--- a/src/mongo/executor/connection_pool_asio.cpp
+++ b/src/mongo/executor/connection_pool_asio.cpp
@@ -160,9 +160,9 @@ std::unique_ptr<NetworkInterfaceASIO::AsyncOp> ASIOConnection::makeAsyncOp(ASIOC
BSON("isMaster" << 1),
BSONObj(),
nullptr},
- [conn](const TaskExecutor::ResponseStatus& status) {
+ [conn](const TaskExecutor::ResponseStatus& rs) {
auto cb = std::move(conn->_setupCallback);
- cb(conn, status.isOK() ? Status::OK() : status.getStatus());
+ cb(conn, rs.status);
},
conn->_global->now());
}
@@ -244,10 +244,10 @@ void ASIOConnection::refresh(Milliseconds timeout, RefreshCallback cb) {
// need to intercept those calls so we can capture them. This will get cleared out when we
// fill
// in the real onFinish in startCommand.
- op->setOnFinish([this](StatusWith<RemoteCommandResponse> failedResponse) {
+ op->setOnFinish([this](RemoteCommandResponse failedResponse) {
invariant(!failedResponse.isOK());
auto cb = std::move(_refreshCallback);
- cb(this, failedResponse.getStatus());
+ cb(this, failedResponse.status);
});
op->_inRefresh = true;
diff --git a/src/mongo/executor/connection_pool_asio_integration_test.cpp b/src/mongo/executor/connection_pool_asio_integration_test.cpp
index 04c39348ee6..fb503fe8502 100644
--- a/src/mongo/executor/connection_pool_asio_integration_test.cpp
+++ b/src/mongo/executor/connection_pool_asio_integration_test.cpp
@@ -103,16 +103,16 @@ TEST(ConnectionPoolASIO, TestPing) {
for (auto& thread : threads) {
thread = stdx::thread([&net, &fixture]() {
auto status = Status::OK();
- Deferred<StatusWith<RemoteCommandResponse>> deferred;
+ Deferred<RemoteCommandResponse> deferred;
RemoteCommandRequest request{
fixture.getServers()[0], "admin", BSON("ping" << 1), BSONObj(), nullptr};
net.startCommand(
- makeCallbackHandle(), request, [&deferred](StatusWith<RemoteCommandResponse> resp) {
+ makeCallbackHandle(), request, [&deferred](RemoteCommandResponse resp) {
deferred.emplace(std::move(resp));
});
- ASSERT_OK(deferred.get().getStatus());
+ ASSERT_OK(deferred.get().status);
});
}
@@ -140,15 +140,14 @@ TEST(ConnectionPoolASIO, TestHostTimeoutRace) {
auto guard = MakeGuard([&] { net.shutdown(); });
for (int i = 0; i < 1000; i++) {
- Deferred<StatusWith<RemoteCommandResponse>> deferred;
+ Deferred<RemoteCommandResponse> deferred;
RemoteCommandRequest request{
fixture.getServers()[0], "admin", BSON("ping" << 1), BSONObj(), nullptr};
- net.startCommand(
- makeCallbackHandle(), request, [&](StatusWith<RemoteCommandResponse> resp) {
- deferred.emplace(std::move(resp));
- });
+ net.startCommand(makeCallbackHandle(), request, [&](RemoteCommandResponse resp) {
+ deferred.emplace(std::move(resp));
+ });
- ASSERT_OK(deferred.get().getStatus());
+ ASSERT_OK(deferred.get().status);
sleepmillis(1);
}
}
@@ -169,14 +168,14 @@ TEST(ConnectionPoolASIO, ConnSetupTimeout) {
net.startup();
auto guard = MakeGuard([&] { net.shutdown(); });
- Deferred<StatusWith<RemoteCommandResponse>> deferred;
+ Deferred<RemoteCommandResponse> deferred;
RemoteCommandRequest request{
fixture.getServers()[0], "admin", BSON("ping" << 1), BSONObj(), nullptr};
- net.startCommand(makeCallbackHandle(), request, [&](StatusWith<RemoteCommandResponse> resp) {
+ net.startCommand(makeCallbackHandle(), request, [&](RemoteCommandResponse resp) {
deferred.emplace(std::move(resp));
});
- ASSERT_EQ(deferred.get().getStatus().code(), ErrorCodes::ExceededTimeLimit);
+ ASSERT_EQ(deferred.get().status.code(), ErrorCodes::ExceededTimeLimit);
}
/**
@@ -195,7 +194,7 @@ TEST(ConnectionPoolASIO, ConnRefreshHappens) {
net.startup();
auto guard = MakeGuard([&] { net.shutdown(); });
- std::array<Deferred<StatusWith<RemoteCommandResponse>>, 10> deferreds;
+ std::array<Deferred<RemoteCommandResponse>, 10> deferreds;
RemoteCommandRequest request{fixture.getServers()[0],
"admin",
@@ -207,10 +206,9 @@ TEST(ConnectionPoolASIO, ConnRefreshHappens) {
nullptr};
for (auto& deferred : deferreds) {
- net.startCommand(
- makeCallbackHandle(), request, [&](StatusWith<RemoteCommandResponse> resp) {
- deferred.emplace(std::move(resp));
- });
+ net.startCommand(makeCallbackHandle(), request, [&](RemoteCommandResponse resp) {
+ deferred.emplace(std::move(resp));
+ });
}
for (auto& deferred : deferreds) {
@@ -241,11 +239,11 @@ TEST(ConnectionPoolASIO, ConnRefreshSurvivesFailure) {
net.startup();
auto guard = MakeGuard([&] { net.shutdown(); });
- Deferred<StatusWith<RemoteCommandResponse>> deferred;
+ Deferred<RemoteCommandResponse> deferred;
RemoteCommandRequest request{
fixture.getServers()[0], "admin", BSON("ping" << 1), BSONObj(), nullptr};
- net.startCommand(makeCallbackHandle(), request, [&](StatusWith<RemoteCommandResponse> resp) {
+ net.startCommand(makeCallbackHandle(), request, [&](RemoteCommandResponse resp) {
deferred.emplace(std::move(resp));
});
diff --git a/src/mongo/executor/downconvert_find_and_getmore_commands.cpp b/src/mongo/executor/downconvert_find_and_getmore_commands.cpp
index df3492d15f1..50a59c4d81f 100644
--- a/src/mongo/executor/downconvert_find_and_getmore_commands.cpp
+++ b/src/mongo/executor/downconvert_find_and_getmore_commands.cpp
@@ -208,12 +208,12 @@ StatusWith<Message> downconvertFindCommandRequest(const RemoteCommandRequest& re
return {std::move(message)};
}
-StatusWith<RemoteCommandResponse> upconvertLegacyQueryResponse(std::int32_t requestId,
- StringData cursorNamespace,
- const Message& response) {
+RemoteCommandResponse upconvertLegacyQueryResponse(std::int32_t requestId,
+ StringData cursorNamespace,
+ const Message& response) {
auto swBatch = getBatchFromReply(requestId, response);
if (!swBatch.isOK()) {
- return swBatch.getStatus();
+ return {swBatch.getStatus()};
}
BSONArray batch;
@@ -252,12 +252,12 @@ StatusWith<Message> downconvertGetMoreCommandRequest(const RemoteCommandRequest&
return {std::move(m)};
}
-StatusWith<RemoteCommandResponse> upconvertLegacyGetMoreResponse(std::int32_t requestId,
- StringData cursorNamespace,
- const Message& response) {
+RemoteCommandResponse upconvertLegacyGetMoreResponse(std::int32_t requestId,
+ StringData cursorNamespace,
+ const Message& response) {
auto swBatch = getBatchFromReply(requestId, response);
if (!swBatch.isOK()) {
- return swBatch.getStatus();
+ return {swBatch.getStatus()};
}
BSONArray batch;
diff --git a/src/mongo/executor/downconvert_find_and_getmore_commands.h b/src/mongo/executor/downconvert_find_and_getmore_commands.h
index d3f24f7331d..dd991358e78 100644
--- a/src/mongo/executor/downconvert_find_and_getmore_commands.h
+++ b/src/mongo/executor/downconvert_find_and_getmore_commands.h
@@ -56,9 +56,9 @@ StatusWith<Message> downconvertFindCommandRequest(const RemoteCommandRequest& re
* find command response. The 'requestId' parameter is the messageId of the original OP_QUERY, and
* the 'cursorNamespace' is the full namespace of the collection the query ran on.
*/
-StatusWith<RemoteCommandResponse> upconvertLegacyQueryResponse(std::int32_t requestId,
- StringData cursorNamespace,
- const Message& response);
+RemoteCommandResponse upconvertLegacyQueryResponse(std::int32_t requestId,
+ StringData cursorNamespace,
+ const Message& response);
/**
* Downconverts a getMore command request to the legacy OP_GET_MORE format. The returned message
@@ -73,9 +73,9 @@ StatusWith<Message> downconvertGetMoreCommandRequest(const RemoteCommandRequest&
* getMore command response. The 'requestId' parameter is the messageId of the original OP_GET_MORE,
* and the 'curesorNamespace' is the full namespace of the collection the original query ran on.
*/
-StatusWith<RemoteCommandResponse> upconvertLegacyGetMoreResponse(std::int32_t requestId,
- StringData cursorNamespace,
- const Message& response);
+RemoteCommandResponse upconvertLegacyGetMoreResponse(std::int32_t requestId,
+ StringData cursorNamespace,
+ const Message& response);
} // namespace mongo
} // namespace executor
diff --git a/src/mongo/executor/network_interface_asio.cpp b/src/mongo/executor/network_interface_asio.cpp
index 8617ce6e20c..e9732dbc125 100644
--- a/src/mongo/executor/network_interface_asio.cpp
+++ b/src/mongo/executor/network_interface_asio.cpp
@@ -276,9 +276,10 @@ Status NetworkInterfaceASIO::startCommand(const TaskExecutor::CallbackHandle& cb
wasPreviouslyCanceled = _inGetConnection.erase(cbHandle) == 0;
}
- onFinish(wasPreviouslyCanceled
- ? Status(ErrorCodes::CallbackCanceled, "Callback canceled")
- : swConn.getStatus());
+ Status status = wasPreviouslyCanceled
+ ? Status(ErrorCodes::CallbackCanceled, "Callback canceled")
+ : swConn.getStatus();
+ onFinish({status, now() - getConnectionStartTime});
signalWorkAvailable();
return;
}
@@ -295,7 +296,9 @@ Status NetworkInterfaceASIO::startCommand(const TaskExecutor::CallbackHandle& cb
if (eraseCount == 0) {
lk.unlock();
- onFinish({ErrorCodes::CallbackCanceled, "Callback canceled"});
+ onFinish({ErrorCodes::CallbackCanceled,
+ "Callback canceled",
+ now() - getConnectionStartTime});
// Though we were canceled, we know that the stream is fine, so indicate success.
conn->indicateSuccess();
@@ -342,7 +345,9 @@ Status NetworkInterfaceASIO::startCommand(const TaskExecutor::CallbackHandle& cb
std::stringstream msg;
msg << "Remote command timed out while waiting to get a connection from the "
<< "pool, took " << getConnectionDuration;
- return _completeOperation(op, {ErrorCodes::ExceededTimeLimit, msg.str()});
+ auto rs = ResponseStatus(
+ ErrorCodes::ExceededTimeLimit, msg.str(), getConnectionDuration);
+ return _completeOperation(op, rs);
}
// The above conditional guarantees that the adjusted timeout will never underflow.
diff --git a/src/mongo/executor/network_interface_asio.h b/src/mongo/executor/network_interface_asio.h
index 6e80c436abc..b0a8d946b1e 100644
--- a/src/mongo/executor/network_interface_asio.h
+++ b/src/mongo/executor/network_interface_asio.h
@@ -95,6 +95,7 @@ class NetworkInterfaceASIO final : public NetworkInterface {
friend class connection_pool_asio::ASIOConnection;
friend class connection_pool_asio::ASIOTimer;
friend class connection_pool_asio::ASIOImpl;
+ class AsyncOp;
public:
struct Options {
@@ -207,7 +208,8 @@ private:
Message& toRecv();
MSGHEADER::Value& header();
- ResponseStatus response(rpc::Protocol protocol,
+ ResponseStatus response(AsyncOp* op,
+ rpc::Protocol protocol,
Date_t now,
rpc::EgressMetadataHook* metadataHook = nullptr);
@@ -314,6 +316,9 @@ private:
void setOperationProtocol(rpc::Protocol proto);
+ void setResponseMetadata(BSONObj m);
+ BSONObj getResponseMetadata();
+
void reset();
void clearStateTransitions();
@@ -416,6 +421,8 @@ private:
* Must be holding the access control's lock to edit.
*/
std::array<State, kMaxStateTransitions> _states;
+
+ BSONObj _responseMetadata{};
};
void _startCommand(AsyncOp* op);
@@ -430,16 +437,17 @@ private:
*/
template <typename Handler>
void _validateAndRun(AsyncOp* op, std::error_code ec, Handler&& handler) {
- if (op->canceled())
- return _completeOperation(op,
- Status(ErrorCodes::CallbackCanceled, "Callback canceled"));
- if (op->timedOut()) {
+ if (op->canceled()) {
+ auto rs = ResponseStatus(
+ ErrorCodes::CallbackCanceled, "Callback canceled", now() - op->start());
+ return _completeOperation(op, rs);
+ } else if (op->timedOut()) {
str::stream msg;
msg << "Operation timed out"
<< ", request was " << op->_request.toString();
- return _completeOperation(op, Status(ErrorCodes::ExceededTimeLimit, msg));
- }
- if (ec)
+ auto rs = ResponseStatus(ErrorCodes::ExceededTimeLimit, msg, now() - op->start());
+ return _completeOperation(op, rs);
+ } else if (ec)
return _networkErrorCallback(op, ec);
handler();
@@ -459,7 +467,7 @@ private:
void _beginCommunication(AsyncOp* op);
void _completedOpCallback(AsyncOp* op);
void _networkErrorCallback(AsyncOp* op, const std::error_code& ec);
- void _completeOperation(AsyncOp* op, const TaskExecutor::ResponseStatus& resp);
+ void _completeOperation(AsyncOp* op, TaskExecutor::ResponseStatus resp);
void _signalWorkAvailable_inlock();
diff --git a/src/mongo/executor/network_interface_asio_auth.cpp b/src/mongo/executor/network_interface_asio_auth.cpp
index 9bd83daca84..5f5fe523f9c 100644
--- a/src/mongo/executor/network_interface_asio_auth.cpp
+++ b/src/mongo/executor/network_interface_asio_auth.cpp
@@ -85,12 +85,12 @@ void NetworkInterfaceASIO::_runIsMaster(AsyncOp* op) {
// Callback to parse protocol information out of received ismaster response
auto parseIsMaster = [this, op]() {
- auto swCommandReply = op->command()->response(rpc::Protocol::kOpQuery, now());
+ auto swCommandReply = op->command()->response(op, rpc::Protocol::kOpQuery, now());
if (!swCommandReply.isOK()) {
- return _completeOperation(op, swCommandReply.getStatus());
+ return _completeOperation(op, swCommandReply);
}
- auto commandReply = std::move(swCommandReply.getValue());
+ auto commandReply = std::move(swCommandReply);
// Ensure that the isMaster response is "ok:1".
auto commandStatus = getStatusFromCommandResult(commandReply.data);
@@ -178,7 +178,8 @@ void NetworkInterfaceASIO::_authenticate(AsyncOp* op) {
}
auto callAuthCompletionHandler = [this, op, handler]() {
- auto authResponse = op->command()->response(op->operationProtocol(), now(), nullptr);
+ auto authResponse =
+ op->command()->response(op, op->operationProtocol(), now(), nullptr);
handler(authResponse);
};
diff --git a/src/mongo/executor/network_interface_asio_command.cpp b/src/mongo/executor/network_interface_asio_command.cpp
index c0aa346fd6e..0ce9a1505de 100644
--- a/src/mongo/executor/network_interface_asio_command.cpp
+++ b/src/mongo/executor/network_interface_asio_command.cpp
@@ -142,15 +142,16 @@ ResponseStatus decodeRPC(Message* received,
if (reply->getProtocol() != protocol) {
auto requestProtocol = rpc::toString(static_cast<rpc::ProtocolSet>(protocol));
if (!requestProtocol.isOK())
- return requestProtocol.getStatus();
-
- return Status(ErrorCodes::RPCProtocolNegotiationFailed,
- str::stream() << "Mismatched RPC protocols - request was '"
- << requestProtocol.getValue().toString()
- << "' '"
- << " but reply was '"
- << networkOpToString(received->operation())
- << "'");
+ return {requestProtocol.getStatus(), elapsed};
+
+ return {ErrorCodes::RPCProtocolNegotiationFailed,
+ str::stream() << "Mismatched RPC protocols - request was '"
+ << requestProtocol.getValue().toString()
+ << "' '"
+ << " but reply was '"
+ << networkOpToString(received->operation())
+ << "'",
+ elapsed};
}
auto commandReply = reply->getCommandReply();
auto replyMetadata = reply->getMetadata();
@@ -160,14 +161,14 @@ ResponseStatus decodeRPC(Message* received,
auto listenStatus = callNoexcept(
*metadataHook, &rpc::EgressMetadataHook::readReplyMetadata, source, replyMetadata);
if (!listenStatus.isOK()) {
- return listenStatus;
+ return {listenStatus, elapsed};
}
}
return {RemoteCommandResponse(
std::move(*received), std::move(commandReply), std::move(replyMetadata), elapsed)};
} catch (...) {
- return exceptionToStatus();
+ return {exceptionToStatus(), elapsed};
}
}
@@ -198,13 +199,17 @@ MSGHEADER::Value& NetworkInterfaceASIO::AsyncCommand::header() {
return _header;
}
-ResponseStatus NetworkInterfaceASIO::AsyncCommand::response(rpc::Protocol protocol,
+ResponseStatus NetworkInterfaceASIO::AsyncCommand::response(AsyncOp* op,
+ rpc::Protocol protocol,
Date_t now,
rpc::EgressMetadataHook* metadataHook) {
auto& received = _toRecv;
switch (_type) {
case CommandType::kRPC: {
- return decodeRPC(&received, protocol, now - _start, _target, metadataHook);
+ auto rs = decodeRPC(&received, protocol, now - _start, _target, metadataHook);
+ if (rs.isOK())
+ op->setResponseMetadata(rs.metadata);
+ return rs;
}
case CommandType::kDownConvertedFind: {
auto ns = DbMessage(_toSend).getns();
@@ -257,30 +262,33 @@ void NetworkInterfaceASIO::_beginCommunication(AsyncOp* op) {
}
void NetworkInterfaceASIO::_completedOpCallback(AsyncOp* op) {
- auto response = op->command()->response(op->operationProtocol(), now(), _metadataHook.get());
+ auto response =
+ op->command()->response(op, op->operationProtocol(), now(), _metadataHook.get());
_completeOperation(op, response);
}
void NetworkInterfaceASIO::_networkErrorCallback(AsyncOp* op, const std::error_code& ec) {
- if (ec.category() == mongoErrorCategory()) {
- // If we get a Mongo error code, we can preserve it.
- _completeOperation(op, Status(ErrorCodes::fromInt(ec.value()), ec.message()));
- } else {
- // If we get an asio or system error, we just convert it to a network error.
- _completeOperation(op, Status(ErrorCodes::HostUnreachable, ec.message()));
- }
+ ErrorCodes::Error errorCode = (ec.category() == mongoErrorCategory())
+ ? ErrorCodes::fromInt(ec.value())
+ : ErrorCodes::HostUnreachable;
+ _completeOperation(op, {errorCode, ec.message(), Milliseconds(now() - op->_start)});
}
// NOTE: This method may only be called by ASIO threads
// (do not call from methods entered by TaskExecutor threads)
-void NetworkInterfaceASIO::_completeOperation(AsyncOp* op, const ResponseStatus& resp) {
+void NetworkInterfaceASIO::_completeOperation(AsyncOp* op, ResponseStatus resp) {
+ auto metadata = op->getResponseMetadata();
+ if (!metadata.isEmpty()) {
+ resp.metadata = metadata;
+ }
+
// Cancel this operation's timeout. Note that the timeout callback may already be running,
// may have run, or may have already been scheduled to run in the near future.
if (op->_timeoutAlarm) {
op->_timeoutAlarm->cancel();
}
- if (resp.getStatus().code() == ErrorCodes::ExceededTimeLimit) {
+ if (resp.status.code() == ErrorCodes::ExceededTimeLimit) {
_numTimedOutOps.fetchAndAdd(1);
}
@@ -289,7 +297,7 @@ void NetworkInterfaceASIO::_completeOperation(AsyncOp* op, const ResponseStatus&
MONGO_ASIO_INVARIANT(!resp.isOK(), "Failed to connect in setup", op);
// If we fail during connection, we won't be able to access any of op's members after
// calling finish(), so we return here.
- log() << "Failed to connect to " << op->request().target << " - " << resp.getStatus();
+ log() << "Failed to connect to " << op->request().target << " - " << resp.status;
_numFailedOps.fetchAndAdd(1);
op->finish(resp);
return;
@@ -301,7 +309,7 @@ void NetworkInterfaceASIO::_completeOperation(AsyncOp* op, const ResponseStatus&
// If we fail during heartbeating, we won't be able to access any of op's members after
// calling finish(), so we return here.
log() << "Failed asio heartbeat to " << op->request().target << " - "
- << redact(resp.getStatus());
+ << redact(resp.status);
_numFailedOps.fetchAndAdd(1);
op->finish(resp);
return;
@@ -312,9 +320,9 @@ void NetworkInterfaceASIO::_completeOperation(AsyncOp* op, const ResponseStatus&
// that
// we got from the pool to execute a command, but it failed for some reason.
LOG(2) << "Failed to execute command: " << redact(op->request().toString())
- << " reason: " << redact(resp.getStatus());
+ << " reason: " << redact(resp.status);
- if (resp.getStatus().code() != ErrorCodes::CallbackCanceled) {
+ if (resp.status.code() != ErrorCodes::CallbackCanceled) {
_numFailedOps.fetchAndAdd(1);
}
} else {
@@ -357,7 +365,7 @@ void NetworkInterfaceASIO::_completeOperation(AsyncOp* op, const ResponseStatus&
asioConn->bindAsyncOp(std::move(ownedOp));
if (!resp.isOK()) {
- asioConn->indicateFailure(resp.getStatus());
+ asioConn->indicateFailure(resp.status);
} else {
asioConn->indicateUsed();
asioConn->indicateSuccess();
@@ -451,16 +459,14 @@ void NetworkInterfaceASIO::_runConnectionHook(AsyncOp* op) {
auto finishHook = [this, op]() {
auto response =
- op->command()->response(op->operationProtocol(), now(), _metadataHook.get());
+ op->command()->response(op, op->operationProtocol(), now(), _metadataHook.get());
if (!response.isOK()) {
- return _completeOperation(op, response.getStatus());
+ return _completeOperation(op, response);
}
- auto handleStatus = callNoexcept(*_hook,
- &NetworkConnectionHook::handleReply,
- op->request().target,
- std::move(response.getValue()));
+ auto handleStatus = callNoexcept(
+ *_hook, &NetworkConnectionHook::handleReply, op->request().target, std::move(response));
if (!handleStatus.isOK()) {
return _completeOperation(op, handleStatus);
diff --git a/src/mongo/executor/network_interface_asio_integration_test.cpp b/src/mongo/executor/network_interface_asio_integration_test.cpp
index 8b39dd5c992..b9d2ebb375e 100644
--- a/src/mongo/executor/network_interface_asio_integration_test.cpp
+++ b/src/mongo/executor/network_interface_asio_integration_test.cpp
@@ -87,23 +87,22 @@ public:
return _rng;
}
- Deferred<StatusWith<RemoteCommandResponse>> runCommand(
- const TaskExecutor::CallbackHandle& cbHandle, RemoteCommandRequest& request) {
- Deferred<StatusWith<RemoteCommandResponse>> deferred;
- net().startCommand(
- cbHandle, request, [deferred](StatusWith<RemoteCommandResponse> resp) mutable {
- deferred.emplace(std::move(resp));
- });
+ Deferred<RemoteCommandResponse> runCommand(const TaskExecutor::CallbackHandle& cbHandle,
+ RemoteCommandRequest& request) {
+ Deferred<RemoteCommandResponse> deferred;
+ net().startCommand(cbHandle, request, [deferred](RemoteCommandResponse resp) mutable {
+ deferred.emplace(std::move(resp));
+ });
return deferred;
}
- StatusWith<RemoteCommandResponse> runCommandSync(RemoteCommandRequest& request) {
+ RemoteCommandResponse runCommandSync(RemoteCommandRequest& request) {
auto deferred = runCommand(makeCallbackHandle(), request);
auto& res = deferred.get();
if (res.isOK()) {
- log() << "got command result: " << res.getValue().toString();
+ log() << "got command result: " << res.toString();
} else {
- log() << "command failed: " << res.getStatus();
+ log() << "command failed: " << res.status;
}
return res;
}
@@ -113,7 +112,8 @@ public:
Milliseconds timeoutMillis = Milliseconds(-1)) {
RemoteCommandRequest request{
fixture().getServers()[0], db.toString(), cmd, BSONObj(), nullptr, timeoutMillis};
- auto res = unittest::assertGet(runCommandSync(request));
+ auto res = runCommandSync(request);
+ ASSERT_OK(res.status);
ASSERT_OK(getStatusFromCommandResult(res.data));
}
@@ -123,8 +123,8 @@ public:
ErrorCodes::Error reason) {
RemoteCommandRequest request{
fixture().getServers()[0], db.toString(), cmd, BSONObj(), nullptr, timeoutMillis};
- auto clientStatus = runCommandSync(request);
- ASSERT_TRUE(clientStatus == reason);
+ auto res = runCommandSync(request);
+ ASSERT_EQ(reason, res.status.code());
}
void assertCommandFailsOnServer(StringData db,
@@ -133,9 +133,10 @@ public:
ErrorCodes::Error reason) {
RemoteCommandRequest request{
fixture().getServers()[0], db.toString(), cmd, BSONObj(), nullptr, timeoutMillis};
- auto res = unittest::assertGet(runCommandSync(request));
+ auto res = runCommandSync(request);
+ ASSERT_OK(res.status);
auto serverStatus = getStatusFromCommandResult(res.data);
- ASSERT_TRUE(serverStatus == reason);
+ ASSERT_EQ(reason, serverStatus);
}
private:
@@ -185,10 +186,10 @@ public:
nullptr,
timeout};
auto out = fixture->runCommand(cb, request)
- .then(pool, [self](StatusWith<RemoteCommandResponse> resp) -> Status {
+ .then(pool, [self](RemoteCommandResponse resp) -> Status {
auto status = resp.isOK()
- ? getStatusFromCommandResult(resp.getValue().data)
- : resp.getStatus();
+ ? getStatusFromCommandResult(resp.data)
+ : resp.status;
return status == self._expected
? Status::OK()
diff --git a/src/mongo/executor/network_interface_asio_operation.cpp b/src/mongo/executor/network_interface_asio_operation.cpp
index a05dba95fc0..a359099e004 100644
--- a/src/mongo/executor/network_interface_asio_operation.cpp
+++ b/src/mongo/executor/network_interface_asio_operation.cpp
@@ -217,16 +217,15 @@ NetworkInterfaceASIO::AsyncCommand* NetworkInterfaceASIO::AsyncOp::command() {
return _command.get_ptr();
}
-void NetworkInterfaceASIO::AsyncOp::finish(const ResponseStatus& status) {
+void NetworkInterfaceASIO::AsyncOp::finish(const ResponseStatus& rs) {
// We never hold the access lock when we call finish from NetworkInterfaceASIO.
_transitionToState(AsyncOp::State::kFinished);
- LOG(2) << "Request " << _request.id << " finished with response: "
- << redact(status.getStatus().isOK() ? status.getValue().data.toString()
- : status.getStatus().toString());
+ LOG(2) << "Request " << _request.id
+ << " finished with response: " << redact(rs.isOK() ? rs.data.toString() : rs.status.toString());
// Calling the completion handler may invalidate state in this op, so do it last.
- _onFinish(status);
+ _onFinish(rs);
}
const RemoteCommandRequest& NetworkInterfaceASIO::AsyncOp::request() const {
@@ -253,6 +252,14 @@ void NetworkInterfaceASIO::AsyncOp::setOperationProtocol(rpc::Protocol proto) {
_operationProtocol = proto;
}
+void NetworkInterfaceASIO::AsyncOp::setResponseMetadata(BSONObj m) {
+ _responseMetadata = m;
+}
+
+BSONObj NetworkInterfaceASIO::AsyncOp::getResponseMetadata() {
+ return _responseMetadata;
+}
+
void NetworkInterfaceASIO::AsyncOp::reset() {
// We don't reset owner as it never changes
_cbHandle = {};
diff --git a/src/mongo/executor/network_interface_asio_test.cpp b/src/mongo/executor/network_interface_asio_test.cpp
index 1b52f951017..1c2f3a6dee3 100644
--- a/src/mongo/executor/network_interface_asio_test.cpp
+++ b/src/mongo/executor/network_interface_asio_test.cpp
@@ -52,6 +52,8 @@ namespace mongo {
namespace executor {
namespace {
+using ResponseStatus = TaskExecutor::ResponseStatus;
+
HostAndPort testHost{"localhost", 20000};
void initWireSpecMongoD() {
@@ -101,20 +103,18 @@ public:
}
}
- Deferred<StatusWith<RemoteCommandResponse>> startCommand(
+ Deferred<RemoteCommandResponse> startCommand(
const TaskExecutor::CallbackHandle& cbHandle, RemoteCommandRequest& request) {
- Deferred<StatusWith<RemoteCommandResponse>> deferredResponse;
+ Deferred<RemoteCommandResponse> deferredResponse;
ASSERT_OK(net().startCommand(
- cbHandle,
- request,
- [deferredResponse](StatusWith<RemoteCommandResponse> response) mutable {
+ cbHandle, request, [deferredResponse](ResponseStatus response) mutable {
deferredResponse.emplace(std::move(response));
}));
return deferredResponse;
}
// Helper to run startCommand and wait for it
- StatusWith<RemoteCommandResponse> startCommandSync(RemoteCommandRequest& request) {
+ RemoteCommandResponse startCommandSync(RemoteCommandRequest& request) {
auto deferred = startCommand(makeCallbackHandle(), request);
// wait for the operation to complete
@@ -180,7 +180,8 @@ TEST_F(NetworkInterfaceASIOTest, CancelOperation) {
// Wait for op to complete, assert that it was canceled.
auto& result = deferred.get();
- ASSERT(result == ErrorCodes::CallbackCanceled);
+ ASSERT(result.status == ErrorCodes::CallbackCanceled);
+ ASSERT(result.elapsedMillis);
assertNumOps(1u, 0u, 0u, 0u);
}
@@ -202,7 +203,8 @@ TEST_F(NetworkInterfaceASIOTest, ImmediateCancel) {
});
auto& result = deferred.get();
- ASSERT(result == ErrorCodes::CallbackCanceled);
+ ASSERT(result.status == ErrorCodes::CallbackCanceled);
+ ASSERT(result.elapsedMillis);
// expect 0 completed ops because the op was canceled before getting a connection
assertNumOps(1u, 0u, 0u, 0u);
}
@@ -230,9 +232,11 @@ TEST_F(NetworkInterfaceASIOTest, LateCancel) {
});
// Allow to complete, then cancel, nothing should happen.
- deferred.get();
+ auto& result = deferred.get();
net().cancelCommand(cbh);
+ ASSERT(result.isOK());
+ ASSERT(result.elapsedMillis);
assertNumOps(0u, 0u, 0u, 1u);
}
@@ -262,7 +266,8 @@ TEST_F(NetworkInterfaceASIOTest, CancelWithNetworkError) {
// Wait for op to complete, assert that cancellation error had precedence.
auto& result = deferred.get();
- ASSERT(result == ErrorCodes::CallbackCanceled);
+ ASSERT(result.status == ErrorCodes::CallbackCanceled);
+ ASSERT(result.elapsedMillis);
assertNumOps(1u, 0u, 0u, 0u);
}
@@ -290,7 +295,8 @@ TEST_F(NetworkInterfaceASIOTest, CancelWithTimeout) {
// Wait for op to complete, assert that cancellation error had precedence.
auto& result = deferred.get();
- ASSERT(result == ErrorCodes::CallbackCanceled);
+ ASSERT(result.status == ErrorCodes::CallbackCanceled);
+ ASSERT(result.elapsedMillis);
assertNumOps(1u, 0u, 0u, 0u);
}
@@ -319,7 +325,8 @@ TEST_F(NetworkInterfaceASIOTest, TimeoutWithNetworkError) {
// Wait for op to complete, assert that timeout had precedence.
auto& result = deferred.get();
- ASSERT(result == ErrorCodes::ExceededTimeLimit);
+ ASSERT(result.status == ErrorCodes::ExceededTimeLimit);
+ ASSERT(result.elapsedMillis);
assertNumOps(0u, 1u, 1u, 0u);
}
@@ -349,7 +356,8 @@ TEST_F(NetworkInterfaceASIOTest, CancelWithTimeoutAndNetworkError) {
// Wait for op to complete, assert that the cancellation had precedence.
auto& result = deferred.get();
- ASSERT(result == ErrorCodes::CallbackCanceled);
+ ASSERT(result.status == ErrorCodes::CallbackCanceled);
+ ASSERT(result.elapsedMillis);
assertNumOps(1u, 0u, 0u, 0u);
}
@@ -386,7 +394,8 @@ TEST_F(NetworkInterfaceASIOTest, AsyncOpTimeout) {
}
auto& result = deferred.get();
- ASSERT(result == ErrorCodes::ExceededTimeLimit);
+ ASSERT(result.status == ErrorCodes::ExceededTimeLimit);
+ ASSERT(result.elapsedMillis);
assertNumOps(0u, 1u, 1u, 0u);
}
@@ -425,10 +434,10 @@ TEST_F(NetworkInterfaceASIOTest, StartCommand) {
});
auto& res = deferred.get();
-
- auto response = uassertStatusOK(res);
- ASSERT_EQ(response.data, expectedCommandReply);
- ASSERT_EQ(response.metadata, expectedMetadata);
+ ASSERT(res.elapsedMillis);
+ uassertStatusOK(res.status);
+ ASSERT_EQ(res.data, expectedCommandReply);
+ ASSERT_EQ(res.metadata, expectedMetadata);
assertNumOps(0u, 0u, 0u, 1u);
}
@@ -442,7 +451,7 @@ TEST_F(NetworkInterfaceASIOTest, StartCommandReturnsNotOKIfShutdownHasStarted) {
net().shutdown();
RemoteCommandRequest request;
ASSERT_NOT_OK(net().startCommand(
- makeCallbackHandle(), request, [&](StatusWith<RemoteCommandResponse> resp) {}));
+ makeCallbackHandle(), request, [&](RemoteCommandResponse resp) {}));
}
class MalformedMessageTest : public NetworkInterfaceASIOTest {
@@ -502,7 +511,8 @@ public:
}
auto& response = deferred.get();
- ASSERT(response == code);
+ ASSERT(response.status == code);
+ ASSERT(response.elapsedMillis);
assertNumOps(0u, 0u, 1u, 0u);
}
};
@@ -614,8 +624,8 @@ TEST_F(NetworkInterfaceASIOConnectionHookTest, InvalidIsMaster) {
// we should stop here.
auto& res = deferred.get();
-
- ASSERT(res == validationFailedStatus);
+ ASSERT(res.status == validationFailedStatus);
+ ASSERT(res.elapsedMillis);
assertNumOps(0u, 0u, 1u, 0u);
}
@@ -671,9 +681,8 @@ TEST_F(NetworkInterfaceASIOConnectionHookTest, ValidateHostInvalid) {
// we should stop here.
auto& res = deferred.get();
-
- // auto result = uassertStatusOK(res);
- ASSERT(res == validationFailedStatus);
+ ASSERT(res.status == validationFailedStatus);
+ ASSERT(res.elapsedMillis);
ASSERT(validateCalled);
ASSERT(hostCorrect);
ASSERT(isMasterReplyCorrect);
@@ -721,7 +730,8 @@ TEST_F(NetworkInterfaceASIOConnectionHookTest, MakeRequestReturnsError) {
// We should stop here.
auto& res = deferred.get();
- ASSERT(res == makeRequestError);
+ ASSERT(res.status == makeRequestError);
+ ASSERT(res.elapsedMillis);
ASSERT(makeRequestCalled);
ASSERT(!handleReplyCalled);
assertNumOps(0u, 0u, 1u, 0u);
@@ -782,8 +792,9 @@ TEST_F(NetworkInterfaceASIOConnectionHookTest, MakeRequestReturnsNone) {
auto& result = deferred.get();
ASSERT(result.isOK());
- ASSERT(result.getValue().data == commandReply);
- ASSERT(result.getValue().metadata == metadata);
+ ASSERT(result.data == commandReply);
+ ASSERT(result.elapsedMillis);
+ ASSERT(result.metadata == metadata);
assertNumOps(0u, 0u, 0u, 1u);
}
@@ -849,8 +860,8 @@ TEST_F(NetworkInterfaceASIOConnectionHookTest, HandleReplyReturnsError) {
});
auto& result = deferred.get();
-
- ASSERT(result == handleReplyError);
+ ASSERT(result.status == handleReplyError);
+ ASSERT(result.elapsedMillis);
ASSERT(makeRequestCalled);
ASSERT(handleReplyCalled);
ASSERT(handleReplyArgumentCorrect);
@@ -954,7 +965,8 @@ TEST_F(NetworkInterfaceASIOMetadataTest, Metadata) {
return response;
});
- deferred.get();
+ auto& res = deferred.get();
+ ASSERT(res.elapsedMillis);
ASSERT(wroteRequestMetadata);
ASSERT(gotReplyMetadata);
assertNumOps(0u, 0u, 0u, 1u);
diff --git a/src/mongo/executor/network_interface_mock.cpp b/src/mongo/executor/network_interface_mock.cpp
index 2ebd1244e47..2c84a34071a 100644
--- a/src/mongo/executor/network_interface_mock.cpp
+++ b/src/mongo/executor/network_interface_mock.cpp
@@ -149,8 +149,8 @@ void NetworkInterfaceMock::cancelCommand(const CallbackHandle& cbHandle) {
invariant(!inShutdown());
stdx::lock_guard<stdx::mutex> lk(_mutex);
- ResponseStatus response(ErrorCodes::CallbackCanceled, "Network operation canceled");
- _cancelCommand_inlock(cbHandle, response);
+ ResponseStatus rs(ErrorCodes::CallbackCanceled, "Network operation canceled", Milliseconds(0));
+ _cancelCommand_inlock(cbHandle, rs);
}
@@ -216,7 +216,7 @@ void NetworkInterfaceMock::shutdown() {
lk.unlock();
for (NetworkOperationIterator iter = todo.begin(); iter != todo.end(); ++iter) {
iter->setResponse(
- now, ResponseStatus(ErrorCodes::ShutdownInProgress, "Shutting down mock network"));
+ now, {ErrorCodes::ShutdownInProgress, "Shutting down mock network", Milliseconds(0)});
iter->finishResponse();
}
lk.lock();
@@ -298,7 +298,7 @@ void NetworkInterfaceMock::scheduleResponse(NetworkOperationIterator noi,
// If no RemoteCommandResponse was returned (for example, on a simulated network error), then
// do not attempt to run the metadata hook, since there is no returned metadata.
if (_metadataHook && response.isOK()) {
- _metadataHook->readReplyMetadata(noi->getRequest().target, response.getValue().metadata);
+ _metadataHook->readReplyMetadata(noi->getRequest().target, response.metadata);
}
noi->setResponse(when, response);
@@ -307,7 +307,7 @@ void NetworkInterfaceMock::scheduleResponse(NetworkOperationIterator noi,
RemoteCommandRequest NetworkInterfaceMock::scheduleSuccessfulResponse(const BSONObj& response) {
BSONObj metadata;
- return scheduleSuccessfulResponse(RemoteCommandResponse(response, metadata));
+ return scheduleSuccessfulResponse(RemoteCommandResponse(response, metadata, Milliseconds(0)));
}
RemoteCommandRequest NetworkInterfaceMock::scheduleSuccessfulResponse(
@@ -330,6 +330,12 @@ RemoteCommandRequest NetworkInterfaceMock::scheduleErrorResponse(const Status& r
return scheduleErrorResponse(getNextReadyRequest(), response);
}
+RemoteCommandRequest NetworkInterfaceMock::scheduleErrorResponse(const ResponseStatus response) {
+ auto noi = getNextReadyRequest();
+ scheduleResponse(noi, now(), response);
+ return noi->getRequest();
+}
+
RemoteCommandRequest NetworkInterfaceMock::scheduleErrorResponse(NetworkOperationIterator noi,
const Status& response) {
return scheduleErrorResponse(noi, now(), response);
@@ -426,9 +432,9 @@ void NetworkInterfaceMock::_enqueueOperation_inlock(
if (op.getRequest().timeout != RemoteCommandRequest::kNoTimeout) {
invariant(op.getRequest().timeout >= Milliseconds(0));
- ResponseStatus response(ErrorCodes::NetworkTimeout, "Network timeout");
+ ResponseStatus rs(ErrorCodes::NetworkTimeout, "Network timeout", Milliseconds(0));
auto action = stdx::bind(
- &NetworkInterfaceMock::_cancelCommand_inlock, this, op.getCallbackHandle(), response);
+ &NetworkInterfaceMock::_cancelCommand_inlock, this, op.getCallbackHandle(), rs);
_alarms.emplace(_now_inlock() + op.getRequest().timeout, action);
}
}
@@ -470,17 +476,15 @@ void NetworkInterfaceMock::_connectThenEnqueueOperation_inlock(const HostAndPort
}
// The completion handler for the postconnect command schedules the original command.
- auto postconnectCompletionHandler = [this,
- op](StatusWith<RemoteCommandResponse> response) mutable {
+ auto postconnectCompletionHandler = [this, op](ResponseStatus rs) mutable {
stdx::lock_guard<stdx::mutex> lk(_mutex);
- if (!response.isOK()) {
- op.setResponse(_now_inlock(), response.getStatus());
+ if (!rs.isOK()) {
+ op.setResponse(_now_inlock(), rs);
op.finishResponse();
return;
}
- auto handleStatus =
- _hook->handleReply(op.getRequest().target, std::move(response.getValue()));
+ auto handleStatus = _hook->handleReply(op.getRequest().target, std::move(rs));
if (!handleStatus.isOK()) {
op.setResponse(_now_inlock(), handleStatus);
@@ -585,8 +589,8 @@ bool NetworkInterfaceMock::_isExecutorThreadRunnable_inlock() {
return _waitingToRunMask & kExecutorThread;
}
-static const StatusWith<RemoteCommandResponse> kUnsetResponse(
- ErrorCodes::InternalError, "NetworkOperation::_response never set");
+static const ResponseStatus kUnsetResponse(ErrorCodes::InternalError,
+ "NetworkOperation::_response never set");
NetworkInterfaceMock::NetworkOperation::NetworkOperation()
: _requestDate(),
@@ -612,9 +616,8 @@ NetworkInterfaceMock::NetworkOperation::~NetworkOperation() {}
std::string NetworkInterfaceMock::NetworkOperation::getDiagnosticString() const {
return str::stream() << "NetworkOperation -- request:'" << _request.toString()
- << "', responseStatus: '" << _response.getStatus().toString()
- << "', responseBody: '"
- << (_response.getStatus().isOK() ? _response.getValue().toString() : "")
+ << "', responseStatus: '" << _response.status.toString()
+ << "', responseBody: '" << (_response.isOK() ? _response.toString() : "")
<< "', reqDate: " << _requestDate.toString()
<< ", nextConsiderDate: " << _nextConsiderationDate.toString()
<< ", respDate: " << _responseDate.toString();
diff --git a/src/mongo/executor/network_interface_mock.h b/src/mongo/executor/network_interface_mock.h
index df972ca17f6..de4a224ac91 100644
--- a/src/mongo/executor/network_interface_mock.h
+++ b/src/mongo/executor/network_interface_mock.h
@@ -49,6 +49,7 @@ class BSONObj;
namespace executor {
+using ResponseStatus = TaskExecutor::ResponseStatus;
class NetworkConnectionHook;
/**
@@ -173,7 +174,7 @@ public:
*/
void scheduleResponse(NetworkOperationIterator noi,
Date_t when,
- const TaskExecutor::ResponseStatus& response);
+ const ResponseStatus& response);
/**
* Schedules a successful "response" to "noi" at virtual time "when".
@@ -195,6 +196,7 @@ public:
* "when" defaults to now().
*/
RemoteCommandRequest scheduleErrorResponse(const Status& response);
+ RemoteCommandRequest scheduleErrorResponse(const ResponseStatus response);
RemoteCommandRequest scheduleErrorResponse(NetworkOperationIterator noi,
const Status& response);
RemoteCommandRequest scheduleErrorResponse(NetworkOperationIterator noi,
@@ -247,7 +249,7 @@ public:
* Cancel a command with specified response, e.g. NetworkTimeout or CallbackCanceled errors.
*/
void _cancelCommand_inlock(const TaskExecutor::CallbackHandle& cbHandle,
- const TaskExecutor::ResponseStatus& response);
+ const ResponseStatus& response);
private:
/**
@@ -412,7 +414,7 @@ public:
/**
* Sets the response and thet virtual time at which it will be delivered.
*/
- void setResponse(Date_t responseDate, const TaskExecutor::ResponseStatus& response);
+ void setResponse(Date_t responseDate, const ResponseStatus& response);
/**
* Predicate that returns true if cbHandle equals the executor's handle for this network
@@ -472,7 +474,7 @@ private:
Date_t _responseDate;
TaskExecutor::CallbackHandle _cbHandle;
RemoteCommandRequest _request;
- TaskExecutor::ResponseStatus _response;
+ ResponseStatus _response;
RemoteCommandCompletionFn _onFinish;
};
diff --git a/src/mongo/executor/network_interface_mock_test.cpp b/src/mongo/executor/network_interface_mock_test.cpp
index 74d5701a55f..e6907dc6769 100644
--- a/src/mongo/executor/network_interface_mock_test.cpp
+++ b/src/mongo/executor/network_interface_mock_test.cpp
@@ -166,14 +166,12 @@ TEST_F(NetworkInterfaceMockTest, ConnectionHook) {
BSONObj(),
Milliseconds(0)};
- ASSERT_OK(
- net().startCommand(cb, actualCommandExpected, [&](StatusWith<RemoteCommandResponse> resp) {
- commandFinished = true;
- if (resp.isOK()) {
- gotCorrectCommandReply =
- (actualResponseExpected.toString() == resp.getValue().toString());
- }
- }));
+ ASSERT_OK(net().startCommand(cb, actualCommandExpected, [&](RemoteCommandResponse resp) {
+ commandFinished = true;
+ if (resp.isOK()) {
+ gotCorrectCommandReply = (actualResponseExpected.toString() == resp.toString());
+ }
+ }));
// At this point validate and makeRequest should have been called.
ASSERT(validateCalled);
@@ -239,10 +237,10 @@ TEST_F(NetworkInterfaceMockTest, ConnectionHookFailedValidation) {
bool statusPropagated = false;
RemoteCommandRequest request;
- ASSERT_OK(net().startCommand(cb, request, [&](StatusWith<RemoteCommandResponse> resp) {
+ ASSERT_OK(net().startCommand(cb, request, [&](RemoteCommandResponse resp) {
commandFinished = true;
- statusPropagated = resp.getStatus().code() == ErrorCodes::ConflictingOperationInProgress;
+ statusPropagated = resp.status.code() == ErrorCodes::ConflictingOperationInProgress;
}));
{
@@ -280,7 +278,7 @@ TEST_F(NetworkInterfaceMockTest, ConnectionHookNoRequest) {
RemoteCommandRequest request;
ASSERT_OK(net().startCommand(
- cb, request, [&](StatusWith<RemoteCommandResponse> resp) { commandFinished = true; }));
+ cb, request, [&](RemoteCommandResponse resp) { commandFinished = true; }));
{
net().enterNetwork();
@@ -316,9 +314,9 @@ TEST_F(NetworkInterfaceMockTest, ConnectionHookMakeRequestFails) {
bool errorPropagated = false;
RemoteCommandRequest request;
- ASSERT_OK(net().startCommand(cb, request, [&](StatusWith<RemoteCommandResponse> resp) {
+ ASSERT_OK(net().startCommand(cb, request, [&](RemoteCommandResponse resp) {
commandFinished = true;
- errorPropagated = resp.getStatus().code() == ErrorCodes::InvalidSyncSource;
+ errorPropagated = resp.status.code() == ErrorCodes::InvalidSyncSource;
}));
{
@@ -353,9 +351,9 @@ TEST_F(NetworkInterfaceMockTest, ConnectionHookHandleReplyFails) {
bool errorPropagated = false;
RemoteCommandRequest request;
- ASSERT_OK(net().startCommand(cb, request, [&](StatusWith<RemoteCommandResponse> resp) {
+ ASSERT_OK(net().startCommand(cb, request, [&](RemoteCommandResponse resp) {
commandFinished = true;
- errorPropagated = resp.getStatus().code() == ErrorCodes::CappedPositionLost;
+ errorPropagated = resp.status.code() == ErrorCodes::CappedPositionLost;
}));
ASSERT(!handleReplyCalled);
@@ -387,7 +385,7 @@ TEST_F(NetworkInterfaceMockTest, StartCommandReturnsNotOKIfShutdownHasStarted) {
TaskExecutor::CallbackHandle cb{};
RemoteCommandRequest request;
- ASSERT_NOT_OK(net().startCommand(cb, request, [](StatusWith<RemoteCommandResponse> resp) {}));
+ ASSERT_NOT_OK(net().startCommand(cb, request, [](RemoteCommandResponse resp) {}));
}
TEST_F(NetworkInterfaceMockTest, SetAlarmReturnsNotOKIfShutdownHasStarted) {
@@ -404,9 +402,7 @@ TEST_F(NetworkInterfaceMockTest, CommandTimeout) {
request.timeout = Milliseconds(2000);
ErrorCodes::Error statusPropagated = ErrorCodes::OK;
- auto finishFn = [&](StatusWith<RemoteCommandResponse> resp) {
- statusPropagated = resp.getStatus().code();
- };
+ auto finishFn = [&](RemoteCommandResponse resp) { statusPropagated = resp.status.code(); };
//
// Command times out.
@@ -435,8 +431,7 @@ TEST_F(NetworkInterfaceMockTest, CommandTimeout) {
ASSERT_EQUALS(start + Milliseconds(1000), net().now());
ASSERT_NOT_EQUALS(ErrorCodes::OK, statusPropagated);
// Reply with a successful response.
- StatusWith<RemoteCommandResponse> responseStatus(RemoteCommandResponse{});
- net().scheduleResponse(noi, net().now(), responseStatus);
+ net().scheduleResponse(noi, net().now(), {});
net().runReadyNetworkOperations();
net().exitNetwork();
ASSERT_EQUALS(ErrorCodes::OK, statusPropagated);
diff --git a/src/mongo/executor/network_interface_perf_test.cpp b/src/mongo/executor/network_interface_perf_test.cpp
index 8cbd529d91e..abb94cf7014 100644
--- a/src/mongo/executor/network_interface_perf_test.cpp
+++ b/src/mongo/executor/network_interface_perf_test.cpp
@@ -75,8 +75,8 @@ int timeNetworkTestMillis(std::size_t operations, NetworkInterface* net) {
const auto bsonObjPing = BSON("ping" << 1);
- const auto callback = [&](StatusWith<RemoteCommandResponse> resp) {
- uassertStatusOK(resp);
+ const auto callback = [&](RemoteCommandResponse resp) {
+ uassertStatusOK(resp.status);
if (--remainingOps) {
return func();
}
diff --git a/src/mongo/executor/network_test_env.cpp b/src/mongo/executor/network_test_env.cpp
index fae46697d5e..2d5ec28ebc3 100644
--- a/src/mongo/executor/network_test_env.cpp
+++ b/src/mongo/executor/network_test_env.cpp
@@ -58,7 +58,8 @@ void NetworkTestEnv::onCommand(OnCommandFunction func) {
_mockNetwork->scheduleResponse(noi, _mockNetwork->now(), response);
} else {
- _mockNetwork->scheduleResponse(noi, _mockNetwork->now(), resultStatus.getStatus());
+ _mockNetwork->scheduleResponse(
+ noi, _mockNetwork->now(), {resultStatus.getStatus(), Milliseconds(0)});
}
_mockNetwork->runReadyNetworkOperations();
@@ -72,18 +73,18 @@ void NetworkTestEnv::onCommandWithMetadata(OnCommandWithMetadataFunction func) {
const RemoteCommandRequest& request = noi->getRequest();
const auto cmdResponseStatus = func(request);
- const auto cmdResponse = cmdResponseStatus.getValue();
BSONObjBuilder result;
if (cmdResponseStatus.isOK()) {
- result.appendElements(cmdResponse.data);
- Command::appendCommandStatus(result, cmdResponseStatus.getStatus());
- const RemoteCommandResponse response(result.obj(), cmdResponse.metadata, Milliseconds(1));
+ result.appendElements(cmdResponseStatus.data);
+ Command::appendCommandStatus(result, cmdResponseStatus.status);
+ const RemoteCommandResponse response(
+ result.obj(), cmdResponseStatus.metadata, Milliseconds(1));
_mockNetwork->scheduleResponse(noi, _mockNetwork->now(), response);
} else {
- _mockNetwork->scheduleResponse(noi, _mockNetwork->now(), cmdResponseStatus.getStatus());
+ _mockNetwork->scheduleResponse(noi, _mockNetwork->now(), cmdResponseStatus.status);
}
_mockNetwork->runReadyNetworkOperations();
@@ -113,30 +114,29 @@ void NetworkTestEnv::onFindCommand(OnFindCommandFunction func) {
}
void NetworkTestEnv::onFindWithMetadataCommand(OnFindCommandWithMetadataFunction func) {
- onCommandWithMetadata(
- [&func](const RemoteCommandRequest& request) -> StatusWith<RemoteCommandResponse> {
- const auto& resultStatus = func(request);
-
- if (!resultStatus.isOK()) {
- return resultStatus.getStatus();
- }
-
- std::vector<BSONObj> result;
- BSONObj metadata;
- std::tie(result, metadata) = resultStatus.getValue();
-
- BSONArrayBuilder arr;
- for (const auto& obj : result) {
- arr.append(obj);
- }
-
- const NamespaceString nss =
- NamespaceString(request.dbname, request.cmdObj.firstElement().String());
- BSONObjBuilder resultBuilder;
- appendCursorResponseObject(0LL, nss.toString(), arr.arr(), &resultBuilder);
-
- return RemoteCommandResponse(resultBuilder.obj(), metadata, Milliseconds(1));
- });
+ onCommandWithMetadata([&func](const RemoteCommandRequest& request) -> RemoteCommandResponse {
+ const auto& resultStatus = func(request);
+
+ if (!resultStatus.isOK()) {
+ return resultStatus.getStatus();
+ }
+
+ std::vector<BSONObj> result;
+ BSONObj metadata;
+ std::tie(result, metadata) = resultStatus.getValue();
+
+ BSONArrayBuilder arr;
+ for (const auto& obj : result) {
+ arr.append(obj);
+ }
+
+ const NamespaceString nss =
+ NamespaceString(request.dbname, request.cmdObj.firstElement().String());
+ BSONObjBuilder resultBuilder;
+ appendCursorResponseObject(0LL, nss.toString(), arr.arr(), &resultBuilder);
+
+ return RemoteCommandResponse(resultBuilder.obj(), metadata, Milliseconds(1));
+ });
}
} // namespace executor
diff --git a/src/mongo/executor/network_test_env.h b/src/mongo/executor/network_test_env.h
index 7e09a18f0fb..4f662f5723e 100644
--- a/src/mongo/executor/network_test_env.h
+++ b/src/mongo/executor/network_test_env.h
@@ -128,7 +128,7 @@ public:
using OnCommandFunction = stdx::function<StatusWith<BSONObj>(const RemoteCommandRequest&)>;
using OnCommandWithMetadataFunction =
- stdx::function<StatusWith<RemoteCommandResponse>(const RemoteCommandRequest&)>;
+ stdx::function<RemoteCommandResponse(const RemoteCommandRequest&)>;
using OnFindCommandFunction =
stdx::function<StatusWith<std::vector<BSONObj>>(const RemoteCommandRequest&)>;
diff --git a/src/mongo/executor/remote_command_response.cpp b/src/mongo/executor/remote_command_response.cpp
index ab67febd598..e611ea0bc2b 100644
--- a/src/mongo/executor/remote_command_response.cpp
+++ b/src/mongo/executor/remote_command_response.cpp
@@ -36,6 +36,49 @@
namespace mongo {
namespace executor {
+RemoteCommandResponse::RemoteCommandResponse(ErrorCodes::Error code, std::string reason)
+ : status(code, reason){};
+
+RemoteCommandResponse::RemoteCommandResponse(ErrorCodes::Error code,
+ std::string reason,
+ Milliseconds millis)
+ : elapsedMillis(millis), status(code, reason) {}
+
+RemoteCommandResponse::RemoteCommandResponse(Status s) : status(std::move(s)) {
+ invariant(!isOK());
+};
+
+RemoteCommandResponse::RemoteCommandResponse(Status s, Milliseconds millis)
+ : elapsedMillis(millis), status(std::move(s)) {
+ invariant(!isOK());
+};
+
+RemoteCommandResponse::RemoteCommandResponse(BSONObj dataObj,
+ BSONObj metadataObj,
+ Milliseconds millis)
+ : data(std::move(dataObj)), metadata(std::move(metadataObj)), elapsedMillis(millis) {
+ // The buffer backing the default empty BSONObj has static duration so it is effectively
+ // owned.
+ invariant(data.isOwned() || data.objdata() == BSONObj().objdata());
+ invariant(metadata.isOwned() || metadata.objdata() == BSONObj().objdata());
+};
+
+RemoteCommandResponse::RemoteCommandResponse(Message messageArg,
+ BSONObj dataObj,
+ BSONObj metadataObj,
+ Milliseconds millis)
+ : message(std::make_shared<const Message>(std::move(messageArg))),
+ data(std::move(dataObj)),
+ metadata(std::move(metadataObj)),
+ elapsedMillis(millis) {
+ if (!data.isOwned()) {
+ data.shareOwnershipWith(message->sharedBuffer());
+ }
+ if (!metadata.isOwned()) {
+ metadata.shareOwnershipWith(message->sharedBuffer());
+ }
+}
+
// TODO(amidvidy): we currently discard output docs when we use this constructor. We should
// have RCR hold those too, but we need more machinery before that is possible.
RemoteCommandResponse::RemoteCommandResponse(const rpc::ReplyInterface& rpcReply,
@@ -43,6 +86,10 @@ RemoteCommandResponse::RemoteCommandResponse(const rpc::ReplyInterface& rpcReply
: RemoteCommandResponse(rpcReply.getCommandReply(), rpcReply.getMetadata(), std::move(millis)) {
}
+bool RemoteCommandResponse::isOK() const {
+ return status.isOK();
+}
+
std::string RemoteCommandResponse::toString() const {
return str::stream() << "RemoteResponse -- "
<< " cmd:" << data.toString();
diff --git a/src/mongo/executor/remote_command_response.h b/src/mongo/executor/remote_command_response.h
index 3408fe01b6c..ebfc0434067 100644
--- a/src/mongo/executor/remote_command_response.h
+++ b/src/mongo/executor/remote_command_response.h
@@ -28,10 +28,12 @@
#pragma once
+#include <boost/optional.hpp>
#include <iosfwd>
#include <memory>
#include <string>
+#include "mongo/base/status.h"
#include "mongo/db/jsobj.h"
#include "mongo/util/net/message.h"
#include "mongo/util/time_support.h"
@@ -51,35 +53,25 @@ namespace executor {
struct RemoteCommandResponse {
RemoteCommandResponse() = default;
- RemoteCommandResponse(BSONObj dataObj, BSONObj metadataObj)
- : RemoteCommandResponse(dataObj, metadataObj, Milliseconds(0)) {}
+ RemoteCommandResponse(ErrorCodes::Error code, std::string reason);
- RemoteCommandResponse(BSONObj dataObj, BSONObj metadataObj, Milliseconds millis)
- : data(std::move(dataObj)), metadata(std::move(metadataObj)), elapsedMillis(millis) {
- // The buffer backing the default empty BSONObj has static duration so it is effectively
- // owned.
- invariant(data.isOwned() || data.objdata() == BSONObj().objdata());
- invariant(metadata.isOwned() || metadata.objdata() == BSONObj().objdata());
- }
+ RemoteCommandResponse(ErrorCodes::Error code, std::string reason, Milliseconds millis);
+
+ RemoteCommandResponse(Status s);
+
+ RemoteCommandResponse(Status s, Milliseconds millis);
+
+ RemoteCommandResponse(BSONObj dataObj, BSONObj metadataObj, Milliseconds millis);
RemoteCommandResponse(Message messageArg,
BSONObj dataObj,
BSONObj metadataObj,
- Milliseconds millis)
- : message(std::make_shared<const Message>(std::move(messageArg))),
- data(std::move(dataObj)),
- metadata(std::move(metadataObj)),
- elapsedMillis(millis) {
- if (!data.isOwned()) {
- data.shareOwnershipWith(message->sharedBuffer());
- }
- if (!metadata.isOwned()) {
- metadata.shareOwnershipWith(message->sharedBuffer());
- }
- }
+ Milliseconds millis);
RemoteCommandResponse(const rpc::ReplyInterface& rpcReply, Milliseconds millis);
+ bool isOK() const;
+
std::string toString() const;
bool operator==(const RemoteCommandResponse& rhs) const;
@@ -88,7 +80,8 @@ struct RemoteCommandResponse {
std::shared_ptr<const Message> message; // May be null.
BSONObj data; // Always owned. May point into message.
BSONObj metadata; // Always owned. May point into message.
- Milliseconds elapsedMillis = {};
+ boost::optional<Milliseconds> elapsedMillis;
+ Status status = Status::OK();
};
} // namespace executor
diff --git a/src/mongo/executor/task_executor.h b/src/mongo/executor/task_executor.h
index fd630e527b7..abf5d5d221a 100644
--- a/src/mongo/executor/task_executor.h
+++ b/src/mongo/executor/task_executor.h
@@ -82,7 +82,7 @@ public:
class EventState;
class EventHandle;
- using ResponseStatus = StatusWith<RemoteCommandResponse>;
+ using ResponseStatus = RemoteCommandResponse;
/**
* Type of a regular callback function.
@@ -404,12 +404,12 @@ struct TaskExecutor::RemoteCommandCallbackArgs {
RemoteCommandCallbackArgs(TaskExecutor* theExecutor,
const CallbackHandle& theHandle,
const RemoteCommandRequest& theRequest,
- const StatusWith<RemoteCommandResponse>& theResponse);
+ const ResponseStatus& theResponse);
TaskExecutor* executor;
CallbackHandle myHandle;
RemoteCommandRequest request;
- StatusWith<RemoteCommandResponse> response;
+ ResponseStatus response;
};
} // namespace executor
diff --git a/src/mongo/executor/task_executor_test_common.cpp b/src/mongo/executor/task_executor_test_common.cpp
index 3212e15a376..57c12813250 100644
--- a/src/mongo/executor/task_executor_test_common.cpp
+++ b/src/mongo/executor/task_executor_test_common.cpp
@@ -367,7 +367,7 @@ static void setStatusOnRemoteCommandCompletion(
<< getRequestDescription(expectedRequest));
return;
}
- *outStatus = cbData.response.getStatus();
+ *outStatus = cbData.response.status;
}
COMMON_EXECUTOR_TEST(ScheduleRemoteCommand) {
@@ -386,8 +386,7 @@ COMMON_EXECUTOR_TEST(ScheduleRemoteCommand) {
net->enterNetwork();
ASSERT(net->hasReadyRequests());
NetworkInterfaceMock::NetworkOperationIterator noi = net->getNextReadyRequest();
- net->scheduleResponse(
- noi, net->now(), TaskExecutor::ResponseStatus(ErrorCodes::NoSuchKey, "I'm missing"));
+ net->scheduleResponse(noi, net->now(), {ErrorCodes::NoSuchKey, "I'm missing"});
net->runReadyNetworkOperations();
ASSERT(!net->hasReadyRequests());
net->exitNetwork();
@@ -434,8 +433,7 @@ COMMON_EXECUTOR_TEST(RemoteCommandWithTimeout) {
ASSERT(net->hasReadyRequests());
const Date_t startTime = net->now();
NetworkInterfaceMock::NetworkOperationIterator noi = net->getNextReadyRequest();
- net->scheduleResponse(
- noi, startTime + Milliseconds(2), TaskExecutor::ResponseStatus(RemoteCommandResponse{}));
+ net->scheduleResponse(noi, startTime + Milliseconds(2), {});
net->runUntil(startTime + Milliseconds(2));
ASSERT_EQUALS(startTime + Milliseconds(2), net->now());
net->exitNetwork();
diff --git a/src/mongo/executor/thread_pool_task_executor.cpp b/src/mongo/executor/thread_pool_task_executor.cpp
index 1f60031ca5d..26606d8f0ee 100644
--- a/src/mongo/executor/thread_pool_task_executor.cpp
+++ b/src/mongo/executor/thread_pool_task_executor.cpp
@@ -282,35 +282,29 @@ StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::scheduleWorkAt(
namespace {
+using ResponseStatus = TaskExecutor::ResponseStatus;
+
// If the request received a connection from the pool but failed in its execution,
-// convert the raw Status in cbData to a StatusWith<RemoteCommandResponse> so that the callback,
-// which expects a StatusWith<RemoteCommandResponse> as part of RemoteCommandCallbackArgs,
+// convert the raw Status in cbData to a RemoteCommandResponse so that the callback,
+// which expects a RemoteCommandResponse as part of RemoteCommandCallbackArgs,
// can be run despite a RemoteCommandResponse never having been created.
void remoteCommandFinished(const TaskExecutor::CallbackArgs& cbData,
const TaskExecutor::RemoteCommandCallbackFn& cb,
const RemoteCommandRequest& request,
- const TaskExecutor::ResponseStatus& response) {
- using ResponseStatus = TaskExecutor::ResponseStatus;
- if (cbData.status.isOK()) {
- cb(TaskExecutor::RemoteCommandCallbackArgs(
- cbData.executor, cbData.myHandle, request, response));
- } else {
- cb(TaskExecutor::RemoteCommandCallbackArgs(
- cbData.executor, cbData.myHandle, request, ResponseStatus(cbData.status)));
- }
+ const ResponseStatus& rs) {
+ cb(TaskExecutor::RemoteCommandCallbackArgs(cbData.executor, cbData.myHandle, request, rs));
}
// If the request failed to receive a connection from the pool,
-// convert the raw Status in cbData to a StatusWith<RemoteCommandResponse> so that the callback,
-// which expects a StatusWith<RemoteCommandResponse> as part of RemoteCommandCallbackArgs,
+// convert the raw Status in cbData to a RemoteCommandResponse so that the callback,
+// which expects a RemoteCommandResponse as part of RemoteCommandCallbackArgs,
// can be run despite a RemoteCommandResponse never having been created.
void remoteCommandFailedEarly(const TaskExecutor::CallbackArgs& cbData,
const TaskExecutor::RemoteCommandCallbackFn& cb,
const RemoteCommandRequest& request) {
- using ResponseStatus = TaskExecutor::ResponseStatus;
invariant(!cbData.status.isOK());
cb(TaskExecutor::RemoteCommandCallbackArgs(
- cbData.executor, cbData.myHandle, request, ResponseStatus(cbData.status)));
+ cbData.executor, cbData.myHandle, request, {cbData.status}));
}
} // namespace
@@ -349,8 +343,7 @@ StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::scheduleRemoteC
return;
}
LOG(3) << "Received remote response: "
- << redact(response.isOK() ? response.getValue().toString()
- : response.getStatus().toString());
+ << redact(response.isOK() ? response.toString() : response.status.toString());
swap(cbState->callback, newCb);
scheduleIntoPool_inlock(&_networkInProgressQueue, cbState->iter, std::move(lk));
});
diff --git a/src/mongo/s/balancer/migration_manager.cpp b/src/mongo/s/balancer/migration_manager.cpp
index 473ba9ce439..8e1480cc1d3 100644
--- a/src/mongo/s/balancer/migration_manager.cpp
+++ b/src/mongo/s/balancer/migration_manager.cpp
@@ -203,9 +203,6 @@ void MigrationManager::_executeMigrations(OperationContext* txn,
RemoteCommandRequest remoteRequest(host.getValue(), "admin", moveChunkRequestObj, txn);
- StatusWith<RemoteCommandResponse> remoteCommandResponse(
- Status{ErrorCodes::InternalError, "Uninitialized value"});
-
executor::TaskExecutor* executor = Grid::get(txn)->getExecutorPool()->getFixedExecutor();
StatusWith<executor::TaskExecutor::CallbackHandle> callbackHandleWithStatus =
@@ -259,18 +256,15 @@ void MigrationManager::_checkMigrationCallback(
OperationContext* txn,
Migration* migration,
MigrationStatuses* migrationStatuses) {
- const auto& remoteCommandResponseWithStatus = callbackArgs.response;
+ const auto& remoteCommandResponse = callbackArgs.response;
- if (!remoteCommandResponseWithStatus.isOK()) {
+ if (!remoteCommandResponse.isOK()) {
stdx::lock_guard<stdx::mutex> lk(_mutex);
- migrationStatuses->insert(
- MigrationStatuses::value_type(migration->chunkInfo.migrateInfo.getName(),
- std::move(remoteCommandResponseWithStatus.getStatus())));
+ migrationStatuses->insert(MigrationStatuses::value_type(
+ migration->chunkInfo.migrateInfo.getName(), std::move(remoteCommandResponse.status)));
return;
}
- const auto& remoteCommandResponse = callbackArgs.response.getValue();
-
Status commandStatus = getStatusFromCommandResult(remoteCommandResponse.data);
if (commandStatus == ErrorCodes::LockBusy && !migration->oldShard) {
diff --git a/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp b/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp
index d8e507496a3..80bd6c81da8 100644
--- a/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp
+++ b/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp
@@ -268,7 +268,7 @@ StatusWith<Shard::CommandResponse> ShardingCatalogManagerImpl::_runCommandForAdd
executor::RemoteCommandRequest request(
host.getValue(), dbName, cmdObj, rpc::makeEmptyMetadata(), nullptr, Seconds(30));
- StatusWith<executor::RemoteCommandResponse> swResponse =
+ executor::RemoteCommandResponse swResponse =
Status(ErrorCodes::InternalError, "Internal error running command");
auto callStatus = _executorForAddShard->scheduleRemoteCommand(
@@ -283,14 +283,14 @@ StatusWith<Shard::CommandResponse> ShardingCatalogManagerImpl::_runCommandForAdd
_executorForAddShard->wait(callStatus.getValue());
if (!swResponse.isOK()) {
- if (swResponse.getStatus().compareCode(ErrorCodes::ExceededTimeLimit)) {
- LOG(0) << "Operation for addShard timed out with status " << swResponse.getStatus();
+ if (swResponse.status.compareCode(ErrorCodes::ExceededTimeLimit)) {
+ LOG(0) << "Operation for addShard timed out with status " << swResponse.status;
}
- return swResponse.getStatus();
+ return swResponse.status;
}
- BSONObj responseObj = swResponse.getValue().data.getOwned();
- BSONObj responseMetadata = swResponse.getValue().metadata.getOwned();
+ BSONObj responseObj = swResponse.data.getOwned();
+ BSONObj responseMetadata = swResponse.metadata.getOwned();
Status commandStatus = getStatusFromCommandResult(responseObj);
Status writeConcernStatus = getWriteConcernStatusFromCommandResult(responseObj);
@@ -1247,7 +1247,7 @@ void ShardingCatalogManagerImpl::_handleAddShardTaskResponse(
std::shared_ptr<RemoteCommandTargeter> targeter) {
stdx::unique_lock<stdx::mutex> lk(_addShardHandlesMutex);
- Status responseStatus = cbArgs.response.getStatus();
+ Status responseStatus = cbArgs.response.status;
if (responseStatus == ErrorCodes::CallbackCanceled) {
return;
}
@@ -1266,12 +1266,12 @@ void ShardingCatalogManagerImpl::_handleAddShardTaskResponse(
warning() << "Failed to upsert shardIdentity document during addShard into shard "
<< shardType.getName() << "(" << shardType.getHost()
<< "). The shardIdentity upsert will continue to be retried. "
- << causedBy(swResponse.getStatus());
+ << causedBy(swResponse.status);
rescheduleTask = true;
} else {
// Create a CommandResponse object in order to use processBatchWriteResponse.
- BSONObj responseObj = swResponse.getValue().data.getOwned();
- BSONObj responseMetadata = swResponse.getValue().metadata.getOwned();
+ BSONObj responseObj = swResponse.data.getOwned();
+ BSONObj responseMetadata = swResponse.metadata.getOwned();
Status commandStatus = getStatusFromCommandResult(responseObj);
Status writeConcernStatus = getWriteConcernStatusFromCommandResult(responseObj);
Shard::CommandResponse commandResponse(std::move(responseObj),
diff --git a/src/mongo/s/client/shard_remote.cpp b/src/mongo/s/client/shard_remote.cpp
index 098bbae7692..5fae69ea753 100644
--- a/src/mongo/s/client/shard_remote.cpp
+++ b/src/mongo/s/client/shard_remote.cpp
@@ -191,8 +191,7 @@ Shard::HostWithResponse ShardRemote::_runCommand(OperationContext* txn,
txn,
requestTimeout < Milliseconds::max() ? requestTimeout : RemoteCommandRequest::kNoTimeout);
- StatusWith<RemoteCommandResponse> swResponse =
- Status(ErrorCodes::InternalError, "Internal error running command");
+ RemoteCommandResponse swResponse = Status(ErrorCodes::InternalError, "Internal error running command");
TaskExecutor* executor = Grid::get(txn)->getExecutorPool()->getFixedExecutor();
auto callStatus = executor->scheduleRemoteCommand(
@@ -205,17 +204,17 @@ Shard::HostWithResponse ShardRemote::_runCommand(OperationContext* txn,
// Block until the command is carried out
executor->wait(callStatus.getValue());
- updateReplSetMonitor(host.getValue(), swResponse.getStatus());
+ updateReplSetMonitor(host.getValue(), swResponse.status);
if (!swResponse.isOK()) {
- if (swResponse.getStatus().compareCode(ErrorCodes::ExceededTimeLimit)) {
- LOG(0) << "Operation timed out with status " << redact(swResponse.getStatus());
+ if (swResponse.status.compareCode(ErrorCodes::ExceededTimeLimit)) {
+ LOG(0) << "Operation timed out with status " << redact(swResponse.status);
}
- return Shard::HostWithResponse(host.getValue(), swResponse.getStatus());
+ return Shard::HostWithResponse(host.getValue(), swResponse.status);
}
- BSONObj responseObj = swResponse.getValue().data.getOwned();
- BSONObj responseMetadata = swResponse.getValue().metadata.getOwned();
+ BSONObj responseObj = swResponse.data.getOwned();
+ BSONObj responseMetadata = swResponse.metadata.getOwned();
Status commandStatus = getStatusFromCommandResult(responseObj);
Status writeConcernStatus = getWriteConcernStatusFromCommandResult(responseObj);
diff --git a/src/mongo/s/query/async_results_merger.cpp b/src/mongo/s/query/async_results_merger.cpp
index 31d9337b10f..85cc8454dd4 100644
--- a/src/mongo/s/query/async_results_merger.cpp
+++ b/src/mongo/s/query/async_results_merger.cpp
@@ -403,7 +403,7 @@ void AsyncResultsMerger::handleBatchResponse(
// Make a best effort to parse the response and retrieve the cursor id. We need the cursor
// id in order to issue a killCursors command against it.
if (cbData.response.isOK()) {
- auto cursorResponse = parseCursorResponse(cbData.response.getValue().data, remote);
+ auto cursorResponse = parseCursorResponse(cbData.response.data, remote);
if (cursorResponse.isOK()) {
remote.cursorId = cursorResponse.getValue().getCursorId();
}
@@ -432,8 +432,8 @@ void AsyncResultsMerger::handleBatchResponse(
ScopeGuard signaller = MakeGuard(&AsyncResultsMerger::signalCurrentEventIfReady_inlock, this);
StatusWith<CursorResponse> cursorResponseStatus(
- cbData.response.isOK() ? parseCursorResponse(cbData.response.getValue().data, remote)
- : cbData.response.getStatus());
+ cbData.response.isOK() ? parseCursorResponse(cbData.response.data, remote)
+ : cbData.response.status);
if (!cursorResponseStatus.isOK()) {
// In the case a read is performed against a view, the shard primary can return an error
@@ -443,7 +443,7 @@ void AsyncResultsMerger::handleBatchResponse(
// collection.
if (cursorResponseStatus.getStatus() ==
ErrorCodes::CommandOnShardedViewNotSupportedOnMongod) {
- auto& responseObj = cbData.response.getValue().data;
+ auto& responseObj = cbData.response.data;
if (!responseObj.hasField("resolvedView")) {
remote.status = Status(ErrorCodes::InternalError,
str::stream() << "Missing field 'resolvedView' in document: "
diff --git a/src/mongo/s/query/async_results_merger_test.cpp b/src/mongo/s/query/async_results_merger_test.cpp
index a5179f16c68..e4d609cc88d 100644
--- a/src/mongo/s/query/async_results_merger_test.cpp
+++ b/src/mongo/s/query/async_results_merger_test.cpp
@@ -54,6 +54,8 @@ using executor::NetworkInterfaceMock;
using executor::RemoteCommandRequest;
using executor::RemoteCommandResponse;
+using ResponseStatus = executor::TaskExecutor::ResponseStatus;
+
const HostAndPort kTestConfigShardHost = HostAndPort("FakeConfigHost", 12345);
const std::vector<ShardId> kTestShardIds = {
ShardId("FakeShard1"), ShardId("FakeShard2"), ShardId("FakeShard3")};
@@ -197,12 +199,13 @@ protected:
return retRequest;
}
- void scheduleErrorResponse(Status status) {
- invariant(!status.isOK());
+ void scheduleErrorResponse(ResponseStatus rs) {
+ invariant(!rs.isOK());
+ rs.elapsedMillis = Milliseconds(0);
executor::NetworkInterfaceMock* net = network();
net->enterNetwork();
ASSERT_TRUE(net->hasReadyRequests());
- net->scheduleResponse(net->getNextReadyRequest(), net->now(), status);
+ net->scheduleResponse(net->getNextReadyRequest(), net->now(), rs);
net->runReadyNetworkOperations();
net->exitNetwork();
}