diff options
Diffstat (limited to 'src/mongo/s/commands/commands_public.cpp')
-rw-r--r-- | src/mongo/s/commands/commands_public.cpp | 208 |
1 files changed, 104 insertions, 104 deletions
diff --git a/src/mongo/s/commands/commands_public.cpp b/src/mongo/s/commands/commands_public.cpp index c0c05398c0e..1d9ab4c6b32 100644 --- a/src/mongo/s/commands/commands_public.cpp +++ b/src/mongo/s/commands/commands_public.cpp @@ -274,13 +274,13 @@ public: const NamespaceString nss(parseNsCollectionRequired(dbName, cmdObj)); LOG(1) << "dropIndexes: " << nss << " cmd:" << redact(cmdObj); - auto shardResponses = - uassertStatusOK(scatterGather(opCtx, - dbName, - nss, - filterCommandRequestForPassthrough(cmdObj), - ReadPreferenceSetting::get(opCtx), - Shard::RetryPolicy::kNotIdempotent)); + auto shardResponses = uassertStatusOK( + scatterGatherOnlyVersionIfUnsharded(opCtx, + dbName, + nss, + filterCommandRequestForPassthrough(cmdObj), + ReadPreferenceSetting::get(opCtx), + Shard::RetryPolicy::kNotIdempotent)); return appendRawResponses(opCtx, &errmsg, &output, std::move(shardResponses)); } } dropIndexesCmd; @@ -319,13 +319,13 @@ public: uassertStatusOK(createShardDatabase(opCtx, dbName)); - auto shardResponses = - uassertStatusOK(scatterGather(opCtx, - dbName, - nss, - filterCommandRequestForPassthrough(cmdObj), - ReadPreferenceSetting::get(opCtx), - Shard::RetryPolicy::kNoRetry)); + auto shardResponses = uassertStatusOK( + scatterGatherOnlyVersionIfUnsharded(opCtx, + dbName, + nss, + filterCommandRequestForPassthrough(cmdObj), + ReadPreferenceSetting::get(opCtx), + Shard::RetryPolicy::kNoRetry)); return appendRawResponses(opCtx, &errmsg, &output, std::move(shardResponses)); } } createIndexesCmd; @@ -362,13 +362,13 @@ public: const NamespaceString nss(parseNsCollectionRequired(dbName, cmdObj)); LOG(1) << "reIndex: " << nss << " cmd:" << redact(cmdObj); - auto shardResponses = - uassertStatusOK(scatterGather(opCtx, - dbName, - nss, - filterCommandRequestForPassthrough(cmdObj), - ReadPreferenceSetting::get(opCtx), - Shard::RetryPolicy::kNoRetry)); + auto shardResponses = uassertStatusOK( + scatterGatherOnlyVersionIfUnsharded(opCtx, + dbName, + nss, + filterCommandRequestForPassthrough(cmdObj), + ReadPreferenceSetting::get(opCtx), + Shard::RetryPolicy::kNoRetry)); return appendRawResponses(opCtx, &errmsg, &output, std::move(shardResponses)); } } reIndexCmd; @@ -404,13 +404,13 @@ public: const NamespaceString nss(parseNsCollectionRequired(dbName, cmdObj)); LOG(1) << "collMod: " << nss << " cmd:" << redact(cmdObj); - auto shardResponses = - uassertStatusOK(scatterGather(opCtx, - dbName, - nss, - filterCommandRequestForPassthrough(cmdObj), - ReadPreferenceSetting::get(opCtx), - Shard::RetryPolicy::kNoRetry)); + auto shardResponses = uassertStatusOK( + scatterGatherOnlyVersionIfUnsharded(opCtx, + dbName, + nss, + filterCommandRequestForPassthrough(cmdObj), + ReadPreferenceSetting::get(opCtx), + Shard::RetryPolicy::kNoRetry)); return appendRawResponses(opCtx, &errmsg, &output, std::move(shardResponses)); } } collectionModCmd; @@ -1077,57 +1077,8 @@ public: BSONObjBuilder& result) override { const NamespaceString nss(parseNsCollectionRequired(dbName, cmdObj)); - auto routingInfo = - uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss)); - if (!routingInfo.cm()) { - // TODO (SERVER-30734) make distinct versioned against unsharded collections - if (passthrough(opCtx, dbName, routingInfo.primaryId(), cmdObj, result)) { - return true; - } - - BSONObj resultObj = result.asTempObj(); - if (ResolvedView::isResolvedViewErrorResponse(resultObj)) { - auto resolvedView = ResolvedView::fromBSON(resultObj); - result.resetToEmpty(); - - auto parsedDistinct = ParsedDistinct::parse( - opCtx, resolvedView.getNamespace(), cmdObj, ExtensionsCallbackNoop(), false); - if (!parsedDistinct.isOK()) { - return appendCommandStatus(result, parsedDistinct.getStatus()); - } - - auto aggCmdOnView = parsedDistinct.getValue().asAggregationCommand(); - if (!aggCmdOnView.isOK()) { - return appendCommandStatus(result, aggCmdOnView.getStatus()); - } - - auto aggRequestOnView = - AggregationRequest::parseFromBSON(nss, aggCmdOnView.getValue()); - if (!aggRequestOnView.isOK()) { - return appendCommandStatus(result, aggRequestOnView.getStatus()); - } - - auto resolvedAggRequest = - resolvedView.asExpandedViewAggregation(aggRequestOnView.getValue()); - auto resolvedAggCmd = resolvedAggRequest.serializeToCommandObj().toBson(); - - BSONObj aggResult = Command::runCommandDirectly( - opCtx, OpMsgRequest::fromDBAndBody(dbName, std::move(resolvedAggCmd))); - - ViewResponseFormatter formatter(aggResult); - auto formatStatus = formatter.appendAsDistinctResponse(&result); - if (!formatStatus.isOK()) { - return appendCommandStatus(result, formatStatus); - } - return true; - } - - return false; - } - - const auto cm = routingInfo.cm(); - auto query = getQuery(cmdObj); + auto swCollation = getCollation(cmdObj); if (!swCollation.isOK()) { return appendEmptyResultSet(result, swCollation.getStatus(), nss.ns()); @@ -1145,20 +1096,71 @@ public: collator = std::move(swCollator.getValue()); } - auto shardResponses = - uassertStatusOK(scatterGather(opCtx, - dbName, - nss, - filterCommandRequestForPassthrough(cmdObj), - ReadPreferenceSetting::get(opCtx), - Shard::RetryPolicy::kIdempotent, - ShardTargetingPolicy::UseRoutingTable, - query, - collation)); - - BSONObjComparator bsonCmp(BSONObj(), - BSONObjComparator::FieldNamesMode::kConsider, - !collation.isEmpty() ? collator.get() : cm->getDefaultCollator()); + // Save a copy of routingInfo before calling scatterGather(), to guarantee that we extract + // the collation from the same routingInfo that was used by scatterGather(). + // (scatterGather() will throw if the routingInfo needs to be refreshed). + const auto routingInfo = + uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss)); + + BSONObj viewDefinition; + auto swShardResponses = + scatterGatherVersionedTargetByRoutingTable(opCtx, + dbName, + nss, + filterCommandRequestForPassthrough(cmdObj), + ReadPreferenceSetting::get(opCtx), + Shard::RetryPolicy::kIdempotent, + query, + collation, + &viewDefinition); + + if (ErrorCodes::CommandOnShardedViewNotSupportedOnMongod == swShardResponses.getStatus()) { + uassert(ErrorCodes::InternalError, + str::stream() << "Missing resolved view definition, but remote returned " + << ErrorCodes::errorString(swShardResponses.getStatus().code()), + !viewDefinition.isEmpty()); + + auto resolvedView = ResolvedView::fromBSON(viewDefinition); + auto parsedDistinct = ParsedDistinct::parse( + opCtx, resolvedView.getNamespace(), cmdObj, ExtensionsCallbackNoop(), true); + if (!parsedDistinct.isOK()) { + return appendCommandStatus(result, parsedDistinct.getStatus()); + } + + auto aggCmdOnView = parsedDistinct.getValue().asAggregationCommand(); + if (!aggCmdOnView.isOK()) { + return appendCommandStatus(result, aggCmdOnView.getStatus()); + } + + auto aggRequestOnView = AggregationRequest::parseFromBSON(nss, aggCmdOnView.getValue()); + if (!aggRequestOnView.isOK()) { + return appendCommandStatus(result, aggRequestOnView.getStatus()); + } + + auto resolvedAggRequest = + resolvedView.asExpandedViewAggregation(aggRequestOnView.getValue()); + auto resolvedAggCmd = resolvedAggRequest.serializeToCommandObj().toBson(); + + BSONObj aggResult = Command::runCommandDirectly( + opCtx, OpMsgRequest::fromDBAndBody(dbName, std::move(resolvedAggCmd))); + + ViewResponseFormatter formatter(aggResult); + auto formatStatus = formatter.appendAsDistinctResponse(&result); + if (!formatStatus.isOK()) { + return appendCommandStatus(result, formatStatus); + } + return true; + } + + uassertStatusOK(swShardResponses.getStatus()); + auto shardResponses = std::move(swShardResponses.getValue()); + + BSONObjComparator bsonCmp( + BSONObj(), + BSONObjComparator::FieldNamesMode::kConsider, + !collation.isEmpty() + ? collator.get() + : (routingInfo.cm() ? routingInfo.cm()->getDefaultCollator() : nullptr)); BSONObjSet all = bsonCmp.makeBSONObjSet(); for (const auto& response : shardResponses) { @@ -1219,18 +1221,16 @@ public: Timer timer; BSONObj viewDefinition; - auto swShardResponses = scatterGather(opCtx, - dbname, - nss, - explainCmd, - ReadPreferenceSetting::get(opCtx), - Shard::RetryPolicy::kIdempotent, - ShardTargetingPolicy::UseRoutingTable, - targetingQuery, - targetingCollation, - true, // do shard versioning - true, // retry on stale shard version - &viewDefinition); + auto swShardResponses = + scatterGatherVersionedTargetByRoutingTable(opCtx, + dbname, + nss, + explainCmd, + ReadPreferenceSetting::get(opCtx), + Shard::RetryPolicy::kIdempotent, + targetingQuery, + targetingCollation, + &viewDefinition); long long millisElapsed = timer.millis(); |