summaryrefslogtreecommitdiff
path: root/src/mongo/db/transaction_coordinator_futures_util.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/transaction_coordinator_futures_util.cpp')
-rw-r--r--src/mongo/db/transaction_coordinator_futures_util.cpp51
1 files changed, 49 insertions, 2 deletions
diff --git a/src/mongo/db/transaction_coordinator_futures_util.cpp b/src/mongo/db/transaction_coordinator_futures_util.cpp
index f072d8b330b..ad72139ccc3 100644
--- a/src/mongo/db/transaction_coordinator_futures_util.cpp
+++ b/src/mongo/db/transaction_coordinator_futures_util.cpp
@@ -36,6 +36,9 @@
#include "mongo/client/remote_command_retry_scheduler.h"
#include "mongo/client/remote_command_targeter.h"
+#include "mongo/db/auth/authorization_session.h"
+#include "mongo/db/s/sharding_state.h"
+#include "mongo/transport/service_entry_point.h"
#include "mongo/util/log.h"
namespace mongo {
@@ -55,6 +58,50 @@ AsyncWorkScheduler::~AsyncWorkScheduler() = default;
Future<executor::TaskExecutor::ResponseStatus> AsyncWorkScheduler::scheduleRemoteCommand(
const ShardId& shardId, const ReadPreferenceSetting& readPref, const BSONObj& commandObj) {
+
+ bool isSelfShard = [this, shardId] {
+ if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) {
+ return shardId == ShardRegistry::kConfigServerShardId;
+ }
+ if (serverGlobalParams.clusterRole == ClusterRole::ShardServer) {
+ return shardId == ShardingState::get(_serviceContext)->shardId();
+ }
+ MONGO_UNREACHABLE; // Only sharded systems should use the two-phase commit path.
+ }();
+
+ if (isSelfShard) {
+ // If sending a command to the same shard as this node is in, send it directly to this node
+ // rather than going through the host targeting below. This ensures that the state changes
+ // for the participant and coordinator occur sequentially on a single branch of replica set
+ // history. See SERVER-38142 for details.
+ return scheduleWork([ shardId,
+ commandObj = commandObj.getOwned() ](OperationContext * opCtx) {
+ // Note: This internal authorization is tied to the lifetime of 'opCtx', which is
+ // destroyed by 'scheduleWork' immediately after this lambda ends.
+ AuthorizationSession::get(Client::getCurrent())->grantInternalAuthorization();
+
+ LOG(3) << "Coordinator going to send command " << commandObj << " to shard " << shardId;
+
+ auto start = Date_t::now();
+
+ auto requestOpMsg =
+ OpMsgRequest::fromDBAndBody(NamespaceString::kAdminDb, commandObj).serialize();
+ const auto replyOpMsg = OpMsg::parseOwned(opCtx->getServiceContext()
+ ->getServiceEntryPoint()
+ ->handleRequest(opCtx, requestOpMsg)
+ .response);
+
+ // Document sequences are not yet being used for responses.
+ invariant(replyOpMsg.sequences.empty());
+
+ // 'ResponseStatus' is the response format of a remote request sent over the network, so
+ // we simulate that format manually here, since we sent the request over the loopback.
+ return ResponseStatus{replyOpMsg.body.getOwned(), Date_t::now() - start};
+ });
+ }
+
+ // Manually simulate a futures interface to the TaskExecutor by creating this promise-future
+ // pair and setting the promise from inside the callback passed to the TaskExecutor.
auto promiseAndFuture = makePromiseFuture<ResponseStatus>();
auto sharedPromise =
std::make_shared<Promise<ResponseStatus>>(std::move(promiseAndFuture.promise));
@@ -93,8 +140,8 @@ Future<executor::TaskExecutor::ResponseStatus> AsyncWorkScheduler::scheduleRemot
})
.getAsync([](Status) {});
- // Do not wait for the callback to run. The callback will reschedule the remote request on
- // the same executor if necessary.
+ // Do not wait for the callback to run. The continuation on the future corresponding to
+ // 'sharedPromise' will reschedule the remote request if necessary.
return std::move(promiseAndFuture.future);
}