summaryrefslogtreecommitdiff
path: root/src/mongo/s/commands/cluster_query_without_shard_key_cmd.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/s/commands/cluster_query_without_shard_key_cmd.cpp')
-rw-r--r--src/mongo/s/commands/cluster_query_without_shard_key_cmd.cpp93
1 files changed, 80 insertions, 13 deletions
diff --git a/src/mongo/s/commands/cluster_query_without_shard_key_cmd.cpp b/src/mongo/s/commands/cluster_query_without_shard_key_cmd.cpp
index cab695ddab4..e87fca927e2 100644
--- a/src/mongo/s/commands/cluster_query_without_shard_key_cmd.cpp
+++ b/src/mongo/s/commands/cluster_query_without_shard_key_cmd.cpp
@@ -28,6 +28,7 @@
*/
#include "mongo/db/auth/authorization_session.h"
#include "mongo/db/commands.h"
+#include "mongo/db/explain_gen.h"
#include "mongo/db/internal_transactions_feature_flag_gen.h"
#include "mongo/db/matcher/extensions_callback_noop.h"
#include "mongo/db/pipeline/document_source_limit.h"
@@ -39,6 +40,7 @@
#include "mongo/db/update/update_util.h"
#include "mongo/logv2/log.h"
#include "mongo/s/cluster_commands_helpers.h"
+#include "mongo/s/commands/cluster_explain.h"
#include "mongo/s/grid.h"
#include "mongo/s/is_mongos.h"
#include "mongo/s/multi_statement_transaction_requests_sender.h"
@@ -190,9 +192,8 @@ BSONObj createAggregateCmdObj(
return aggregate.toBSON({});
}
-ParsedCommandInfo parseWriteCommand(OperationContext* opCtx,
- StringData commandName,
- const BSONObj& writeCmdObj) {
+ParsedCommandInfo parseWriteCommand(OperationContext* opCtx, const BSONObj& writeCmdObj) {
+ auto commandName = writeCmdObj.firstElementFieldNameStringData();
ParsedCommandInfo parsedInfo;
if (commandName == write_ops::UpdateCommandRequest::kCommandName) {
auto updateRequest = write_ops::UpdateCommandRequest::parse(
@@ -253,7 +254,8 @@ ParsedCommandInfo parseWriteCommand(OperationContext* opCtx,
parsedInfo.collation = *parsedCollation;
}
} else {
- uasserted(ErrorCodes::InvalidOptions, "Not a supported write command");
+ uasserted(ErrorCodes::InvalidOptions,
+ str::stream() << commandName << " is not a supported batch write command");
}
return parsedInfo;
}
@@ -276,22 +278,19 @@ public:
"Running read phase for a write without a shard key.",
"clientWriteRequest"_attr = request().getWriteCmd());
+ const auto writeCmdObj = request().getWriteCmd();
+
// Get all shard ids for shards that have chunks in the desired namespace.
- const NamespaceString nss(
- CommandHelpers::parseNsCollectionRequired(ns().dbName(), request().getWriteCmd()));
+ const NamespaceString nss =
+ CommandHelpers::parseNsCollectionRequired(ns().dbName(), writeCmdObj);
hangBeforeMetadataRefreshClusterQuery.pauseWhileSet(opCtx);
const auto cri = uassertStatusOK(getCollectionRoutingInfoForTxnCmd(opCtx, nss));
// Parse into OpMsgRequest to append the $db field, which is required for command
// parsing.
- const auto opMsgRequest =
- OpMsgRequest::fromDBAndBody(nss.db(), request().getWriteCmd());
-
- auto parsedInfoFromRequest =
- parseWriteCommand(opCtx,
- request().getWriteCmd().firstElementFieldNameStringData(),
- opMsgRequest.body);
+ const auto opMsgRequest = OpMsgRequest::fromDBAndBody(ns().db(), writeCmdObj);
+ auto parsedInfoFromRequest = parseWriteCommand(opCtx, opMsgRequest.body);
auto allShardsContainingChunksForNs =
getShardsToTarget(opCtx, cri.cm, nss, parsedInfoFromRequest);
@@ -409,6 +408,74 @@ public:
}
private:
+ void explain(OperationContext* opCtx,
+ ExplainOptions::Verbosity verbosity,
+ rpc::ReplyBuilderInterface* result) override {
+ const auto writeCmdObj = [&] {
+ const auto explainCmdObj = request().getWriteCmd();
+ const auto opMsgRequestExplainCmd =
+ OpMsgRequest::fromDBAndBody(ns().db(), explainCmdObj);
+ auto explainRequest = ExplainCommandRequest::parse(
+ IDLParserContext("_clusterQueryWithoutShardKeyExplain"),
+ opMsgRequestExplainCmd.body);
+ return explainRequest.getCommandParameter().getOwned();
+ }();
+
+ // Get all shard ids for shards that have chunks in the desired namespace.
+ const NamespaceString nss =
+ CommandHelpers::parseNsCollectionRequired(ns().dbName(), writeCmdObj);
+ const auto cri = uassertStatusOK(getCollectionRoutingInfoForTxnCmd(opCtx, nss));
+
+ // Parse into OpMsgRequest to append the $db field, which is required for command
+ // parsing.
+ const auto opMsgRequestWriteCmd = OpMsgRequest::fromDBAndBody(ns().db(), writeCmdObj);
+ auto parsedInfoFromRequest = parseWriteCommand(opCtx, opMsgRequestWriteCmd.body);
+
+ auto allShardsContainingChunksForNs =
+ getShardsToTarget(opCtx, cri.cm, nss, parsedInfoFromRequest);
+ auto cmdObj = createAggregateCmdObj(opCtx, parsedInfoFromRequest, nss, boost::none);
+
+ const auto aggExplainCmdObj = ClusterExplain::wrapAsExplain(cmdObj, verbosity);
+
+ std::vector<AsyncRequestsSender::Request> requests;
+ for (const auto& shardId : allShardsContainingChunksForNs) {
+ requests.emplace_back(
+ shardId, appendShardVersion(aggExplainCmdObj, cri.getShardVersion(shardId)));
+ }
+
+ Timer timer;
+ MultiStatementTransactionRequestsSender ars(
+ opCtx,
+ Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(),
+ request().getDbName(),
+ requests,
+ ReadPreferenceSetting(ReadPreference::PrimaryOnly),
+ Shard::RetryPolicy::kNoRetry);
+
+ ShardId shardId;
+ std::vector<AsyncRequestsSender::Response> responses;
+
+ while (!ars.done()) {
+ auto response = ars.next();
+ uassertStatusOK(response.swResponse);
+ responses.push_back(response);
+ shardId = response.shardId;
+ }
+
+ const auto millisElapsed = timer.millis();
+
+ auto bodyBuilder = result->getBodyBuilder();
+ uassertStatusOK(ClusterExplain::buildExplainResult(
+ opCtx,
+ responses,
+ parsedInfoFromRequest.sort ? ClusterExplain::kMergeSortFromShards
+ : ClusterExplain::kMergeFromShards,
+ millisElapsed,
+ writeCmdObj,
+ &bodyBuilder));
+ bodyBuilder.append("targetShardId", shardId);
+ }
+
NamespaceString ns() const override {
return NamespaceString(request().getDbName());
}