diff options
Diffstat (limited to 'src/mongo/db/transaction_coordinator_futures_util.cpp')
-rw-r--r-- | src/mongo/db/transaction_coordinator_futures_util.cpp | 51 |
1 files changed, 49 insertions, 2 deletions
diff --git a/src/mongo/db/transaction_coordinator_futures_util.cpp b/src/mongo/db/transaction_coordinator_futures_util.cpp index f072d8b330b..ad72139ccc3 100644 --- a/src/mongo/db/transaction_coordinator_futures_util.cpp +++ b/src/mongo/db/transaction_coordinator_futures_util.cpp @@ -36,6 +36,9 @@ #include "mongo/client/remote_command_retry_scheduler.h" #include "mongo/client/remote_command_targeter.h" +#include "mongo/db/auth/authorization_session.h" +#include "mongo/db/s/sharding_state.h" +#include "mongo/transport/service_entry_point.h" #include "mongo/util/log.h" namespace mongo { @@ -55,6 +58,50 @@ AsyncWorkScheduler::~AsyncWorkScheduler() = default; Future<executor::TaskExecutor::ResponseStatus> AsyncWorkScheduler::scheduleRemoteCommand( const ShardId& shardId, const ReadPreferenceSetting& readPref, const BSONObj& commandObj) { + + bool isSelfShard = [this, shardId] { + if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) { + return shardId == ShardRegistry::kConfigServerShardId; + } + if (serverGlobalParams.clusterRole == ClusterRole::ShardServer) { + return shardId == ShardingState::get(_serviceContext)->shardId(); + } + MONGO_UNREACHABLE; // Only sharded systems should use the two-phase commit path. + }(); + + if (isSelfShard) { + // If sending a command to the same shard as this node is in, send it directly to this node + // rather than going through the host targeting below. This ensures that the state changes + // for the participant and coordinator occur sequentially on a single branch of replica set + // history. See SERVER-38142 for details. + return scheduleWork([ shardId, + commandObj = commandObj.getOwned() ](OperationContext * opCtx) { + // Note: This internal authorization is tied to the lifetime of 'opCtx', which is + // destroyed by 'scheduleWork' immediately after this lambda ends. + AuthorizationSession::get(Client::getCurrent())->grantInternalAuthorization(); + + LOG(3) << "Coordinator going to send command " << commandObj << " to shard " << shardId; + + auto start = Date_t::now(); + + auto requestOpMsg = + OpMsgRequest::fromDBAndBody(NamespaceString::kAdminDb, commandObj).serialize(); + const auto replyOpMsg = OpMsg::parseOwned(opCtx->getServiceContext() + ->getServiceEntryPoint() + ->handleRequest(opCtx, requestOpMsg) + .response); + + // Document sequences are not yet being used for responses. + invariant(replyOpMsg.sequences.empty()); + + // 'ResponseStatus' is the response format of a remote request sent over the network, so + // we simulate that format manually here, since we sent the request over the loopback. + return ResponseStatus{replyOpMsg.body.getOwned(), Date_t::now() - start}; + }); + } + + // Manually simulate a futures interface to the TaskExecutor by creating this promise-future + // pair and setting the promise from inside the callback passed to the TaskExecutor. auto promiseAndFuture = makePromiseFuture<ResponseStatus>(); auto sharedPromise = std::make_shared<Promise<ResponseStatus>>(std::move(promiseAndFuture.promise)); @@ -93,8 +140,8 @@ Future<executor::TaskExecutor::ResponseStatus> AsyncWorkScheduler::scheduleRemot }) .getAsync([](Status) {}); - // Do not wait for the callback to run. The callback will reschedule the remote request on - // the same executor if necessary. + // Do not wait for the callback to run. The continuation on the future corresponding to + // 'sharedPromise' will reschedule the remote request if necessary. return std::move(promiseAndFuture.future); } |