diff options
73 files changed, 433 insertions, 242 deletions
diff --git a/src/mongo/client/authenticate_test.cpp b/src/mongo/client/authenticate_test.cpp index adca0ec582b..3044da9f6cd 100644 --- a/src/mongo/client/authenticate_test.cpp +++ b/src/mongo/client/authenticate_test.cpp @@ -107,7 +107,7 @@ public: } void pushRequest(StringData dbname, const BSONObj& cmd) { - _requests.emplace(_mockHost, dbname.toString(), cmd); + _requests.emplace(_mockHost, dbname.toString(), cmd, nullptr); } BSONObj loadMongoCRConversation() { diff --git a/src/mongo/client/dbclient.cpp b/src/mongo/client/dbclient.cpp index 8a76a95de26..73b3d77fab7 100644 --- a/src/mongo/client/dbclient.cpp +++ b/src/mongo/client/dbclient.cpp @@ -44,6 +44,7 @@ #include "mongo/client/replica_set_monitor.h" #include "mongo/config.h" #include "mongo/db/auth/internal_user_auth.h" +#include "mongo/db/client.h" #include "mongo/db/commands.h" #include "mongo/db/json.h" #include "mongo/db/namespace_string.h" @@ -182,7 +183,8 @@ rpc::UniqueReply DBClientWithCommands::runCommandWithMetadata(StringData databas metadataBob.appendElements(metadata); if (_metadataWriter) { - uassertStatusOK(_metadataWriter(&metadataBob, host)); + uassertStatusOK(_metadataWriter( + (haveClient() ? cc().getOperationContext() : nullptr), &metadataBob, host)); } auto requestBuilder = rpc::makeRequestBuilder(getClientRPCProtocols(), getServerRPCProtocols()); diff --git a/src/mongo/client/dbclientcursor.cpp b/src/mongo/client/dbclientcursor.cpp index b1c1faef87f..7cdbc35de7f 100644 --- a/src/mongo/client/dbclientcursor.cpp +++ b/src/mongo/client/dbclientcursor.cpp @@ -34,6 +34,7 @@ #include "mongo/client/dbclientcursor.h" #include "mongo/client/connpool.h" +#include "mongo/db/client.h" #include "mongo/db/dbmessage.h" #include "mongo/db/namespace_string.h" #include "mongo/rpc/factory.h" @@ -79,7 +80,10 @@ Message assembleCommandRequest(DBClientWithCommands* cli, BSONObjBuilder metadataBob; metadataBob.appendElements(upconvertedMetadata); if (cli->getRequestMetadataWriter()) { - uassertStatusOK(cli->getRequestMetadataWriter()(&metadataBob, cli->getServerAddress())); + uassertStatusOK( + cli->getRequestMetadataWriter()((haveClient() ? cc().getOperationContext() : nullptr), + &metadataBob, + cli->getServerAddress())); } requestBuilder->setDatabase(database); diff --git a/src/mongo/client/fetcher.cpp b/src/mongo/client/fetcher.cpp index c1d783858cc..f50a877e557 100644 --- a/src/mongo/client/fetcher.cpp +++ b/src/mongo/client/fetcher.cpp @@ -179,7 +179,7 @@ Fetcher::Fetcher(executor::TaskExecutor* executor, _timeout(timeout), _firstRemoteCommandScheduler( _executor, - RemoteCommandRequest(_source, _dbname, _cmdObj, _metadata, _timeout), + RemoteCommandRequest(_source, _dbname, _cmdObj, _metadata, nullptr, _timeout), stdx::bind(&Fetcher::_callback, this, stdx::placeholders::_1, kFirstBatchFieldName), std::move(firstCommandRetryPolicy)) { uassert(ErrorCodes::BadValue, "callback function cannot be null", work); @@ -280,7 +280,7 @@ Status Fetcher::_scheduleGetMore(const BSONObj& cmdObj) { } StatusWith<executor::TaskExecutor::CallbackHandle> scheduleResult = _executor->scheduleRemoteCommand( - RemoteCommandRequest(_source, _dbname, cmdObj, _metadata, _timeout), + RemoteCommandRequest(_source, _dbname, cmdObj, _metadata, nullptr, _timeout), stdx::bind(&Fetcher::_callback, this, stdx::placeholders::_1, kNextBatchFieldName)); if (!scheduleResult.isOK()) { @@ -389,7 +389,7 @@ void Fetcher::_sendKillCursors(const CursorId id, const NamespaceString& nss) { }; auto cmdObj = BSON("killCursors" << nss.coll() << "cursors" << BSON_ARRAY(id)); auto scheduleResult = _executor->scheduleRemoteCommand( - RemoteCommandRequest(_source, _dbname, cmdObj), logKillCursorsResult); + RemoteCommandRequest(_source, _dbname, cmdObj, nullptr), logKillCursorsResult); if (!scheduleResult.isOK()) { warning() << "failed to schedule killCursors command: " << scheduleResult.getStatus(); } diff --git a/src/mongo/client/remote_command_retry_scheduler_test.cpp b/src/mongo/client/remote_command_retry_scheduler_test.cpp index c5eb97a381a..6e2c70fe9dc 100644 --- a/src/mongo/client/remote_command_retry_scheduler_test.cpp +++ b/src/mongo/client/remote_command_retry_scheduler_test.cpp @@ -168,7 +168,10 @@ std::vector<StatusWith<executor::RemoteCommandResponse>> CallbackResponseSaver:: return _responses; } -const executor::RemoteCommandRequest request(HostAndPort("h1:12345"), "db1", BSON("ping" << 1)); +const executor::RemoteCommandRequest request(HostAndPort("h1:12345"), + "db1", + BSON("ping" << 1), + nullptr); TEST_F(RemoteCommandRetrySchedulerTest, MakeSingleShotRetryPolicy) { auto policy = RemoteCommandRetryScheduler::makeNoRetryPolicy(); @@ -217,7 +220,7 @@ TEST_F(RemoteCommandRetrySchedulerTest, InvalidConstruction) { ASSERT_THROWS_CODE_AND_WHAT( RemoteCommandRetryScheduler( &getExecutor(), - executor::RemoteCommandRequest(HostAndPort(), request.dbname, request.cmdObj), + executor::RemoteCommandRequest(HostAndPort(), request.dbname, request.cmdObj, nullptr), callback, makeRetryPolicy()), UserException, @@ -225,20 +228,21 @@ TEST_F(RemoteCommandRetrySchedulerTest, InvalidConstruction) { "source in remote command request cannot be empty"); // Empty source in remote command request. - ASSERT_THROWS_CODE_AND_WHAT(RemoteCommandRetryScheduler(&getExecutor(), - executor::RemoteCommandRequest( - request.target, "", request.cmdObj), - callback, - makeRetryPolicy()), - UserException, - ErrorCodes::BadValue, - "database name in remote command request cannot be empty"); + ASSERT_THROWS_CODE_AND_WHAT( + RemoteCommandRetryScheduler( + &getExecutor(), + executor::RemoteCommandRequest(request.target, "", request.cmdObj, nullptr), + callback, + makeRetryPolicy()), + UserException, + ErrorCodes::BadValue, + "database name in remote command request cannot be empty"); // Empty command object in remote command request. ASSERT_THROWS_CODE_AND_WHAT( RemoteCommandRetryScheduler( &getExecutor(), - executor::RemoteCommandRequest(request.target, request.dbname, BSONObj()), + executor::RemoteCommandRequest(request.target, request.dbname, BSONObj(), nullptr), callback, makeRetryPolicy()), UserException, diff --git a/src/mongo/db/audit.cpp b/src/mongo/db/audit.cpp index f781b237f96..715e9023c15 100644 --- a/src/mongo/db/audit.cpp +++ b/src/mongo/db/audit.cpp @@ -178,7 +178,8 @@ void logAuthentication(ClientBasic* client, const BSONObj& keyPattern, bool unique) MONGO_AUDIT_STUB - void writeImpersonatedUsersToMetadata(BSONObjBuilder* metadata) MONGO_AUDIT_STUB + void writeImpersonatedUsersToMetadata(OperationContext* txn, + BSONObjBuilder* metadata) MONGO_AUDIT_STUB void parseAndRemoveImpersonatedUsersField(BSONObj cmdObj, std::vector<UserName>* parsedUserNames, diff --git a/src/mongo/db/audit.h b/src/mongo/db/audit.h index 4dc187da2b1..54c07aa1d77 100644 --- a/src/mongo/db/audit.h +++ b/src/mongo/db/audit.h @@ -44,6 +44,7 @@ class BSONObj; class ClientBasic; class Command; class NamespaceString; +class OperationContext; class ReplSetConfig; class StringData; class UserName; @@ -304,7 +305,7 @@ void logShardCollection(ClientBasic* client, StringData ns, const BSONObj& keyPa * to the provided metadata builder. The users and roles are extracted from the current client. * They are to be the impersonated users and roles for a Command run by an internal user. */ -void writeImpersonatedUsersToMetadata(BSONObjBuilder* metadataBob); +void writeImpersonatedUsersToMetadata(OperationContext* txn, BSONObjBuilder* metadataBob); /* * Looks for an 'impersonatedUsers' field. This field is used by mongos to diff --git a/src/mongo/db/repl/check_quorum_for_config_change.cpp b/src/mongo/db/repl/check_quorum_for_config_change.cpp index 583b3df37d8..bd3740ed410 100644 --- a/src/mongo/db/repl/check_quorum_for_config_change.cpp +++ b/src/mongo/db/repl/check_quorum_for_config_change.cpp @@ -103,6 +103,7 @@ std::vector<RemoteCommandRequest> QuorumChecker::getRequests() const { "admin", hbRequest, BSON(rpc::kReplSetMetadataFieldName << 1), + nullptr, _rsConfig->getHeartbeatTimeoutPeriodMillis())); } diff --git a/src/mongo/db/repl/databases_cloner.cpp b/src/mongo/db/repl/databases_cloner.cpp index dd7e8e6996c..c0155d8fcb7 100644 --- a/src/mongo/db/repl/databases_cloner.cpp +++ b/src/mongo/db/repl/databases_cloner.cpp @@ -161,7 +161,8 @@ Status DatabasesCloner::startup() { Request listDBsReq(_source, "admin", BSON("listDatabases" << true), - rpc::ServerSelectionMetadata(true, boost::none).toBSON()); + rpc::ServerSelectionMetadata(true, boost::none).toBSON(), + nullptr); _listDBsScheduler = stdx::make_unique<RemoteCommandRetryScheduler>( _exec, listDBsReq, diff --git a/src/mongo/db/repl/elect_cmd_runner.cpp b/src/mongo/db/repl/elect_cmd_runner.cpp index dc2bb2e240d..afe28960c1f 100644 --- a/src/mongo/db/repl/elect_cmd_runner.cpp +++ b/src/mongo/db/repl/elect_cmd_runner.cpp @@ -80,6 +80,7 @@ std::vector<RemoteCommandRequest> ElectCmdRunner::Algorithm::getRequests() const *it, "admin", replSetElectCmd, + nullptr, Milliseconds(30 * 1000))); // trying to match current Socket timeout } diff --git a/src/mongo/db/repl/elect_cmd_runner_test.cpp b/src/mongo/db/repl/elect_cmd_runner_test.cpp index c92f931cf34..593b1f10fc7 100644 --- a/src/mongo/db/repl/elect_cmd_runner_test.cpp +++ b/src/mongo/db/repl/elect_cmd_runner_test.cpp @@ -287,6 +287,7 @@ protected: return RemoteCommandRequest(HostAndPort(hostname), "", // the non-hostname fields do not matter for Elect BSONObj(), + nullptr, Milliseconds(0)); } diff --git a/src/mongo/db/repl/freshness_checker.cpp b/src/mongo/db/repl/freshness_checker.cpp index f5bd3963e63..d3927c33102 100644 --- a/src/mongo/db/repl/freshness_checker.cpp +++ b/src/mongo/db/repl/freshness_checker.cpp @@ -98,6 +98,7 @@ std::vector<RemoteCommandRequest> FreshnessChecker::Algorithm::getRequests() con *it, "admin", replSetFreshCmd, + nullptr, Milliseconds(30 * 1000))); // trying to match current Socket timeout } diff --git a/src/mongo/db/repl/freshness_checker_test.cpp b/src/mongo/db/repl/freshness_checker_test.cpp index 2d0917254ec..2aef6f626df 100644 --- a/src/mongo/db/repl/freshness_checker_test.cpp +++ b/src/mongo/db/repl/freshness_checker_test.cpp @@ -947,6 +947,7 @@ protected: return RemoteCommandRequest(HostAndPort(hostname), "", // the non-hostname fields do not matter in Freshness BSONObj(), + nullptr, Milliseconds(0)); } diff --git a/src/mongo/db/repl/freshness_scanner.cpp b/src/mongo/db/repl/freshness_scanner.cpp index fcba850e021..87182309a39 100644 --- a/src/mongo/db/repl/freshness_scanner.cpp +++ b/src/mongo/db/repl/freshness_scanner.cpp @@ -64,7 +64,7 @@ std::vector<RemoteCommandRequest> FreshnessScanner::Algorithm::getRequests() con std::vector<RemoteCommandRequest> requests; for (auto& target : _targets) { - requests.push_back(RemoteCommandRequest(target, "admin", getStatusCmd, _timeout)); + requests.push_back(RemoteCommandRequest(target, "admin", getStatusCmd, nullptr, _timeout)); } return requests; } diff --git a/src/mongo/db/repl/freshness_scanner_test.cpp b/src/mongo/db/repl/freshness_scanner_test.cpp index 53314298b5b..d931ef55b70 100644 --- a/src/mongo/db/repl/freshness_scanner_test.cpp +++ b/src/mongo/db/repl/freshness_scanner_test.cpp @@ -99,6 +99,7 @@ protected: return RemoteCommandRequest(HostAndPort(hostname), "", // fields do not matter in FreshnessScanner BSONObj(), + nullptr, Milliseconds(0)); } diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp index fb88d374e93..e56fa347fcd 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp @@ -95,7 +95,7 @@ void ReplicationCoordinatorImpl::_doMemberHeartbeat(ReplicationExecutor::Callbac } const RemoteCommandRequest request( - target, "admin", heartbeatObj, BSON(rpc::kReplSetMetadataFieldName << 1), timeout); + target, "admin", heartbeatObj, BSON(rpc::kReplSetMetadataFieldName << 1), nullptr, timeout); const ReplicationExecutor::RemoteCommandCallbackFn callback = stdx::bind(&ReplicationCoordinatorImpl::_handleHeartbeatResponse, this, @@ -304,7 +304,7 @@ void remoteStepdownCallback(const ReplicationExecutor::RemoteCommandCallbackArgs } // namespace void ReplicationCoordinatorImpl::_requestRemotePrimaryStepdown(const HostAndPort& target) { - RemoteCommandRequest request(target, "admin", BSON("replSetStepDown" << 20)); + RemoteCommandRequest request(target, "admin", BSON("replSetStepDown" << 20), nullptr); log() << "Requesting " << target << " step down from primary"; CBHStatus cbh = _replExecutor.scheduleRemoteCommand(request, remoteStepdownCallback); diff --git a/src/mongo/db/repl/reporter.cpp b/src/mongo/db/repl/reporter.cpp index e5dfdab34da..2e19e804108 100644 --- a/src/mongo/db/repl/reporter.cpp +++ b/src/mongo/db/repl/reporter.cpp @@ -229,7 +229,7 @@ void Reporter::_sendCommand_inlock(BSONObj commandRequest) { << commandRequest; auto scheduleResult = _executor->scheduleRemoteCommand( - executor::RemoteCommandRequest(_target, "admin", commandRequest), + executor::RemoteCommandRequest(_target, "admin", commandRequest, nullptr), stdx::bind(&Reporter::_processResponseCallback, this, stdx::placeholders::_1)); _status = scheduleResult.getStatus(); diff --git a/src/mongo/db/repl/rollback_checker.cpp b/src/mongo/db/repl/rollback_checker.cpp index cb86eb2a811..3e6d49026b2 100644 --- a/src/mongo/db/repl/rollback_checker.cpp +++ b/src/mongo/db/repl/rollback_checker.cpp @@ -140,7 +140,7 @@ bool RollbackChecker::_checkForRollback_inlock(int remoteRBID) { RollbackChecker::CallbackHandle RollbackChecker::_scheduleGetRollbackId( const RemoteCommandCallbackFn& nextAction, const CallbackFn& errorFn) { executor::RemoteCommandRequest getRollbackIDReq( - _syncSource, "admin", BSON("replSetGetRBID" << 1)); + _syncSource, "admin", BSON("replSetGetRBID" << 1), nullptr); auto cbh = _executor->scheduleRemoteCommand(getRollbackIDReq, nextAction); if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) { diff --git a/src/mongo/db/repl/scatter_gather_test.cpp b/src/mongo/db/repl/scatter_gather_test.cpp index 05edf467fbd..e231c251d35 100644 --- a/src/mongo/db/repl/scatter_gather_test.cpp +++ b/src/mongo/db/repl/scatter_gather_test.cpp @@ -61,7 +61,7 @@ public: std::vector<RemoteCommandRequest> requests; for (int i = 0; i < kTotalRequests; i++) { requests.push_back(RemoteCommandRequest( - HostAndPort("hostname", i), "admin", BSONObj(), Milliseconds(30 * 1000))); + HostAndPort("hostname", i), "admin", BSONObj(), nullptr, Milliseconds(30 * 1000))); } return requests; } diff --git a/src/mongo/db/repl/vote_requester.cpp b/src/mongo/db/repl/vote_requester.cpp index 836617e5493..b02b3bb7c0c 100644 --- a/src/mongo/db/repl/vote_requester.cpp +++ b/src/mongo/db/repl/vote_requester.cpp @@ -82,7 +82,7 @@ std::vector<RemoteCommandRequest> VoteRequester::Algorithm::getRequests() const std::vector<RemoteCommandRequest> requests; for (const auto& target : _targets) { requests.push_back(RemoteCommandRequest( - target, "admin", requestVotesCmd, _rsConfig.getElectionTimeoutPeriod())); + target, "admin", requestVotesCmd, nullptr, _rsConfig.getElectionTimeoutPeriod())); } return requests; diff --git a/src/mongo/db/repl/vote_requester_test.cpp b/src/mongo/db/repl/vote_requester_test.cpp index ba4691019d8..799fff1817c 100644 --- a/src/mongo/db/repl/vote_requester_test.cpp +++ b/src/mongo/db/repl/vote_requester_test.cpp @@ -124,6 +124,7 @@ protected: return RemoteCommandRequest(HostAndPort(hostname), "", // fields do not matter in VoteRequester BSONObj(), + nullptr, Milliseconds(0)); } diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp index 55b38ec4b47..500b11e57e7 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp +++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp @@ -472,7 +472,7 @@ StatusWith<BSONObj> MigrationChunkClonerSourceLegacy::_callRecipient(const BSONO auto executor = grid.getExecutorPool()->getArbitraryExecutor(); auto scheduleStatus = executor->scheduleRemoteCommand( - executor::RemoteCommandRequest(_recipientHost, "admin", cmdObj), + executor::RemoteCommandRequest(_recipientHost, "admin", cmdObj, nullptr), [&responseStatus](const executor::TaskExecutor::RemoteCommandCallbackArgs& args) { responseStatus = args.response; }); diff --git a/src/mongo/executor/async_mock_stream_factory.cpp b/src/mongo/executor/async_mock_stream_factory.cpp index ddfd61cf6f8..e368c2ba183 100644 --- a/src/mongo/executor/async_mock_stream_factory.cpp +++ b/src/mongo/executor/async_mock_stream_factory.cpp @@ -286,7 +286,7 @@ void AsyncMockStreamFactory::MockStream::simulateServer( auto parsedRequest = rpc::makeRequest(&msg); ASSERT(parsedRequest->getProtocol() == proto); - RemoteCommandRequest rcr(target(), *parsedRequest); + RemoteCommandRequest rcr(target(), *parsedRequest, nullptr); messageId = msg.header().getId(); diff --git a/src/mongo/executor/connection_pool_asio.cpp b/src/mongo/executor/connection_pool_asio.cpp index c189d0f5347..99e4fadb135 100644 --- a/src/mongo/executor/connection_pool_asio.cpp +++ b/src/mongo/executor/connection_pool_asio.cpp @@ -155,8 +155,11 @@ std::unique_ptr<NetworkInterfaceASIO::AsyncOp> ASIOConnection::makeAsyncOp(ASIOC return stdx::make_unique<NetworkInterfaceASIO::AsyncOp>( conn->_global->_impl, TaskExecutor::CallbackHandle(), - RemoteCommandRequest{ - conn->getHostAndPort(), std::string("admin"), BSON("isMaster" << 1), BSONObj()}, + RemoteCommandRequest{conn->getHostAndPort(), + std::string("admin"), + BSON("isMaster" << 1), + BSONObj(), + nullptr}, [conn](const TaskExecutor::ResponseStatus& status) { auto cb = std::move(conn->_setupCallback); cb(conn, status.isOK() ? Status::OK() : status.getStatus()); diff --git a/src/mongo/executor/connection_pool_asio_integration_test.cpp b/src/mongo/executor/connection_pool_asio_integration_test.cpp index 3bc12a2da9c..04c39348ee6 100644 --- a/src/mongo/executor/connection_pool_asio_integration_test.cpp +++ b/src/mongo/executor/connection_pool_asio_integration_test.cpp @@ -105,12 +105,12 @@ TEST(ConnectionPoolASIO, TestPing) { auto status = Status::OK(); Deferred<StatusWith<RemoteCommandResponse>> deferred; - net.startCommand(makeCallbackHandle(), - RemoteCommandRequest{ - fixture.getServers()[0], "admin", BSON("ping" << 1), BSONObj()}, - [&deferred](StatusWith<RemoteCommandResponse> resp) { - deferred.emplace(std::move(resp)); - }); + RemoteCommandRequest request{ + fixture.getServers()[0], "admin", BSON("ping" << 1), BSONObj(), nullptr}; + net.startCommand( + makeCallbackHandle(), request, [&deferred](StatusWith<RemoteCommandResponse> resp) { + deferred.emplace(std::move(resp)); + }); ASSERT_OK(deferred.get().getStatus()); }); @@ -141,10 +141,12 @@ TEST(ConnectionPoolASIO, TestHostTimeoutRace) { for (int i = 0; i < 1000; i++) { Deferred<StatusWith<RemoteCommandResponse>> deferred; + RemoteCommandRequest request{ + fixture.getServers()[0], "admin", BSON("ping" << 1), BSONObj(), nullptr}; net.startCommand( - makeCallbackHandle(), - RemoteCommandRequest{fixture.getServers()[0], "admin", BSON("ping" << 1), BSONObj()}, - [&](StatusWith<RemoteCommandResponse> resp) { deferred.emplace(std::move(resp)); }); + makeCallbackHandle(), request, [&](StatusWith<RemoteCommandResponse> resp) { + deferred.emplace(std::move(resp)); + }); ASSERT_OK(deferred.get().getStatus()); sleepmillis(1); @@ -168,10 +170,11 @@ TEST(ConnectionPoolASIO, ConnSetupTimeout) { auto guard = MakeGuard([&] { net.shutdown(); }); Deferred<StatusWith<RemoteCommandResponse>> deferred; - net.startCommand( - makeCallbackHandle(), - RemoteCommandRequest{fixture.getServers()[0], "admin", BSON("ping" << 1), BSONObj()}, - [&](StatusWith<RemoteCommandResponse> resp) { deferred.emplace(std::move(resp)); }); + RemoteCommandRequest request{ + fixture.getServers()[0], "admin", BSON("ping" << 1), BSONObj(), nullptr}; + net.startCommand(makeCallbackHandle(), request, [&](StatusWith<RemoteCommandResponse> resp) { + deferred.emplace(std::move(resp)); + }); ASSERT_EQ(deferred.get().getStatus().code(), ErrorCodes::ExceededTimeLimit); } @@ -194,17 +197,20 @@ TEST(ConnectionPoolASIO, ConnRefreshHappens) { std::array<Deferred<StatusWith<RemoteCommandResponse>>, 10> deferreds; - for (auto& deferred : deferreds) { - net.startCommand( - makeCallbackHandle(), - RemoteCommandRequest{fixture.getServers()[0], + RemoteCommandRequest request{fixture.getServers()[0], "admin", BSON("sleep" << 1 << "lock" << "none" << "secs" << 2), - BSONObj()}, - [&](StatusWith<RemoteCommandResponse> resp) { deferred.emplace(std::move(resp)); }); + BSONObj(), + nullptr}; + + for (auto& deferred : deferreds) { + net.startCommand( + makeCallbackHandle(), request, [&](StatusWith<RemoteCommandResponse> resp) { + deferred.emplace(std::move(resp)); + }); } for (auto& deferred : deferreds) { @@ -237,10 +243,11 @@ TEST(ConnectionPoolASIO, ConnRefreshSurvivesFailure) { Deferred<StatusWith<RemoteCommandResponse>> deferred; - net.startCommand( - makeCallbackHandle(), - RemoteCommandRequest{fixture.getServers()[0], "admin", BSON("ping" << 1), BSONObj()}, - [&](StatusWith<RemoteCommandResponse> resp) { deferred.emplace(std::move(resp)); }); + RemoteCommandRequest request{ + fixture.getServers()[0], "admin", BSON("ping" << 1), BSONObj(), nullptr}; + net.startCommand(makeCallbackHandle(), request, [&](StatusWith<RemoteCommandResponse> resp) { + deferred.emplace(std::move(resp)); + }); deferred.get(); diff --git a/src/mongo/executor/network_interface.h b/src/mongo/executor/network_interface.h index d230e2eb8a6..3b73761ef10 100644 --- a/src/mongo/executor/network_interface.h +++ b/src/mongo/executor/network_interface.h @@ -120,12 +120,14 @@ public: /** * Starts asynchronous execution of the command described by "request". * + * The request mutated to append request metadata to be sent in OP_Command messages. + * * Returns ErrorCodes::ShutdownInProgress if NetworkInterface::shutdown has already started * and Status::OK() otherwise. If it returns Status::OK(), then the onFinish argument will be * executed by NetworkInterface eventually; otherwise, it will not. */ virtual Status startCommand(const TaskExecutor::CallbackHandle& cbHandle, - const RemoteCommandRequest& request, + RemoteCommandRequest& request, const RemoteCommandCompletionFn& onFinish) = 0; /** diff --git a/src/mongo/executor/network_interface_asio.cpp b/src/mongo/executor/network_interface_asio.cpp index 1b4eae416cf..8617ce6e20c 100644 --- a/src/mongo/executor/network_interface_asio.cpp +++ b/src/mongo/executor/network_interface_asio.cpp @@ -211,8 +211,36 @@ Date_t NetworkInterfaceASIO::now() { return Date_t::now(); } +namespace { + +Status attachMetadataIfNeeded(RemoteCommandRequest& request, + rpc::EgressMetadataHook* metadataHook) { + + // Append the metadata of the request with metadata from the metadata hook + // if a hook is installed + if (metadataHook) { + BSONObjBuilder augmentedBob; + augmentedBob.appendElements(request.metadata); + + auto writeStatus = callNoexcept(*metadataHook, + &rpc::EgressMetadataHook::writeRequestMetadata, + request.txn, + request.target, + &augmentedBob); + if (!writeStatus.isOK()) { + return writeStatus; + } + + request.metadata = augmentedBob.obj(); + } + + return Status::OK(); +} + +} // namespace + Status NetworkInterfaceASIO::startCommand(const TaskExecutor::CallbackHandle& cbHandle, - const RemoteCommandRequest& request, + RemoteCommandRequest& request, const RemoteCommandCompletionFn& onFinish) { MONGO_ASIO_INVARIANT(onFinish, "Invalid completion function"); { @@ -230,6 +258,11 @@ Status NetworkInterfaceASIO::startCommand(const TaskExecutor::CallbackHandle& cb auto getConnectionStartTime = now(); + auto statusMetadata = attachMetadataIfNeeded(request, _metadataHook.get()); + if (!statusMetadata.isOK()) { + return statusMetadata; + } + auto nextStep = [this, getConnectionStartTime, cbHandle, request, onFinish]( StatusWith<ConnectionPool::ConnectionHandle> swConn) { diff --git a/src/mongo/executor/network_interface_asio.h b/src/mongo/executor/network_interface_asio.h index 1d6bd4c5475..6e80c436abc 100644 --- a/src/mongo/executor/network_interface_asio.h +++ b/src/mongo/executor/network_interface_asio.h @@ -127,7 +127,7 @@ public: void signalWorkAvailable() override; Date_t now() override; Status startCommand(const TaskExecutor::CallbackHandle& cbHandle, - const RemoteCommandRequest& request, + RemoteCommandRequest& request, const RemoteCommandCompletionFn& onFinish) override; void cancelCommand(const TaskExecutor::CallbackHandle& cbHandle) override; void cancelAllCommands() override; @@ -292,8 +292,7 @@ private: // AsyncOp may run multiple commands over its lifetime (for example, an ismaster // command, the command provided to the NetworkInterface via startCommand(), etc.) // Calling beginCommand() resets internal state to prepare to run newCommand. - Status beginCommand(const RemoteCommandRequest& request, - rpc::EgressMetadataHook* metadataHook = nullptr); + Status beginCommand(const RemoteCommandRequest& request); // This form of beginCommand takes a raw message. It is needed if the caller // has to form the command manually (e.g. to use a specific requestBuilder). diff --git a/src/mongo/executor/network_interface_asio_auth.cpp b/src/mongo/executor/network_interface_asio_auth.cpp index 59ed433820a..e09a952c282 100644 --- a/src/mongo/executor/network_interface_asio_auth.cpp +++ b/src/mongo/executor/network_interface_asio_auth.cpp @@ -172,7 +172,7 @@ void NetworkInterfaceASIO::_authenticate(AsyncOp* op) { // SERVER-14170: Set the metadataHook to nullptr explicitly as we cannot write metadata // here. - auto beginStatus = op->beginCommand(request, nullptr); + auto beginStatus = op->beginCommand(request); if (!beginStatus.isOK()) { return handler(beginStatus); } diff --git a/src/mongo/executor/network_interface_asio_command.cpp b/src/mongo/executor/network_interface_asio_command.cpp index f3da7abc0d1..99f702c4b99 100644 --- a/src/mongo/executor/network_interface_asio_command.cpp +++ b/src/mongo/executor/network_interface_asio_command.cpp @@ -246,7 +246,7 @@ void NetworkInterfaceASIO::_beginCommunication(AsyncOp* op) { LOG(3) << "Initiating asynchronous command: " << redact(op->request().toString()); - auto beginStatus = op->beginCommand(op->request(), _metadataHook.get()); + auto beginStatus = op->beginCommand(op->request()); if (!beginStatus.isOK()) { return _completeOperation(op, beginStatus); } @@ -444,7 +444,7 @@ void NetworkInterfaceASIO::_runConnectionHook(AsyncOp* op) { return _beginCommunication(op); } - auto beginStatus = op->beginCommand(*optionalRequest, _metadataHook.get()); + auto beginStatus = op->beginCommand(*optionalRequest); if (!beginStatus.isOK()) { return _completeOperation(op, beginStatus); } diff --git a/src/mongo/executor/network_interface_asio_integration_test.cpp b/src/mongo/executor/network_interface_asio_integration_test.cpp index 6165c018b3f..8b39dd5c992 100644 --- a/src/mongo/executor/network_interface_asio_integration_test.cpp +++ b/src/mongo/executor/network_interface_asio_integration_test.cpp @@ -88,7 +88,7 @@ public: } Deferred<StatusWith<RemoteCommandResponse>> runCommand( - const TaskExecutor::CallbackHandle& cbHandle, const RemoteCommandRequest& request) { + const TaskExecutor::CallbackHandle& cbHandle, RemoteCommandRequest& request) { Deferred<StatusWith<RemoteCommandResponse>> deferred; net().startCommand( cbHandle, request, [deferred](StatusWith<RemoteCommandResponse> resp) mutable { @@ -97,7 +97,7 @@ public: return deferred; } - StatusWith<RemoteCommandResponse> runCommandSync(const RemoteCommandRequest& request) { + StatusWith<RemoteCommandResponse> runCommandSync(RemoteCommandRequest& request) { auto deferred = runCommand(makeCallbackHandle(), request); auto& res = deferred.get(); if (res.isOK()) { @@ -111,8 +111,9 @@ public: void assertCommandOK(StringData db, const BSONObj& cmd, Milliseconds timeoutMillis = Milliseconds(-1)) { - auto res = unittest::assertGet(runCommandSync( - {fixture().getServers()[0], db.toString(), cmd, BSONObj(), timeoutMillis})); + RemoteCommandRequest request{ + fixture().getServers()[0], db.toString(), cmd, BSONObj(), nullptr, timeoutMillis}; + auto res = unittest::assertGet(runCommandSync(request)); ASSERT_OK(getStatusFromCommandResult(res.data)); } @@ -120,8 +121,9 @@ public: const BSONObj& cmd, Milliseconds timeoutMillis, ErrorCodes::Error reason) { - auto clientStatus = runCommandSync( - {fixture().getServers()[0], db.toString(), cmd, BSONObj(), timeoutMillis}); + RemoteCommandRequest request{ + fixture().getServers()[0], db.toString(), cmd, BSONObj(), nullptr, timeoutMillis}; + auto clientStatus = runCommandSync(request); ASSERT_TRUE(clientStatus == reason); } @@ -129,8 +131,9 @@ public: const BSONObj& cmd, Milliseconds timeoutMillis, ErrorCodes::Error reason) { - auto res = unittest::assertGet(runCommandSync( - {fixture().getServers()[0], db.toString(), cmd, BSONObj(), timeoutMillis})); + RemoteCommandRequest request{ + fixture().getServers()[0], db.toString(), cmd, BSONObj(), nullptr, timeoutMillis}; + auto res = unittest::assertGet(runCommandSync(request)); auto serverStatus = getStatusFromCommandResult(res.data); ASSERT_TRUE(serverStatus == reason); } @@ -176,12 +179,12 @@ public: Milliseconds timeout = RemoteCommandRequest::kNoTimeout) { auto cb = makeCallbackHandle(); auto self = *this; - auto out = fixture - ->runCommand(cb, - {unittest::getFixtureConnectionString().getServers()[0], + RemoteCommandRequest request{unittest::getFixtureConnectionString().getServers()[0], "admin", _command, - timeout}) + nullptr, + timeout}; + auto out = fixture->runCommand(cb, request) .then(pool, [self](StatusWith<RemoteCommandResponse> resp) -> Status { auto status = resp.isOK() ? getStatusFromCommandResult(resp.getValue().data) @@ -324,7 +327,8 @@ class HangingHook : public executor::NetworkConnectionHook { << "none" << "secs" << 100000000), - BSONObj()))}; + BSONObj(), + nullptr))}; } Status handleReply(const HostAndPort& remoteHost, RemoteCommandResponse&& response) final { diff --git a/src/mongo/executor/network_interface_asio_operation.cpp b/src/mongo/executor/network_interface_asio_operation.cpp index c9b25549907..a05dba95fc0 100644 --- a/src/mongo/executor/network_interface_asio_operation.cpp +++ b/src/mongo/executor/network_interface_asio_operation.cpp @@ -63,37 +63,16 @@ namespace { // be used to run multiple distinct requests. AtomicUInt64 kAsyncOpIdCounter(0); -// Metadata listener can be nullptr. StatusWith<Message> messageFromRequest(const RemoteCommandRequest& request, - rpc::Protocol protocol, - rpc::EgressMetadataHook* metadataHook) { + rpc::Protocol protocol) { BSONObj query = request.cmdObj; auto requestBuilder = rpc::makeRequestBuilder(protocol); - BSONObj maybeAugmented; - // Handle outgoing request metadata. - if (metadataHook) { - BSONObjBuilder augmentedBob; - augmentedBob.appendElements(request.metadata); - - auto writeStatus = callNoexcept(*metadataHook, - &rpc::EgressMetadataHook::writeRequestMetadata, - request.target, - &augmentedBob); - if (!writeStatus.isOK()) { - return writeStatus; - } - - maybeAugmented = augmentedBob.obj(); - } else { - maybeAugmented = request.metadata; - } - auto toSend = rpc::makeRequestBuilder(protocol) ->setDatabase(request.dbname) .setCommandName(request.cmdObj.firstElementFieldName()) .setCommandArgs(request.cmdObj) - .setMetadata(maybeAugmented) + .setMetadata(request.metadata) .done(); return std::move(toSend); } @@ -197,8 +176,7 @@ Status NetworkInterfaceASIO::AsyncOp::beginCommand(Message&& newCommand, return Status::OK(); } -Status NetworkInterfaceASIO::AsyncOp::beginCommand(const RemoteCommandRequest& request, - rpc::EgressMetadataHook* metadataHook) { +Status NetworkInterfaceASIO::AsyncOp::beginCommand(const RemoteCommandRequest& request) { // Check if we need to downconvert find or getMore commands. StringData commandName = request.cmdObj.firstElement().fieldNameStringData(); const auto isFindCmd = commandName == QueryRequest::kFindCommandName; @@ -208,7 +186,7 @@ Status NetworkInterfaceASIO::AsyncOp::beginCommand(const RemoteCommandRequest& r // If we aren't sending a find or getMore, or the server supports OP_COMMAND we don't have // to worry about downconversion. if (!isFindOrGetMoreCmd || connection().serverProtocols() == rpc::supports::kAll) { - auto newCommand = messageFromRequest(request, operationProtocol(), metadataHook); + auto newCommand = messageFromRequest(request, operationProtocol()); if (!newCommand.isOK()) { return newCommand.getStatus(); } diff --git a/src/mongo/executor/network_interface_asio_test.cpp b/src/mongo/executor/network_interface_asio_test.cpp index bf96e029865..1b52f951017 100644 --- a/src/mongo/executor/network_interface_asio_test.cpp +++ b/src/mongo/executor/network_interface_asio_test.cpp @@ -102,7 +102,7 @@ public: } Deferred<StatusWith<RemoteCommandResponse>> startCommand( - const TaskExecutor::CallbackHandle& cbHandle, const RemoteCommandRequest& request) { + const TaskExecutor::CallbackHandle& cbHandle, RemoteCommandRequest& request) { Deferred<StatusWith<RemoteCommandResponse>> deferredResponse; ASSERT_OK(net().startCommand( cbHandle, @@ -114,7 +114,7 @@ public: } // Helper to run startCommand and wait for it - StatusWith<RemoteCommandResponse> startCommandSync(const RemoteCommandRequest& request) { + StatusWith<RemoteCommandResponse> startCommandSync(RemoteCommandRequest& request) { auto deferred = startCommand(makeCallbackHandle(), request); // wait for the operation to complete @@ -157,8 +157,8 @@ TEST_F(NetworkInterfaceASIOTest, CancelOperation) { auto cbh = makeCallbackHandle(); // Kick off our operation - auto deferred = - startCommand(cbh, RemoteCommandRequest(testHost, "testDB", BSON("a" << 1), BSONObj())); + RemoteCommandRequest request{testHost, "testDB", BSON("a" << 1), BSONObj(), nullptr}; + auto deferred = startCommand(cbh, request); // Create and initialize a stream so operation can begin auto stream = streamFactory().blockUntilStreamExists(testHost); @@ -187,8 +187,8 @@ TEST_F(NetworkInterfaceASIOTest, CancelOperation) { TEST_F(NetworkInterfaceASIOTest, ImmediateCancel) { auto cbh = makeCallbackHandle(); - auto deferred = - startCommand(cbh, RemoteCommandRequest(testHost, "testDB", BSON("a" << 1), BSONObj())); + RemoteCommandRequest request{testHost, "testDB", BSON("a" << 1), BSONObj(), nullptr}; + auto deferred = startCommand(cbh, request); // Cancel immediately net().cancelCommand(cbh); @@ -209,8 +209,8 @@ TEST_F(NetworkInterfaceASIOTest, ImmediateCancel) { TEST_F(NetworkInterfaceASIOTest, LateCancel) { auto cbh = makeCallbackHandle(); - auto deferred = - startCommand(cbh, RemoteCommandRequest(testHost, "testDB", BSON("a" << 1), BSONObj())); + RemoteCommandRequest request{testHost, "testDB", BSON("a" << 1), BSONObj(), nullptr}; + auto deferred = startCommand(cbh, request); // Allow stream to connect so operation can return auto stream = streamFactory().blockUntilStreamExists(testHost); @@ -238,8 +238,8 @@ TEST_F(NetworkInterfaceASIOTest, LateCancel) { TEST_F(NetworkInterfaceASIOTest, CancelWithNetworkError) { auto cbh = makeCallbackHandle(); - auto deferred = - startCommand(cbh, RemoteCommandRequest(testHost, "testDB", BSON("a" << 1), BSONObj())); + RemoteCommandRequest request{testHost, "testDB", BSON("a" << 1), BSONObj(), nullptr}; + auto deferred = startCommand(cbh, request); // Create and initialize a stream so operation can begin auto stream = streamFactory().blockUntilStreamExists(testHost); @@ -268,8 +268,8 @@ TEST_F(NetworkInterfaceASIOTest, CancelWithNetworkError) { TEST_F(NetworkInterfaceASIOTest, CancelWithTimeout) { auto cbh = makeCallbackHandle(); - auto deferred = - startCommand(cbh, RemoteCommandRequest(testHost, "testDB", BSON("a" << 1), BSONObj())); + RemoteCommandRequest request{testHost, "testDB", BSON("a" << 1), BSONObj(), nullptr}; + auto deferred = startCommand(cbh, request); // Create and initialize a stream so operation can begin auto stream = streamFactory().blockUntilStreamExists(testHost); @@ -296,9 +296,9 @@ TEST_F(NetworkInterfaceASIOTest, CancelWithTimeout) { TEST_F(NetworkInterfaceASIOTest, TimeoutWithNetworkError) { auto cbh = makeCallbackHandle(); - auto deferred = startCommand( - cbh, - RemoteCommandRequest(testHost, "testDB", BSON("a" << 1), BSONObj(), Milliseconds(1000))); + RemoteCommandRequest request{ + testHost, "testDB", BSON("a" << 1), BSONObj(), nullptr, Milliseconds(1000)}; + auto deferred = startCommand(cbh, request); // Create and initialize a stream so operation can begin auto stream = streamFactory().blockUntilStreamExists(testHost); @@ -325,9 +325,9 @@ TEST_F(NetworkInterfaceASIOTest, TimeoutWithNetworkError) { TEST_F(NetworkInterfaceASIOTest, CancelWithTimeoutAndNetworkError) { auto cbh = makeCallbackHandle(); - auto deferred = startCommand( - cbh, - RemoteCommandRequest(testHost, "testDB", BSON("a" << 1), BSONObj(), Milliseconds(1000))); + RemoteCommandRequest request{ + testHost, "testDB", BSON("a" << 1), BSONObj(), nullptr, Milliseconds(1000)}; + auto deferred = startCommand(cbh, request); // Create and initialize a stream so operation can begin auto stream = streamFactory().blockUntilStreamExists(testHost); @@ -357,7 +357,8 @@ TEST_F(NetworkInterfaceASIOTest, AsyncOpTimeout) { // Kick off operation auto cb = makeCallbackHandle(); Milliseconds timeout(1000); - auto deferred = startCommand(cb, {testHost, "testDB", BSON("a" << 1), BSONObj(), timeout}); + RemoteCommandRequest request{testHost, "testDB", BSON("a" << 1), BSONObj(), nullptr, timeout}; + auto deferred = startCommand(cb, request); // Create and initialize a stream so operation can begin auto stream = streamFactory().blockUntilStreamExists(testHost); @@ -390,9 +391,8 @@ TEST_F(NetworkInterfaceASIOTest, AsyncOpTimeout) { } TEST_F(NetworkInterfaceASIOTest, StartCommand) { - auto deferred = - startCommand(makeCallbackHandle(), - RemoteCommandRequest(testHost, "testDB", BSON("foo" << 1), BSON("bar" << 1))); + RemoteCommandRequest request{testHost, "testDB", BSON("foo" << 1), BSON("bar" << 1), nullptr}; + auto deferred = startCommand(makeCallbackHandle(), request); auto stream = streamFactory().blockUntilStreamExists(testHost); @@ -440,9 +440,9 @@ TEST_F(NetworkInterfaceASIOTest, InShutdown) { TEST_F(NetworkInterfaceASIOTest, StartCommandReturnsNotOKIfShutdownHasStarted) { net().shutdown(); - ASSERT_NOT_OK(net().startCommand(makeCallbackHandle(), - RemoteCommandRequest{}, - [&](StatusWith<RemoteCommandResponse> resp) {})); + RemoteCommandRequest request; + ASSERT_NOT_OK(net().startCommand( + makeCallbackHandle(), request, [&](StatusWith<RemoteCommandResponse> resp) {})); } class MalformedMessageTest : public NetworkInterfaceASIOTest { @@ -451,9 +451,8 @@ public: void runMessageTest(ErrorCodes::Error code, bool loadBody, MessageHook hook) { // Kick off our operation - auto deferred = - startCommand(makeCallbackHandle(), - RemoteCommandRequest(testHost, "testDB", BSON("ping" << 1), BSONObj())); + RemoteCommandRequest request{testHost, "testDB", BSON("ping" << 1), BSONObj(), nullptr}; + auto deferred = startCommand(makeCallbackHandle(), request); // Wait for it to block waiting for a write auto stream = streamFactory().blockUntilStreamExists(testHost); @@ -591,11 +590,12 @@ TEST_F(NetworkInterfaceASIOConnectionHookTest, InvalidIsMaster) { return Status::OK(); })); - auto deferred = startCommand(makeCallbackHandle(), - {testHost, - "blah", - BSON("foo" - << "bar")}); + RemoteCommandRequest request{testHost, + "blah", + BSON("foo" + << "bar"), + nullptr}; + auto deferred = startCommand(makeCallbackHandle(), request); auto stream = streamFactory().blockUntilStreamExists(testHost); @@ -645,11 +645,12 @@ TEST_F(NetworkInterfaceASIOConnectionHookTest, ValidateHostInvalid) { return Status::OK(); })); - auto deferred = startCommand(makeCallbackHandle(), - {testHost, - "blah", - BSON("foo" - << "bar")}); + RemoteCommandRequest request{testHost, + "blah", + BSON("foo" + << "bar"), + nullptr}; + auto deferred = startCommand(makeCallbackHandle(), request); auto stream = streamFactory().blockUntilStreamExists(testHost); @@ -701,11 +702,12 @@ TEST_F(NetworkInterfaceASIOConnectionHookTest, MakeRequestReturnsError) { return Status::OK(); })); - auto deferred = startCommand(makeCallbackHandle(), - {testHost, - "blah", - BSON("foo" - << "bar")}); + RemoteCommandRequest request{testHost, + "blah", + BSON("foo" + << "bar"), + nullptr}; + auto deferred = startCommand(makeCallbackHandle(), request); auto stream = streamFactory().blockUntilStreamExists(testHost); ConnectEvent{stream}.skip(); @@ -753,7 +755,8 @@ TEST_F(NetworkInterfaceASIOConnectionHookTest, MakeRequestReturnsNone) { auto metadata = BSON("aaa" << "bbb"); - auto deferred = startCommand(makeCallbackHandle(), {testHost, "blah", commandRequest}); + RemoteCommandRequest request{testHost, "blah", commandRequest, nullptr}; + auto deferred = startCommand(makeCallbackHandle(), request); auto stream = streamFactory().blockUntilStreamExists(testHost); ConnectEvent{stream}.skip(); @@ -809,7 +812,7 @@ TEST_F(NetworkInterfaceASIOConnectionHookTest, HandleReplyReturnsError) { [&](const HostAndPort& remoteHost) -> StatusWith<boost::optional<RemoteCommandRequest>> { makeRequestCalled = true; return {boost::make_optional<RemoteCommandRequest>( - {testHost, "foo", hookCommandRequest, hookRequestMetadata})}; + {testHost, "foo", hookCommandRequest, hookRequestMetadata, nullptr})}; }, [&](const HostAndPort& remoteHost, RemoteCommandResponse&& response) { @@ -821,7 +824,8 @@ TEST_F(NetworkInterfaceASIOConnectionHookTest, HandleReplyReturnsError) { auto commandRequest = BSON("foo" << "bar"); - auto deferred = startCommand(makeCallbackHandle(), {testHost, "blah", commandRequest}); + RemoteCommandRequest request{testHost, "blah", commandRequest, nullptr}; + auto deferred = startCommand(makeCallbackHandle(), request); auto stream = streamFactory().blockUntilStreamExists(testHost); ConnectEvent{stream}.skip(); @@ -904,7 +908,8 @@ public: TestMetadataHook(bool* wroteRequestMetadata, bool* gotReplyMetadata) : _wroteRequestMetadata(wroteRequestMetadata), _gotReplyMetadata(gotReplyMetadata) {} - Status writeRequestMetadata(const HostAndPort& requestDestination, + Status writeRequestMetadata(OperationContext* txn, + const HostAndPort& requestDestination, BSONObjBuilder* metadataBob) override { metadataBob->append("foo", "bar"); *_wroteRequestMetadata = true; @@ -926,7 +931,8 @@ TEST_F(NetworkInterfaceASIOMetadataTest, Metadata) { bool gotReplyMetadata = false; start(stdx::make_unique<TestMetadataHook>(&wroteRequestMetadata, &gotReplyMetadata)); - auto deferred = startCommand(makeCallbackHandle(), {testHost, "blah", BSON("ping" << 1)}); + RemoteCommandRequest request{testHost, "blah", BSON("ping" << 1), nullptr}; + auto deferred = startCommand(makeCallbackHandle(), request); auto stream = streamFactory().blockUntilStreamExists(testHost); ConnectEvent{stream}.skip(); diff --git a/src/mongo/executor/network_interface_mock.cpp b/src/mongo/executor/network_interface_mock.cpp index d17fc448bd7..2ebd1244e47 100644 --- a/src/mongo/executor/network_interface_mock.cpp +++ b/src/mongo/executor/network_interface_mock.cpp @@ -112,7 +112,7 @@ std::string NetworkInterfaceMock::getHostName() { } Status NetworkInterfaceMock::startCommand(const CallbackHandle& cbHandle, - const RemoteCommandRequest& request, + RemoteCommandRequest& request, const RemoteCommandCompletionFn& onFinish) { if (inShutdown()) { return {ErrorCodes::ShutdownInProgress, "NetworkInterfaceMock shutdown in progress"}; diff --git a/src/mongo/executor/network_interface_mock.h b/src/mongo/executor/network_interface_mock.h index df8f2b541d6..df972ca17f6 100644 --- a/src/mongo/executor/network_interface_mock.h +++ b/src/mongo/executor/network_interface_mock.h @@ -105,7 +105,7 @@ public: virtual Date_t now(); virtual std::string getHostName(); virtual Status startCommand(const TaskExecutor::CallbackHandle& cbHandle, - const RemoteCommandRequest& request, + RemoteCommandRequest& request, const RemoteCommandCompletionFn& onFinish); virtual void cancelCommand(const TaskExecutor::CallbackHandle& cbHandle); /** diff --git a/src/mongo/executor/network_interface_mock_test.cpp b/src/mongo/executor/network_interface_mock_test.cpp index d022b07d264..74d5701a55f 100644 --- a/src/mongo/executor/network_interface_mock_test.cpp +++ b/src/mongo/executor/network_interface_mock_test.cpp @@ -110,7 +110,8 @@ TEST_F(NetworkInterfaceMockTest, ConnectionHook) { "test", BSON("1" << 2), BSON("some" - << "stuff")}; + << "stuff"), + nullptr}; RemoteCommandResponse expectedResponse{BSON("foo" << "bar" @@ -159,7 +160,7 @@ TEST_F(NetworkInterfaceMockTest, ConnectionHook) { bool gotCorrectCommandReply = false; RemoteCommandRequest actualCommandExpected{ - testHost(), "testDB", BSON("test" << 1), rpc::makeEmptyMetadata()}; + testHost(), "testDB", BSON("test" << 1), rpc::makeEmptyMetadata(), nullptr}; RemoteCommandResponse actualResponseExpected{BSON("1212121212" << "12121212121212"), BSONObj(), @@ -237,14 +238,12 @@ TEST_F(NetworkInterfaceMockTest, ConnectionHookFailedValidation) { bool commandFinished = false; bool statusPropagated = false; - ASSERT_OK(net().startCommand(cb, - RemoteCommandRequest{}, - [&](StatusWith<RemoteCommandResponse> resp) { - commandFinished = true; + RemoteCommandRequest request; + ASSERT_OK(net().startCommand(cb, request, [&](StatusWith<RemoteCommandResponse> resp) { + commandFinished = true; - statusPropagated = resp.getStatus().code() == - ErrorCodes::ConflictingOperationInProgress; - })); + statusPropagated = resp.getStatus().code() == ErrorCodes::ConflictingOperationInProgress; + })); { net().enterNetwork(); @@ -279,10 +278,9 @@ TEST_F(NetworkInterfaceMockTest, ConnectionHookNoRequest) { bool commandFinished = false; + RemoteCommandRequest request; ASSERT_OK(net().startCommand( - cb, - RemoteCommandRequest{}, - [&](StatusWith<RemoteCommandResponse> resp) { commandFinished = true; })); + cb, request, [&](StatusWith<RemoteCommandResponse> resp) { commandFinished = true; })); { net().enterNetwork(); @@ -317,13 +315,11 @@ TEST_F(NetworkInterfaceMockTest, ConnectionHookMakeRequestFails) { bool commandFinished = false; bool errorPropagated = false; - ASSERT_OK(net().startCommand(cb, - RemoteCommandRequest{}, - [&](StatusWith<RemoteCommandResponse> resp) { - commandFinished = true; - errorPropagated = - resp.getStatus().code() == ErrorCodes::InvalidSyncSource; - })); + RemoteCommandRequest request; + ASSERT_OK(net().startCommand(cb, request, [&](StatusWith<RemoteCommandResponse> resp) { + commandFinished = true; + errorPropagated = resp.getStatus().code() == ErrorCodes::InvalidSyncSource; + })); { net().enterNetwork(); @@ -356,13 +352,11 @@ TEST_F(NetworkInterfaceMockTest, ConnectionHookHandleReplyFails) { bool commandFinished = false; bool errorPropagated = false; - ASSERT_OK(net().startCommand(cb, - RemoteCommandRequest{}, - [&](StatusWith<RemoteCommandResponse> resp) { - commandFinished = true; - errorPropagated = - resp.getStatus().code() == ErrorCodes::CappedPositionLost; - })); + RemoteCommandRequest request; + ASSERT_OK(net().startCommand(cb, request, [&](StatusWith<RemoteCommandResponse> resp) { + commandFinished = true; + errorPropagated = resp.getStatus().code() == ErrorCodes::CappedPositionLost; + })); ASSERT(!handleReplyCalled); @@ -392,8 +386,8 @@ TEST_F(NetworkInterfaceMockTest, StartCommandReturnsNotOKIfShutdownHasStarted) { tearDown(); TaskExecutor::CallbackHandle cb{}; - ASSERT_NOT_OK(net().startCommand( - cb, RemoteCommandRequest{}, [](StatusWith<RemoteCommandResponse> resp) {})); + RemoteCommandRequest request; + ASSERT_NOT_OK(net().startCommand(cb, request, [](StatusWith<RemoteCommandResponse> resp) {})); } TEST_F(NetworkInterfaceMockTest, SetAlarmReturnsNotOKIfShutdownHasStarted) { diff --git a/src/mongo/executor/network_interface_perf_test.cpp b/src/mongo/executor/network_interface_perf_test.cpp index 4102723b793..8cbd529d91e 100644 --- a/src/mongo/executor/network_interface_perf_test.cpp +++ b/src/mongo/executor/network_interface_perf_test.cpp @@ -85,9 +85,9 @@ int timeNetworkTestMillis(std::size_t operations, NetworkInterface* net) { }; func = [&]() { - net->startCommand(makeCallbackHandle(), - {server, "admin", bsonObjPing, bsonObjPing, Milliseconds(-1)}, - callback); + RemoteCommandRequest request{ + server, "admin", bsonObjPing, bsonObjPing, nullptr, Milliseconds(-1)}; + net->startCommand(makeCallbackHandle(), request, callback); }; func(); diff --git a/src/mongo/executor/remote_command_request.cpp b/src/mongo/executor/remote_command_request.cpp index 1f6486afd70..613b5ac5824 100644 --- a/src/mongo/executor/remote_command_request.cpp +++ b/src/mongo/executor/remote_command_request.cpp @@ -55,24 +55,28 @@ RemoteCommandRequest::RemoteCommandRequest(RequestId requestId, const std::string& theDbName, const BSONObj& theCmdObj, const BSONObj& metadataObj, + OperationContext* txn, Milliseconds timeoutMillis) : id(requestId), target(theTarget), dbname(theDbName), metadata(metadataObj), cmdObj(theCmdObj), + txn(txn), timeout(timeoutMillis) {} RemoteCommandRequest::RemoteCommandRequest(const HostAndPort& theTarget, const std::string& theDbName, const BSONObj& theCmdObj, const BSONObj& metadataObj, + OperationContext* txn, Milliseconds timeoutMillis) : RemoteCommandRequest(requestIdCounter.addAndFetch(1), theTarget, theDbName, theCmdObj, metadataObj, + txn, timeoutMillis) {} std::string RemoteCommandRequest::toString() const { diff --git a/src/mongo/executor/remote_command_request.h b/src/mongo/executor/remote_command_request.h index 975f7498a71..3f8fcc387f0 100644 --- a/src/mongo/executor/remote_command_request.h +++ b/src/mongo/executor/remote_command_request.h @@ -60,28 +60,33 @@ struct RemoteCommandRequest { const std::string& theDbName, const BSONObj& theCmdObj, const BSONObj& metadataObj, + OperationContext* txn, Milliseconds timeoutMillis); RemoteCommandRequest(const HostAndPort& theTarget, const std::string& theDbName, const BSONObj& theCmdObj, const BSONObj& metadataObj, + OperationContext* txn, Milliseconds timeoutMillis = kNoTimeout); RemoteCommandRequest(const HostAndPort& theTarget, const std::string& theDbName, const BSONObj& theCmdObj, + OperationContext* txn, Milliseconds timeoutMillis = kNoTimeout) : RemoteCommandRequest( - theTarget, theDbName, theCmdObj, rpc::makeEmptyMetadata(), timeoutMillis) {} + theTarget, theDbName, theCmdObj, rpc::makeEmptyMetadata(), txn, timeoutMillis) {} RemoteCommandRequest(const HostAndPort& theTarget, const rpc::RequestInterface& request, + OperationContext* txn, Milliseconds timeoutMillis = kNoTimeout) : RemoteCommandRequest(theTarget, request.getDatabase().toString(), request.getCommandArgs(), request.getMetadata(), + txn, timeoutMillis) {} std::string toString() const; @@ -89,13 +94,23 @@ struct RemoteCommandRequest { bool operator==(const RemoteCommandRequest& rhs) const; bool operator!=(const RemoteCommandRequest& rhs) const; - // Internal id of this request. Not interpereted and used for tracing purposes only. + // Internal id of this request. Not interpreted and used for tracing purposes only. RequestId id; HostAndPort target; std::string dbname; BSONObj metadata{rpc::makeEmptyMetadata()}; BSONObj cmdObj; + + // OperationContext is added to each request to allow OP_Command metadata attachment access to + // the Client object. The OperationContext is only accessed on the thread that calls + // NetworkInterface::startCommand. It is not safe to access from a thread that does not own the + // OperationContext in the general case. OperationContext should be non-null on + // NetworkInterfaces that do user work (i.e. reads, and writes) so that audit and client + // metadata is propagated. It is allowed to be null if used on NetworkInterfaces without + // metadata attachment (i.e., replication). + OperationContext* txn{nullptr}; + Milliseconds timeout = kNoTimeout; // Deadline by when the request must be completed diff --git a/src/mongo/executor/task_executor_test_common.cpp b/src/mongo/executor/task_executor_test_common.cpp index ed956b93567..3212e15a376 100644 --- a/src/mongo/executor/task_executor_test_common.cpp +++ b/src/mongo/executor/task_executor_test_common.cpp @@ -378,7 +378,8 @@ COMMON_EXECUTOR_TEST(ScheduleRemoteCommand) { const RemoteCommandRequest request(HostAndPort("localhost", 27017), "mydb", BSON("whatsUp" - << "doc")); + << "doc"), + nullptr); TaskExecutor::CallbackHandle cbHandle = unittest::assertGet(executor.scheduleRemoteCommand( request, stdx::bind(setStatusOnRemoteCommandCompletion, stdx::placeholders::_1, request, &status1))); @@ -402,7 +403,8 @@ COMMON_EXECUTOR_TEST(ScheduleAndCancelRemoteCommand) { const RemoteCommandRequest request(HostAndPort("localhost", 27017), "mydb", BSON("whatsUp" - << "doc")); + << "doc"), + nullptr); TaskExecutor::CallbackHandle cbHandle = unittest::assertGet(executor.scheduleRemoteCommand( request, stdx::bind(setStatusOnRemoteCommandCompletion, stdx::placeholders::_1, request, &status1))); @@ -424,7 +426,7 @@ COMMON_EXECUTOR_TEST(RemoteCommandWithTimeout) { Status status(ErrorCodes::InternalError, ""); launchExecutorThread(); const RemoteCommandRequest request( - HostAndPort("lazy", 27017), "admin", BSON("sleep" << 1), Milliseconds(1)); + HostAndPort("lazy", 27017), "admin", BSON("sleep" << 1), nullptr, Milliseconds(1)); TaskExecutor::CallbackHandle cbHandle = unittest::assertGet(executor.scheduleRemoteCommand( request, stdx::bind(setStatusOnRemoteCommandCompletion, stdx::placeholders::_1, request, &status))); @@ -445,7 +447,8 @@ COMMON_EXECUTOR_TEST(CallbackHandleComparison) { TaskExecutor& executor = getExecutor(); auto status1 = getDetectableErrorStatus(); auto status2 = getDetectableErrorStatus(); - const RemoteCommandRequest request(HostAndPort("lazy", 27017), "admin", BSON("cmd" << 1)); + const RemoteCommandRequest request( + HostAndPort("lazy", 27017), "admin", BSON("cmd" << 1), nullptr); TaskExecutor::CallbackHandle cbHandle1 = unittest::assertGet(executor.scheduleRemoteCommand( request, stdx::bind(setStatusOnRemoteCommandCompletion, stdx::placeholders::_1, request, &status1))); diff --git a/src/mongo/rpc/metadata.h b/src/mongo/rpc/metadata.h index b44bdafc13e..9af7bfdfeb5 100644 --- a/src/mongo/rpc/metadata.h +++ b/src/mongo/rpc/metadata.h @@ -102,11 +102,13 @@ StatusWith<BSONObj> downconvertReplyMetadata(const BSONObj& commandReply, const BSONObj& replyMetadata); /** - * A function type for writing request metadata. The function takes a pointer to a - * BSONObjBuilder used to construct the metadata object and the server address of the - * target of the request and returns a Status indicating if the metadata was written successfully. + * A function type for writing request metadata. The function takes a pointer to an optional + * OperationContext so metadata associated with a Client can be appended, a pointer to a + * BSONObjBuilder used to construct the metadata object and the server address of the target of the + * request and returns a Status indicating if the metadata was written successfully. */ -using RequestMetadataWriter = stdx::function<Status(BSONObjBuilder*, StringData)>; +using RequestMetadataWriter = + stdx::function<Status(OperationContext*, BSONObjBuilder*, StringData)>; /** * A function type for reading reply metadata. The function takes a a reference to a diff --git a/src/mongo/rpc/metadata/metadata_hook.h b/src/mongo/rpc/metadata/metadata_hook.h index 888e324b297..2369fb8d47a 100644 --- a/src/mongo/rpc/metadata/metadata_hook.h +++ b/src/mongo/rpc/metadata/metadata_hook.h @@ -33,6 +33,7 @@ namespace mongo { class BSONObj; class BSONObjBuilder; struct HostAndPort; +class OperationContext; class Status; namespace rpc { @@ -52,8 +53,12 @@ public: /** * Writes to an outgoing request metadata object. This method must not throw or block on * database or network operations and can be called by multiple concurrent threads. + * + * txn may be null as writeRequestMetadata may be called on ASIO background threads, and may not + * have an OperationContext as a result. */ - virtual Status writeRequestMetadata(const HostAndPort& requestDestination, + virtual Status writeRequestMetadata(OperationContext* txn, + const HostAndPort& requestDestination, BSONObjBuilder* metadataBob) = 0; /** diff --git a/src/mongo/s/balancer/migration_manager.cpp b/src/mongo/s/balancer/migration_manager.cpp index 3b24561ade6..209c90b7106 100644 --- a/src/mongo/s/balancer/migration_manager.cpp +++ b/src/mongo/s/balancer/migration_manager.cpp @@ -178,7 +178,7 @@ void MigrationManager::_executeMigrations(OperationContext* txn, continue; } - RemoteCommandRequest remoteRequest(host.getValue(), "admin", moveChunkRequestObj); + RemoteCommandRequest remoteRequest(host.getValue(), "admin", moveChunkRequestObj, txn); StatusWith<RemoteCommandResponse> remoteCommandResponse( Status{ErrorCodes::InternalError, "Uninitialized value"}); diff --git a/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp b/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp index f8aec3aea0c..bd587160121 100644 --- a/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp +++ b/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp @@ -269,7 +269,7 @@ StatusWith<Shard::CommandResponse> ShardingCatalogManagerImpl::_runCommandForAdd } executor::RemoteCommandRequest request( - host.getValue(), dbName, cmdObj, rpc::makeEmptyMetadata(), Seconds(30)); + host.getValue(), dbName, cmdObj, rpc::makeEmptyMetadata(), nullptr, Seconds(30)); StatusWith<executor::RemoteCommandResponse> swResponse = Status(ErrorCodes::InternalError, "Internal error running command"); @@ -1167,7 +1167,7 @@ void ShardingCatalogManagerImpl::_scheduleAddShardTask( } executor::RemoteCommandRequest request( - swHost.getValue(), "admin", commandRequest, rpc::makeEmptyMetadata(), Seconds(30)); + swHost.getValue(), "admin", commandRequest, rpc::makeEmptyMetadata(), nullptr, Seconds(30)); const RemoteCommandCallbackFn callback = stdx::bind(&ShardingCatalogManagerImpl::_handleAddShardTaskResponse, diff --git a/src/mongo/s/client/dbclient_multi_command.cpp b/src/mongo/s/client/dbclient_multi_command.cpp index 2f3e6ae67f8..9b086c15aa3 100644 --- a/src/mongo/s/client/dbclient_multi_command.cpp +++ b/src/mongo/s/client/dbclient_multi_command.cpp @@ -31,6 +31,7 @@ #include "mongo/s/client/dbclient_multi_command.h" #include "mongo/db/audit.h" +#include "mongo/db/client.h" #include "mongo/db/dbmessage.h" #include "mongo/db/wire_version.h" #include "mongo/rpc/factory.h" @@ -88,7 +89,9 @@ static void sayAsCmd(DBClientBase* conn, StringData dbName, const BSONObj& cmdOb BSONObjBuilder metadataBob; metadataBob.appendElements(upconvertedMetadata); if (conn->getRequestMetadataWriter()) { - conn->getRequestMetadataWriter()(&metadataBob, conn->getServerAddress()); + conn->getRequestMetadataWriter()((haveClient() ? cc().getOperationContext() : nullptr), + &metadataBob, + conn->getServerAddress()); } requestBuilder->setDatabase(dbName); diff --git a/src/mongo/s/client/shard_remote.cpp b/src/mongo/s/client/shard_remote.cpp index 2fff4b54520..79b7d493863 100644 --- a/src/mongo/s/client/shard_remote.cpp +++ b/src/mongo/s/client/shard_remote.cpp @@ -212,6 +212,7 @@ Shard::HostWithResponse ShardRemote::_runCommand(OperationContext* txn, dbName, cmdWithMaxTimeMS, _getMetadataForCommand(readPref), + txn, isConfig() ? kConfigCommandTimeout : executor::RemoteCommandRequest::kNoTimeout); StatusWith<RemoteCommandResponse> swResponse = diff --git a/src/mongo/s/client/sharding_connection_hook.cpp b/src/mongo/s/client/sharding_connection_hook.cpp index b838c50e10c..b24bccd3381 100644 --- a/src/mongo/s/client/sharding_connection_hook.cpp +++ b/src/mongo/s/client/sharding_connection_hook.cpp @@ -77,9 +77,11 @@ void ShardingConnectionHook::onCreate(DBClientBase* conn) { return _egressHook->readReplyMetadata(target, metadataObj); }); } - conn->setRequestMetadataWriter([this](BSONObjBuilder* metadataBob, StringData hostStringData) { - return _egressHook->writeRequestMetadata(_shardedConnections, hostStringData, metadataBob); - }); + conn->setRequestMetadataWriter( + [this](OperationContext* txn, BSONObjBuilder* metadataBob, StringData hostStringData) { + return _egressHook->writeRequestMetadata( + _shardedConnections, txn, hostStringData, metadataBob); + }); if (conn->type() == ConnectionString::MASTER) { diff --git a/src/mongo/s/query/async_results_merger.cpp b/src/mongo/s/query/async_results_merger.cpp index 9dd3e10c1f3..2d1919769ad 100644 --- a/src/mongo/s/query/async_results_merger.cpp +++ b/src/mongo/s/query/async_results_merger.cpp @@ -114,6 +114,11 @@ Status AsyncResultsMerger::setAwaitDataTimeout(Milliseconds awaitDataTimeout) { return Status::OK(); } +void AsyncResultsMerger::setOperationContext(OperationContext* txn) { + stdx::lock_guard<stdx::mutex> lk(_mutex); + _params.txn = txn; +} + bool AsyncResultsMerger::ready() { stdx::lock_guard<stdx::mutex> lk(_mutex); return ready_inlock(); @@ -290,8 +295,11 @@ Status AsyncResultsMerger::askForNextBatch_inlock(size_t remoteIndex) { cmdObj = *remote.initialCmdObj; } - executor::RemoteCommandRequest request( - remote.getTargetHost(), _params.nsString.db().toString(), cmdObj, _metadataObj); + executor::RemoteCommandRequest request(remote.getTargetHost(), + _params.nsString.db().toString(), + cmdObj, + _metadataObj, + _params.txn); auto callbackStatus = _executor->scheduleRemoteCommand( request, @@ -560,7 +568,7 @@ void AsyncResultsMerger::scheduleKillCursors_inlock() { BSONObj cmdObj = KillCursorsRequest(_params.nsString, {*remote.cursorId}).toBSON(); executor::RemoteCommandRequest request( - remote.getTargetHost(), _params.nsString.db().toString(), cmdObj); + remote.getTargetHost(), _params.nsString.db().toString(), cmdObj, _params.txn); _executor->scheduleRemoteCommand( request, diff --git a/src/mongo/s/query/async_results_merger.h b/src/mongo/s/query/async_results_merger.h index b4d04a9c7ad..8956dca52c8 100644 --- a/src/mongo/s/query/async_results_merger.h +++ b/src/mongo/s/query/async_results_merger.h @@ -101,6 +101,13 @@ public: Status setAwaitDataTimeout(Milliseconds awaitDataTimeout); /** + * Update the operation context for remote requests. + * + * Network requests depend on having a valid operation context for user initiated actions. + */ + void setOperationContext(OperationContext* txn); + + /** * Returns true if there is no need to schedule remote work in order to take the next action. * This means that either * --there is a buffered result which we can return, diff --git a/src/mongo/s/query/cluster_client_cursor.h b/src/mongo/s/query/cluster_client_cursor.h index 55f2412f7c8..47f4e46f89a 100644 --- a/src/mongo/s/query/cluster_client_cursor.h +++ b/src/mongo/s/query/cluster_client_cursor.h @@ -35,6 +35,7 @@ namespace mongo { +class OperationContext; template <typename T> class StatusWith; @@ -105,6 +106,13 @@ public: * the cursor is not tailable + awaitData). */ virtual Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) = 0; + + /** + * Update the operation context for remote requests. + * + * Network requests depend on having a valid operation context for user initiated actions. + */ + virtual void setOperationContext(OperationContext* txn) = 0; }; } // namespace mongo diff --git a/src/mongo/s/query/cluster_client_cursor_impl.cpp b/src/mongo/s/query/cluster_client_cursor_impl.cpp index c40dafa91ee..d37b78f2a58 100644 --- a/src/mongo/s/query/cluster_client_cursor_impl.cpp +++ b/src/mongo/s/query/cluster_client_cursor_impl.cpp @@ -113,6 +113,10 @@ Status ClusterClientCursorImpl::setAwaitDataTimeout(Milliseconds awaitDataTimeou return _root->setAwaitDataTimeout(awaitDataTimeout); } +void ClusterClientCursorImpl::setOperationContext(OperationContext* txn) { + return _root->setOperationContext(txn); +} + std::unique_ptr<RouterExecStage> ClusterClientCursorImpl::buildMergerPlan( executor::TaskExecutor* executor, ClusterClientCursorParams&& params) { const auto skip = params.skip; diff --git a/src/mongo/s/query/cluster_client_cursor_impl.h b/src/mongo/s/query/cluster_client_cursor_impl.h index b16dae9f9d3..d8645bb7834 100644 --- a/src/mongo/s/query/cluster_client_cursor_impl.h +++ b/src/mongo/s/query/cluster_client_cursor_impl.h @@ -105,6 +105,8 @@ public: Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) final; + void setOperationContext(OperationContext* txn) final; + private: /** * Constructs a cluster client cursor. diff --git a/src/mongo/s/query/cluster_client_cursor_mock.cpp b/src/mongo/s/query/cluster_client_cursor_mock.cpp index 4cf9418fc7f..2b10928449e 100644 --- a/src/mongo/s/query/cluster_client_cursor_mock.cpp +++ b/src/mongo/s/query/cluster_client_cursor_mock.cpp @@ -97,4 +97,9 @@ Status ClusterClientCursorMock::setAwaitDataTimeout(Milliseconds awaitDataTimeou MONGO_UNREACHABLE; } + +void ClusterClientCursorMock::setOperationContext(OperationContext* txn) { + // Do nothing +} + } // namespace mongo diff --git a/src/mongo/s/query/cluster_client_cursor_mock.h b/src/mongo/s/query/cluster_client_cursor_mock.h index 67efae2181a..3749a8abb19 100644 --- a/src/mongo/s/query/cluster_client_cursor_mock.h +++ b/src/mongo/s/query/cluster_client_cursor_mock.h @@ -55,6 +55,8 @@ public: Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) final; + void setOperationContext(OperationContext* txn) final; + /** * Returns true unless marked as having non-exhausted remote cursors via * markRemotesNotExhausted(). diff --git a/src/mongo/s/query/cluster_client_cursor_params.h b/src/mongo/s/query/cluster_client_cursor_params.h index 5e21b25ea8b..fce3bcf12cb 100644 --- a/src/mongo/s/query/cluster_client_cursor_params.h +++ b/src/mongo/s/query/cluster_client_cursor_params.h @@ -137,6 +137,10 @@ struct ClusterClientCursorParams { // Whether the client indicated that it is willing to receive partial results in the case of an // unreachable host. bool isAllowPartialResults = false; + + // OperationContext of the calling thread. Used to append Client dependent metadata to remote + // requests. + OperationContext* txn; }; } // mongo diff --git a/src/mongo/s/query/cluster_cursor_manager.cpp b/src/mongo/s/query/cluster_cursor_manager.cpp index 924a472d7a5..cca70a8dbdf 100644 --- a/src/mongo/s/query/cluster_cursor_manager.cpp +++ b/src/mongo/s/query/cluster_cursor_manager.cpp @@ -152,6 +152,11 @@ Status ClusterCursorManager::PinnedCursor::setAwaitDataTimeout(Milliseconds awai return _cursor->setAwaitDataTimeout(awaitDataTimeout); } +void ClusterCursorManager::PinnedCursor::setOperationContext(OperationContext* txn) { + return _cursor->setOperationContext(txn); +} + + void ClusterCursorManager::PinnedCursor::returnAndKillCursor() { invariant(_cursor); @@ -245,7 +250,7 @@ StatusWith<CursorId> ClusterCursorManager::registerCursor( } StatusWith<ClusterCursorManager::PinnedCursor> ClusterCursorManager::checkOutCursor( - const NamespaceString& nss, CursorId cursorId) { + const NamespaceString& nss, CursorId cursorId, OperationContext* txn) { // Read the clock out of the lock. const auto now = _clockSource->now(); @@ -269,6 +274,7 @@ StatusWith<ClusterCursorManager::PinnedCursor> ClusterCursorManager::checkOutCur } entry->setLastActive(now); + cursor->setOperationContext(txn); // Note that pinning a cursor transfers ownership of the underlying ClusterClientCursor object // to the pin; the CursorEntry is left with a null ClusterClientCursor. @@ -283,11 +289,15 @@ void ClusterCursorManager::checkInCursor(std::unique_ptr<ClusterClientCursor> cu invariant(cursor); + // Reset OperationContext so that non-user initiated operations do not try to use an invalid + // operation context + cursor->setOperationContext(nullptr); const bool remotesExhausted = cursor->remotesExhausted(); CursorEntry* entry = getEntry_inlock(nss, cursorId); invariant(entry); + entry->returnCursor(std::move(cursor)); if (cursorState == CursorState::NotExhausted || entry->getKillPending()) { @@ -390,6 +400,7 @@ std::size_t ClusterCursorManager::reapZombieCursors() { } lk.unlock(); + zombieCursor.getValue()->setOperationContext(nullptr); zombieCursor.getValue()->kill(); zombieCursor.getValue().reset(); lk.lock(); diff --git a/src/mongo/s/query/cluster_cursor_manager.h b/src/mongo/s/query/cluster_cursor_manager.h index 7770cc741c8..a1d4b28ba68 100644 --- a/src/mongo/s/query/cluster_cursor_manager.h +++ b/src/mongo/s/query/cluster_cursor_manager.h @@ -42,6 +42,7 @@ namespace mongo { class ClockSource; +class OperationContext; template <typename T> class StatusWith; @@ -201,6 +202,14 @@ public: */ Status setAwaitDataTimeout(Milliseconds awaitDataTimeout); + + /** + * Update the operation context for remote requests. + * + * Network requests depend on having a valid operation context for user initiated actions. + */ + void setOperationContext(OperationContext* txn); + private: // ClusterCursorManager is a friend so that its methods can call the PinnedCursor // constructor declared below, which is private to prevent clients from calling it directly. @@ -278,7 +287,9 @@ public: * * Does not block. */ - StatusWith<PinnedCursor> checkOutCursor(const NamespaceString& nss, CursorId cursorId); + StatusWith<PinnedCursor> checkOutCursor(const NamespaceString& nss, + CursorId cursorId, + OperationContext* txn); /** * Informs the manager that the given cursor should be killed. The cursor need not necessarily diff --git a/src/mongo/s/query/cluster_cursor_manager_test.cpp b/src/mongo/s/query/cluster_cursor_manager_test.cpp index 0edda882977..ce65558f865 100644 --- a/src/mongo/s/query/cluster_cursor_manager_test.cpp +++ b/src/mongo/s/query/cluster_cursor_manager_test.cpp @@ -113,7 +113,7 @@ TEST_F(ClusterCursorManagerTest, RegisterCursor) { nss, ClusterCursorManager::CursorType::NamespaceNotSharded, ClusterCursorManager::CursorLifetime::Mortal)); - auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId); + auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId, nullptr); ASSERT_OK(pinnedCursor.getStatus()); auto nextResult = pinnedCursor.getValue().next(); ASSERT_OK(nextResult.getStatus()); @@ -143,7 +143,7 @@ TEST_F(ClusterCursorManagerTest, CheckOutCursorBasic) { nss, ClusterCursorManager::CursorType::NamespaceNotSharded, ClusterCursorManager::CursorLifetime::Mortal)); - auto checkedOutCursor = getManager()->checkOutCursor(nss, cursorId); + auto checkedOutCursor = getManager()->checkOutCursor(nss, cursorId, nullptr); ASSERT_OK(checkedOutCursor.getStatus()); ASSERT_EQ(cursorId, checkedOutCursor.getValue().getCursorId()); auto nextResult = checkedOutCursor.getValue().next(); @@ -170,7 +170,7 @@ TEST_F(ClusterCursorManagerTest, CheckOutCursorMultipleCursors) { ClusterCursorManager::CursorLifetime::Mortal)); } for (int i = 0; i < numCursors; ++i) { - auto pinnedCursor = getManager()->checkOutCursor(nss, cursorIds[i]); + auto pinnedCursor = getManager()->checkOutCursor(nss, cursorIds[i], nullptr); ASSERT_OK(pinnedCursor.getStatus()); auto nextResult = pinnedCursor.getValue().next(); ASSERT_OK(nextResult.getStatus()); @@ -189,9 +189,10 @@ TEST_F(ClusterCursorManagerTest, CheckOutCursorPinned) { nss, ClusterCursorManager::CursorType::NamespaceNotSharded, ClusterCursorManager::CursorLifetime::Mortal)); - auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId); + auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId, nullptr); ASSERT_OK(pinnedCursor.getStatus()); - ASSERT_EQ(ErrorCodes::CursorInUse, getManager()->checkOutCursor(nss, cursorId).getStatus()); + ASSERT_EQ(ErrorCodes::CursorInUse, + getManager()->checkOutCursor(nss, cursorId, nullptr).getStatus()); } // Test that checking out a killed cursor returns an error with code ErrorCodes::CursorNotFound. @@ -202,12 +203,14 @@ TEST_F(ClusterCursorManagerTest, CheckOutCursorKilled) { ClusterCursorManager::CursorType::NamespaceNotSharded, ClusterCursorManager::CursorLifetime::Mortal)); ASSERT_OK(getManager()->killCursor(nss, cursorId)); - ASSERT_EQ(ErrorCodes::CursorNotFound, getManager()->checkOutCursor(nss, cursorId).getStatus()); + ASSERT_EQ(ErrorCodes::CursorNotFound, + getManager()->checkOutCursor(nss, cursorId, nullptr).getStatus()); } // Test that checking out an unknown cursor returns an error with code ErrorCodes::CursorNotFound. TEST_F(ClusterCursorManagerTest, CheckOutCursorUnknown) { - ASSERT_EQ(ErrorCodes::CursorNotFound, getManager()->checkOutCursor(nss, 5).getStatus()); + ASSERT_EQ(ErrorCodes::CursorNotFound, + getManager()->checkOutCursor(nss, 5, nullptr).getStatus()); } // Test that checking out a unknown cursor returns an error with code ErrorCodes::CursorNotFound, @@ -221,7 +224,7 @@ TEST_F(ClusterCursorManagerTest, CheckOutCursorWrongNamespace) { ClusterCursorManager::CursorType::NamespaceNotSharded, ClusterCursorManager::CursorLifetime::Mortal)); ASSERT_EQ(ErrorCodes::CursorNotFound, - getManager()->checkOutCursor(incorrectNamespace, cursorId).getStatus()); + getManager()->checkOutCursor(incorrectNamespace, cursorId, nullptr).getStatus()); } // Test that checking out a unknown cursor returns an error with code ErrorCodes::CursorNotFound, @@ -233,7 +236,7 @@ TEST_F(ClusterCursorManagerTest, CheckOutCursorWrongCursorId) { ClusterCursorManager::CursorType::NamespaceNotSharded, ClusterCursorManager::CursorLifetime::Mortal)); ASSERT_EQ(ErrorCodes::CursorNotFound, - getManager()->checkOutCursor(nss, cursorId + 1).getStatus()); + getManager()->checkOutCursor(nss, cursorId + 1, nullptr).getStatus()); } // Test that checking out a cursor updates the 'last active' time associated with the cursor to the @@ -246,7 +249,7 @@ TEST_F(ClusterCursorManagerTest, CheckOutCursorUpdateActiveTime) { ClusterCursorManager::CursorLifetime::Mortal)); Date_t cursorRegistrationTime = getClockSource()->now(); getClockSource()->advance(Milliseconds(1)); - auto checkedOutCursor = getManager()->checkOutCursor(nss, cursorId); + auto checkedOutCursor = getManager()->checkOutCursor(nss, cursorId, nullptr); ASSERT_OK(checkedOutCursor.getStatus()); checkedOutCursor.getValue().returnCursor(ClusterCursorManager::CursorState::NotExhausted); getManager()->killMortalCursorsInactiveSince(cursorRegistrationTime); @@ -262,7 +265,7 @@ TEST_F(ClusterCursorManagerTest, KillCursorBasic) { nss, ClusterCursorManager::CursorType::NamespaceNotSharded, ClusterCursorManager::CursorLifetime::Mortal)); - auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId); + auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId, nullptr); ASSERT_OK(pinnedCursor.getStatus()); ASSERT_OK(getManager()->killCursor(nss, pinnedCursor.getValue().getCursorId())); pinnedCursor.getValue().returnCursor(ClusterCursorManager::CursorState::NotExhausted); @@ -434,7 +437,7 @@ TEST_F(ClusterCursorManagerTest, ReapZombieCursorsSkipPinned) { nss, ClusterCursorManager::CursorType::NamespaceNotSharded, ClusterCursorManager::CursorLifetime::Mortal)); - auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId); + auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId, nullptr); ASSERT(!isMockCursorKilled(0)); getManager()->reapZombieCursors(); ASSERT(!isMockCursorKilled(0)); @@ -484,7 +487,7 @@ TEST_F(ClusterCursorManagerTest, StatsPinCursor) { nss, ClusterCursorManager::CursorType::NamespaceSharded, ClusterCursorManager::CursorLifetime::Mortal)); - auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId); + auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId, nullptr); ASSERT_EQ(1U, getManager()->stats().cursorsPinned); } @@ -542,7 +545,7 @@ TEST_F(ClusterCursorManagerTest, StatsKillPinnedCursor) { nss, ClusterCursorManager::CursorType::NamespaceSharded, ClusterCursorManager::CursorLifetime::Mortal)); - auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId); + auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId, nullptr); ASSERT_EQ(1U, getManager()->stats().cursorsPinned); ASSERT_OK(getManager()->killCursor(nss, cursorId)); ASSERT_EQ(0U, getManager()->stats().cursorsPinned); @@ -555,7 +558,7 @@ TEST_F(ClusterCursorManagerTest, StatsExhaustShardedCursor) { nss, ClusterCursorManager::CursorType::NamespaceSharded, ClusterCursorManager::CursorLifetime::Mortal)); - auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId); + auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId, nullptr); ASSERT_OK(pinnedCursor.getStatus()); ASSERT_OK(pinnedCursor.getValue().next().getStatus()); ASSERT_EQ(1U, getManager()->stats().cursorsSharded); @@ -570,7 +573,7 @@ TEST_F(ClusterCursorManagerTest, StatsExhaustNotShardedCursor) { nss, ClusterCursorManager::CursorType::NamespaceNotSharded, ClusterCursorManager::CursorLifetime::Mortal)); - auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId); + auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId, nullptr); ASSERT_OK(pinnedCursor.getStatus()); ASSERT_OK(pinnedCursor.getValue().next().getStatus()); ASSERT_EQ(1U, getManager()->stats().cursorsNotSharded); @@ -586,7 +589,7 @@ TEST_F(ClusterCursorManagerTest, StatsExhaustPinnedCursor) { nss, ClusterCursorManager::CursorType::NamespaceNotSharded, ClusterCursorManager::CursorLifetime::Mortal)); - auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId); + auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId, nullptr); ASSERT_OK(pinnedCursor.getStatus()); ASSERT_OK(pinnedCursor.getValue().next().getStatus()); ASSERT_EQ(1U, getManager()->stats().cursorsPinned); @@ -602,7 +605,7 @@ TEST_F(ClusterCursorManagerTest, StatsCheckInWithoutExhaustingPinnedCursor) { nss, ClusterCursorManager::CursorType::NamespaceNotSharded, ClusterCursorManager::CursorLifetime::Mortal)); - auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId); + auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId, nullptr); ASSERT_OK(pinnedCursor.getStatus()); ASSERT_OK(pinnedCursor.getValue().next().getStatus()); ASSERT_EQ(1U, getManager()->stats().cursorsPinned); @@ -685,13 +688,13 @@ TEST_F(ClusterCursorManagerTest, PinnedCursorReturnCursorNotExhausted) { nss, ClusterCursorManager::CursorType::NamespaceNotSharded, ClusterCursorManager::CursorLifetime::Mortal)); - auto registeredCursor = getManager()->checkOutCursor(nss, cursorId); + auto registeredCursor = getManager()->checkOutCursor(nss, cursorId, nullptr); ASSERT_OK(registeredCursor.getStatus()); ASSERT_EQ(cursorId, registeredCursor.getValue().getCursorId()); ASSERT_NE(0, cursorId); registeredCursor.getValue().returnCursor(ClusterCursorManager::CursorState::NotExhausted); ASSERT_EQ(0, registeredCursor.getValue().getCursorId()); - auto checkedOutCursor = getManager()->checkOutCursor(nss, cursorId); + auto checkedOutCursor = getManager()->checkOutCursor(nss, cursorId, nullptr); ASSERT_OK(checkedOutCursor.getStatus()); } @@ -703,7 +706,7 @@ TEST_F(ClusterCursorManagerTest, PinnedCursorReturnCursorExhausted) { nss, ClusterCursorManager::CursorType::NamespaceNotSharded, ClusterCursorManager::CursorLifetime::Mortal)); - auto registeredCursor = getManager()->checkOutCursor(nss, cursorId); + auto registeredCursor = getManager()->checkOutCursor(nss, cursorId, nullptr); ASSERT_OK(registeredCursor.getStatus()); ASSERT_EQ(cursorId, registeredCursor.getValue().getCursorId()); ASSERT_NE(0, cursorId); @@ -714,7 +717,7 @@ TEST_F(ClusterCursorManagerTest, PinnedCursorReturnCursorExhausted) { // Cursor should have been destroyed without ever being killed. To be sure that the cursor has // not been marked kill pending but not yet destroyed (i.e. that the cursor is not a zombie), we // reapZombieCursors() and check that the cursor still has not been killed. - ASSERT_NOT_OK(getManager()->checkOutCursor(nss, cursorId).getStatus()); + ASSERT_NOT_OK(getManager()->checkOutCursor(nss, cursorId, nullptr).getStatus()); ASSERT(!isMockCursorKilled(0)); getManager()->reapZombieCursors(); ASSERT(!isMockCursorKilled(0)); @@ -734,7 +737,7 @@ TEST_F(ClusterCursorManagerTest, PinnedCursorReturnCursorExhaustedWithNonExhaust nss, ClusterCursorManager::CursorType::NamespaceNotSharded, ClusterCursorManager::CursorLifetime::Mortal)); - auto registeredCursor = getManager()->checkOutCursor(nss, cursorId); + auto registeredCursor = getManager()->checkOutCursor(nss, cursorId, nullptr); ASSERT_OK(registeredCursor.getStatus()); ASSERT_EQ(cursorId, registeredCursor.getValue().getCursorId()); ASSERT_NE(0, cursorId); @@ -743,7 +746,7 @@ TEST_F(ClusterCursorManagerTest, PinnedCursorReturnCursorExhaustedWithNonExhaust ASSERT_EQ(0, registeredCursor.getValue().getCursorId()); // Cursor should be kill pending, so it will be killed during reaping. - ASSERT_NOT_OK(getManager()->checkOutCursor(nss, cursorId).getStatus()); + ASSERT_NOT_OK(getManager()->checkOutCursor(nss, cursorId, nullptr).getStatus()); ASSERT(!isMockCursorKilled(0)); getManager()->reapZombieCursors(); ASSERT(isMockCursorKilled(0)); @@ -757,7 +760,7 @@ TEST_F(ClusterCursorManagerTest, PinnedCursorMoveAssignmentKill) { nss, ClusterCursorManager::CursorType::NamespaceNotSharded, ClusterCursorManager::CursorLifetime::Mortal)); - auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId); + auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId, nullptr); pinnedCursor = ClusterCursorManager::PinnedCursor(); ASSERT(!isMockCursorKilled(0)); getManager()->reapZombieCursors(); @@ -772,7 +775,7 @@ TEST_F(ClusterCursorManagerTest, PinnedCursorDestructorKill) { nss, ClusterCursorManager::CursorType::NamespaceNotSharded, ClusterCursorManager::CursorLifetime::Mortal)); - auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId); + auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId, nullptr); } ASSERT(!isMockCursorKilled(0)); getManager()->reapZombieCursors(); @@ -790,7 +793,7 @@ TEST_F(ClusterCursorManagerTest, RemotesExhausted) { nss, ClusterCursorManager::CursorType::NamespaceNotSharded, ClusterCursorManager::CursorLifetime::Mortal)); - auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId); + auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId, nullptr); ASSERT_OK(pinnedCursor.getStatus()); ASSERT_FALSE(pinnedCursor.getValue().remotesExhausted()); } @@ -802,7 +805,7 @@ TEST_F(ClusterCursorManagerTest, DoNotReapKilledPinnedCursors) { nss, ClusterCursorManager::CursorType::NamespaceNotSharded, ClusterCursorManager::CursorLifetime::Mortal)); - auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId); + auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId, nullptr); ASSERT_OK(pinnedCursor.getStatus()); ASSERT_OK(getManager()->killCursor(nss, cursorId)); ASSERT(!isMockCursorKilled(0)); @@ -850,7 +853,7 @@ TEST_F(ClusterCursorManagerTest, CannotCheckoutCursorDuringShutdown) { ASSERT(isMockCursorKilled(0)); ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, - getManager()->checkOutCursor(nss, cursorId).getStatus()); + getManager()->checkOutCursor(nss, cursorId, nullptr).getStatus()); } } // namespace diff --git a/src/mongo/s/query/cluster_find.cpp b/src/mongo/s/query/cluster_find.cpp index 8f7b260e7a1..57d6ece2c87 100644 --- a/src/mongo/s/query/cluster_find.cpp +++ b/src/mongo/s/query/cluster_find.cpp @@ -180,6 +180,7 @@ StatusWith<CursorId> runQueryWithoutRetrying(OperationContext* txn, params.isTailable = query.getQueryRequest().isTailable(); params.isAwaitData = query.getQueryRequest().isAwaitData(); params.isAllowPartialResults = query.getQueryRequest().isAllowPartialResults(); + params.txn = txn; // This is the batchSize passed to each subsequent getMore command issued by the cursor. We // usually use the batchSize associated with the initial find, but as it is illegal to send a @@ -358,7 +359,7 @@ StatusWith<CursorResponse> ClusterFind::runGetMore(OperationContext* txn, const GetMoreRequest& request) { auto cursorManager = grid.getCursorManager(); - auto pinnedCursor = cursorManager->checkOutCursor(request.nss, request.cursorid); + auto pinnedCursor = cursorManager->checkOutCursor(request.nss, request.cursorid, txn); if (!pinnedCursor.isOK()) { return pinnedCursor.getStatus(); } diff --git a/src/mongo/s/query/router_exec_stage.h b/src/mongo/s/query/router_exec_stage.h index 1f2dbdf6c7c..0e10d9edff2 100644 --- a/src/mongo/s/query/router_exec_stage.h +++ b/src/mongo/s/query/router_exec_stage.h @@ -37,6 +37,8 @@ namespace mongo { +class OperationContext; + /** * This is the lightweight mongoS analogue of the PlanStage abstraction used to execute queries on * mongoD (see mongo/db/plan_stage.h). @@ -85,6 +87,13 @@ public: */ virtual Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) = 0; + /** + * Update the operation context for remote requests. + * + * Network requests depend on having a valid operation context for user initiated actions. + */ + virtual void setOperationContext(OperationContext* txn) = 0; + protected: /** * Returns an unowned pointer to the child stage, or nullptr if there is no child. diff --git a/src/mongo/s/query/router_stage_limit.cpp b/src/mongo/s/query/router_stage_limit.cpp index 9a9f77fbf00..5f7db02ec7b 100644 --- a/src/mongo/s/query/router_stage_limit.cpp +++ b/src/mongo/s/query/router_stage_limit.cpp @@ -67,4 +67,8 @@ Status RouterStageLimit::setAwaitDataTimeout(Milliseconds awaitDataTimeout) { return getChildStage()->setAwaitDataTimeout(awaitDataTimeout); } +void RouterStageLimit::setOperationContext(OperationContext* txn) { + return getChildStage()->setOperationContext(txn); +} + } // namespace mongo diff --git a/src/mongo/s/query/router_stage_limit.h b/src/mongo/s/query/router_stage_limit.h index 0db06c30c3b..366a964f2a4 100644 --- a/src/mongo/s/query/router_stage_limit.h +++ b/src/mongo/s/query/router_stage_limit.h @@ -47,6 +47,8 @@ public: Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) final; + void setOperationContext(OperationContext* txn) final; + private: long long _limit; diff --git a/src/mongo/s/query/router_stage_merge.cpp b/src/mongo/s/query/router_stage_merge.cpp index 527bc0f0063..0e9304d9952 100644 --- a/src/mongo/s/query/router_stage_merge.cpp +++ b/src/mongo/s/query/router_stage_merge.cpp @@ -68,4 +68,8 @@ Status RouterStageMerge::setAwaitDataTimeout(Milliseconds awaitDataTimeout) { return _arm.setAwaitDataTimeout(awaitDataTimeout); } +void RouterStageMerge::setOperationContext(OperationContext* txn) { + return _arm.setOperationContext(txn); +} + } // namespace mongo diff --git a/src/mongo/s/query/router_stage_merge.h b/src/mongo/s/query/router_stage_merge.h index d74870f8a94..a75d5a46f8d 100644 --- a/src/mongo/s/query/router_stage_merge.h +++ b/src/mongo/s/query/router_stage_merge.h @@ -53,6 +53,8 @@ public: Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) final; + void setOperationContext(OperationContext* txn) final; + private: // Not owned here. executor::TaskExecutor* _executor; diff --git a/src/mongo/s/query/router_stage_mock.cpp b/src/mongo/s/query/router_stage_mock.cpp index 179635bbb08..daad6fe6d07 100644 --- a/src/mongo/s/query/router_stage_mock.cpp +++ b/src/mongo/s/query/router_stage_mock.cpp @@ -73,6 +73,10 @@ Status RouterStageMock::setAwaitDataTimeout(Milliseconds awaitDataTimeout) { return Status::OK(); } +void RouterStageMock::setOperationContext(OperationContext* txn) { + // Do nothing +} + StatusWith<Milliseconds> RouterStageMock::getAwaitDataTimeout() { if (!_awaitDataTimeout) { return Status(ErrorCodes::BadValue, "no awaitData timeout set"); diff --git a/src/mongo/s/query/router_stage_mock.h b/src/mongo/s/query/router_stage_mock.h index b83e2879096..255ae75b595 100644 --- a/src/mongo/s/query/router_stage_mock.h +++ b/src/mongo/s/query/router_stage_mock.h @@ -51,6 +51,8 @@ public: Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) final; + void setOperationContext(OperationContext* txn) final; + /** * Queues a BSONObj to be returned. */ diff --git a/src/mongo/s/query/router_stage_remove_sortkey.cpp b/src/mongo/s/query/router_stage_remove_sortkey.cpp index 16e6f9407a4..949182e3a1a 100644 --- a/src/mongo/s/query/router_stage_remove_sortkey.cpp +++ b/src/mongo/s/query/router_stage_remove_sortkey.cpp @@ -69,4 +69,8 @@ Status RouterStageRemoveSortKey::setAwaitDataTimeout(Milliseconds awaitDataTimeo return getChildStage()->setAwaitDataTimeout(awaitDataTimeout); } +void RouterStageRemoveSortKey::setOperationContext(OperationContext* txn) { + return getChildStage()->setOperationContext(txn); +} + } // namespace mongo diff --git a/src/mongo/s/query/router_stage_remove_sortkey.h b/src/mongo/s/query/router_stage_remove_sortkey.h index 6ef60012a4d..f7965312d47 100644 --- a/src/mongo/s/query/router_stage_remove_sortkey.h +++ b/src/mongo/s/query/router_stage_remove_sortkey.h @@ -48,6 +48,8 @@ public: bool remotesExhausted() final; Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) final; + + void setOperationContext(OperationContext* txn) final; }; } // namespace mongo diff --git a/src/mongo/s/query/router_stage_skip.cpp b/src/mongo/s/query/router_stage_skip.cpp index 536c3d173a2..af746d5e430 100644 --- a/src/mongo/s/query/router_stage_skip.cpp +++ b/src/mongo/s/query/router_stage_skip.cpp @@ -68,4 +68,8 @@ Status RouterStageSkip::setAwaitDataTimeout(Milliseconds awaitDataTimeout) { return getChildStage()->setAwaitDataTimeout(awaitDataTimeout); } +void RouterStageSkip::setOperationContext(OperationContext* txn) { + return getChildStage()->setOperationContext(txn); +} + } // namespace mongo diff --git a/src/mongo/s/query/router_stage_skip.h b/src/mongo/s/query/router_stage_skip.h index 35994d31e3e..430d3748c91 100644 --- a/src/mongo/s/query/router_stage_skip.h +++ b/src/mongo/s/query/router_stage_skip.h @@ -47,6 +47,8 @@ public: Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) final; + void setOperationContext(OperationContext* txn) final; + private: long long _skip; diff --git a/src/mongo/s/sharding_egress_metadata_hook.cpp b/src/mongo/s/sharding_egress_metadata_hook.cpp index 0ed82987534..49ee7c03d60 100644 --- a/src/mongo/s/sharding_egress_metadata_hook.cpp +++ b/src/mongo/s/sharding_egress_metadata_hook.cpp @@ -50,10 +50,11 @@ namespace rpc { using std::shared_ptr; Status ShardingEgressMetadataHook::writeRequestMetadata(bool shardedConnection, + OperationContext* txn, const StringData target, BSONObjBuilder* metadataBob) { try { - audit::writeImpersonatedUsersToMetadata(metadataBob); + audit::writeImpersonatedUsersToMetadata(txn, metadataBob); if (!shardedConnection) { return Status::OK(); } @@ -64,10 +65,11 @@ Status ShardingEgressMetadataHook::writeRequestMetadata(bool shardedConnection, } } -Status ShardingEgressMetadataHook::writeRequestMetadata(const HostAndPort& target, +Status ShardingEgressMetadataHook::writeRequestMetadata(OperationContext* txn, + const HostAndPort& target, BSONObjBuilder* metadataBob) { try { - audit::writeImpersonatedUsersToMetadata(metadataBob); + audit::writeImpersonatedUsersToMetadata(txn, metadataBob); rpc::ConfigServerMetadata(_getConfigServerOpTime()).writeToMetadata(metadataBob); return Status::OK(); } catch (...) { diff --git a/src/mongo/s/sharding_egress_metadata_hook.h b/src/mongo/s/sharding_egress_metadata_hook.h index c510aefaab4..df105c813bf 100644 --- a/src/mongo/s/sharding_egress_metadata_hook.h +++ b/src/mongo/s/sharding_egress_metadata_hook.h @@ -46,7 +46,9 @@ public: virtual ~ShardingEgressMetadataHook() = default; Status readReplyMetadata(const HostAndPort& replySource, const BSONObj& metadataObj) override; - Status writeRequestMetadata(const HostAndPort& target, BSONObjBuilder* metadataBob) override; + Status writeRequestMetadata(OperationContext* txn, + const HostAndPort& target, + BSONObjBuilder* metadataBob) override; // These overloaded methods exist to allow ShardingConnectionHook, which is soon to be // deprecated, to use the logic in ShardingEgressMetadataHook instead of duplicating the @@ -55,6 +57,7 @@ public: // contact. Status readReplyMetadata(const StringData replySource, const BSONObj& metadataObj); Status writeRequestMetadata(bool shardedConnection, + OperationContext* txn, const StringData target, BSONObjBuilder* metadataBob); diff --git a/src/mongo/s/sharding_initialization.cpp b/src/mongo/s/sharding_initialization.cpp index 61f0ed8d66f..6dde6355a82 100644 --- a/src/mongo/s/sharding_initialization.cpp +++ b/src/mongo/s/sharding_initialization.cpp @@ -95,13 +95,13 @@ std::unique_ptr<ShardingCatalogClient> makeCatalogClient(ServiceContext* service std::unique_ptr<TaskExecutorPool> makeTaskExecutorPool( std::unique_ptr<NetworkInterface> fixedNet, - std::unique_ptr<rpc::EgressMetadataHook> metadataHook) { + rpc::ShardingEgressMetadataHookBuilder metadataHookBuilder) { std::vector<std::unique_ptr<executor::TaskExecutor>> executors; for (size_t i = 0; i < TaskExecutorPool::getSuggestedPoolSize(); ++i) { auto net = executor::makeNetworkInterface( "NetworkInterfaceASIO-TaskExecutorPool-" + std::to_string(i), stdx::make_unique<ShardingNetworkConnectionHook>(), - std::move(metadataHook)); + metadataHookBuilder()); auto netPtr = net.get(); auto exec = stdx::make_unique<ThreadPoolTaskExecutor>( stdx::make_unique<NetworkInterfaceThreadPool>(netPtr), std::move(net)); @@ -148,7 +148,7 @@ Status initializeGlobalShardingState(OperationContext* txn, stdx::make_unique<ShardingNetworkConnectionHook>(), hookBuilder()); auto networkPtr = network.get(); - auto executorPool = makeTaskExecutorPool(std::move(network), hookBuilder()); + auto executorPool = makeTaskExecutorPool(std::move(network), hookBuilder); executorPool->startup(); auto shardRegistry(stdx::make_unique<ShardRegistry>(std::move(shardFactory), configCS)); |