diff options
author | Esha Maharishi <esha.maharishi@mongodb.com> | 2017-09-25 13:23:18 -0400 |
---|---|---|
committer | Esha Maharishi <esha.maharishi@mongodb.com> | 2017-09-26 13:06:13 -0400 |
commit | dabde5a17356591ebcb465129bcd5ed5d9236b60 (patch) | |
tree | e0a58da30be2d87ff4670e63cff12b1404fa29d1 /src/mongo | |
parent | 5c29ce7718b423ae23ba0eccf71249cf69d36d37 (diff) | |
download | mongo-dabde5a17356591ebcb465129bcd5ed5d9236b60.tar.gz |
SERVER-30025 for sharded read with empty query, target only shards that own data for the collection instead of all shards
Diffstat (limited to 'src/mongo')
-rw-r--r-- | src/mongo/s/commands/cluster_aggregate.cpp | 38 | ||||
-rw-r--r-- | src/mongo/s/commands/cluster_commands_helpers.cpp | 210 | ||||
-rw-r--r-- | src/mongo/s/commands/cluster_commands_helpers.h | 81 | ||||
-rw-r--r-- | src/mongo/s/commands/cluster_count_cmd.cpp | 44 | ||||
-rw-r--r-- | src/mongo/s/commands/cluster_db_stats_cmd.cpp | 15 | ||||
-rw-r--r-- | src/mongo/s/commands/commands_public.cpp | 208 | ||||
-rw-r--r-- | src/mongo/s/commands/strategy.cpp | 22 |
7 files changed, 303 insertions, 315 deletions
diff --git a/src/mongo/s/commands/cluster_aggregate.cpp b/src/mongo/s/commands/cluster_aggregate.cpp index 1ececcc0470..f3298662b02 100644 --- a/src/mongo/s/commands/cluster_aggregate.cpp +++ b/src/mongo/s/commands/cluster_aggregate.cpp @@ -541,19 +541,31 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, // Explain does not produce a cursor, so instead we scatter-gather commands to the shards. if (mergeCtx->explain) { - swShardResults = scatterGather(opCtx, - namespaces.executionNss.db().toString(), - namespaces.executionNss, - targetedCommand, - ReadPreferenceSetting::get(opCtx), - Shard::RetryPolicy::kIdempotent, - namespaces.executionNss.isCollectionlessAggregateNS() - ? ShardTargetingPolicy::BroadcastToAllShards - : ShardTargetingPolicy::UseRoutingTable, - shardQuery, - request.getCollation(), - true, - false); + if (namespaces.executionNss.isCollectionlessAggregateNS()) { + // Some commands, such as $currentOp, are implemented as aggregation stages on a + // "collectionless" namespace. Currently, all such commands should be broadcast to + // all shards, and should not participate in the shard version protocol. + swShardResults = + scatterGatherUnversionedTargetAllShards(opCtx, + namespaces.executionNss.db().toString(), + namespaces.executionNss, + targetedCommand, + ReadPreferenceSetting::get(opCtx), + Shard::RetryPolicy::kIdempotent); + } else { + // Aggregations on a real namespace should use the routing table to target shards, + // and should participate in the shard version protocol. + swShardResults = scatterGatherVersionedTargetByRoutingTable( + opCtx, + namespaces.executionNss.db().toString(), + namespaces.executionNss, + targetedCommand, + ReadPreferenceSetting::get(opCtx), + Shard::RetryPolicy::kIdempotent, + shardQuery, + request.getCollation(), + nullptr /* viewDefinition */); + } } else { swCursors = establishShardCursors(opCtx, namespaces.executionNss, diff --git a/src/mongo/s/commands/cluster_commands_helpers.cpp b/src/mongo/s/commands/cluster_commands_helpers.cpp index 1701f7223c7..69357d02785 100644 --- a/src/mongo/s/commands/cluster_commands_helpers.cpp +++ b/src/mongo/s/commands/cluster_commands_helpers.cpp @@ -69,8 +69,8 @@ void appendWriteConcernErrorToCmdResponse(const ShardId& shardId, namespace { -std::vector<AsyncRequestsSender::Request> buildRequestsForAllShards(OperationContext* opCtx, - const BSONObj& cmdObj) { +std::vector<AsyncRequestsSender::Request> buildUnversionedRequestsForAllShards( + OperationContext* opCtx, const BSONObj& cmdObj) { std::vector<AsyncRequestsSender::Request> requests; std::vector<ShardId> shardIds; Grid::get(opCtx)->shardRegistry()->getAllShardIds(&shardIds); @@ -80,66 +80,44 @@ std::vector<AsyncRequestsSender::Request> buildRequestsForAllShards(OperationCon return requests; } -std::vector<AsyncRequestsSender::Request> buildRequestsForShardsForNamespace( +std::vector<AsyncRequestsSender::Request> buildVersionedRequestsForTargetedShards( OperationContext* opCtx, const CachedCollectionRoutingInfo& routingInfo, const BSONObj& cmdObj, - const boost::optional<BSONObj> query, - const boost::optional<BSONObj> collation, - bool appendVersion) { + const BSONObj& query, + const BSONObj& collation) { std::vector<AsyncRequestsSender::Request> requests; if (routingInfo.cm()) { - // The collection is sharded. - // Note(esha): The for-loop is duplicated because ChunkManager::getShardIdsForQuery() and - // ShardRegistry::getAllShardIds() return different types: std::set and std::vector, - // respectively. - if (query) { - // A query was specified. Target all shards that own chunks that match the query. - std::set<ShardId> shardIds; - routingInfo.cm()->getShardIdsForQuery( - opCtx, *query, (collation ? *collation : BSONObj()), &shardIds); - for (const ShardId& shardId : shardIds) { - requests.emplace_back( - shardId, - appendVersion - ? appendShardVersion(cmdObj, routingInfo.cm()->getVersion(shardId)) - : cmdObj); - } - } else { - // No query was specified. Target all shards. - std::vector<ShardId> shardIds; - Grid::get(opCtx)->shardRegistry()->getAllShardIds(&shardIds); - for (const ShardId& shardId : shardIds) { - requests.emplace_back( - shardId, - appendVersion - ? appendShardVersion(cmdObj, routingInfo.cm()->getVersion(shardId)) - : cmdObj); - } + // The collection is sharded. Target all shards that own chunks that match the query. + std::set<ShardId> shardIds; + routingInfo.cm()->getShardIdsForQuery(opCtx, query, collation, &shardIds); + for (const ShardId& shardId : shardIds) { + requests.emplace_back( + shardId, appendShardVersion(cmdObj, routingInfo.cm()->getVersion(shardId))); } } else { // The collection is unsharded. Target only the primary shard for the database. // Don't append shard version info when contacting the config servers. requests.emplace_back(routingInfo.primaryId(), - appendVersion && !routingInfo.primary()->isConfig() + !routingInfo.primary()->isConfig() ? appendShardVersion(cmdObj, ChunkVersion::UNSHARDED()) : cmdObj); } return requests; } +/** + * Throws StaleConfigException if any remote returns a stale shardVersion error. + */ StatusWith<std::vector<AsyncRequestsSender::Response>> gatherResponses( OperationContext* opCtx, const std::string& dbName, - const BSONObj& cmdObj, const ReadPreferenceSetting& readPref, Shard::RetryPolicy retryPolicy, const std::vector<AsyncRequestsSender::Request>& requests, BSONObj* viewDefinition) { // Send the requests. - LOG(2) << "Dispatching command " << redact(cmdObj) << " to " << requests.size() - << " targeted shards using readPreference " << readPref; AsyncRequestsSender ars(opCtx, Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(), dbName, @@ -159,58 +137,47 @@ StatusWith<std::vector<AsyncRequestsSender::Response>> gatherResponses( // We successfully received a response. // Check for special errors that require throwing out any accumulated results. - - status = getStatusFromCommandResult(response.swResponse.getValue().data); - LOG(2) << "Received status " << status << " and responseObj " - << response.swResponse.getValue() << " from shard " << response.shardId - << " at host " << response.shardHostAndPort->toString(); + auto& responseObj = response.swResponse.getValue().data; + status = getStatusFromCommandResult(responseObj); // Failing to establish a consistent shardVersion means no results should be examined. if (ErrorCodes::isStaleShardingError(status.code())) { - return status; + throw StaleConfigException(str::stream() << "got stale shardVersion response " + << responseObj + << " from shard " + << response.shardId + << " at host " + << response.shardHostAndPort->toString(), + responseObj); } - // In the case a read is performed against a view, the shard primary can return an error + // In the case a read is performed against a view, the server can return an error // indicating that the underlying collection may be sharded. When this occurs the return // message will include an expanded view definition and collection namespace. We pass // the definition back to the caller by storing it in the 'viewDefinition' parameter. // This allows the caller to rewrite the request as an aggregation and retry it. if (ErrorCodes::CommandOnShardedViewNotSupportedOnMongod == status) { - auto& responseObj = response.swResponse.getValue().data; if (!responseObj.hasField("resolvedView")) { - status = Status(ErrorCodes::InternalError, - str::stream() << "Missing field 'resolvedView' in document: " - << responseObj); - return status; + return {ErrorCodes::InternalError, + str::stream() << "Missing field 'resolvedView' in document: " + << responseObj}; } auto resolvedViewObj = responseObj.getObjectField("resolvedView"); if (resolvedViewObj.isEmpty()) { - status = Status(ErrorCodes::InternalError, - str::stream() << "Field 'resolvedView' must be an object: " - << responseObj); - return status; + return {ErrorCodes::InternalError, + str::stream() << "Field 'resolvedView' must be an object: " + << responseObj}; } + if (viewDefinition) { *viewDefinition = BSON("resolvedView" << resolvedViewObj.getOwned()); } return status; } - - if (status.isOK()) { - // The command status was OK. - responses.push_back(std::move(response)); - continue; - } } - - // Either we failed to get a response, or the command had a non-OK status that we can store - // as an individual shard response. - LOG(2) << "Received error " << response.swResponse.getStatus() << " from shard " - << response.shardId; responses.push_back(std::move(response)); } - return responses; } @@ -222,75 +189,74 @@ BSONObj appendShardVersion(BSONObj cmdObj, ChunkVersion version) { return cmdWithVersionBob.obj(); } -StatusWith<std::vector<AsyncRequestsSender::Response>> scatterGather( +StatusWith<std::vector<AsyncRequestsSender::Response>> scatterGatherUnversionedTargetAllShards( OperationContext* opCtx, const std::string& dbName, - const boost::optional<NamespaceString> nss, + boost::optional<NamespaceString> nss, const BSONObj& cmdObj, const ReadPreferenceSetting& readPref, - const Shard::RetryPolicy retryPolicy, - const ShardTargetingPolicy targetPolicy, - const boost::optional<BSONObj> query, - const boost::optional<BSONObj> collation, - const bool appendShardVersion, - const bool retryOnStaleShardVersion, - BSONObj* viewDefinition) { + Shard::RetryPolicy retryPolicy) { + // Some commands, such as $currentOp, operate on a collectionless namespace. If a full namespace + // is specified, its database must match the dbName. + invariant(!nss || (nss->db() == dbName)); - // If a NamespaceString is specified, it must match the dbName. - invariant(!nss || (nss.get().db() == dbName)); + auto requests = buildUnversionedRequestsForAllShards(opCtx, cmdObj); - switch (targetPolicy) { - case ShardTargetingPolicy::BroadcastToAllShards: { - // Send unversioned commands to all shards. - auto requests = buildRequestsForAllShards(opCtx, cmdObj); - return gatherResponses( - opCtx, dbName, cmdObj, readPref, retryPolicy, requests, viewDefinition); - } + return gatherResponses( + opCtx, dbName, readPref, retryPolicy, requests, nullptr /* viewDefinition */); +} + +StatusWith<std::vector<AsyncRequestsSender::Response>> scatterGatherVersionedTargetByRoutingTable( + OperationContext* opCtx, + const std::string& dbName, + const NamespaceString& nss, + const BSONObj& cmdObj, + const ReadPreferenceSetting& readPref, + Shard::RetryPolicy retryPolicy, + const BSONObj& query, + const BSONObj& collation, + BSONObj* viewDefinition) { + // The database in the full namespace must match the dbName. + invariant(nss.db() == dbName); - case ShardTargetingPolicy::UseRoutingTable: { - // We must have a valid NamespaceString. - invariant(nss && nss.get().isValid()); + auto swRoutingInfo = Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss); + if (!swRoutingInfo.isOK()) { + return swRoutingInfo.getStatus(); + } + auto routingInfo = swRoutingInfo.getValue(); - int numAttempts = 0; - StatusWith<std::vector<AsyncRequestsSender::Response>> swResponses( - (std::vector<AsyncRequestsSender::Response>())); + auto requests = + buildVersionedRequestsForTargetedShards(opCtx, routingInfo, cmdObj, query, collation); - do { - // Get the routing table cache. - auto swRoutingInfo = - Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, *nss); - if (!swRoutingInfo.isOK()) { - return swRoutingInfo.getStatus(); - } - auto routingInfo = swRoutingInfo.getValue(); - - // Use the routing table cache to decide which shards to target, and build the - // requests to send to them. - auto requests = buildRequestsForShardsForNamespace( - opCtx, routingInfo, cmdObj, query, collation, appendShardVersion); - - // Retrieve the responses from the shards. - swResponses = gatherResponses( - opCtx, dbName, cmdObj, readPref, retryPolicy, requests, viewDefinition); - ++numAttempts; - - // If any shard returned a stale shardVersion error, invalidate the routing table - // cache. This will cause the cache to be refreshed the next time it is accessed. - if (ErrorCodes::isStaleShardingError(swResponses.getStatus().code())) { - Grid::get(opCtx)->catalogCache()->onStaleConfigError(std::move(routingInfo)); - LOG(1) << "got stale shardVersion error " << swResponses.getStatus() - << " while dispatching " << redact(cmdObj) << " after " << numAttempts - << " dispatch attempts"; - } - } while (retryOnStaleShardVersion && numAttempts < kMaxNumStaleVersionRetries && - !swResponses.getStatus().isOK()); + return gatherResponses(opCtx, dbName, readPref, retryPolicy, requests, viewDefinition); +} - return swResponses; - } +StatusWith<std::vector<AsyncRequestsSender::Response>> scatterGatherOnlyVersionIfUnsharded( + OperationContext* opCtx, + const std::string& dbName, + const NamespaceString& nss, + const BSONObj& cmdObj, + const ReadPreferenceSetting& readPref, + Shard::RetryPolicy retryPolicy) { + // The database in the full namespace must match the dbName. + invariant(nss.db() == dbName); + + auto swRoutingInfo = Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss); + if (!swRoutingInfo.isOK()) { + return swRoutingInfo.getStatus(); + } + auto routingInfo = swRoutingInfo.getValue(); - default: - MONGO_UNREACHABLE; + std::vector<AsyncRequestsSender::Request> requests; + if (routingInfo.cm()) { + requests = buildUnversionedRequestsForAllShards(opCtx, cmdObj); + } else { + requests = buildVersionedRequestsForTargetedShards( + opCtx, routingInfo, cmdObj, BSONObj(), BSONObj()); } + + return gatherResponses( + opCtx, dbName, readPref, retryPolicy, requests, nullptr /* viewDefinition */); } bool appendRawResponses(OperationContext* opCtx, diff --git a/src/mongo/s/commands/cluster_commands_helpers.h b/src/mongo/s/commands/cluster_commands_helpers.h index 907656f264b..e55e3677609 100644 --- a/src/mongo/s/commands/cluster_commands_helpers.h +++ b/src/mongo/s/commands/cluster_commands_helpers.h @@ -47,12 +47,6 @@ class CachedDatabaseInfo; class OperationContext; class ShardId; -/* - * Allows callers of routing functions to specify a preferred targeting policy. See scatterGather - * for a usage example. - */ -enum class ShardTargetingPolicy { UseRoutingTable, BroadcastToAllShards }; - /** * This function appends the provided writeConcernError BSONElement to the sharded response. */ @@ -65,43 +59,66 @@ void appendWriteConcernErrorToCmdResponse(const ShardId& shardID, BSONObj appendShardVersion(BSONObj cmdObj, ChunkVersion version); /** - * Generic function for dispatching commands to the cluster. + * Utility for dispatching unversioned commands to all shards in a cluster. * - * If 'targetPolicy' is ShardTargetingPolicy::BroadcastToAllShards, the command will be sent - * unversioned to all shards and run on database 'dbName'. The 'query', 'collation' and - * 'appendShardVersion' arguments, if supplied, will be ignored. + * Returns a non-OK status if a failure occurs on *this* node during execution. Otherwise, returns + * success and a list of responses from shards (including errors from the shards or errors reaching + * the shards). * - * If 'targetPolicy' is ShardTargetingPolicy::UseRoutingTable, the routing table cache will be used - * to determine which shards the command should be dispatched to. If the namespace specified by - * 'nss' is an unsharded collection, the command will be sent to the Primary shard for the database. - * If 'query' is specified, only shards that own data needed by the query are targeted; otherwise, - * all shards are targeted. By default, shardVersions are attached to the outgoing requests, and the - * function will re-target and retry if it receives a stale shardVersion error from any shard. + * Note, if this mongos has not refreshed its shard list since + * 1) a shard has been *added* through a different mongos, a request will not be sent to the added + * shard + * 2) a shard has been *removed* through a different mongos, this function will return a + * ShardNotFound error status. + */ +StatusWith<std::vector<AsyncRequestsSender::Response>> scatterGatherUnversionedTargetAllShards( + OperationContext* opCtx, + const std::string& dbName, + boost::optional<NamespaceString> nss, + const BSONObj& cmdObj, + const ReadPreferenceSetting& readPref, + Shard::RetryPolicy retryPolicy); + +/** + * Utility for dispatching versioned commands on a namespace, deciding which shards to + * target by applying the passed-in query and collation to the local routing table cache. * - * Returns a non-OK status if a failure occurs on *this* node during execution or on seeing an error - * from a shard that means the operation as a whole should fail, such as a exceeding retries for - * stale shardVersion errors. + * Throws on seeing a StaleConfigException from any shard. * - * If a shard returns an error saying that the request was on a view, the shard will also return a - * view definition. This will be stored in the BSONObj* viewDefinition argument, if non-null, so - * that the caller can re-run the operation as an aggregation. + * Optionally populates a 'viewDefinition' out parameter if a shard's result contains a view + * definition. The viewDefinition can be used to re-run the command as an aggregation. * - * Otherwise, returns success and a list of responses from shards (including errors from the shards - * or errors reaching the shards). + * Return value is the same as scatterGatherUnversionedTargetAllShards(). */ -StatusWith<std::vector<AsyncRequestsSender::Response>> scatterGather( +StatusWith<std::vector<AsyncRequestsSender::Response>> scatterGatherVersionedTargetByRoutingTable( OperationContext* opCtx, const std::string& dbName, - const boost::optional<NamespaceString> nss, + const NamespaceString& nss, const BSONObj& cmdObj, const ReadPreferenceSetting& readPref, Shard::RetryPolicy retryPolicy, - const ShardTargetingPolicy targetPolicy = ShardTargetingPolicy::UseRoutingTable, - const boost::optional<BSONObj> query = boost::none, - const boost::optional<BSONObj> collation = boost::none, - const bool appendShardVersion = true, - const bool retryOnStaleShardVersion = true, - BSONObj* viewDefinition = nullptr); + const BSONObj& query, + const BSONObj& collation, + BSONObj* viewDefinition); + +/** + * Utility for dispatching commands on a namespace, but with special hybrid versioning: + * - If the namespace is unsharded, a version is attached (so this node can find out if its routing + * table was stale, and the namespace is actually sharded), and only the primary shard is targeted. + * - If the namespace is sharded, no version is attached, and the request is broadcast to all + * shards. + * + * Throws on seeing a StaleConfigException. + * + * Return value is the same as scatterGatherUnversionedTargetAllShards(). + */ +StatusWith<std::vector<AsyncRequestsSender::Response>> scatterGatherOnlyVersionIfUnsharded( + OperationContext* opCtx, + const std::string& dbName, + const NamespaceString& nss, + const BSONObj& cmdObj, + const ReadPreferenceSetting& readPref, + Shard::RetryPolicy retryPolicy); /** * Attaches each shard's response or error status by the shard's connection string in a top-level diff --git a/src/mongo/s/commands/cluster_count_cmd.cpp b/src/mongo/s/commands/cluster_count_cmd.cpp index 755188311b0..e88eb8c00cc 100644 --- a/src/mongo/s/commands/cluster_count_cmd.cpp +++ b/src/mongo/s/commands/cluster_count_cmd.cpp @@ -141,18 +141,16 @@ public: auto countCmdObj = countCmdBuilder.done(); BSONObj viewDefinition; - auto swShardResponses = scatterGather(opCtx, - dbname, - nss, - countCmdObj, - ReadPreferenceSetting::get(opCtx), - Shard::RetryPolicy::kIdempotent, - ShardTargetingPolicy::UseRoutingTable, - filter, - collation, - true, // do shard versioning - true, // retry on stale shard version - &viewDefinition); + auto swShardResponses = + scatterGatherVersionedTargetByRoutingTable(opCtx, + dbname, + nss, + countCmdObj, + ReadPreferenceSetting::get(opCtx), + Shard::RetryPolicy::kIdempotent, + filter, + collation, + &viewDefinition); if (ErrorCodes::CommandOnShardedViewNotSupportedOnMongod == swShardResponses.getStatus()) { if (viewDefinition.isEmpty()) { @@ -272,18 +270,16 @@ public: Timer timer; BSONObj viewDefinition; - auto swShardResponses = scatterGather(opCtx, - dbname, - nss, - explainCmd, - ReadPreferenceSetting::get(opCtx), - Shard::RetryPolicy::kIdempotent, - ShardTargetingPolicy::UseRoutingTable, - targetingQuery, - targetingCollation, - true, // do shard versioning - true, // retry on stale shard version - &viewDefinition); + auto swShardResponses = + scatterGatherVersionedTargetByRoutingTable(opCtx, + dbname, + nss, + explainCmd, + ReadPreferenceSetting::get(opCtx), + Shard::RetryPolicy::kIdempotent, + targetingQuery, + targetingCollation, + &viewDefinition); long long millisElapsed = timer.millis(); diff --git a/src/mongo/s/commands/cluster_db_stats_cmd.cpp b/src/mongo/s/commands/cluster_db_stats_cmd.cpp index d6f88616a15..54edc5c9db4 100644 --- a/src/mongo/s/commands/cluster_db_stats_cmd.cpp +++ b/src/mongo/s/commands/cluster_db_stats_cmd.cpp @@ -68,14 +68,13 @@ public: const BSONObj& cmdObj, std::string& errmsg, BSONObjBuilder& output) override { - auto shardResponses = - uassertStatusOK(scatterGather(opCtx, - dbName, - boost::none, - filterCommandRequestForPassthrough(cmdObj), - ReadPreferenceSetting::get(opCtx), - Shard::RetryPolicy::kIdempotent, - ShardTargetingPolicy::BroadcastToAllShards)); + auto shardResponses = uassertStatusOK( + scatterGatherUnversionedTargetAllShards(opCtx, + dbName, + boost::none, + filterCommandRequestForPassthrough(cmdObj), + ReadPreferenceSetting::get(opCtx), + Shard::RetryPolicy::kIdempotent)); if (!appendRawResponses(opCtx, &errmsg, &output, shardResponses)) { return false; } diff --git a/src/mongo/s/commands/commands_public.cpp b/src/mongo/s/commands/commands_public.cpp index c0c05398c0e..1d9ab4c6b32 100644 --- a/src/mongo/s/commands/commands_public.cpp +++ b/src/mongo/s/commands/commands_public.cpp @@ -274,13 +274,13 @@ public: const NamespaceString nss(parseNsCollectionRequired(dbName, cmdObj)); LOG(1) << "dropIndexes: " << nss << " cmd:" << redact(cmdObj); - auto shardResponses = - uassertStatusOK(scatterGather(opCtx, - dbName, - nss, - filterCommandRequestForPassthrough(cmdObj), - ReadPreferenceSetting::get(opCtx), - Shard::RetryPolicy::kNotIdempotent)); + auto shardResponses = uassertStatusOK( + scatterGatherOnlyVersionIfUnsharded(opCtx, + dbName, + nss, + filterCommandRequestForPassthrough(cmdObj), + ReadPreferenceSetting::get(opCtx), + Shard::RetryPolicy::kNotIdempotent)); return appendRawResponses(opCtx, &errmsg, &output, std::move(shardResponses)); } } dropIndexesCmd; @@ -319,13 +319,13 @@ public: uassertStatusOK(createShardDatabase(opCtx, dbName)); - auto shardResponses = - uassertStatusOK(scatterGather(opCtx, - dbName, - nss, - filterCommandRequestForPassthrough(cmdObj), - ReadPreferenceSetting::get(opCtx), - Shard::RetryPolicy::kNoRetry)); + auto shardResponses = uassertStatusOK( + scatterGatherOnlyVersionIfUnsharded(opCtx, + dbName, + nss, + filterCommandRequestForPassthrough(cmdObj), + ReadPreferenceSetting::get(opCtx), + Shard::RetryPolicy::kNoRetry)); return appendRawResponses(opCtx, &errmsg, &output, std::move(shardResponses)); } } createIndexesCmd; @@ -362,13 +362,13 @@ public: const NamespaceString nss(parseNsCollectionRequired(dbName, cmdObj)); LOG(1) << "reIndex: " << nss << " cmd:" << redact(cmdObj); - auto shardResponses = - uassertStatusOK(scatterGather(opCtx, - dbName, - nss, - filterCommandRequestForPassthrough(cmdObj), - ReadPreferenceSetting::get(opCtx), - Shard::RetryPolicy::kNoRetry)); + auto shardResponses = uassertStatusOK( + scatterGatherOnlyVersionIfUnsharded(opCtx, + dbName, + nss, + filterCommandRequestForPassthrough(cmdObj), + ReadPreferenceSetting::get(opCtx), + Shard::RetryPolicy::kNoRetry)); return appendRawResponses(opCtx, &errmsg, &output, std::move(shardResponses)); } } reIndexCmd; @@ -404,13 +404,13 @@ public: const NamespaceString nss(parseNsCollectionRequired(dbName, cmdObj)); LOG(1) << "collMod: " << nss << " cmd:" << redact(cmdObj); - auto shardResponses = - uassertStatusOK(scatterGather(opCtx, - dbName, - nss, - filterCommandRequestForPassthrough(cmdObj), - ReadPreferenceSetting::get(opCtx), - Shard::RetryPolicy::kNoRetry)); + auto shardResponses = uassertStatusOK( + scatterGatherOnlyVersionIfUnsharded(opCtx, + dbName, + nss, + filterCommandRequestForPassthrough(cmdObj), + ReadPreferenceSetting::get(opCtx), + Shard::RetryPolicy::kNoRetry)); return appendRawResponses(opCtx, &errmsg, &output, std::move(shardResponses)); } } collectionModCmd; @@ -1077,57 +1077,8 @@ public: BSONObjBuilder& result) override { const NamespaceString nss(parseNsCollectionRequired(dbName, cmdObj)); - auto routingInfo = - uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss)); - if (!routingInfo.cm()) { - // TODO (SERVER-30734) make distinct versioned against unsharded collections - if (passthrough(opCtx, dbName, routingInfo.primaryId(), cmdObj, result)) { - return true; - } - - BSONObj resultObj = result.asTempObj(); - if (ResolvedView::isResolvedViewErrorResponse(resultObj)) { - auto resolvedView = ResolvedView::fromBSON(resultObj); - result.resetToEmpty(); - - auto parsedDistinct = ParsedDistinct::parse( - opCtx, resolvedView.getNamespace(), cmdObj, ExtensionsCallbackNoop(), false); - if (!parsedDistinct.isOK()) { - return appendCommandStatus(result, parsedDistinct.getStatus()); - } - - auto aggCmdOnView = parsedDistinct.getValue().asAggregationCommand(); - if (!aggCmdOnView.isOK()) { - return appendCommandStatus(result, aggCmdOnView.getStatus()); - } - - auto aggRequestOnView = - AggregationRequest::parseFromBSON(nss, aggCmdOnView.getValue()); - if (!aggRequestOnView.isOK()) { - return appendCommandStatus(result, aggRequestOnView.getStatus()); - } - - auto resolvedAggRequest = - resolvedView.asExpandedViewAggregation(aggRequestOnView.getValue()); - auto resolvedAggCmd = resolvedAggRequest.serializeToCommandObj().toBson(); - - BSONObj aggResult = Command::runCommandDirectly( - opCtx, OpMsgRequest::fromDBAndBody(dbName, std::move(resolvedAggCmd))); - - ViewResponseFormatter formatter(aggResult); - auto formatStatus = formatter.appendAsDistinctResponse(&result); - if (!formatStatus.isOK()) { - return appendCommandStatus(result, formatStatus); - } - return true; - } - - return false; - } - - const auto cm = routingInfo.cm(); - auto query = getQuery(cmdObj); + auto swCollation = getCollation(cmdObj); if (!swCollation.isOK()) { return appendEmptyResultSet(result, swCollation.getStatus(), nss.ns()); @@ -1145,20 +1096,71 @@ public: collator = std::move(swCollator.getValue()); } - auto shardResponses = - uassertStatusOK(scatterGather(opCtx, - dbName, - nss, - filterCommandRequestForPassthrough(cmdObj), - ReadPreferenceSetting::get(opCtx), - Shard::RetryPolicy::kIdempotent, - ShardTargetingPolicy::UseRoutingTable, - query, - collation)); - - BSONObjComparator bsonCmp(BSONObj(), - BSONObjComparator::FieldNamesMode::kConsider, - !collation.isEmpty() ? collator.get() : cm->getDefaultCollator()); + // Save a copy of routingInfo before calling scatterGather(), to guarantee that we extract + // the collation from the same routingInfo that was used by scatterGather(). + // (scatterGather() will throw if the routingInfo needs to be refreshed). + const auto routingInfo = + uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss)); + + BSONObj viewDefinition; + auto swShardResponses = + scatterGatherVersionedTargetByRoutingTable(opCtx, + dbName, + nss, + filterCommandRequestForPassthrough(cmdObj), + ReadPreferenceSetting::get(opCtx), + Shard::RetryPolicy::kIdempotent, + query, + collation, + &viewDefinition); + + if (ErrorCodes::CommandOnShardedViewNotSupportedOnMongod == swShardResponses.getStatus()) { + uassert(ErrorCodes::InternalError, + str::stream() << "Missing resolved view definition, but remote returned " + << ErrorCodes::errorString(swShardResponses.getStatus().code()), + !viewDefinition.isEmpty()); + + auto resolvedView = ResolvedView::fromBSON(viewDefinition); + auto parsedDistinct = ParsedDistinct::parse( + opCtx, resolvedView.getNamespace(), cmdObj, ExtensionsCallbackNoop(), true); + if (!parsedDistinct.isOK()) { + return appendCommandStatus(result, parsedDistinct.getStatus()); + } + + auto aggCmdOnView = parsedDistinct.getValue().asAggregationCommand(); + if (!aggCmdOnView.isOK()) { + return appendCommandStatus(result, aggCmdOnView.getStatus()); + } + + auto aggRequestOnView = AggregationRequest::parseFromBSON(nss, aggCmdOnView.getValue()); + if (!aggRequestOnView.isOK()) { + return appendCommandStatus(result, aggRequestOnView.getStatus()); + } + + auto resolvedAggRequest = + resolvedView.asExpandedViewAggregation(aggRequestOnView.getValue()); + auto resolvedAggCmd = resolvedAggRequest.serializeToCommandObj().toBson(); + + BSONObj aggResult = Command::runCommandDirectly( + opCtx, OpMsgRequest::fromDBAndBody(dbName, std::move(resolvedAggCmd))); + + ViewResponseFormatter formatter(aggResult); + auto formatStatus = formatter.appendAsDistinctResponse(&result); + if (!formatStatus.isOK()) { + return appendCommandStatus(result, formatStatus); + } + return true; + } + + uassertStatusOK(swShardResponses.getStatus()); + auto shardResponses = std::move(swShardResponses.getValue()); + + BSONObjComparator bsonCmp( + BSONObj(), + BSONObjComparator::FieldNamesMode::kConsider, + !collation.isEmpty() + ? collator.get() + : (routingInfo.cm() ? routingInfo.cm()->getDefaultCollator() : nullptr)); BSONObjSet all = bsonCmp.makeBSONObjSet(); for (const auto& response : shardResponses) { @@ -1219,18 +1221,16 @@ public: Timer timer; BSONObj viewDefinition; - auto swShardResponses = scatterGather(opCtx, - dbname, - nss, - explainCmd, - ReadPreferenceSetting::get(opCtx), - Shard::RetryPolicy::kIdempotent, - ShardTargetingPolicy::UseRoutingTable, - targetingQuery, - targetingCollation, - true, // do shard versioning - true, // retry on stale shard version - &viewDefinition); + auto swShardResponses = + scatterGatherVersionedTargetByRoutingTable(opCtx, + dbname, + nss, + explainCmd, + ReadPreferenceSetting::get(opCtx), + Shard::RetryPolicy::kIdempotent, + targetingQuery, + targetingCollation, + &viewDefinition); long long millisElapsed = timer.millis(); diff --git a/src/mongo/s/commands/strategy.cpp b/src/mongo/s/commands/strategy.cpp index f0727526ac7..cd70651b62f 100644 --- a/src/mongo/s/commands/strategy.cpp +++ b/src/mongo/s/commands/strategy.cpp @@ -606,18 +606,16 @@ Status Strategy::explainFind(OperationContext* opCtx, Timer timer; BSONObj viewDefinition; - auto swShardResponses = scatterGather(opCtx, - qr.nss().db().toString(), - qr.nss(), - explainCmd, - readPref, - Shard::RetryPolicy::kIdempotent, - ShardTargetingPolicy::UseRoutingTable, - qr.getFilter(), - qr.getCollation(), - true, // do shard versioning - true, // retry on stale shard version - &viewDefinition); + auto swShardResponses = + scatterGatherVersionedTargetByRoutingTable(opCtx, + qr.nss().db().toString(), + qr.nss(), + explainCmd, + readPref, + Shard::RetryPolicy::kIdempotent, + qr.getFilter(), + qr.getCollation(), + &viewDefinition); long long millisElapsed = timer.millis(); |