diff options
Diffstat (limited to 'src/mongo/s/commands/cluster_query_without_shard_key_cmd.cpp')
-rw-r--r-- | src/mongo/s/commands/cluster_query_without_shard_key_cmd.cpp | 93 |
1 files changed, 80 insertions, 13 deletions
diff --git a/src/mongo/s/commands/cluster_query_without_shard_key_cmd.cpp b/src/mongo/s/commands/cluster_query_without_shard_key_cmd.cpp index cab695ddab4..e87fca927e2 100644 --- a/src/mongo/s/commands/cluster_query_without_shard_key_cmd.cpp +++ b/src/mongo/s/commands/cluster_query_without_shard_key_cmd.cpp @@ -28,6 +28,7 @@ */ #include "mongo/db/auth/authorization_session.h" #include "mongo/db/commands.h" +#include "mongo/db/explain_gen.h" #include "mongo/db/internal_transactions_feature_flag_gen.h" #include "mongo/db/matcher/extensions_callback_noop.h" #include "mongo/db/pipeline/document_source_limit.h" @@ -39,6 +40,7 @@ #include "mongo/db/update/update_util.h" #include "mongo/logv2/log.h" #include "mongo/s/cluster_commands_helpers.h" +#include "mongo/s/commands/cluster_explain.h" #include "mongo/s/grid.h" #include "mongo/s/is_mongos.h" #include "mongo/s/multi_statement_transaction_requests_sender.h" @@ -190,9 +192,8 @@ BSONObj createAggregateCmdObj( return aggregate.toBSON({}); } -ParsedCommandInfo parseWriteCommand(OperationContext* opCtx, - StringData commandName, - const BSONObj& writeCmdObj) { +ParsedCommandInfo parseWriteCommand(OperationContext* opCtx, const BSONObj& writeCmdObj) { + auto commandName = writeCmdObj.firstElementFieldNameStringData(); ParsedCommandInfo parsedInfo; if (commandName == write_ops::UpdateCommandRequest::kCommandName) { auto updateRequest = write_ops::UpdateCommandRequest::parse( @@ -253,7 +254,8 @@ ParsedCommandInfo parseWriteCommand(OperationContext* opCtx, parsedInfo.collation = *parsedCollation; } } else { - uasserted(ErrorCodes::InvalidOptions, "Not a supported write command"); + uasserted(ErrorCodes::InvalidOptions, + str::stream() << commandName << " is not a supported batch write command"); } return parsedInfo; } @@ -276,22 +278,19 @@ public: "Running read phase for a write without a shard key.", "clientWriteRequest"_attr = request().getWriteCmd()); + const auto writeCmdObj = request().getWriteCmd(); + // Get all shard ids for shards that have chunks in the desired namespace. - const NamespaceString nss( - CommandHelpers::parseNsCollectionRequired(ns().dbName(), request().getWriteCmd())); + const NamespaceString nss = + CommandHelpers::parseNsCollectionRequired(ns().dbName(), writeCmdObj); hangBeforeMetadataRefreshClusterQuery.pauseWhileSet(opCtx); const auto cri = uassertStatusOK(getCollectionRoutingInfoForTxnCmd(opCtx, nss)); // Parse into OpMsgRequest to append the $db field, which is required for command // parsing. - const auto opMsgRequest = - OpMsgRequest::fromDBAndBody(nss.db(), request().getWriteCmd()); - - auto parsedInfoFromRequest = - parseWriteCommand(opCtx, - request().getWriteCmd().firstElementFieldNameStringData(), - opMsgRequest.body); + const auto opMsgRequest = OpMsgRequest::fromDBAndBody(ns().db(), writeCmdObj); + auto parsedInfoFromRequest = parseWriteCommand(opCtx, opMsgRequest.body); auto allShardsContainingChunksForNs = getShardsToTarget(opCtx, cri.cm, nss, parsedInfoFromRequest); @@ -409,6 +408,74 @@ public: } private: + void explain(OperationContext* opCtx, + ExplainOptions::Verbosity verbosity, + rpc::ReplyBuilderInterface* result) override { + const auto writeCmdObj = [&] { + const auto explainCmdObj = request().getWriteCmd(); + const auto opMsgRequestExplainCmd = + OpMsgRequest::fromDBAndBody(ns().db(), explainCmdObj); + auto explainRequest = ExplainCommandRequest::parse( + IDLParserContext("_clusterQueryWithoutShardKeyExplain"), + opMsgRequestExplainCmd.body); + return explainRequest.getCommandParameter().getOwned(); + }(); + + // Get all shard ids for shards that have chunks in the desired namespace. + const NamespaceString nss = + CommandHelpers::parseNsCollectionRequired(ns().dbName(), writeCmdObj); + const auto cri = uassertStatusOK(getCollectionRoutingInfoForTxnCmd(opCtx, nss)); + + // Parse into OpMsgRequest to append the $db field, which is required for command + // parsing. + const auto opMsgRequestWriteCmd = OpMsgRequest::fromDBAndBody(ns().db(), writeCmdObj); + auto parsedInfoFromRequest = parseWriteCommand(opCtx, opMsgRequestWriteCmd.body); + + auto allShardsContainingChunksForNs = + getShardsToTarget(opCtx, cri.cm, nss, parsedInfoFromRequest); + auto cmdObj = createAggregateCmdObj(opCtx, parsedInfoFromRequest, nss, boost::none); + + const auto aggExplainCmdObj = ClusterExplain::wrapAsExplain(cmdObj, verbosity); + + std::vector<AsyncRequestsSender::Request> requests; + for (const auto& shardId : allShardsContainingChunksForNs) { + requests.emplace_back( + shardId, appendShardVersion(aggExplainCmdObj, cri.getShardVersion(shardId))); + } + + Timer timer; + MultiStatementTransactionRequestsSender ars( + opCtx, + Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(), + request().getDbName(), + requests, + ReadPreferenceSetting(ReadPreference::PrimaryOnly), + Shard::RetryPolicy::kNoRetry); + + ShardId shardId; + std::vector<AsyncRequestsSender::Response> responses; + + while (!ars.done()) { + auto response = ars.next(); + uassertStatusOK(response.swResponse); + responses.push_back(response); + shardId = response.shardId; + } + + const auto millisElapsed = timer.millis(); + + auto bodyBuilder = result->getBodyBuilder(); + uassertStatusOK(ClusterExplain::buildExplainResult( + opCtx, + responses, + parsedInfoFromRequest.sort ? ClusterExplain::kMergeSortFromShards + : ClusterExplain::kMergeFromShards, + millisElapsed, + writeCmdObj, + &bodyBuilder)); + bodyBuilder.append("targetShardId", shardId); + } + NamespaceString ns() const override { return NamespaceString(request().getDbName()); } |