summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorMark Benvenuto <mark.benvenuto@mongodb.com>2016-07-26 15:23:10 -0400
committerMark Benvenuto <mark.benvenuto@mongodb.com>2016-07-26 15:23:51 -0400
commit1febb4ceb0e59743b0a49af35db10c4c689aa130 (patch)
tree431445a395fd4fbd1e7c94bf1c4151bba714088d /src/mongo
parent7daf57f28f564329e92b8779cf12845776b958b3 (diff)
downloadmongo-1febb4ceb0e59743b0a49af35db10c4c689aa130.tar.gz
SERVER-24615 Add support for OperationContext in EgressMetadataHook
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/client/authenticate_test.cpp2
-rw-r--r--src/mongo/client/dbclient.cpp4
-rw-r--r--src/mongo/client/dbclientcursor.cpp6
-rw-r--r--src/mongo/client/fetcher.cpp6
-rw-r--r--src/mongo/client/remote_command_retry_scheduler_test.cpp26
-rw-r--r--src/mongo/db/audit.cpp3
-rw-r--r--src/mongo/db/audit.h3
-rw-r--r--src/mongo/db/repl/check_quorum_for_config_change.cpp1
-rw-r--r--src/mongo/db/repl/databases_cloner.cpp3
-rw-r--r--src/mongo/db/repl/elect_cmd_runner.cpp1
-rw-r--r--src/mongo/db/repl/elect_cmd_runner_test.cpp1
-rw-r--r--src/mongo/db/repl/freshness_checker.cpp1
-rw-r--r--src/mongo/db/repl/freshness_checker_test.cpp1
-rw-r--r--src/mongo/db/repl/freshness_scanner.cpp2
-rw-r--r--src/mongo/db/repl/freshness_scanner_test.cpp1
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp4
-rw-r--r--src/mongo/db/repl/reporter.cpp2
-rw-r--r--src/mongo/db/repl/rollback_checker.cpp2
-rw-r--r--src/mongo/db/repl/scatter_gather_test.cpp2
-rw-r--r--src/mongo/db/repl/vote_requester.cpp2
-rw-r--r--src/mongo/db/repl/vote_requester_test.cpp1
-rw-r--r--src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp2
-rw-r--r--src/mongo/executor/async_mock_stream_factory.cpp2
-rw-r--r--src/mongo/executor/connection_pool_asio.cpp7
-rw-r--r--src/mongo/executor/connection_pool_asio_integration_test.cpp53
-rw-r--r--src/mongo/executor/network_interface.h4
-rw-r--r--src/mongo/executor/network_interface_asio.cpp35
-rw-r--r--src/mongo/executor/network_interface_asio.h5
-rw-r--r--src/mongo/executor/network_interface_asio_auth.cpp2
-rw-r--r--src/mongo/executor/network_interface_asio_command.cpp4
-rw-r--r--src/mongo/executor/network_interface_asio_integration_test.cpp30
-rw-r--r--src/mongo/executor/network_interface_asio_operation.cpp30
-rw-r--r--src/mongo/executor/network_interface_asio_test.cpp102
-rw-r--r--src/mongo/executor/network_interface_mock.cpp2
-rw-r--r--src/mongo/executor/network_interface_mock.h2
-rw-r--r--src/mongo/executor/network_interface_mock_test.cpp50
-rw-r--r--src/mongo/executor/network_interface_perf_test.cpp6
-rw-r--r--src/mongo/executor/remote_command_request.cpp4
-rw-r--r--src/mongo/executor/remote_command_request.h19
-rw-r--r--src/mongo/executor/task_executor_test_common.cpp11
-rw-r--r--src/mongo/rpc/metadata.h10
-rw-r--r--src/mongo/rpc/metadata/metadata_hook.h7
-rw-r--r--src/mongo/s/balancer/migration_manager.cpp2
-rw-r--r--src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp4
-rw-r--r--src/mongo/s/client/dbclient_multi_command.cpp5
-rw-r--r--src/mongo/s/client/shard_remote.cpp1
-rw-r--r--src/mongo/s/client/sharding_connection_hook.cpp8
-rw-r--r--src/mongo/s/query/async_results_merger.cpp14
-rw-r--r--src/mongo/s/query/async_results_merger.h7
-rw-r--r--src/mongo/s/query/cluster_client_cursor.h8
-rw-r--r--src/mongo/s/query/cluster_client_cursor_impl.cpp4
-rw-r--r--src/mongo/s/query/cluster_client_cursor_impl.h2
-rw-r--r--src/mongo/s/query/cluster_client_cursor_mock.cpp5
-rw-r--r--src/mongo/s/query/cluster_client_cursor_mock.h2
-rw-r--r--src/mongo/s/query/cluster_client_cursor_params.h4
-rw-r--r--src/mongo/s/query/cluster_cursor_manager.cpp13
-rw-r--r--src/mongo/s/query/cluster_cursor_manager.h13
-rw-r--r--src/mongo/s/query/cluster_cursor_manager_test.cpp61
-rw-r--r--src/mongo/s/query/cluster_find.cpp3
-rw-r--r--src/mongo/s/query/router_exec_stage.h9
-rw-r--r--src/mongo/s/query/router_stage_limit.cpp4
-rw-r--r--src/mongo/s/query/router_stage_limit.h2
-rw-r--r--src/mongo/s/query/router_stage_merge.cpp4
-rw-r--r--src/mongo/s/query/router_stage_merge.h2
-rw-r--r--src/mongo/s/query/router_stage_mock.cpp4
-rw-r--r--src/mongo/s/query/router_stage_mock.h2
-rw-r--r--src/mongo/s/query/router_stage_remove_sortkey.cpp4
-rw-r--r--src/mongo/s/query/router_stage_remove_sortkey.h2
-rw-r--r--src/mongo/s/query/router_stage_skip.cpp4
-rw-r--r--src/mongo/s/query/router_stage_skip.h2
-rw-r--r--src/mongo/s/sharding_egress_metadata_hook.cpp8
-rw-r--r--src/mongo/s/sharding_egress_metadata_hook.h5
-rw-r--r--src/mongo/s/sharding_initialization.cpp6
73 files changed, 433 insertions, 242 deletions
diff --git a/src/mongo/client/authenticate_test.cpp b/src/mongo/client/authenticate_test.cpp
index adca0ec582b..3044da9f6cd 100644
--- a/src/mongo/client/authenticate_test.cpp
+++ b/src/mongo/client/authenticate_test.cpp
@@ -107,7 +107,7 @@ public:
}
void pushRequest(StringData dbname, const BSONObj& cmd) {
- _requests.emplace(_mockHost, dbname.toString(), cmd);
+ _requests.emplace(_mockHost, dbname.toString(), cmd, nullptr);
}
BSONObj loadMongoCRConversation() {
diff --git a/src/mongo/client/dbclient.cpp b/src/mongo/client/dbclient.cpp
index 8a76a95de26..73b3d77fab7 100644
--- a/src/mongo/client/dbclient.cpp
+++ b/src/mongo/client/dbclient.cpp
@@ -44,6 +44,7 @@
#include "mongo/client/replica_set_monitor.h"
#include "mongo/config.h"
#include "mongo/db/auth/internal_user_auth.h"
+#include "mongo/db/client.h"
#include "mongo/db/commands.h"
#include "mongo/db/json.h"
#include "mongo/db/namespace_string.h"
@@ -182,7 +183,8 @@ rpc::UniqueReply DBClientWithCommands::runCommandWithMetadata(StringData databas
metadataBob.appendElements(metadata);
if (_metadataWriter) {
- uassertStatusOK(_metadataWriter(&metadataBob, host));
+ uassertStatusOK(_metadataWriter(
+ (haveClient() ? cc().getOperationContext() : nullptr), &metadataBob, host));
}
auto requestBuilder = rpc::makeRequestBuilder(getClientRPCProtocols(), getServerRPCProtocols());
diff --git a/src/mongo/client/dbclientcursor.cpp b/src/mongo/client/dbclientcursor.cpp
index b1c1faef87f..7cdbc35de7f 100644
--- a/src/mongo/client/dbclientcursor.cpp
+++ b/src/mongo/client/dbclientcursor.cpp
@@ -34,6 +34,7 @@
#include "mongo/client/dbclientcursor.h"
#include "mongo/client/connpool.h"
+#include "mongo/db/client.h"
#include "mongo/db/dbmessage.h"
#include "mongo/db/namespace_string.h"
#include "mongo/rpc/factory.h"
@@ -79,7 +80,10 @@ Message assembleCommandRequest(DBClientWithCommands* cli,
BSONObjBuilder metadataBob;
metadataBob.appendElements(upconvertedMetadata);
if (cli->getRequestMetadataWriter()) {
- uassertStatusOK(cli->getRequestMetadataWriter()(&metadataBob, cli->getServerAddress()));
+ uassertStatusOK(
+ cli->getRequestMetadataWriter()((haveClient() ? cc().getOperationContext() : nullptr),
+ &metadataBob,
+ cli->getServerAddress()));
}
requestBuilder->setDatabase(database);
diff --git a/src/mongo/client/fetcher.cpp b/src/mongo/client/fetcher.cpp
index c1d783858cc..f50a877e557 100644
--- a/src/mongo/client/fetcher.cpp
+++ b/src/mongo/client/fetcher.cpp
@@ -179,7 +179,7 @@ Fetcher::Fetcher(executor::TaskExecutor* executor,
_timeout(timeout),
_firstRemoteCommandScheduler(
_executor,
- RemoteCommandRequest(_source, _dbname, _cmdObj, _metadata, _timeout),
+ RemoteCommandRequest(_source, _dbname, _cmdObj, _metadata, nullptr, _timeout),
stdx::bind(&Fetcher::_callback, this, stdx::placeholders::_1, kFirstBatchFieldName),
std::move(firstCommandRetryPolicy)) {
uassert(ErrorCodes::BadValue, "callback function cannot be null", work);
@@ -280,7 +280,7 @@ Status Fetcher::_scheduleGetMore(const BSONObj& cmdObj) {
}
StatusWith<executor::TaskExecutor::CallbackHandle> scheduleResult =
_executor->scheduleRemoteCommand(
- RemoteCommandRequest(_source, _dbname, cmdObj, _metadata, _timeout),
+ RemoteCommandRequest(_source, _dbname, cmdObj, _metadata, nullptr, _timeout),
stdx::bind(&Fetcher::_callback, this, stdx::placeholders::_1, kNextBatchFieldName));
if (!scheduleResult.isOK()) {
@@ -389,7 +389,7 @@ void Fetcher::_sendKillCursors(const CursorId id, const NamespaceString& nss) {
};
auto cmdObj = BSON("killCursors" << nss.coll() << "cursors" << BSON_ARRAY(id));
auto scheduleResult = _executor->scheduleRemoteCommand(
- RemoteCommandRequest(_source, _dbname, cmdObj), logKillCursorsResult);
+ RemoteCommandRequest(_source, _dbname, cmdObj, nullptr), logKillCursorsResult);
if (!scheduleResult.isOK()) {
warning() << "failed to schedule killCursors command: " << scheduleResult.getStatus();
}
diff --git a/src/mongo/client/remote_command_retry_scheduler_test.cpp b/src/mongo/client/remote_command_retry_scheduler_test.cpp
index c5eb97a381a..6e2c70fe9dc 100644
--- a/src/mongo/client/remote_command_retry_scheduler_test.cpp
+++ b/src/mongo/client/remote_command_retry_scheduler_test.cpp
@@ -168,7 +168,10 @@ std::vector<StatusWith<executor::RemoteCommandResponse>> CallbackResponseSaver::
return _responses;
}
-const executor::RemoteCommandRequest request(HostAndPort("h1:12345"), "db1", BSON("ping" << 1));
+const executor::RemoteCommandRequest request(HostAndPort("h1:12345"),
+ "db1",
+ BSON("ping" << 1),
+ nullptr);
TEST_F(RemoteCommandRetrySchedulerTest, MakeSingleShotRetryPolicy) {
auto policy = RemoteCommandRetryScheduler::makeNoRetryPolicy();
@@ -217,7 +220,7 @@ TEST_F(RemoteCommandRetrySchedulerTest, InvalidConstruction) {
ASSERT_THROWS_CODE_AND_WHAT(
RemoteCommandRetryScheduler(
&getExecutor(),
- executor::RemoteCommandRequest(HostAndPort(), request.dbname, request.cmdObj),
+ executor::RemoteCommandRequest(HostAndPort(), request.dbname, request.cmdObj, nullptr),
callback,
makeRetryPolicy()),
UserException,
@@ -225,20 +228,21 @@ TEST_F(RemoteCommandRetrySchedulerTest, InvalidConstruction) {
"source in remote command request cannot be empty");
// Empty source in remote command request.
- ASSERT_THROWS_CODE_AND_WHAT(RemoteCommandRetryScheduler(&getExecutor(),
- executor::RemoteCommandRequest(
- request.target, "", request.cmdObj),
- callback,
- makeRetryPolicy()),
- UserException,
- ErrorCodes::BadValue,
- "database name in remote command request cannot be empty");
+ ASSERT_THROWS_CODE_AND_WHAT(
+ RemoteCommandRetryScheduler(
+ &getExecutor(),
+ executor::RemoteCommandRequest(request.target, "", request.cmdObj, nullptr),
+ callback,
+ makeRetryPolicy()),
+ UserException,
+ ErrorCodes::BadValue,
+ "database name in remote command request cannot be empty");
// Empty command object in remote command request.
ASSERT_THROWS_CODE_AND_WHAT(
RemoteCommandRetryScheduler(
&getExecutor(),
- executor::RemoteCommandRequest(request.target, request.dbname, BSONObj()),
+ executor::RemoteCommandRequest(request.target, request.dbname, BSONObj(), nullptr),
callback,
makeRetryPolicy()),
UserException,
diff --git a/src/mongo/db/audit.cpp b/src/mongo/db/audit.cpp
index f781b237f96..715e9023c15 100644
--- a/src/mongo/db/audit.cpp
+++ b/src/mongo/db/audit.cpp
@@ -178,7 +178,8 @@ void logAuthentication(ClientBasic* client,
const BSONObj& keyPattern,
bool unique) MONGO_AUDIT_STUB
- void writeImpersonatedUsersToMetadata(BSONObjBuilder* metadata) MONGO_AUDIT_STUB
+ void writeImpersonatedUsersToMetadata(OperationContext* txn,
+ BSONObjBuilder* metadata) MONGO_AUDIT_STUB
void parseAndRemoveImpersonatedUsersField(BSONObj cmdObj,
std::vector<UserName>* parsedUserNames,
diff --git a/src/mongo/db/audit.h b/src/mongo/db/audit.h
index 4dc187da2b1..54c07aa1d77 100644
--- a/src/mongo/db/audit.h
+++ b/src/mongo/db/audit.h
@@ -44,6 +44,7 @@ class BSONObj;
class ClientBasic;
class Command;
class NamespaceString;
+class OperationContext;
class ReplSetConfig;
class StringData;
class UserName;
@@ -304,7 +305,7 @@ void logShardCollection(ClientBasic* client, StringData ns, const BSONObj& keyPa
* to the provided metadata builder. The users and roles are extracted from the current client.
* They are to be the impersonated users and roles for a Command run by an internal user.
*/
-void writeImpersonatedUsersToMetadata(BSONObjBuilder* metadataBob);
+void writeImpersonatedUsersToMetadata(OperationContext* txn, BSONObjBuilder* metadataBob);
/*
* Looks for an 'impersonatedUsers' field. This field is used by mongos to
diff --git a/src/mongo/db/repl/check_quorum_for_config_change.cpp b/src/mongo/db/repl/check_quorum_for_config_change.cpp
index 583b3df37d8..bd3740ed410 100644
--- a/src/mongo/db/repl/check_quorum_for_config_change.cpp
+++ b/src/mongo/db/repl/check_quorum_for_config_change.cpp
@@ -103,6 +103,7 @@ std::vector<RemoteCommandRequest> QuorumChecker::getRequests() const {
"admin",
hbRequest,
BSON(rpc::kReplSetMetadataFieldName << 1),
+ nullptr,
_rsConfig->getHeartbeatTimeoutPeriodMillis()));
}
diff --git a/src/mongo/db/repl/databases_cloner.cpp b/src/mongo/db/repl/databases_cloner.cpp
index dd7e8e6996c..c0155d8fcb7 100644
--- a/src/mongo/db/repl/databases_cloner.cpp
+++ b/src/mongo/db/repl/databases_cloner.cpp
@@ -161,7 +161,8 @@ Status DatabasesCloner::startup() {
Request listDBsReq(_source,
"admin",
BSON("listDatabases" << true),
- rpc::ServerSelectionMetadata(true, boost::none).toBSON());
+ rpc::ServerSelectionMetadata(true, boost::none).toBSON(),
+ nullptr);
_listDBsScheduler = stdx::make_unique<RemoteCommandRetryScheduler>(
_exec,
listDBsReq,
diff --git a/src/mongo/db/repl/elect_cmd_runner.cpp b/src/mongo/db/repl/elect_cmd_runner.cpp
index dc2bb2e240d..afe28960c1f 100644
--- a/src/mongo/db/repl/elect_cmd_runner.cpp
+++ b/src/mongo/db/repl/elect_cmd_runner.cpp
@@ -80,6 +80,7 @@ std::vector<RemoteCommandRequest> ElectCmdRunner::Algorithm::getRequests() const
*it,
"admin",
replSetElectCmd,
+ nullptr,
Milliseconds(30 * 1000))); // trying to match current Socket timeout
}
diff --git a/src/mongo/db/repl/elect_cmd_runner_test.cpp b/src/mongo/db/repl/elect_cmd_runner_test.cpp
index c92f931cf34..593b1f10fc7 100644
--- a/src/mongo/db/repl/elect_cmd_runner_test.cpp
+++ b/src/mongo/db/repl/elect_cmd_runner_test.cpp
@@ -287,6 +287,7 @@ protected:
return RemoteCommandRequest(HostAndPort(hostname),
"", // the non-hostname fields do not matter for Elect
BSONObj(),
+ nullptr,
Milliseconds(0));
}
diff --git a/src/mongo/db/repl/freshness_checker.cpp b/src/mongo/db/repl/freshness_checker.cpp
index f5bd3963e63..d3927c33102 100644
--- a/src/mongo/db/repl/freshness_checker.cpp
+++ b/src/mongo/db/repl/freshness_checker.cpp
@@ -98,6 +98,7 @@ std::vector<RemoteCommandRequest> FreshnessChecker::Algorithm::getRequests() con
*it,
"admin",
replSetFreshCmd,
+ nullptr,
Milliseconds(30 * 1000))); // trying to match current Socket timeout
}
diff --git a/src/mongo/db/repl/freshness_checker_test.cpp b/src/mongo/db/repl/freshness_checker_test.cpp
index 2d0917254ec..2aef6f626df 100644
--- a/src/mongo/db/repl/freshness_checker_test.cpp
+++ b/src/mongo/db/repl/freshness_checker_test.cpp
@@ -947,6 +947,7 @@ protected:
return RemoteCommandRequest(HostAndPort(hostname),
"", // the non-hostname fields do not matter in Freshness
BSONObj(),
+ nullptr,
Milliseconds(0));
}
diff --git a/src/mongo/db/repl/freshness_scanner.cpp b/src/mongo/db/repl/freshness_scanner.cpp
index fcba850e021..87182309a39 100644
--- a/src/mongo/db/repl/freshness_scanner.cpp
+++ b/src/mongo/db/repl/freshness_scanner.cpp
@@ -64,7 +64,7 @@ std::vector<RemoteCommandRequest> FreshnessScanner::Algorithm::getRequests() con
std::vector<RemoteCommandRequest> requests;
for (auto& target : _targets) {
- requests.push_back(RemoteCommandRequest(target, "admin", getStatusCmd, _timeout));
+ requests.push_back(RemoteCommandRequest(target, "admin", getStatusCmd, nullptr, _timeout));
}
return requests;
}
diff --git a/src/mongo/db/repl/freshness_scanner_test.cpp b/src/mongo/db/repl/freshness_scanner_test.cpp
index 53314298b5b..d931ef55b70 100644
--- a/src/mongo/db/repl/freshness_scanner_test.cpp
+++ b/src/mongo/db/repl/freshness_scanner_test.cpp
@@ -99,6 +99,7 @@ protected:
return RemoteCommandRequest(HostAndPort(hostname),
"", // fields do not matter in FreshnessScanner
BSONObj(),
+ nullptr,
Milliseconds(0));
}
diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp
index fb88d374e93..e56fa347fcd 100644
--- a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp
@@ -95,7 +95,7 @@ void ReplicationCoordinatorImpl::_doMemberHeartbeat(ReplicationExecutor::Callbac
}
const RemoteCommandRequest request(
- target, "admin", heartbeatObj, BSON(rpc::kReplSetMetadataFieldName << 1), timeout);
+ target, "admin", heartbeatObj, BSON(rpc::kReplSetMetadataFieldName << 1), nullptr, timeout);
const ReplicationExecutor::RemoteCommandCallbackFn callback =
stdx::bind(&ReplicationCoordinatorImpl::_handleHeartbeatResponse,
this,
@@ -304,7 +304,7 @@ void remoteStepdownCallback(const ReplicationExecutor::RemoteCommandCallbackArgs
} // namespace
void ReplicationCoordinatorImpl::_requestRemotePrimaryStepdown(const HostAndPort& target) {
- RemoteCommandRequest request(target, "admin", BSON("replSetStepDown" << 20));
+ RemoteCommandRequest request(target, "admin", BSON("replSetStepDown" << 20), nullptr);
log() << "Requesting " << target << " step down from primary";
CBHStatus cbh = _replExecutor.scheduleRemoteCommand(request, remoteStepdownCallback);
diff --git a/src/mongo/db/repl/reporter.cpp b/src/mongo/db/repl/reporter.cpp
index e5dfdab34da..2e19e804108 100644
--- a/src/mongo/db/repl/reporter.cpp
+++ b/src/mongo/db/repl/reporter.cpp
@@ -229,7 +229,7 @@ void Reporter::_sendCommand_inlock(BSONObj commandRequest) {
<< commandRequest;
auto scheduleResult = _executor->scheduleRemoteCommand(
- executor::RemoteCommandRequest(_target, "admin", commandRequest),
+ executor::RemoteCommandRequest(_target, "admin", commandRequest, nullptr),
stdx::bind(&Reporter::_processResponseCallback, this, stdx::placeholders::_1));
_status = scheduleResult.getStatus();
diff --git a/src/mongo/db/repl/rollback_checker.cpp b/src/mongo/db/repl/rollback_checker.cpp
index cb86eb2a811..3e6d49026b2 100644
--- a/src/mongo/db/repl/rollback_checker.cpp
+++ b/src/mongo/db/repl/rollback_checker.cpp
@@ -140,7 +140,7 @@ bool RollbackChecker::_checkForRollback_inlock(int remoteRBID) {
RollbackChecker::CallbackHandle RollbackChecker::_scheduleGetRollbackId(
const RemoteCommandCallbackFn& nextAction, const CallbackFn& errorFn) {
executor::RemoteCommandRequest getRollbackIDReq(
- _syncSource, "admin", BSON("replSetGetRBID" << 1));
+ _syncSource, "admin", BSON("replSetGetRBID" << 1), nullptr);
auto cbh = _executor->scheduleRemoteCommand(getRollbackIDReq, nextAction);
if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) {
diff --git a/src/mongo/db/repl/scatter_gather_test.cpp b/src/mongo/db/repl/scatter_gather_test.cpp
index 05edf467fbd..e231c251d35 100644
--- a/src/mongo/db/repl/scatter_gather_test.cpp
+++ b/src/mongo/db/repl/scatter_gather_test.cpp
@@ -61,7 +61,7 @@ public:
std::vector<RemoteCommandRequest> requests;
for (int i = 0; i < kTotalRequests; i++) {
requests.push_back(RemoteCommandRequest(
- HostAndPort("hostname", i), "admin", BSONObj(), Milliseconds(30 * 1000)));
+ HostAndPort("hostname", i), "admin", BSONObj(), nullptr, Milliseconds(30 * 1000)));
}
return requests;
}
diff --git a/src/mongo/db/repl/vote_requester.cpp b/src/mongo/db/repl/vote_requester.cpp
index 836617e5493..b02b3bb7c0c 100644
--- a/src/mongo/db/repl/vote_requester.cpp
+++ b/src/mongo/db/repl/vote_requester.cpp
@@ -82,7 +82,7 @@ std::vector<RemoteCommandRequest> VoteRequester::Algorithm::getRequests() const
std::vector<RemoteCommandRequest> requests;
for (const auto& target : _targets) {
requests.push_back(RemoteCommandRequest(
- target, "admin", requestVotesCmd, _rsConfig.getElectionTimeoutPeriod()));
+ target, "admin", requestVotesCmd, nullptr, _rsConfig.getElectionTimeoutPeriod()));
}
return requests;
diff --git a/src/mongo/db/repl/vote_requester_test.cpp b/src/mongo/db/repl/vote_requester_test.cpp
index ba4691019d8..799fff1817c 100644
--- a/src/mongo/db/repl/vote_requester_test.cpp
+++ b/src/mongo/db/repl/vote_requester_test.cpp
@@ -124,6 +124,7 @@ protected:
return RemoteCommandRequest(HostAndPort(hostname),
"", // fields do not matter in VoteRequester
BSONObj(),
+ nullptr,
Milliseconds(0));
}
diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
index 55b38ec4b47..500b11e57e7 100644
--- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
+++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
@@ -472,7 +472,7 @@ StatusWith<BSONObj> MigrationChunkClonerSourceLegacy::_callRecipient(const BSONO
auto executor = grid.getExecutorPool()->getArbitraryExecutor();
auto scheduleStatus = executor->scheduleRemoteCommand(
- executor::RemoteCommandRequest(_recipientHost, "admin", cmdObj),
+ executor::RemoteCommandRequest(_recipientHost, "admin", cmdObj, nullptr),
[&responseStatus](const executor::TaskExecutor::RemoteCommandCallbackArgs& args) {
responseStatus = args.response;
});
diff --git a/src/mongo/executor/async_mock_stream_factory.cpp b/src/mongo/executor/async_mock_stream_factory.cpp
index ddfd61cf6f8..e368c2ba183 100644
--- a/src/mongo/executor/async_mock_stream_factory.cpp
+++ b/src/mongo/executor/async_mock_stream_factory.cpp
@@ -286,7 +286,7 @@ void AsyncMockStreamFactory::MockStream::simulateServer(
auto parsedRequest = rpc::makeRequest(&msg);
ASSERT(parsedRequest->getProtocol() == proto);
- RemoteCommandRequest rcr(target(), *parsedRequest);
+ RemoteCommandRequest rcr(target(), *parsedRequest, nullptr);
messageId = msg.header().getId();
diff --git a/src/mongo/executor/connection_pool_asio.cpp b/src/mongo/executor/connection_pool_asio.cpp
index c189d0f5347..99e4fadb135 100644
--- a/src/mongo/executor/connection_pool_asio.cpp
+++ b/src/mongo/executor/connection_pool_asio.cpp
@@ -155,8 +155,11 @@ std::unique_ptr<NetworkInterfaceASIO::AsyncOp> ASIOConnection::makeAsyncOp(ASIOC
return stdx::make_unique<NetworkInterfaceASIO::AsyncOp>(
conn->_global->_impl,
TaskExecutor::CallbackHandle(),
- RemoteCommandRequest{
- conn->getHostAndPort(), std::string("admin"), BSON("isMaster" << 1), BSONObj()},
+ RemoteCommandRequest{conn->getHostAndPort(),
+ std::string("admin"),
+ BSON("isMaster" << 1),
+ BSONObj(),
+ nullptr},
[conn](const TaskExecutor::ResponseStatus& status) {
auto cb = std::move(conn->_setupCallback);
cb(conn, status.isOK() ? Status::OK() : status.getStatus());
diff --git a/src/mongo/executor/connection_pool_asio_integration_test.cpp b/src/mongo/executor/connection_pool_asio_integration_test.cpp
index 3bc12a2da9c..04c39348ee6 100644
--- a/src/mongo/executor/connection_pool_asio_integration_test.cpp
+++ b/src/mongo/executor/connection_pool_asio_integration_test.cpp
@@ -105,12 +105,12 @@ TEST(ConnectionPoolASIO, TestPing) {
auto status = Status::OK();
Deferred<StatusWith<RemoteCommandResponse>> deferred;
- net.startCommand(makeCallbackHandle(),
- RemoteCommandRequest{
- fixture.getServers()[0], "admin", BSON("ping" << 1), BSONObj()},
- [&deferred](StatusWith<RemoteCommandResponse> resp) {
- deferred.emplace(std::move(resp));
- });
+ RemoteCommandRequest request{
+ fixture.getServers()[0], "admin", BSON("ping" << 1), BSONObj(), nullptr};
+ net.startCommand(
+ makeCallbackHandle(), request, [&deferred](StatusWith<RemoteCommandResponse> resp) {
+ deferred.emplace(std::move(resp));
+ });
ASSERT_OK(deferred.get().getStatus());
});
@@ -141,10 +141,12 @@ TEST(ConnectionPoolASIO, TestHostTimeoutRace) {
for (int i = 0; i < 1000; i++) {
Deferred<StatusWith<RemoteCommandResponse>> deferred;
+ RemoteCommandRequest request{
+ fixture.getServers()[0], "admin", BSON("ping" << 1), BSONObj(), nullptr};
net.startCommand(
- makeCallbackHandle(),
- RemoteCommandRequest{fixture.getServers()[0], "admin", BSON("ping" << 1), BSONObj()},
- [&](StatusWith<RemoteCommandResponse> resp) { deferred.emplace(std::move(resp)); });
+ makeCallbackHandle(), request, [&](StatusWith<RemoteCommandResponse> resp) {
+ deferred.emplace(std::move(resp));
+ });
ASSERT_OK(deferred.get().getStatus());
sleepmillis(1);
@@ -168,10 +170,11 @@ TEST(ConnectionPoolASIO, ConnSetupTimeout) {
auto guard = MakeGuard([&] { net.shutdown(); });
Deferred<StatusWith<RemoteCommandResponse>> deferred;
- net.startCommand(
- makeCallbackHandle(),
- RemoteCommandRequest{fixture.getServers()[0], "admin", BSON("ping" << 1), BSONObj()},
- [&](StatusWith<RemoteCommandResponse> resp) { deferred.emplace(std::move(resp)); });
+ RemoteCommandRequest request{
+ fixture.getServers()[0], "admin", BSON("ping" << 1), BSONObj(), nullptr};
+ net.startCommand(makeCallbackHandle(), request, [&](StatusWith<RemoteCommandResponse> resp) {
+ deferred.emplace(std::move(resp));
+ });
ASSERT_EQ(deferred.get().getStatus().code(), ErrorCodes::ExceededTimeLimit);
}
@@ -194,17 +197,20 @@ TEST(ConnectionPoolASIO, ConnRefreshHappens) {
std::array<Deferred<StatusWith<RemoteCommandResponse>>, 10> deferreds;
- for (auto& deferred : deferreds) {
- net.startCommand(
- makeCallbackHandle(),
- RemoteCommandRequest{fixture.getServers()[0],
+ RemoteCommandRequest request{fixture.getServers()[0],
"admin",
BSON("sleep" << 1 << "lock"
<< "none"
<< "secs"
<< 2),
- BSONObj()},
- [&](StatusWith<RemoteCommandResponse> resp) { deferred.emplace(std::move(resp)); });
+ BSONObj(),
+ nullptr};
+
+ for (auto& deferred : deferreds) {
+ net.startCommand(
+ makeCallbackHandle(), request, [&](StatusWith<RemoteCommandResponse> resp) {
+ deferred.emplace(std::move(resp));
+ });
}
for (auto& deferred : deferreds) {
@@ -237,10 +243,11 @@ TEST(ConnectionPoolASIO, ConnRefreshSurvivesFailure) {
Deferred<StatusWith<RemoteCommandResponse>> deferred;
- net.startCommand(
- makeCallbackHandle(),
- RemoteCommandRequest{fixture.getServers()[0], "admin", BSON("ping" << 1), BSONObj()},
- [&](StatusWith<RemoteCommandResponse> resp) { deferred.emplace(std::move(resp)); });
+ RemoteCommandRequest request{
+ fixture.getServers()[0], "admin", BSON("ping" << 1), BSONObj(), nullptr};
+ net.startCommand(makeCallbackHandle(), request, [&](StatusWith<RemoteCommandResponse> resp) {
+ deferred.emplace(std::move(resp));
+ });
deferred.get();
diff --git a/src/mongo/executor/network_interface.h b/src/mongo/executor/network_interface.h
index d230e2eb8a6..3b73761ef10 100644
--- a/src/mongo/executor/network_interface.h
+++ b/src/mongo/executor/network_interface.h
@@ -120,12 +120,14 @@ public:
/**
* Starts asynchronous execution of the command described by "request".
*
+ * The request mutated to append request metadata to be sent in OP_Command messages.
+ *
* Returns ErrorCodes::ShutdownInProgress if NetworkInterface::shutdown has already started
* and Status::OK() otherwise. If it returns Status::OK(), then the onFinish argument will be
* executed by NetworkInterface eventually; otherwise, it will not.
*/
virtual Status startCommand(const TaskExecutor::CallbackHandle& cbHandle,
- const RemoteCommandRequest& request,
+ RemoteCommandRequest& request,
const RemoteCommandCompletionFn& onFinish) = 0;
/**
diff --git a/src/mongo/executor/network_interface_asio.cpp b/src/mongo/executor/network_interface_asio.cpp
index 1b4eae416cf..8617ce6e20c 100644
--- a/src/mongo/executor/network_interface_asio.cpp
+++ b/src/mongo/executor/network_interface_asio.cpp
@@ -211,8 +211,36 @@ Date_t NetworkInterfaceASIO::now() {
return Date_t::now();
}
+namespace {
+
+Status attachMetadataIfNeeded(RemoteCommandRequest& request,
+ rpc::EgressMetadataHook* metadataHook) {
+
+ // Append the metadata of the request with metadata from the metadata hook
+ // if a hook is installed
+ if (metadataHook) {
+ BSONObjBuilder augmentedBob;
+ augmentedBob.appendElements(request.metadata);
+
+ auto writeStatus = callNoexcept(*metadataHook,
+ &rpc::EgressMetadataHook::writeRequestMetadata,
+ request.txn,
+ request.target,
+ &augmentedBob);
+ if (!writeStatus.isOK()) {
+ return writeStatus;
+ }
+
+ request.metadata = augmentedBob.obj();
+ }
+
+ return Status::OK();
+}
+
+} // namespace
+
Status NetworkInterfaceASIO::startCommand(const TaskExecutor::CallbackHandle& cbHandle,
- const RemoteCommandRequest& request,
+ RemoteCommandRequest& request,
const RemoteCommandCompletionFn& onFinish) {
MONGO_ASIO_INVARIANT(onFinish, "Invalid completion function");
{
@@ -230,6 +258,11 @@ Status NetworkInterfaceASIO::startCommand(const TaskExecutor::CallbackHandle& cb
auto getConnectionStartTime = now();
+ auto statusMetadata = attachMetadataIfNeeded(request, _metadataHook.get());
+ if (!statusMetadata.isOK()) {
+ return statusMetadata;
+ }
+
auto nextStep = [this, getConnectionStartTime, cbHandle, request, onFinish](
StatusWith<ConnectionPool::ConnectionHandle> swConn) {
diff --git a/src/mongo/executor/network_interface_asio.h b/src/mongo/executor/network_interface_asio.h
index 1d6bd4c5475..6e80c436abc 100644
--- a/src/mongo/executor/network_interface_asio.h
+++ b/src/mongo/executor/network_interface_asio.h
@@ -127,7 +127,7 @@ public:
void signalWorkAvailable() override;
Date_t now() override;
Status startCommand(const TaskExecutor::CallbackHandle& cbHandle,
- const RemoteCommandRequest& request,
+ RemoteCommandRequest& request,
const RemoteCommandCompletionFn& onFinish) override;
void cancelCommand(const TaskExecutor::CallbackHandle& cbHandle) override;
void cancelAllCommands() override;
@@ -292,8 +292,7 @@ private:
// AsyncOp may run multiple commands over its lifetime (for example, an ismaster
// command, the command provided to the NetworkInterface via startCommand(), etc.)
// Calling beginCommand() resets internal state to prepare to run newCommand.
- Status beginCommand(const RemoteCommandRequest& request,
- rpc::EgressMetadataHook* metadataHook = nullptr);
+ Status beginCommand(const RemoteCommandRequest& request);
// This form of beginCommand takes a raw message. It is needed if the caller
// has to form the command manually (e.g. to use a specific requestBuilder).
diff --git a/src/mongo/executor/network_interface_asio_auth.cpp b/src/mongo/executor/network_interface_asio_auth.cpp
index 59ed433820a..e09a952c282 100644
--- a/src/mongo/executor/network_interface_asio_auth.cpp
+++ b/src/mongo/executor/network_interface_asio_auth.cpp
@@ -172,7 +172,7 @@ void NetworkInterfaceASIO::_authenticate(AsyncOp* op) {
// SERVER-14170: Set the metadataHook to nullptr explicitly as we cannot write metadata
// here.
- auto beginStatus = op->beginCommand(request, nullptr);
+ auto beginStatus = op->beginCommand(request);
if (!beginStatus.isOK()) {
return handler(beginStatus);
}
diff --git a/src/mongo/executor/network_interface_asio_command.cpp b/src/mongo/executor/network_interface_asio_command.cpp
index f3da7abc0d1..99f702c4b99 100644
--- a/src/mongo/executor/network_interface_asio_command.cpp
+++ b/src/mongo/executor/network_interface_asio_command.cpp
@@ -246,7 +246,7 @@ void NetworkInterfaceASIO::_beginCommunication(AsyncOp* op) {
LOG(3) << "Initiating asynchronous command: " << redact(op->request().toString());
- auto beginStatus = op->beginCommand(op->request(), _metadataHook.get());
+ auto beginStatus = op->beginCommand(op->request());
if (!beginStatus.isOK()) {
return _completeOperation(op, beginStatus);
}
@@ -444,7 +444,7 @@ void NetworkInterfaceASIO::_runConnectionHook(AsyncOp* op) {
return _beginCommunication(op);
}
- auto beginStatus = op->beginCommand(*optionalRequest, _metadataHook.get());
+ auto beginStatus = op->beginCommand(*optionalRequest);
if (!beginStatus.isOK()) {
return _completeOperation(op, beginStatus);
}
diff --git a/src/mongo/executor/network_interface_asio_integration_test.cpp b/src/mongo/executor/network_interface_asio_integration_test.cpp
index 6165c018b3f..8b39dd5c992 100644
--- a/src/mongo/executor/network_interface_asio_integration_test.cpp
+++ b/src/mongo/executor/network_interface_asio_integration_test.cpp
@@ -88,7 +88,7 @@ public:
}
Deferred<StatusWith<RemoteCommandResponse>> runCommand(
- const TaskExecutor::CallbackHandle& cbHandle, const RemoteCommandRequest& request) {
+ const TaskExecutor::CallbackHandle& cbHandle, RemoteCommandRequest& request) {
Deferred<StatusWith<RemoteCommandResponse>> deferred;
net().startCommand(
cbHandle, request, [deferred](StatusWith<RemoteCommandResponse> resp) mutable {
@@ -97,7 +97,7 @@ public:
return deferred;
}
- StatusWith<RemoteCommandResponse> runCommandSync(const RemoteCommandRequest& request) {
+ StatusWith<RemoteCommandResponse> runCommandSync(RemoteCommandRequest& request) {
auto deferred = runCommand(makeCallbackHandle(), request);
auto& res = deferred.get();
if (res.isOK()) {
@@ -111,8 +111,9 @@ public:
void assertCommandOK(StringData db,
const BSONObj& cmd,
Milliseconds timeoutMillis = Milliseconds(-1)) {
- auto res = unittest::assertGet(runCommandSync(
- {fixture().getServers()[0], db.toString(), cmd, BSONObj(), timeoutMillis}));
+ RemoteCommandRequest request{
+ fixture().getServers()[0], db.toString(), cmd, BSONObj(), nullptr, timeoutMillis};
+ auto res = unittest::assertGet(runCommandSync(request));
ASSERT_OK(getStatusFromCommandResult(res.data));
}
@@ -120,8 +121,9 @@ public:
const BSONObj& cmd,
Milliseconds timeoutMillis,
ErrorCodes::Error reason) {
- auto clientStatus = runCommandSync(
- {fixture().getServers()[0], db.toString(), cmd, BSONObj(), timeoutMillis});
+ RemoteCommandRequest request{
+ fixture().getServers()[0], db.toString(), cmd, BSONObj(), nullptr, timeoutMillis};
+ auto clientStatus = runCommandSync(request);
ASSERT_TRUE(clientStatus == reason);
}
@@ -129,8 +131,9 @@ public:
const BSONObj& cmd,
Milliseconds timeoutMillis,
ErrorCodes::Error reason) {
- auto res = unittest::assertGet(runCommandSync(
- {fixture().getServers()[0], db.toString(), cmd, BSONObj(), timeoutMillis}));
+ RemoteCommandRequest request{
+ fixture().getServers()[0], db.toString(), cmd, BSONObj(), nullptr, timeoutMillis};
+ auto res = unittest::assertGet(runCommandSync(request));
auto serverStatus = getStatusFromCommandResult(res.data);
ASSERT_TRUE(serverStatus == reason);
}
@@ -176,12 +179,12 @@ public:
Milliseconds timeout = RemoteCommandRequest::kNoTimeout) {
auto cb = makeCallbackHandle();
auto self = *this;
- auto out = fixture
- ->runCommand(cb,
- {unittest::getFixtureConnectionString().getServers()[0],
+ RemoteCommandRequest request{unittest::getFixtureConnectionString().getServers()[0],
"admin",
_command,
- timeout})
+ nullptr,
+ timeout};
+ auto out = fixture->runCommand(cb, request)
.then(pool, [self](StatusWith<RemoteCommandResponse> resp) -> Status {
auto status = resp.isOK()
? getStatusFromCommandResult(resp.getValue().data)
@@ -324,7 +327,8 @@ class HangingHook : public executor::NetworkConnectionHook {
<< "none"
<< "secs"
<< 100000000),
- BSONObj()))};
+ BSONObj(),
+ nullptr))};
}
Status handleReply(const HostAndPort& remoteHost, RemoteCommandResponse&& response) final {
diff --git a/src/mongo/executor/network_interface_asio_operation.cpp b/src/mongo/executor/network_interface_asio_operation.cpp
index c9b25549907..a05dba95fc0 100644
--- a/src/mongo/executor/network_interface_asio_operation.cpp
+++ b/src/mongo/executor/network_interface_asio_operation.cpp
@@ -63,37 +63,16 @@ namespace {
// be used to run multiple distinct requests.
AtomicUInt64 kAsyncOpIdCounter(0);
-// Metadata listener can be nullptr.
StatusWith<Message> messageFromRequest(const RemoteCommandRequest& request,
- rpc::Protocol protocol,
- rpc::EgressMetadataHook* metadataHook) {
+ rpc::Protocol protocol) {
BSONObj query = request.cmdObj;
auto requestBuilder = rpc::makeRequestBuilder(protocol);
- BSONObj maybeAugmented;
- // Handle outgoing request metadata.
- if (metadataHook) {
- BSONObjBuilder augmentedBob;
- augmentedBob.appendElements(request.metadata);
-
- auto writeStatus = callNoexcept(*metadataHook,
- &rpc::EgressMetadataHook::writeRequestMetadata,
- request.target,
- &augmentedBob);
- if (!writeStatus.isOK()) {
- return writeStatus;
- }
-
- maybeAugmented = augmentedBob.obj();
- } else {
- maybeAugmented = request.metadata;
- }
-
auto toSend = rpc::makeRequestBuilder(protocol)
->setDatabase(request.dbname)
.setCommandName(request.cmdObj.firstElementFieldName())
.setCommandArgs(request.cmdObj)
- .setMetadata(maybeAugmented)
+ .setMetadata(request.metadata)
.done();
return std::move(toSend);
}
@@ -197,8 +176,7 @@ Status NetworkInterfaceASIO::AsyncOp::beginCommand(Message&& newCommand,
return Status::OK();
}
-Status NetworkInterfaceASIO::AsyncOp::beginCommand(const RemoteCommandRequest& request,
- rpc::EgressMetadataHook* metadataHook) {
+Status NetworkInterfaceASIO::AsyncOp::beginCommand(const RemoteCommandRequest& request) {
// Check if we need to downconvert find or getMore commands.
StringData commandName = request.cmdObj.firstElement().fieldNameStringData();
const auto isFindCmd = commandName == QueryRequest::kFindCommandName;
@@ -208,7 +186,7 @@ Status NetworkInterfaceASIO::AsyncOp::beginCommand(const RemoteCommandRequest& r
// If we aren't sending a find or getMore, or the server supports OP_COMMAND we don't have
// to worry about downconversion.
if (!isFindOrGetMoreCmd || connection().serverProtocols() == rpc::supports::kAll) {
- auto newCommand = messageFromRequest(request, operationProtocol(), metadataHook);
+ auto newCommand = messageFromRequest(request, operationProtocol());
if (!newCommand.isOK()) {
return newCommand.getStatus();
}
diff --git a/src/mongo/executor/network_interface_asio_test.cpp b/src/mongo/executor/network_interface_asio_test.cpp
index bf96e029865..1b52f951017 100644
--- a/src/mongo/executor/network_interface_asio_test.cpp
+++ b/src/mongo/executor/network_interface_asio_test.cpp
@@ -102,7 +102,7 @@ public:
}
Deferred<StatusWith<RemoteCommandResponse>> startCommand(
- const TaskExecutor::CallbackHandle& cbHandle, const RemoteCommandRequest& request) {
+ const TaskExecutor::CallbackHandle& cbHandle, RemoteCommandRequest& request) {
Deferred<StatusWith<RemoteCommandResponse>> deferredResponse;
ASSERT_OK(net().startCommand(
cbHandle,
@@ -114,7 +114,7 @@ public:
}
// Helper to run startCommand and wait for it
- StatusWith<RemoteCommandResponse> startCommandSync(const RemoteCommandRequest& request) {
+ StatusWith<RemoteCommandResponse> startCommandSync(RemoteCommandRequest& request) {
auto deferred = startCommand(makeCallbackHandle(), request);
// wait for the operation to complete
@@ -157,8 +157,8 @@ TEST_F(NetworkInterfaceASIOTest, CancelOperation) {
auto cbh = makeCallbackHandle();
// Kick off our operation
- auto deferred =
- startCommand(cbh, RemoteCommandRequest(testHost, "testDB", BSON("a" << 1), BSONObj()));
+ RemoteCommandRequest request{testHost, "testDB", BSON("a" << 1), BSONObj(), nullptr};
+ auto deferred = startCommand(cbh, request);
// Create and initialize a stream so operation can begin
auto stream = streamFactory().blockUntilStreamExists(testHost);
@@ -187,8 +187,8 @@ TEST_F(NetworkInterfaceASIOTest, CancelOperation) {
TEST_F(NetworkInterfaceASIOTest, ImmediateCancel) {
auto cbh = makeCallbackHandle();
- auto deferred =
- startCommand(cbh, RemoteCommandRequest(testHost, "testDB", BSON("a" << 1), BSONObj()));
+ RemoteCommandRequest request{testHost, "testDB", BSON("a" << 1), BSONObj(), nullptr};
+ auto deferred = startCommand(cbh, request);
// Cancel immediately
net().cancelCommand(cbh);
@@ -209,8 +209,8 @@ TEST_F(NetworkInterfaceASIOTest, ImmediateCancel) {
TEST_F(NetworkInterfaceASIOTest, LateCancel) {
auto cbh = makeCallbackHandle();
- auto deferred =
- startCommand(cbh, RemoteCommandRequest(testHost, "testDB", BSON("a" << 1), BSONObj()));
+ RemoteCommandRequest request{testHost, "testDB", BSON("a" << 1), BSONObj(), nullptr};
+ auto deferred = startCommand(cbh, request);
// Allow stream to connect so operation can return
auto stream = streamFactory().blockUntilStreamExists(testHost);
@@ -238,8 +238,8 @@ TEST_F(NetworkInterfaceASIOTest, LateCancel) {
TEST_F(NetworkInterfaceASIOTest, CancelWithNetworkError) {
auto cbh = makeCallbackHandle();
- auto deferred =
- startCommand(cbh, RemoteCommandRequest(testHost, "testDB", BSON("a" << 1), BSONObj()));
+ RemoteCommandRequest request{testHost, "testDB", BSON("a" << 1), BSONObj(), nullptr};
+ auto deferred = startCommand(cbh, request);
// Create and initialize a stream so operation can begin
auto stream = streamFactory().blockUntilStreamExists(testHost);
@@ -268,8 +268,8 @@ TEST_F(NetworkInterfaceASIOTest, CancelWithNetworkError) {
TEST_F(NetworkInterfaceASIOTest, CancelWithTimeout) {
auto cbh = makeCallbackHandle();
- auto deferred =
- startCommand(cbh, RemoteCommandRequest(testHost, "testDB", BSON("a" << 1), BSONObj()));
+ RemoteCommandRequest request{testHost, "testDB", BSON("a" << 1), BSONObj(), nullptr};
+ auto deferred = startCommand(cbh, request);
// Create and initialize a stream so operation can begin
auto stream = streamFactory().blockUntilStreamExists(testHost);
@@ -296,9 +296,9 @@ TEST_F(NetworkInterfaceASIOTest, CancelWithTimeout) {
TEST_F(NetworkInterfaceASIOTest, TimeoutWithNetworkError) {
auto cbh = makeCallbackHandle();
- auto deferred = startCommand(
- cbh,
- RemoteCommandRequest(testHost, "testDB", BSON("a" << 1), BSONObj(), Milliseconds(1000)));
+ RemoteCommandRequest request{
+ testHost, "testDB", BSON("a" << 1), BSONObj(), nullptr, Milliseconds(1000)};
+ auto deferred = startCommand(cbh, request);
// Create and initialize a stream so operation can begin
auto stream = streamFactory().blockUntilStreamExists(testHost);
@@ -325,9 +325,9 @@ TEST_F(NetworkInterfaceASIOTest, TimeoutWithNetworkError) {
TEST_F(NetworkInterfaceASIOTest, CancelWithTimeoutAndNetworkError) {
auto cbh = makeCallbackHandle();
- auto deferred = startCommand(
- cbh,
- RemoteCommandRequest(testHost, "testDB", BSON("a" << 1), BSONObj(), Milliseconds(1000)));
+ RemoteCommandRequest request{
+ testHost, "testDB", BSON("a" << 1), BSONObj(), nullptr, Milliseconds(1000)};
+ auto deferred = startCommand(cbh, request);
// Create and initialize a stream so operation can begin
auto stream = streamFactory().blockUntilStreamExists(testHost);
@@ -357,7 +357,8 @@ TEST_F(NetworkInterfaceASIOTest, AsyncOpTimeout) {
// Kick off operation
auto cb = makeCallbackHandle();
Milliseconds timeout(1000);
- auto deferred = startCommand(cb, {testHost, "testDB", BSON("a" << 1), BSONObj(), timeout});
+ RemoteCommandRequest request{testHost, "testDB", BSON("a" << 1), BSONObj(), nullptr, timeout};
+ auto deferred = startCommand(cb, request);
// Create and initialize a stream so operation can begin
auto stream = streamFactory().blockUntilStreamExists(testHost);
@@ -390,9 +391,8 @@ TEST_F(NetworkInterfaceASIOTest, AsyncOpTimeout) {
}
TEST_F(NetworkInterfaceASIOTest, StartCommand) {
- auto deferred =
- startCommand(makeCallbackHandle(),
- RemoteCommandRequest(testHost, "testDB", BSON("foo" << 1), BSON("bar" << 1)));
+ RemoteCommandRequest request{testHost, "testDB", BSON("foo" << 1), BSON("bar" << 1), nullptr};
+ auto deferred = startCommand(makeCallbackHandle(), request);
auto stream = streamFactory().blockUntilStreamExists(testHost);
@@ -440,9 +440,9 @@ TEST_F(NetworkInterfaceASIOTest, InShutdown) {
TEST_F(NetworkInterfaceASIOTest, StartCommandReturnsNotOKIfShutdownHasStarted) {
net().shutdown();
- ASSERT_NOT_OK(net().startCommand(makeCallbackHandle(),
- RemoteCommandRequest{},
- [&](StatusWith<RemoteCommandResponse> resp) {}));
+ RemoteCommandRequest request;
+ ASSERT_NOT_OK(net().startCommand(
+ makeCallbackHandle(), request, [&](StatusWith<RemoteCommandResponse> resp) {}));
}
class MalformedMessageTest : public NetworkInterfaceASIOTest {
@@ -451,9 +451,8 @@ public:
void runMessageTest(ErrorCodes::Error code, bool loadBody, MessageHook hook) {
// Kick off our operation
- auto deferred =
- startCommand(makeCallbackHandle(),
- RemoteCommandRequest(testHost, "testDB", BSON("ping" << 1), BSONObj()));
+ RemoteCommandRequest request{testHost, "testDB", BSON("ping" << 1), BSONObj(), nullptr};
+ auto deferred = startCommand(makeCallbackHandle(), request);
// Wait for it to block waiting for a write
auto stream = streamFactory().blockUntilStreamExists(testHost);
@@ -591,11 +590,12 @@ TEST_F(NetworkInterfaceASIOConnectionHookTest, InvalidIsMaster) {
return Status::OK();
}));
- auto deferred = startCommand(makeCallbackHandle(),
- {testHost,
- "blah",
- BSON("foo"
- << "bar")});
+ RemoteCommandRequest request{testHost,
+ "blah",
+ BSON("foo"
+ << "bar"),
+ nullptr};
+ auto deferred = startCommand(makeCallbackHandle(), request);
auto stream = streamFactory().blockUntilStreamExists(testHost);
@@ -645,11 +645,12 @@ TEST_F(NetworkInterfaceASIOConnectionHookTest, ValidateHostInvalid) {
return Status::OK();
}));
- auto deferred = startCommand(makeCallbackHandle(),
- {testHost,
- "blah",
- BSON("foo"
- << "bar")});
+ RemoteCommandRequest request{testHost,
+ "blah",
+ BSON("foo"
+ << "bar"),
+ nullptr};
+ auto deferred = startCommand(makeCallbackHandle(), request);
auto stream = streamFactory().blockUntilStreamExists(testHost);
@@ -701,11 +702,12 @@ TEST_F(NetworkInterfaceASIOConnectionHookTest, MakeRequestReturnsError) {
return Status::OK();
}));
- auto deferred = startCommand(makeCallbackHandle(),
- {testHost,
- "blah",
- BSON("foo"
- << "bar")});
+ RemoteCommandRequest request{testHost,
+ "blah",
+ BSON("foo"
+ << "bar"),
+ nullptr};
+ auto deferred = startCommand(makeCallbackHandle(), request);
auto stream = streamFactory().blockUntilStreamExists(testHost);
ConnectEvent{stream}.skip();
@@ -753,7 +755,8 @@ TEST_F(NetworkInterfaceASIOConnectionHookTest, MakeRequestReturnsNone) {
auto metadata = BSON("aaa"
<< "bbb");
- auto deferred = startCommand(makeCallbackHandle(), {testHost, "blah", commandRequest});
+ RemoteCommandRequest request{testHost, "blah", commandRequest, nullptr};
+ auto deferred = startCommand(makeCallbackHandle(), request);
auto stream = streamFactory().blockUntilStreamExists(testHost);
ConnectEvent{stream}.skip();
@@ -809,7 +812,7 @@ TEST_F(NetworkInterfaceASIOConnectionHookTest, HandleReplyReturnsError) {
[&](const HostAndPort& remoteHost) -> StatusWith<boost::optional<RemoteCommandRequest>> {
makeRequestCalled = true;
return {boost::make_optional<RemoteCommandRequest>(
- {testHost, "foo", hookCommandRequest, hookRequestMetadata})};
+ {testHost, "foo", hookCommandRequest, hookRequestMetadata, nullptr})};
},
[&](const HostAndPort& remoteHost, RemoteCommandResponse&& response) {
@@ -821,7 +824,8 @@ TEST_F(NetworkInterfaceASIOConnectionHookTest, HandleReplyReturnsError) {
auto commandRequest = BSON("foo"
<< "bar");
- auto deferred = startCommand(makeCallbackHandle(), {testHost, "blah", commandRequest});
+ RemoteCommandRequest request{testHost, "blah", commandRequest, nullptr};
+ auto deferred = startCommand(makeCallbackHandle(), request);
auto stream = streamFactory().blockUntilStreamExists(testHost);
ConnectEvent{stream}.skip();
@@ -904,7 +908,8 @@ public:
TestMetadataHook(bool* wroteRequestMetadata, bool* gotReplyMetadata)
: _wroteRequestMetadata(wroteRequestMetadata), _gotReplyMetadata(gotReplyMetadata) {}
- Status writeRequestMetadata(const HostAndPort& requestDestination,
+ Status writeRequestMetadata(OperationContext* txn,
+ const HostAndPort& requestDestination,
BSONObjBuilder* metadataBob) override {
metadataBob->append("foo", "bar");
*_wroteRequestMetadata = true;
@@ -926,7 +931,8 @@ TEST_F(NetworkInterfaceASIOMetadataTest, Metadata) {
bool gotReplyMetadata = false;
start(stdx::make_unique<TestMetadataHook>(&wroteRequestMetadata, &gotReplyMetadata));
- auto deferred = startCommand(makeCallbackHandle(), {testHost, "blah", BSON("ping" << 1)});
+ RemoteCommandRequest request{testHost, "blah", BSON("ping" << 1), nullptr};
+ auto deferred = startCommand(makeCallbackHandle(), request);
auto stream = streamFactory().blockUntilStreamExists(testHost);
ConnectEvent{stream}.skip();
diff --git a/src/mongo/executor/network_interface_mock.cpp b/src/mongo/executor/network_interface_mock.cpp
index d17fc448bd7..2ebd1244e47 100644
--- a/src/mongo/executor/network_interface_mock.cpp
+++ b/src/mongo/executor/network_interface_mock.cpp
@@ -112,7 +112,7 @@ std::string NetworkInterfaceMock::getHostName() {
}
Status NetworkInterfaceMock::startCommand(const CallbackHandle& cbHandle,
- const RemoteCommandRequest& request,
+ RemoteCommandRequest& request,
const RemoteCommandCompletionFn& onFinish) {
if (inShutdown()) {
return {ErrorCodes::ShutdownInProgress, "NetworkInterfaceMock shutdown in progress"};
diff --git a/src/mongo/executor/network_interface_mock.h b/src/mongo/executor/network_interface_mock.h
index df8f2b541d6..df972ca17f6 100644
--- a/src/mongo/executor/network_interface_mock.h
+++ b/src/mongo/executor/network_interface_mock.h
@@ -105,7 +105,7 @@ public:
virtual Date_t now();
virtual std::string getHostName();
virtual Status startCommand(const TaskExecutor::CallbackHandle& cbHandle,
- const RemoteCommandRequest& request,
+ RemoteCommandRequest& request,
const RemoteCommandCompletionFn& onFinish);
virtual void cancelCommand(const TaskExecutor::CallbackHandle& cbHandle);
/**
diff --git a/src/mongo/executor/network_interface_mock_test.cpp b/src/mongo/executor/network_interface_mock_test.cpp
index d022b07d264..74d5701a55f 100644
--- a/src/mongo/executor/network_interface_mock_test.cpp
+++ b/src/mongo/executor/network_interface_mock_test.cpp
@@ -110,7 +110,8 @@ TEST_F(NetworkInterfaceMockTest, ConnectionHook) {
"test",
BSON("1" << 2),
BSON("some"
- << "stuff")};
+ << "stuff"),
+ nullptr};
RemoteCommandResponse expectedResponse{BSON("foo"
<< "bar"
@@ -159,7 +160,7 @@ TEST_F(NetworkInterfaceMockTest, ConnectionHook) {
bool gotCorrectCommandReply = false;
RemoteCommandRequest actualCommandExpected{
- testHost(), "testDB", BSON("test" << 1), rpc::makeEmptyMetadata()};
+ testHost(), "testDB", BSON("test" << 1), rpc::makeEmptyMetadata(), nullptr};
RemoteCommandResponse actualResponseExpected{BSON("1212121212"
<< "12121212121212"),
BSONObj(),
@@ -237,14 +238,12 @@ TEST_F(NetworkInterfaceMockTest, ConnectionHookFailedValidation) {
bool commandFinished = false;
bool statusPropagated = false;
- ASSERT_OK(net().startCommand(cb,
- RemoteCommandRequest{},
- [&](StatusWith<RemoteCommandResponse> resp) {
- commandFinished = true;
+ RemoteCommandRequest request;
+ ASSERT_OK(net().startCommand(cb, request, [&](StatusWith<RemoteCommandResponse> resp) {
+ commandFinished = true;
- statusPropagated = resp.getStatus().code() ==
- ErrorCodes::ConflictingOperationInProgress;
- }));
+ statusPropagated = resp.getStatus().code() == ErrorCodes::ConflictingOperationInProgress;
+ }));
{
net().enterNetwork();
@@ -279,10 +278,9 @@ TEST_F(NetworkInterfaceMockTest, ConnectionHookNoRequest) {
bool commandFinished = false;
+ RemoteCommandRequest request;
ASSERT_OK(net().startCommand(
- cb,
- RemoteCommandRequest{},
- [&](StatusWith<RemoteCommandResponse> resp) { commandFinished = true; }));
+ cb, request, [&](StatusWith<RemoteCommandResponse> resp) { commandFinished = true; }));
{
net().enterNetwork();
@@ -317,13 +315,11 @@ TEST_F(NetworkInterfaceMockTest, ConnectionHookMakeRequestFails) {
bool commandFinished = false;
bool errorPropagated = false;
- ASSERT_OK(net().startCommand(cb,
- RemoteCommandRequest{},
- [&](StatusWith<RemoteCommandResponse> resp) {
- commandFinished = true;
- errorPropagated =
- resp.getStatus().code() == ErrorCodes::InvalidSyncSource;
- }));
+ RemoteCommandRequest request;
+ ASSERT_OK(net().startCommand(cb, request, [&](StatusWith<RemoteCommandResponse> resp) {
+ commandFinished = true;
+ errorPropagated = resp.getStatus().code() == ErrorCodes::InvalidSyncSource;
+ }));
{
net().enterNetwork();
@@ -356,13 +352,11 @@ TEST_F(NetworkInterfaceMockTest, ConnectionHookHandleReplyFails) {
bool commandFinished = false;
bool errorPropagated = false;
- ASSERT_OK(net().startCommand(cb,
- RemoteCommandRequest{},
- [&](StatusWith<RemoteCommandResponse> resp) {
- commandFinished = true;
- errorPropagated =
- resp.getStatus().code() == ErrorCodes::CappedPositionLost;
- }));
+ RemoteCommandRequest request;
+ ASSERT_OK(net().startCommand(cb, request, [&](StatusWith<RemoteCommandResponse> resp) {
+ commandFinished = true;
+ errorPropagated = resp.getStatus().code() == ErrorCodes::CappedPositionLost;
+ }));
ASSERT(!handleReplyCalled);
@@ -392,8 +386,8 @@ TEST_F(NetworkInterfaceMockTest, StartCommandReturnsNotOKIfShutdownHasStarted) {
tearDown();
TaskExecutor::CallbackHandle cb{};
- ASSERT_NOT_OK(net().startCommand(
- cb, RemoteCommandRequest{}, [](StatusWith<RemoteCommandResponse> resp) {}));
+ RemoteCommandRequest request;
+ ASSERT_NOT_OK(net().startCommand(cb, request, [](StatusWith<RemoteCommandResponse> resp) {}));
}
TEST_F(NetworkInterfaceMockTest, SetAlarmReturnsNotOKIfShutdownHasStarted) {
diff --git a/src/mongo/executor/network_interface_perf_test.cpp b/src/mongo/executor/network_interface_perf_test.cpp
index 4102723b793..8cbd529d91e 100644
--- a/src/mongo/executor/network_interface_perf_test.cpp
+++ b/src/mongo/executor/network_interface_perf_test.cpp
@@ -85,9 +85,9 @@ int timeNetworkTestMillis(std::size_t operations, NetworkInterface* net) {
};
func = [&]() {
- net->startCommand(makeCallbackHandle(),
- {server, "admin", bsonObjPing, bsonObjPing, Milliseconds(-1)},
- callback);
+ RemoteCommandRequest request{
+ server, "admin", bsonObjPing, bsonObjPing, nullptr, Milliseconds(-1)};
+ net->startCommand(makeCallbackHandle(), request, callback);
};
func();
diff --git a/src/mongo/executor/remote_command_request.cpp b/src/mongo/executor/remote_command_request.cpp
index 1f6486afd70..613b5ac5824 100644
--- a/src/mongo/executor/remote_command_request.cpp
+++ b/src/mongo/executor/remote_command_request.cpp
@@ -55,24 +55,28 @@ RemoteCommandRequest::RemoteCommandRequest(RequestId requestId,
const std::string& theDbName,
const BSONObj& theCmdObj,
const BSONObj& metadataObj,
+ OperationContext* txn,
Milliseconds timeoutMillis)
: id(requestId),
target(theTarget),
dbname(theDbName),
metadata(metadataObj),
cmdObj(theCmdObj),
+ txn(txn),
timeout(timeoutMillis) {}
RemoteCommandRequest::RemoteCommandRequest(const HostAndPort& theTarget,
const std::string& theDbName,
const BSONObj& theCmdObj,
const BSONObj& metadataObj,
+ OperationContext* txn,
Milliseconds timeoutMillis)
: RemoteCommandRequest(requestIdCounter.addAndFetch(1),
theTarget,
theDbName,
theCmdObj,
metadataObj,
+ txn,
timeoutMillis) {}
std::string RemoteCommandRequest::toString() const {
diff --git a/src/mongo/executor/remote_command_request.h b/src/mongo/executor/remote_command_request.h
index 975f7498a71..3f8fcc387f0 100644
--- a/src/mongo/executor/remote_command_request.h
+++ b/src/mongo/executor/remote_command_request.h
@@ -60,28 +60,33 @@ struct RemoteCommandRequest {
const std::string& theDbName,
const BSONObj& theCmdObj,
const BSONObj& metadataObj,
+ OperationContext* txn,
Milliseconds timeoutMillis);
RemoteCommandRequest(const HostAndPort& theTarget,
const std::string& theDbName,
const BSONObj& theCmdObj,
const BSONObj& metadataObj,
+ OperationContext* txn,
Milliseconds timeoutMillis = kNoTimeout);
RemoteCommandRequest(const HostAndPort& theTarget,
const std::string& theDbName,
const BSONObj& theCmdObj,
+ OperationContext* txn,
Milliseconds timeoutMillis = kNoTimeout)
: RemoteCommandRequest(
- theTarget, theDbName, theCmdObj, rpc::makeEmptyMetadata(), timeoutMillis) {}
+ theTarget, theDbName, theCmdObj, rpc::makeEmptyMetadata(), txn, timeoutMillis) {}
RemoteCommandRequest(const HostAndPort& theTarget,
const rpc::RequestInterface& request,
+ OperationContext* txn,
Milliseconds timeoutMillis = kNoTimeout)
: RemoteCommandRequest(theTarget,
request.getDatabase().toString(),
request.getCommandArgs(),
request.getMetadata(),
+ txn,
timeoutMillis) {}
std::string toString() const;
@@ -89,13 +94,23 @@ struct RemoteCommandRequest {
bool operator==(const RemoteCommandRequest& rhs) const;
bool operator!=(const RemoteCommandRequest& rhs) const;
- // Internal id of this request. Not interpereted and used for tracing purposes only.
+ // Internal id of this request. Not interpreted and used for tracing purposes only.
RequestId id;
HostAndPort target;
std::string dbname;
BSONObj metadata{rpc::makeEmptyMetadata()};
BSONObj cmdObj;
+
+ // OperationContext is added to each request to allow OP_Command metadata attachment access to
+ // the Client object. The OperationContext is only accessed on the thread that calls
+ // NetworkInterface::startCommand. It is not safe to access from a thread that does not own the
+ // OperationContext in the general case. OperationContext should be non-null on
+ // NetworkInterfaces that do user work (i.e. reads, and writes) so that audit and client
+ // metadata is propagated. It is allowed to be null if used on NetworkInterfaces without
+ // metadata attachment (i.e., replication).
+ OperationContext* txn{nullptr};
+
Milliseconds timeout = kNoTimeout;
// Deadline by when the request must be completed
diff --git a/src/mongo/executor/task_executor_test_common.cpp b/src/mongo/executor/task_executor_test_common.cpp
index ed956b93567..3212e15a376 100644
--- a/src/mongo/executor/task_executor_test_common.cpp
+++ b/src/mongo/executor/task_executor_test_common.cpp
@@ -378,7 +378,8 @@ COMMON_EXECUTOR_TEST(ScheduleRemoteCommand) {
const RemoteCommandRequest request(HostAndPort("localhost", 27017),
"mydb",
BSON("whatsUp"
- << "doc"));
+ << "doc"),
+ nullptr);
TaskExecutor::CallbackHandle cbHandle = unittest::assertGet(executor.scheduleRemoteCommand(
request,
stdx::bind(setStatusOnRemoteCommandCompletion, stdx::placeholders::_1, request, &status1)));
@@ -402,7 +403,8 @@ COMMON_EXECUTOR_TEST(ScheduleAndCancelRemoteCommand) {
const RemoteCommandRequest request(HostAndPort("localhost", 27017),
"mydb",
BSON("whatsUp"
- << "doc"));
+ << "doc"),
+ nullptr);
TaskExecutor::CallbackHandle cbHandle = unittest::assertGet(executor.scheduleRemoteCommand(
request,
stdx::bind(setStatusOnRemoteCommandCompletion, stdx::placeholders::_1, request, &status1)));
@@ -424,7 +426,7 @@ COMMON_EXECUTOR_TEST(RemoteCommandWithTimeout) {
Status status(ErrorCodes::InternalError, "");
launchExecutorThread();
const RemoteCommandRequest request(
- HostAndPort("lazy", 27017), "admin", BSON("sleep" << 1), Milliseconds(1));
+ HostAndPort("lazy", 27017), "admin", BSON("sleep" << 1), nullptr, Milliseconds(1));
TaskExecutor::CallbackHandle cbHandle = unittest::assertGet(executor.scheduleRemoteCommand(
request,
stdx::bind(setStatusOnRemoteCommandCompletion, stdx::placeholders::_1, request, &status)));
@@ -445,7 +447,8 @@ COMMON_EXECUTOR_TEST(CallbackHandleComparison) {
TaskExecutor& executor = getExecutor();
auto status1 = getDetectableErrorStatus();
auto status2 = getDetectableErrorStatus();
- const RemoteCommandRequest request(HostAndPort("lazy", 27017), "admin", BSON("cmd" << 1));
+ const RemoteCommandRequest request(
+ HostAndPort("lazy", 27017), "admin", BSON("cmd" << 1), nullptr);
TaskExecutor::CallbackHandle cbHandle1 = unittest::assertGet(executor.scheduleRemoteCommand(
request,
stdx::bind(setStatusOnRemoteCommandCompletion, stdx::placeholders::_1, request, &status1)));
diff --git a/src/mongo/rpc/metadata.h b/src/mongo/rpc/metadata.h
index b44bdafc13e..9af7bfdfeb5 100644
--- a/src/mongo/rpc/metadata.h
+++ b/src/mongo/rpc/metadata.h
@@ -102,11 +102,13 @@ StatusWith<BSONObj> downconvertReplyMetadata(const BSONObj& commandReply,
const BSONObj& replyMetadata);
/**
- * A function type for writing request metadata. The function takes a pointer to a
- * BSONObjBuilder used to construct the metadata object and the server address of the
- * target of the request and returns a Status indicating if the metadata was written successfully.
+ * A function type for writing request metadata. The function takes a pointer to an optional
+ * OperationContext so metadata associated with a Client can be appended, a pointer to a
+ * BSONObjBuilder used to construct the metadata object and the server address of the target of the
+ * request and returns a Status indicating if the metadata was written successfully.
*/
-using RequestMetadataWriter = stdx::function<Status(BSONObjBuilder*, StringData)>;
+using RequestMetadataWriter =
+ stdx::function<Status(OperationContext*, BSONObjBuilder*, StringData)>;
/**
* A function type for reading reply metadata. The function takes a a reference to a
diff --git a/src/mongo/rpc/metadata/metadata_hook.h b/src/mongo/rpc/metadata/metadata_hook.h
index 888e324b297..2369fb8d47a 100644
--- a/src/mongo/rpc/metadata/metadata_hook.h
+++ b/src/mongo/rpc/metadata/metadata_hook.h
@@ -33,6 +33,7 @@ namespace mongo {
class BSONObj;
class BSONObjBuilder;
struct HostAndPort;
+class OperationContext;
class Status;
namespace rpc {
@@ -52,8 +53,12 @@ public:
/**
* Writes to an outgoing request metadata object. This method must not throw or block on
* database or network operations and can be called by multiple concurrent threads.
+ *
+ * txn may be null as writeRequestMetadata may be called on ASIO background threads, and may not
+ * have an OperationContext as a result.
*/
- virtual Status writeRequestMetadata(const HostAndPort& requestDestination,
+ virtual Status writeRequestMetadata(OperationContext* txn,
+ const HostAndPort& requestDestination,
BSONObjBuilder* metadataBob) = 0;
/**
diff --git a/src/mongo/s/balancer/migration_manager.cpp b/src/mongo/s/balancer/migration_manager.cpp
index 3b24561ade6..209c90b7106 100644
--- a/src/mongo/s/balancer/migration_manager.cpp
+++ b/src/mongo/s/balancer/migration_manager.cpp
@@ -178,7 +178,7 @@ void MigrationManager::_executeMigrations(OperationContext* txn,
continue;
}
- RemoteCommandRequest remoteRequest(host.getValue(), "admin", moveChunkRequestObj);
+ RemoteCommandRequest remoteRequest(host.getValue(), "admin", moveChunkRequestObj, txn);
StatusWith<RemoteCommandResponse> remoteCommandResponse(
Status{ErrorCodes::InternalError, "Uninitialized value"});
diff --git a/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp b/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp
index f8aec3aea0c..bd587160121 100644
--- a/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp
+++ b/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp
@@ -269,7 +269,7 @@ StatusWith<Shard::CommandResponse> ShardingCatalogManagerImpl::_runCommandForAdd
}
executor::RemoteCommandRequest request(
- host.getValue(), dbName, cmdObj, rpc::makeEmptyMetadata(), Seconds(30));
+ host.getValue(), dbName, cmdObj, rpc::makeEmptyMetadata(), nullptr, Seconds(30));
StatusWith<executor::RemoteCommandResponse> swResponse =
Status(ErrorCodes::InternalError, "Internal error running command");
@@ -1167,7 +1167,7 @@ void ShardingCatalogManagerImpl::_scheduleAddShardTask(
}
executor::RemoteCommandRequest request(
- swHost.getValue(), "admin", commandRequest, rpc::makeEmptyMetadata(), Seconds(30));
+ swHost.getValue(), "admin", commandRequest, rpc::makeEmptyMetadata(), nullptr, Seconds(30));
const RemoteCommandCallbackFn callback =
stdx::bind(&ShardingCatalogManagerImpl::_handleAddShardTaskResponse,
diff --git a/src/mongo/s/client/dbclient_multi_command.cpp b/src/mongo/s/client/dbclient_multi_command.cpp
index 2f3e6ae67f8..9b086c15aa3 100644
--- a/src/mongo/s/client/dbclient_multi_command.cpp
+++ b/src/mongo/s/client/dbclient_multi_command.cpp
@@ -31,6 +31,7 @@
#include "mongo/s/client/dbclient_multi_command.h"
#include "mongo/db/audit.h"
+#include "mongo/db/client.h"
#include "mongo/db/dbmessage.h"
#include "mongo/db/wire_version.h"
#include "mongo/rpc/factory.h"
@@ -88,7 +89,9 @@ static void sayAsCmd(DBClientBase* conn, StringData dbName, const BSONObj& cmdOb
BSONObjBuilder metadataBob;
metadataBob.appendElements(upconvertedMetadata);
if (conn->getRequestMetadataWriter()) {
- conn->getRequestMetadataWriter()(&metadataBob, conn->getServerAddress());
+ conn->getRequestMetadataWriter()((haveClient() ? cc().getOperationContext() : nullptr),
+ &metadataBob,
+ conn->getServerAddress());
}
requestBuilder->setDatabase(dbName);
diff --git a/src/mongo/s/client/shard_remote.cpp b/src/mongo/s/client/shard_remote.cpp
index 2fff4b54520..79b7d493863 100644
--- a/src/mongo/s/client/shard_remote.cpp
+++ b/src/mongo/s/client/shard_remote.cpp
@@ -212,6 +212,7 @@ Shard::HostWithResponse ShardRemote::_runCommand(OperationContext* txn,
dbName,
cmdWithMaxTimeMS,
_getMetadataForCommand(readPref),
+ txn,
isConfig() ? kConfigCommandTimeout
: executor::RemoteCommandRequest::kNoTimeout);
StatusWith<RemoteCommandResponse> swResponse =
diff --git a/src/mongo/s/client/sharding_connection_hook.cpp b/src/mongo/s/client/sharding_connection_hook.cpp
index b838c50e10c..b24bccd3381 100644
--- a/src/mongo/s/client/sharding_connection_hook.cpp
+++ b/src/mongo/s/client/sharding_connection_hook.cpp
@@ -77,9 +77,11 @@ void ShardingConnectionHook::onCreate(DBClientBase* conn) {
return _egressHook->readReplyMetadata(target, metadataObj);
});
}
- conn->setRequestMetadataWriter([this](BSONObjBuilder* metadataBob, StringData hostStringData) {
- return _egressHook->writeRequestMetadata(_shardedConnections, hostStringData, metadataBob);
- });
+ conn->setRequestMetadataWriter(
+ [this](OperationContext* txn, BSONObjBuilder* metadataBob, StringData hostStringData) {
+ return _egressHook->writeRequestMetadata(
+ _shardedConnections, txn, hostStringData, metadataBob);
+ });
if (conn->type() == ConnectionString::MASTER) {
diff --git a/src/mongo/s/query/async_results_merger.cpp b/src/mongo/s/query/async_results_merger.cpp
index 9dd3e10c1f3..2d1919769ad 100644
--- a/src/mongo/s/query/async_results_merger.cpp
+++ b/src/mongo/s/query/async_results_merger.cpp
@@ -114,6 +114,11 @@ Status AsyncResultsMerger::setAwaitDataTimeout(Milliseconds awaitDataTimeout) {
return Status::OK();
}
+void AsyncResultsMerger::setOperationContext(OperationContext* txn) {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ _params.txn = txn;
+}
+
bool AsyncResultsMerger::ready() {
stdx::lock_guard<stdx::mutex> lk(_mutex);
return ready_inlock();
@@ -290,8 +295,11 @@ Status AsyncResultsMerger::askForNextBatch_inlock(size_t remoteIndex) {
cmdObj = *remote.initialCmdObj;
}
- executor::RemoteCommandRequest request(
- remote.getTargetHost(), _params.nsString.db().toString(), cmdObj, _metadataObj);
+ executor::RemoteCommandRequest request(remote.getTargetHost(),
+ _params.nsString.db().toString(),
+ cmdObj,
+ _metadataObj,
+ _params.txn);
auto callbackStatus = _executor->scheduleRemoteCommand(
request,
@@ -560,7 +568,7 @@ void AsyncResultsMerger::scheduleKillCursors_inlock() {
BSONObj cmdObj = KillCursorsRequest(_params.nsString, {*remote.cursorId}).toBSON();
executor::RemoteCommandRequest request(
- remote.getTargetHost(), _params.nsString.db().toString(), cmdObj);
+ remote.getTargetHost(), _params.nsString.db().toString(), cmdObj, _params.txn);
_executor->scheduleRemoteCommand(
request,
diff --git a/src/mongo/s/query/async_results_merger.h b/src/mongo/s/query/async_results_merger.h
index b4d04a9c7ad..8956dca52c8 100644
--- a/src/mongo/s/query/async_results_merger.h
+++ b/src/mongo/s/query/async_results_merger.h
@@ -101,6 +101,13 @@ public:
Status setAwaitDataTimeout(Milliseconds awaitDataTimeout);
/**
+ * Update the operation context for remote requests.
+ *
+ * Network requests depend on having a valid operation context for user initiated actions.
+ */
+ void setOperationContext(OperationContext* txn);
+
+ /**
* Returns true if there is no need to schedule remote work in order to take the next action.
* This means that either
* --there is a buffered result which we can return,
diff --git a/src/mongo/s/query/cluster_client_cursor.h b/src/mongo/s/query/cluster_client_cursor.h
index 55f2412f7c8..47f4e46f89a 100644
--- a/src/mongo/s/query/cluster_client_cursor.h
+++ b/src/mongo/s/query/cluster_client_cursor.h
@@ -35,6 +35,7 @@
namespace mongo {
+class OperationContext;
template <typename T>
class StatusWith;
@@ -105,6 +106,13 @@ public:
* the cursor is not tailable + awaitData).
*/
virtual Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) = 0;
+
+ /**
+ * Update the operation context for remote requests.
+ *
+ * Network requests depend on having a valid operation context for user initiated actions.
+ */
+ virtual void setOperationContext(OperationContext* txn) = 0;
};
} // namespace mongo
diff --git a/src/mongo/s/query/cluster_client_cursor_impl.cpp b/src/mongo/s/query/cluster_client_cursor_impl.cpp
index c40dafa91ee..d37b78f2a58 100644
--- a/src/mongo/s/query/cluster_client_cursor_impl.cpp
+++ b/src/mongo/s/query/cluster_client_cursor_impl.cpp
@@ -113,6 +113,10 @@ Status ClusterClientCursorImpl::setAwaitDataTimeout(Milliseconds awaitDataTimeou
return _root->setAwaitDataTimeout(awaitDataTimeout);
}
+void ClusterClientCursorImpl::setOperationContext(OperationContext* txn) {
+ return _root->setOperationContext(txn);
+}
+
std::unique_ptr<RouterExecStage> ClusterClientCursorImpl::buildMergerPlan(
executor::TaskExecutor* executor, ClusterClientCursorParams&& params) {
const auto skip = params.skip;
diff --git a/src/mongo/s/query/cluster_client_cursor_impl.h b/src/mongo/s/query/cluster_client_cursor_impl.h
index b16dae9f9d3..d8645bb7834 100644
--- a/src/mongo/s/query/cluster_client_cursor_impl.h
+++ b/src/mongo/s/query/cluster_client_cursor_impl.h
@@ -105,6 +105,8 @@ public:
Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) final;
+ void setOperationContext(OperationContext* txn) final;
+
private:
/**
* Constructs a cluster client cursor.
diff --git a/src/mongo/s/query/cluster_client_cursor_mock.cpp b/src/mongo/s/query/cluster_client_cursor_mock.cpp
index 4cf9418fc7f..2b10928449e 100644
--- a/src/mongo/s/query/cluster_client_cursor_mock.cpp
+++ b/src/mongo/s/query/cluster_client_cursor_mock.cpp
@@ -97,4 +97,9 @@ Status ClusterClientCursorMock::setAwaitDataTimeout(Milliseconds awaitDataTimeou
MONGO_UNREACHABLE;
}
+
+void ClusterClientCursorMock::setOperationContext(OperationContext* txn) {
+ // Do nothing
+}
+
} // namespace mongo
diff --git a/src/mongo/s/query/cluster_client_cursor_mock.h b/src/mongo/s/query/cluster_client_cursor_mock.h
index 67efae2181a..3749a8abb19 100644
--- a/src/mongo/s/query/cluster_client_cursor_mock.h
+++ b/src/mongo/s/query/cluster_client_cursor_mock.h
@@ -55,6 +55,8 @@ public:
Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) final;
+ void setOperationContext(OperationContext* txn) final;
+
/**
* Returns true unless marked as having non-exhausted remote cursors via
* markRemotesNotExhausted().
diff --git a/src/mongo/s/query/cluster_client_cursor_params.h b/src/mongo/s/query/cluster_client_cursor_params.h
index 5e21b25ea8b..fce3bcf12cb 100644
--- a/src/mongo/s/query/cluster_client_cursor_params.h
+++ b/src/mongo/s/query/cluster_client_cursor_params.h
@@ -137,6 +137,10 @@ struct ClusterClientCursorParams {
// Whether the client indicated that it is willing to receive partial results in the case of an
// unreachable host.
bool isAllowPartialResults = false;
+
+ // OperationContext of the calling thread. Used to append Client dependent metadata to remote
+ // requests.
+ OperationContext* txn;
};
} // mongo
diff --git a/src/mongo/s/query/cluster_cursor_manager.cpp b/src/mongo/s/query/cluster_cursor_manager.cpp
index 924a472d7a5..cca70a8dbdf 100644
--- a/src/mongo/s/query/cluster_cursor_manager.cpp
+++ b/src/mongo/s/query/cluster_cursor_manager.cpp
@@ -152,6 +152,11 @@ Status ClusterCursorManager::PinnedCursor::setAwaitDataTimeout(Milliseconds awai
return _cursor->setAwaitDataTimeout(awaitDataTimeout);
}
+void ClusterCursorManager::PinnedCursor::setOperationContext(OperationContext* txn) {
+ return _cursor->setOperationContext(txn);
+}
+
+
void ClusterCursorManager::PinnedCursor::returnAndKillCursor() {
invariant(_cursor);
@@ -245,7 +250,7 @@ StatusWith<CursorId> ClusterCursorManager::registerCursor(
}
StatusWith<ClusterCursorManager::PinnedCursor> ClusterCursorManager::checkOutCursor(
- const NamespaceString& nss, CursorId cursorId) {
+ const NamespaceString& nss, CursorId cursorId, OperationContext* txn) {
// Read the clock out of the lock.
const auto now = _clockSource->now();
@@ -269,6 +274,7 @@ StatusWith<ClusterCursorManager::PinnedCursor> ClusterCursorManager::checkOutCur
}
entry->setLastActive(now);
+ cursor->setOperationContext(txn);
// Note that pinning a cursor transfers ownership of the underlying ClusterClientCursor object
// to the pin; the CursorEntry is left with a null ClusterClientCursor.
@@ -283,11 +289,15 @@ void ClusterCursorManager::checkInCursor(std::unique_ptr<ClusterClientCursor> cu
invariant(cursor);
+ // Reset OperationContext so that non-user initiated operations do not try to use an invalid
+ // operation context
+ cursor->setOperationContext(nullptr);
const bool remotesExhausted = cursor->remotesExhausted();
CursorEntry* entry = getEntry_inlock(nss, cursorId);
invariant(entry);
+
entry->returnCursor(std::move(cursor));
if (cursorState == CursorState::NotExhausted || entry->getKillPending()) {
@@ -390,6 +400,7 @@ std::size_t ClusterCursorManager::reapZombieCursors() {
}
lk.unlock();
+ zombieCursor.getValue()->setOperationContext(nullptr);
zombieCursor.getValue()->kill();
zombieCursor.getValue().reset();
lk.lock();
diff --git a/src/mongo/s/query/cluster_cursor_manager.h b/src/mongo/s/query/cluster_cursor_manager.h
index 7770cc741c8..a1d4b28ba68 100644
--- a/src/mongo/s/query/cluster_cursor_manager.h
+++ b/src/mongo/s/query/cluster_cursor_manager.h
@@ -42,6 +42,7 @@
namespace mongo {
class ClockSource;
+class OperationContext;
template <typename T>
class StatusWith;
@@ -201,6 +202,14 @@ public:
*/
Status setAwaitDataTimeout(Milliseconds awaitDataTimeout);
+
+ /**
+ * Update the operation context for remote requests.
+ *
+ * Network requests depend on having a valid operation context for user initiated actions.
+ */
+ void setOperationContext(OperationContext* txn);
+
private:
// ClusterCursorManager is a friend so that its methods can call the PinnedCursor
// constructor declared below, which is private to prevent clients from calling it directly.
@@ -278,7 +287,9 @@ public:
*
* Does not block.
*/
- StatusWith<PinnedCursor> checkOutCursor(const NamespaceString& nss, CursorId cursorId);
+ StatusWith<PinnedCursor> checkOutCursor(const NamespaceString& nss,
+ CursorId cursorId,
+ OperationContext* txn);
/**
* Informs the manager that the given cursor should be killed. The cursor need not necessarily
diff --git a/src/mongo/s/query/cluster_cursor_manager_test.cpp b/src/mongo/s/query/cluster_cursor_manager_test.cpp
index 0edda882977..ce65558f865 100644
--- a/src/mongo/s/query/cluster_cursor_manager_test.cpp
+++ b/src/mongo/s/query/cluster_cursor_manager_test.cpp
@@ -113,7 +113,7 @@ TEST_F(ClusterCursorManagerTest, RegisterCursor) {
nss,
ClusterCursorManager::CursorType::NamespaceNotSharded,
ClusterCursorManager::CursorLifetime::Mortal));
- auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId);
+ auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId, nullptr);
ASSERT_OK(pinnedCursor.getStatus());
auto nextResult = pinnedCursor.getValue().next();
ASSERT_OK(nextResult.getStatus());
@@ -143,7 +143,7 @@ TEST_F(ClusterCursorManagerTest, CheckOutCursorBasic) {
nss,
ClusterCursorManager::CursorType::NamespaceNotSharded,
ClusterCursorManager::CursorLifetime::Mortal));
- auto checkedOutCursor = getManager()->checkOutCursor(nss, cursorId);
+ auto checkedOutCursor = getManager()->checkOutCursor(nss, cursorId, nullptr);
ASSERT_OK(checkedOutCursor.getStatus());
ASSERT_EQ(cursorId, checkedOutCursor.getValue().getCursorId());
auto nextResult = checkedOutCursor.getValue().next();
@@ -170,7 +170,7 @@ TEST_F(ClusterCursorManagerTest, CheckOutCursorMultipleCursors) {
ClusterCursorManager::CursorLifetime::Mortal));
}
for (int i = 0; i < numCursors; ++i) {
- auto pinnedCursor = getManager()->checkOutCursor(nss, cursorIds[i]);
+ auto pinnedCursor = getManager()->checkOutCursor(nss, cursorIds[i], nullptr);
ASSERT_OK(pinnedCursor.getStatus());
auto nextResult = pinnedCursor.getValue().next();
ASSERT_OK(nextResult.getStatus());
@@ -189,9 +189,10 @@ TEST_F(ClusterCursorManagerTest, CheckOutCursorPinned) {
nss,
ClusterCursorManager::CursorType::NamespaceNotSharded,
ClusterCursorManager::CursorLifetime::Mortal));
- auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId);
+ auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId, nullptr);
ASSERT_OK(pinnedCursor.getStatus());
- ASSERT_EQ(ErrorCodes::CursorInUse, getManager()->checkOutCursor(nss, cursorId).getStatus());
+ ASSERT_EQ(ErrorCodes::CursorInUse,
+ getManager()->checkOutCursor(nss, cursorId, nullptr).getStatus());
}
// Test that checking out a killed cursor returns an error with code ErrorCodes::CursorNotFound.
@@ -202,12 +203,14 @@ TEST_F(ClusterCursorManagerTest, CheckOutCursorKilled) {
ClusterCursorManager::CursorType::NamespaceNotSharded,
ClusterCursorManager::CursorLifetime::Mortal));
ASSERT_OK(getManager()->killCursor(nss, cursorId));
- ASSERT_EQ(ErrorCodes::CursorNotFound, getManager()->checkOutCursor(nss, cursorId).getStatus());
+ ASSERT_EQ(ErrorCodes::CursorNotFound,
+ getManager()->checkOutCursor(nss, cursorId, nullptr).getStatus());
}
// Test that checking out an unknown cursor returns an error with code ErrorCodes::CursorNotFound.
TEST_F(ClusterCursorManagerTest, CheckOutCursorUnknown) {
- ASSERT_EQ(ErrorCodes::CursorNotFound, getManager()->checkOutCursor(nss, 5).getStatus());
+ ASSERT_EQ(ErrorCodes::CursorNotFound,
+ getManager()->checkOutCursor(nss, 5, nullptr).getStatus());
}
// Test that checking out a unknown cursor returns an error with code ErrorCodes::CursorNotFound,
@@ -221,7 +224,7 @@ TEST_F(ClusterCursorManagerTest, CheckOutCursorWrongNamespace) {
ClusterCursorManager::CursorType::NamespaceNotSharded,
ClusterCursorManager::CursorLifetime::Mortal));
ASSERT_EQ(ErrorCodes::CursorNotFound,
- getManager()->checkOutCursor(incorrectNamespace, cursorId).getStatus());
+ getManager()->checkOutCursor(incorrectNamespace, cursorId, nullptr).getStatus());
}
// Test that checking out a unknown cursor returns an error with code ErrorCodes::CursorNotFound,
@@ -233,7 +236,7 @@ TEST_F(ClusterCursorManagerTest, CheckOutCursorWrongCursorId) {
ClusterCursorManager::CursorType::NamespaceNotSharded,
ClusterCursorManager::CursorLifetime::Mortal));
ASSERT_EQ(ErrorCodes::CursorNotFound,
- getManager()->checkOutCursor(nss, cursorId + 1).getStatus());
+ getManager()->checkOutCursor(nss, cursorId + 1, nullptr).getStatus());
}
// Test that checking out a cursor updates the 'last active' time associated with the cursor to the
@@ -246,7 +249,7 @@ TEST_F(ClusterCursorManagerTest, CheckOutCursorUpdateActiveTime) {
ClusterCursorManager::CursorLifetime::Mortal));
Date_t cursorRegistrationTime = getClockSource()->now();
getClockSource()->advance(Milliseconds(1));
- auto checkedOutCursor = getManager()->checkOutCursor(nss, cursorId);
+ auto checkedOutCursor = getManager()->checkOutCursor(nss, cursorId, nullptr);
ASSERT_OK(checkedOutCursor.getStatus());
checkedOutCursor.getValue().returnCursor(ClusterCursorManager::CursorState::NotExhausted);
getManager()->killMortalCursorsInactiveSince(cursorRegistrationTime);
@@ -262,7 +265,7 @@ TEST_F(ClusterCursorManagerTest, KillCursorBasic) {
nss,
ClusterCursorManager::CursorType::NamespaceNotSharded,
ClusterCursorManager::CursorLifetime::Mortal));
- auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId);
+ auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId, nullptr);
ASSERT_OK(pinnedCursor.getStatus());
ASSERT_OK(getManager()->killCursor(nss, pinnedCursor.getValue().getCursorId()));
pinnedCursor.getValue().returnCursor(ClusterCursorManager::CursorState::NotExhausted);
@@ -434,7 +437,7 @@ TEST_F(ClusterCursorManagerTest, ReapZombieCursorsSkipPinned) {
nss,
ClusterCursorManager::CursorType::NamespaceNotSharded,
ClusterCursorManager::CursorLifetime::Mortal));
- auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId);
+ auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId, nullptr);
ASSERT(!isMockCursorKilled(0));
getManager()->reapZombieCursors();
ASSERT(!isMockCursorKilled(0));
@@ -484,7 +487,7 @@ TEST_F(ClusterCursorManagerTest, StatsPinCursor) {
nss,
ClusterCursorManager::CursorType::NamespaceSharded,
ClusterCursorManager::CursorLifetime::Mortal));
- auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId);
+ auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId, nullptr);
ASSERT_EQ(1U, getManager()->stats().cursorsPinned);
}
@@ -542,7 +545,7 @@ TEST_F(ClusterCursorManagerTest, StatsKillPinnedCursor) {
nss,
ClusterCursorManager::CursorType::NamespaceSharded,
ClusterCursorManager::CursorLifetime::Mortal));
- auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId);
+ auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId, nullptr);
ASSERT_EQ(1U, getManager()->stats().cursorsPinned);
ASSERT_OK(getManager()->killCursor(nss, cursorId));
ASSERT_EQ(0U, getManager()->stats().cursorsPinned);
@@ -555,7 +558,7 @@ TEST_F(ClusterCursorManagerTest, StatsExhaustShardedCursor) {
nss,
ClusterCursorManager::CursorType::NamespaceSharded,
ClusterCursorManager::CursorLifetime::Mortal));
- auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId);
+ auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId, nullptr);
ASSERT_OK(pinnedCursor.getStatus());
ASSERT_OK(pinnedCursor.getValue().next().getStatus());
ASSERT_EQ(1U, getManager()->stats().cursorsSharded);
@@ -570,7 +573,7 @@ TEST_F(ClusterCursorManagerTest, StatsExhaustNotShardedCursor) {
nss,
ClusterCursorManager::CursorType::NamespaceNotSharded,
ClusterCursorManager::CursorLifetime::Mortal));
- auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId);
+ auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId, nullptr);
ASSERT_OK(pinnedCursor.getStatus());
ASSERT_OK(pinnedCursor.getValue().next().getStatus());
ASSERT_EQ(1U, getManager()->stats().cursorsNotSharded);
@@ -586,7 +589,7 @@ TEST_F(ClusterCursorManagerTest, StatsExhaustPinnedCursor) {
nss,
ClusterCursorManager::CursorType::NamespaceNotSharded,
ClusterCursorManager::CursorLifetime::Mortal));
- auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId);
+ auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId, nullptr);
ASSERT_OK(pinnedCursor.getStatus());
ASSERT_OK(pinnedCursor.getValue().next().getStatus());
ASSERT_EQ(1U, getManager()->stats().cursorsPinned);
@@ -602,7 +605,7 @@ TEST_F(ClusterCursorManagerTest, StatsCheckInWithoutExhaustingPinnedCursor) {
nss,
ClusterCursorManager::CursorType::NamespaceNotSharded,
ClusterCursorManager::CursorLifetime::Mortal));
- auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId);
+ auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId, nullptr);
ASSERT_OK(pinnedCursor.getStatus());
ASSERT_OK(pinnedCursor.getValue().next().getStatus());
ASSERT_EQ(1U, getManager()->stats().cursorsPinned);
@@ -685,13 +688,13 @@ TEST_F(ClusterCursorManagerTest, PinnedCursorReturnCursorNotExhausted) {
nss,
ClusterCursorManager::CursorType::NamespaceNotSharded,
ClusterCursorManager::CursorLifetime::Mortal));
- auto registeredCursor = getManager()->checkOutCursor(nss, cursorId);
+ auto registeredCursor = getManager()->checkOutCursor(nss, cursorId, nullptr);
ASSERT_OK(registeredCursor.getStatus());
ASSERT_EQ(cursorId, registeredCursor.getValue().getCursorId());
ASSERT_NE(0, cursorId);
registeredCursor.getValue().returnCursor(ClusterCursorManager::CursorState::NotExhausted);
ASSERT_EQ(0, registeredCursor.getValue().getCursorId());
- auto checkedOutCursor = getManager()->checkOutCursor(nss, cursorId);
+ auto checkedOutCursor = getManager()->checkOutCursor(nss, cursorId, nullptr);
ASSERT_OK(checkedOutCursor.getStatus());
}
@@ -703,7 +706,7 @@ TEST_F(ClusterCursorManagerTest, PinnedCursorReturnCursorExhausted) {
nss,
ClusterCursorManager::CursorType::NamespaceNotSharded,
ClusterCursorManager::CursorLifetime::Mortal));
- auto registeredCursor = getManager()->checkOutCursor(nss, cursorId);
+ auto registeredCursor = getManager()->checkOutCursor(nss, cursorId, nullptr);
ASSERT_OK(registeredCursor.getStatus());
ASSERT_EQ(cursorId, registeredCursor.getValue().getCursorId());
ASSERT_NE(0, cursorId);
@@ -714,7 +717,7 @@ TEST_F(ClusterCursorManagerTest, PinnedCursorReturnCursorExhausted) {
// Cursor should have been destroyed without ever being killed. To be sure that the cursor has
// not been marked kill pending but not yet destroyed (i.e. that the cursor is not a zombie), we
// reapZombieCursors() and check that the cursor still has not been killed.
- ASSERT_NOT_OK(getManager()->checkOutCursor(nss, cursorId).getStatus());
+ ASSERT_NOT_OK(getManager()->checkOutCursor(nss, cursorId, nullptr).getStatus());
ASSERT(!isMockCursorKilled(0));
getManager()->reapZombieCursors();
ASSERT(!isMockCursorKilled(0));
@@ -734,7 +737,7 @@ TEST_F(ClusterCursorManagerTest, PinnedCursorReturnCursorExhaustedWithNonExhaust
nss,
ClusterCursorManager::CursorType::NamespaceNotSharded,
ClusterCursorManager::CursorLifetime::Mortal));
- auto registeredCursor = getManager()->checkOutCursor(nss, cursorId);
+ auto registeredCursor = getManager()->checkOutCursor(nss, cursorId, nullptr);
ASSERT_OK(registeredCursor.getStatus());
ASSERT_EQ(cursorId, registeredCursor.getValue().getCursorId());
ASSERT_NE(0, cursorId);
@@ -743,7 +746,7 @@ TEST_F(ClusterCursorManagerTest, PinnedCursorReturnCursorExhaustedWithNonExhaust
ASSERT_EQ(0, registeredCursor.getValue().getCursorId());
// Cursor should be kill pending, so it will be killed during reaping.
- ASSERT_NOT_OK(getManager()->checkOutCursor(nss, cursorId).getStatus());
+ ASSERT_NOT_OK(getManager()->checkOutCursor(nss, cursorId, nullptr).getStatus());
ASSERT(!isMockCursorKilled(0));
getManager()->reapZombieCursors();
ASSERT(isMockCursorKilled(0));
@@ -757,7 +760,7 @@ TEST_F(ClusterCursorManagerTest, PinnedCursorMoveAssignmentKill) {
nss,
ClusterCursorManager::CursorType::NamespaceNotSharded,
ClusterCursorManager::CursorLifetime::Mortal));
- auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId);
+ auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId, nullptr);
pinnedCursor = ClusterCursorManager::PinnedCursor();
ASSERT(!isMockCursorKilled(0));
getManager()->reapZombieCursors();
@@ -772,7 +775,7 @@ TEST_F(ClusterCursorManagerTest, PinnedCursorDestructorKill) {
nss,
ClusterCursorManager::CursorType::NamespaceNotSharded,
ClusterCursorManager::CursorLifetime::Mortal));
- auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId);
+ auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId, nullptr);
}
ASSERT(!isMockCursorKilled(0));
getManager()->reapZombieCursors();
@@ -790,7 +793,7 @@ TEST_F(ClusterCursorManagerTest, RemotesExhausted) {
nss,
ClusterCursorManager::CursorType::NamespaceNotSharded,
ClusterCursorManager::CursorLifetime::Mortal));
- auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId);
+ auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId, nullptr);
ASSERT_OK(pinnedCursor.getStatus());
ASSERT_FALSE(pinnedCursor.getValue().remotesExhausted());
}
@@ -802,7 +805,7 @@ TEST_F(ClusterCursorManagerTest, DoNotReapKilledPinnedCursors) {
nss,
ClusterCursorManager::CursorType::NamespaceNotSharded,
ClusterCursorManager::CursorLifetime::Mortal));
- auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId);
+ auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId, nullptr);
ASSERT_OK(pinnedCursor.getStatus());
ASSERT_OK(getManager()->killCursor(nss, cursorId));
ASSERT(!isMockCursorKilled(0));
@@ -850,7 +853,7 @@ TEST_F(ClusterCursorManagerTest, CannotCheckoutCursorDuringShutdown) {
ASSERT(isMockCursorKilled(0));
ASSERT_EQUALS(ErrorCodes::ShutdownInProgress,
- getManager()->checkOutCursor(nss, cursorId).getStatus());
+ getManager()->checkOutCursor(nss, cursorId, nullptr).getStatus());
}
} // namespace
diff --git a/src/mongo/s/query/cluster_find.cpp b/src/mongo/s/query/cluster_find.cpp
index 8f7b260e7a1..57d6ece2c87 100644
--- a/src/mongo/s/query/cluster_find.cpp
+++ b/src/mongo/s/query/cluster_find.cpp
@@ -180,6 +180,7 @@ StatusWith<CursorId> runQueryWithoutRetrying(OperationContext* txn,
params.isTailable = query.getQueryRequest().isTailable();
params.isAwaitData = query.getQueryRequest().isAwaitData();
params.isAllowPartialResults = query.getQueryRequest().isAllowPartialResults();
+ params.txn = txn;
// This is the batchSize passed to each subsequent getMore command issued by the cursor. We
// usually use the batchSize associated with the initial find, but as it is illegal to send a
@@ -358,7 +359,7 @@ StatusWith<CursorResponse> ClusterFind::runGetMore(OperationContext* txn,
const GetMoreRequest& request) {
auto cursorManager = grid.getCursorManager();
- auto pinnedCursor = cursorManager->checkOutCursor(request.nss, request.cursorid);
+ auto pinnedCursor = cursorManager->checkOutCursor(request.nss, request.cursorid, txn);
if (!pinnedCursor.isOK()) {
return pinnedCursor.getStatus();
}
diff --git a/src/mongo/s/query/router_exec_stage.h b/src/mongo/s/query/router_exec_stage.h
index 1f2dbdf6c7c..0e10d9edff2 100644
--- a/src/mongo/s/query/router_exec_stage.h
+++ b/src/mongo/s/query/router_exec_stage.h
@@ -37,6 +37,8 @@
namespace mongo {
+class OperationContext;
+
/**
* This is the lightweight mongoS analogue of the PlanStage abstraction used to execute queries on
* mongoD (see mongo/db/plan_stage.h).
@@ -85,6 +87,13 @@ public:
*/
virtual Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) = 0;
+ /**
+ * Update the operation context for remote requests.
+ *
+ * Network requests depend on having a valid operation context for user initiated actions.
+ */
+ virtual void setOperationContext(OperationContext* txn) = 0;
+
protected:
/**
* Returns an unowned pointer to the child stage, or nullptr if there is no child.
diff --git a/src/mongo/s/query/router_stage_limit.cpp b/src/mongo/s/query/router_stage_limit.cpp
index 9a9f77fbf00..5f7db02ec7b 100644
--- a/src/mongo/s/query/router_stage_limit.cpp
+++ b/src/mongo/s/query/router_stage_limit.cpp
@@ -67,4 +67,8 @@ Status RouterStageLimit::setAwaitDataTimeout(Milliseconds awaitDataTimeout) {
return getChildStage()->setAwaitDataTimeout(awaitDataTimeout);
}
+void RouterStageLimit::setOperationContext(OperationContext* txn) {
+ return getChildStage()->setOperationContext(txn);
+}
+
} // namespace mongo
diff --git a/src/mongo/s/query/router_stage_limit.h b/src/mongo/s/query/router_stage_limit.h
index 0db06c30c3b..366a964f2a4 100644
--- a/src/mongo/s/query/router_stage_limit.h
+++ b/src/mongo/s/query/router_stage_limit.h
@@ -47,6 +47,8 @@ public:
Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) final;
+ void setOperationContext(OperationContext* txn) final;
+
private:
long long _limit;
diff --git a/src/mongo/s/query/router_stage_merge.cpp b/src/mongo/s/query/router_stage_merge.cpp
index 527bc0f0063..0e9304d9952 100644
--- a/src/mongo/s/query/router_stage_merge.cpp
+++ b/src/mongo/s/query/router_stage_merge.cpp
@@ -68,4 +68,8 @@ Status RouterStageMerge::setAwaitDataTimeout(Milliseconds awaitDataTimeout) {
return _arm.setAwaitDataTimeout(awaitDataTimeout);
}
+void RouterStageMerge::setOperationContext(OperationContext* txn) {
+ return _arm.setOperationContext(txn);
+}
+
} // namespace mongo
diff --git a/src/mongo/s/query/router_stage_merge.h b/src/mongo/s/query/router_stage_merge.h
index d74870f8a94..a75d5a46f8d 100644
--- a/src/mongo/s/query/router_stage_merge.h
+++ b/src/mongo/s/query/router_stage_merge.h
@@ -53,6 +53,8 @@ public:
Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) final;
+ void setOperationContext(OperationContext* txn) final;
+
private:
// Not owned here.
executor::TaskExecutor* _executor;
diff --git a/src/mongo/s/query/router_stage_mock.cpp b/src/mongo/s/query/router_stage_mock.cpp
index 179635bbb08..daad6fe6d07 100644
--- a/src/mongo/s/query/router_stage_mock.cpp
+++ b/src/mongo/s/query/router_stage_mock.cpp
@@ -73,6 +73,10 @@ Status RouterStageMock::setAwaitDataTimeout(Milliseconds awaitDataTimeout) {
return Status::OK();
}
+void RouterStageMock::setOperationContext(OperationContext* txn) {
+ // Do nothing
+}
+
StatusWith<Milliseconds> RouterStageMock::getAwaitDataTimeout() {
if (!_awaitDataTimeout) {
return Status(ErrorCodes::BadValue, "no awaitData timeout set");
diff --git a/src/mongo/s/query/router_stage_mock.h b/src/mongo/s/query/router_stage_mock.h
index b83e2879096..255ae75b595 100644
--- a/src/mongo/s/query/router_stage_mock.h
+++ b/src/mongo/s/query/router_stage_mock.h
@@ -51,6 +51,8 @@ public:
Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) final;
+ void setOperationContext(OperationContext* txn) final;
+
/**
* Queues a BSONObj to be returned.
*/
diff --git a/src/mongo/s/query/router_stage_remove_sortkey.cpp b/src/mongo/s/query/router_stage_remove_sortkey.cpp
index 16e6f9407a4..949182e3a1a 100644
--- a/src/mongo/s/query/router_stage_remove_sortkey.cpp
+++ b/src/mongo/s/query/router_stage_remove_sortkey.cpp
@@ -69,4 +69,8 @@ Status RouterStageRemoveSortKey::setAwaitDataTimeout(Milliseconds awaitDataTimeo
return getChildStage()->setAwaitDataTimeout(awaitDataTimeout);
}
+void RouterStageRemoveSortKey::setOperationContext(OperationContext* txn) {
+ return getChildStage()->setOperationContext(txn);
+}
+
} // namespace mongo
diff --git a/src/mongo/s/query/router_stage_remove_sortkey.h b/src/mongo/s/query/router_stage_remove_sortkey.h
index 6ef60012a4d..f7965312d47 100644
--- a/src/mongo/s/query/router_stage_remove_sortkey.h
+++ b/src/mongo/s/query/router_stage_remove_sortkey.h
@@ -48,6 +48,8 @@ public:
bool remotesExhausted() final;
Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) final;
+
+ void setOperationContext(OperationContext* txn) final;
};
} // namespace mongo
diff --git a/src/mongo/s/query/router_stage_skip.cpp b/src/mongo/s/query/router_stage_skip.cpp
index 536c3d173a2..af746d5e430 100644
--- a/src/mongo/s/query/router_stage_skip.cpp
+++ b/src/mongo/s/query/router_stage_skip.cpp
@@ -68,4 +68,8 @@ Status RouterStageSkip::setAwaitDataTimeout(Milliseconds awaitDataTimeout) {
return getChildStage()->setAwaitDataTimeout(awaitDataTimeout);
}
+void RouterStageSkip::setOperationContext(OperationContext* txn) {
+ return getChildStage()->setOperationContext(txn);
+}
+
} // namespace mongo
diff --git a/src/mongo/s/query/router_stage_skip.h b/src/mongo/s/query/router_stage_skip.h
index 35994d31e3e..430d3748c91 100644
--- a/src/mongo/s/query/router_stage_skip.h
+++ b/src/mongo/s/query/router_stage_skip.h
@@ -47,6 +47,8 @@ public:
Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) final;
+ void setOperationContext(OperationContext* txn) final;
+
private:
long long _skip;
diff --git a/src/mongo/s/sharding_egress_metadata_hook.cpp b/src/mongo/s/sharding_egress_metadata_hook.cpp
index 0ed82987534..49ee7c03d60 100644
--- a/src/mongo/s/sharding_egress_metadata_hook.cpp
+++ b/src/mongo/s/sharding_egress_metadata_hook.cpp
@@ -50,10 +50,11 @@ namespace rpc {
using std::shared_ptr;
Status ShardingEgressMetadataHook::writeRequestMetadata(bool shardedConnection,
+ OperationContext* txn,
const StringData target,
BSONObjBuilder* metadataBob) {
try {
- audit::writeImpersonatedUsersToMetadata(metadataBob);
+ audit::writeImpersonatedUsersToMetadata(txn, metadataBob);
if (!shardedConnection) {
return Status::OK();
}
@@ -64,10 +65,11 @@ Status ShardingEgressMetadataHook::writeRequestMetadata(bool shardedConnection,
}
}
-Status ShardingEgressMetadataHook::writeRequestMetadata(const HostAndPort& target,
+Status ShardingEgressMetadataHook::writeRequestMetadata(OperationContext* txn,
+ const HostAndPort& target,
BSONObjBuilder* metadataBob) {
try {
- audit::writeImpersonatedUsersToMetadata(metadataBob);
+ audit::writeImpersonatedUsersToMetadata(txn, metadataBob);
rpc::ConfigServerMetadata(_getConfigServerOpTime()).writeToMetadata(metadataBob);
return Status::OK();
} catch (...) {
diff --git a/src/mongo/s/sharding_egress_metadata_hook.h b/src/mongo/s/sharding_egress_metadata_hook.h
index c510aefaab4..df105c813bf 100644
--- a/src/mongo/s/sharding_egress_metadata_hook.h
+++ b/src/mongo/s/sharding_egress_metadata_hook.h
@@ -46,7 +46,9 @@ public:
virtual ~ShardingEgressMetadataHook() = default;
Status readReplyMetadata(const HostAndPort& replySource, const BSONObj& metadataObj) override;
- Status writeRequestMetadata(const HostAndPort& target, BSONObjBuilder* metadataBob) override;
+ Status writeRequestMetadata(OperationContext* txn,
+ const HostAndPort& target,
+ BSONObjBuilder* metadataBob) override;
// These overloaded methods exist to allow ShardingConnectionHook, which is soon to be
// deprecated, to use the logic in ShardingEgressMetadataHook instead of duplicating the
@@ -55,6 +57,7 @@ public:
// contact.
Status readReplyMetadata(const StringData replySource, const BSONObj& metadataObj);
Status writeRequestMetadata(bool shardedConnection,
+ OperationContext* txn,
const StringData target,
BSONObjBuilder* metadataBob);
diff --git a/src/mongo/s/sharding_initialization.cpp b/src/mongo/s/sharding_initialization.cpp
index 61f0ed8d66f..6dde6355a82 100644
--- a/src/mongo/s/sharding_initialization.cpp
+++ b/src/mongo/s/sharding_initialization.cpp
@@ -95,13 +95,13 @@ std::unique_ptr<ShardingCatalogClient> makeCatalogClient(ServiceContext* service
std::unique_ptr<TaskExecutorPool> makeTaskExecutorPool(
std::unique_ptr<NetworkInterface> fixedNet,
- std::unique_ptr<rpc::EgressMetadataHook> metadataHook) {
+ rpc::ShardingEgressMetadataHookBuilder metadataHookBuilder) {
std::vector<std::unique_ptr<executor::TaskExecutor>> executors;
for (size_t i = 0; i < TaskExecutorPool::getSuggestedPoolSize(); ++i) {
auto net = executor::makeNetworkInterface(
"NetworkInterfaceASIO-TaskExecutorPool-" + std::to_string(i),
stdx::make_unique<ShardingNetworkConnectionHook>(),
- std::move(metadataHook));
+ metadataHookBuilder());
auto netPtr = net.get();
auto exec = stdx::make_unique<ThreadPoolTaskExecutor>(
stdx::make_unique<NetworkInterfaceThreadPool>(netPtr), std::move(net));
@@ -148,7 +148,7 @@ Status initializeGlobalShardingState(OperationContext* txn,
stdx::make_unique<ShardingNetworkConnectionHook>(),
hookBuilder());
auto networkPtr = network.get();
- auto executorPool = makeTaskExecutorPool(std::move(network), hookBuilder());
+ auto executorPool = makeTaskExecutorPool(std::move(network), hookBuilder);
executorPool->startup();
auto shardRegistry(stdx::make_unique<ShardRegistry>(std::move(shardFactory), configCS));