summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorEsha Maharishi <esha.maharishi@mongodb.com>2017-09-25 13:23:18 -0400
committerEsha Maharishi <esha.maharishi@mongodb.com>2017-09-26 13:06:13 -0400
commitdabde5a17356591ebcb465129bcd5ed5d9236b60 (patch)
treee0a58da30be2d87ff4670e63cff12b1404fa29d1 /src/mongo
parent5c29ce7718b423ae23ba0eccf71249cf69d36d37 (diff)
downloadmongo-dabde5a17356591ebcb465129bcd5ed5d9236b60.tar.gz
SERVER-30025 for sharded read with empty query, target only shards that own data for the collection instead of all shards
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/s/commands/cluster_aggregate.cpp38
-rw-r--r--src/mongo/s/commands/cluster_commands_helpers.cpp210
-rw-r--r--src/mongo/s/commands/cluster_commands_helpers.h81
-rw-r--r--src/mongo/s/commands/cluster_count_cmd.cpp44
-rw-r--r--src/mongo/s/commands/cluster_db_stats_cmd.cpp15
-rw-r--r--src/mongo/s/commands/commands_public.cpp208
-rw-r--r--src/mongo/s/commands/strategy.cpp22
7 files changed, 303 insertions, 315 deletions
diff --git a/src/mongo/s/commands/cluster_aggregate.cpp b/src/mongo/s/commands/cluster_aggregate.cpp
index 1ececcc0470..f3298662b02 100644
--- a/src/mongo/s/commands/cluster_aggregate.cpp
+++ b/src/mongo/s/commands/cluster_aggregate.cpp
@@ -541,19 +541,31 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
// Explain does not produce a cursor, so instead we scatter-gather commands to the shards.
if (mergeCtx->explain) {
- swShardResults = scatterGather(opCtx,
- namespaces.executionNss.db().toString(),
- namespaces.executionNss,
- targetedCommand,
- ReadPreferenceSetting::get(opCtx),
- Shard::RetryPolicy::kIdempotent,
- namespaces.executionNss.isCollectionlessAggregateNS()
- ? ShardTargetingPolicy::BroadcastToAllShards
- : ShardTargetingPolicy::UseRoutingTable,
- shardQuery,
- request.getCollation(),
- true,
- false);
+ if (namespaces.executionNss.isCollectionlessAggregateNS()) {
+ // Some commands, such as $currentOp, are implemented as aggregation stages on a
+ // "collectionless" namespace. Currently, all such commands should be broadcast to
+ // all shards, and should not participate in the shard version protocol.
+ swShardResults =
+ scatterGatherUnversionedTargetAllShards(opCtx,
+ namespaces.executionNss.db().toString(),
+ namespaces.executionNss,
+ targetedCommand,
+ ReadPreferenceSetting::get(opCtx),
+ Shard::RetryPolicy::kIdempotent);
+ } else {
+ // Aggregations on a real namespace should use the routing table to target shards,
+ // and should participate in the shard version protocol.
+ swShardResults = scatterGatherVersionedTargetByRoutingTable(
+ opCtx,
+ namespaces.executionNss.db().toString(),
+ namespaces.executionNss,
+ targetedCommand,
+ ReadPreferenceSetting::get(opCtx),
+ Shard::RetryPolicy::kIdempotent,
+ shardQuery,
+ request.getCollation(),
+ nullptr /* viewDefinition */);
+ }
} else {
swCursors = establishShardCursors(opCtx,
namespaces.executionNss,
diff --git a/src/mongo/s/commands/cluster_commands_helpers.cpp b/src/mongo/s/commands/cluster_commands_helpers.cpp
index 1701f7223c7..69357d02785 100644
--- a/src/mongo/s/commands/cluster_commands_helpers.cpp
+++ b/src/mongo/s/commands/cluster_commands_helpers.cpp
@@ -69,8 +69,8 @@ void appendWriteConcernErrorToCmdResponse(const ShardId& shardId,
namespace {
-std::vector<AsyncRequestsSender::Request> buildRequestsForAllShards(OperationContext* opCtx,
- const BSONObj& cmdObj) {
+std::vector<AsyncRequestsSender::Request> buildUnversionedRequestsForAllShards(
+ OperationContext* opCtx, const BSONObj& cmdObj) {
std::vector<AsyncRequestsSender::Request> requests;
std::vector<ShardId> shardIds;
Grid::get(opCtx)->shardRegistry()->getAllShardIds(&shardIds);
@@ -80,66 +80,44 @@ std::vector<AsyncRequestsSender::Request> buildRequestsForAllShards(OperationCon
return requests;
}
-std::vector<AsyncRequestsSender::Request> buildRequestsForShardsForNamespace(
+std::vector<AsyncRequestsSender::Request> buildVersionedRequestsForTargetedShards(
OperationContext* opCtx,
const CachedCollectionRoutingInfo& routingInfo,
const BSONObj& cmdObj,
- const boost::optional<BSONObj> query,
- const boost::optional<BSONObj> collation,
- bool appendVersion) {
+ const BSONObj& query,
+ const BSONObj& collation) {
std::vector<AsyncRequestsSender::Request> requests;
if (routingInfo.cm()) {
- // The collection is sharded.
- // Note(esha): The for-loop is duplicated because ChunkManager::getShardIdsForQuery() and
- // ShardRegistry::getAllShardIds() return different types: std::set and std::vector,
- // respectively.
- if (query) {
- // A query was specified. Target all shards that own chunks that match the query.
- std::set<ShardId> shardIds;
- routingInfo.cm()->getShardIdsForQuery(
- opCtx, *query, (collation ? *collation : BSONObj()), &shardIds);
- for (const ShardId& shardId : shardIds) {
- requests.emplace_back(
- shardId,
- appendVersion
- ? appendShardVersion(cmdObj, routingInfo.cm()->getVersion(shardId))
- : cmdObj);
- }
- } else {
- // No query was specified. Target all shards.
- std::vector<ShardId> shardIds;
- Grid::get(opCtx)->shardRegistry()->getAllShardIds(&shardIds);
- for (const ShardId& shardId : shardIds) {
- requests.emplace_back(
- shardId,
- appendVersion
- ? appendShardVersion(cmdObj, routingInfo.cm()->getVersion(shardId))
- : cmdObj);
- }
+ // The collection is sharded. Target all shards that own chunks that match the query.
+ std::set<ShardId> shardIds;
+ routingInfo.cm()->getShardIdsForQuery(opCtx, query, collation, &shardIds);
+ for (const ShardId& shardId : shardIds) {
+ requests.emplace_back(
+ shardId, appendShardVersion(cmdObj, routingInfo.cm()->getVersion(shardId)));
}
} else {
// The collection is unsharded. Target only the primary shard for the database.
// Don't append shard version info when contacting the config servers.
requests.emplace_back(routingInfo.primaryId(),
- appendVersion && !routingInfo.primary()->isConfig()
+ !routingInfo.primary()->isConfig()
? appendShardVersion(cmdObj, ChunkVersion::UNSHARDED())
: cmdObj);
}
return requests;
}
+/**
+ * Throws StaleConfigException if any remote returns a stale shardVersion error.
+ */
StatusWith<std::vector<AsyncRequestsSender::Response>> gatherResponses(
OperationContext* opCtx,
const std::string& dbName,
- const BSONObj& cmdObj,
const ReadPreferenceSetting& readPref,
Shard::RetryPolicy retryPolicy,
const std::vector<AsyncRequestsSender::Request>& requests,
BSONObj* viewDefinition) {
// Send the requests.
- LOG(2) << "Dispatching command " << redact(cmdObj) << " to " << requests.size()
- << " targeted shards using readPreference " << readPref;
AsyncRequestsSender ars(opCtx,
Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(),
dbName,
@@ -159,58 +137,47 @@ StatusWith<std::vector<AsyncRequestsSender::Response>> gatherResponses(
// We successfully received a response.
// Check for special errors that require throwing out any accumulated results.
-
- status = getStatusFromCommandResult(response.swResponse.getValue().data);
- LOG(2) << "Received status " << status << " and responseObj "
- << response.swResponse.getValue() << " from shard " << response.shardId
- << " at host " << response.shardHostAndPort->toString();
+ auto& responseObj = response.swResponse.getValue().data;
+ status = getStatusFromCommandResult(responseObj);
// Failing to establish a consistent shardVersion means no results should be examined.
if (ErrorCodes::isStaleShardingError(status.code())) {
- return status;
+ throw StaleConfigException(str::stream() << "got stale shardVersion response "
+ << responseObj
+ << " from shard "
+ << response.shardId
+ << " at host "
+ << response.shardHostAndPort->toString(),
+ responseObj);
}
- // In the case a read is performed against a view, the shard primary can return an error
+ // In the case a read is performed against a view, the server can return an error
// indicating that the underlying collection may be sharded. When this occurs the return
// message will include an expanded view definition and collection namespace. We pass
// the definition back to the caller by storing it in the 'viewDefinition' parameter.
// This allows the caller to rewrite the request as an aggregation and retry it.
if (ErrorCodes::CommandOnShardedViewNotSupportedOnMongod == status) {
- auto& responseObj = response.swResponse.getValue().data;
if (!responseObj.hasField("resolvedView")) {
- status = Status(ErrorCodes::InternalError,
- str::stream() << "Missing field 'resolvedView' in document: "
- << responseObj);
- return status;
+ return {ErrorCodes::InternalError,
+ str::stream() << "Missing field 'resolvedView' in document: "
+ << responseObj};
}
auto resolvedViewObj = responseObj.getObjectField("resolvedView");
if (resolvedViewObj.isEmpty()) {
- status = Status(ErrorCodes::InternalError,
- str::stream() << "Field 'resolvedView' must be an object: "
- << responseObj);
- return status;
+ return {ErrorCodes::InternalError,
+ str::stream() << "Field 'resolvedView' must be an object: "
+ << responseObj};
}
+
if (viewDefinition) {
*viewDefinition = BSON("resolvedView" << resolvedViewObj.getOwned());
}
return status;
}
-
- if (status.isOK()) {
- // The command status was OK.
- responses.push_back(std::move(response));
- continue;
- }
}
-
- // Either we failed to get a response, or the command had a non-OK status that we can store
- // as an individual shard response.
- LOG(2) << "Received error " << response.swResponse.getStatus() << " from shard "
- << response.shardId;
responses.push_back(std::move(response));
}
-
return responses;
}
@@ -222,75 +189,74 @@ BSONObj appendShardVersion(BSONObj cmdObj, ChunkVersion version) {
return cmdWithVersionBob.obj();
}
-StatusWith<std::vector<AsyncRequestsSender::Response>> scatterGather(
+StatusWith<std::vector<AsyncRequestsSender::Response>> scatterGatherUnversionedTargetAllShards(
OperationContext* opCtx,
const std::string& dbName,
- const boost::optional<NamespaceString> nss,
+ boost::optional<NamespaceString> nss,
const BSONObj& cmdObj,
const ReadPreferenceSetting& readPref,
- const Shard::RetryPolicy retryPolicy,
- const ShardTargetingPolicy targetPolicy,
- const boost::optional<BSONObj> query,
- const boost::optional<BSONObj> collation,
- const bool appendShardVersion,
- const bool retryOnStaleShardVersion,
- BSONObj* viewDefinition) {
+ Shard::RetryPolicy retryPolicy) {
+ // Some commands, such as $currentOp, operate on a collectionless namespace. If a full namespace
+ // is specified, its database must match the dbName.
+ invariant(!nss || (nss->db() == dbName));
- // If a NamespaceString is specified, it must match the dbName.
- invariant(!nss || (nss.get().db() == dbName));
+ auto requests = buildUnversionedRequestsForAllShards(opCtx, cmdObj);
- switch (targetPolicy) {
- case ShardTargetingPolicy::BroadcastToAllShards: {
- // Send unversioned commands to all shards.
- auto requests = buildRequestsForAllShards(opCtx, cmdObj);
- return gatherResponses(
- opCtx, dbName, cmdObj, readPref, retryPolicy, requests, viewDefinition);
- }
+ return gatherResponses(
+ opCtx, dbName, readPref, retryPolicy, requests, nullptr /* viewDefinition */);
+}
+
+StatusWith<std::vector<AsyncRequestsSender::Response>> scatterGatherVersionedTargetByRoutingTable(
+ OperationContext* opCtx,
+ const std::string& dbName,
+ const NamespaceString& nss,
+ const BSONObj& cmdObj,
+ const ReadPreferenceSetting& readPref,
+ Shard::RetryPolicy retryPolicy,
+ const BSONObj& query,
+ const BSONObj& collation,
+ BSONObj* viewDefinition) {
+ // The database in the full namespace must match the dbName.
+ invariant(nss.db() == dbName);
- case ShardTargetingPolicy::UseRoutingTable: {
- // We must have a valid NamespaceString.
- invariant(nss && nss.get().isValid());
+ auto swRoutingInfo = Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss);
+ if (!swRoutingInfo.isOK()) {
+ return swRoutingInfo.getStatus();
+ }
+ auto routingInfo = swRoutingInfo.getValue();
- int numAttempts = 0;
- StatusWith<std::vector<AsyncRequestsSender::Response>> swResponses(
- (std::vector<AsyncRequestsSender::Response>()));
+ auto requests =
+ buildVersionedRequestsForTargetedShards(opCtx, routingInfo, cmdObj, query, collation);
- do {
- // Get the routing table cache.
- auto swRoutingInfo =
- Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, *nss);
- if (!swRoutingInfo.isOK()) {
- return swRoutingInfo.getStatus();
- }
- auto routingInfo = swRoutingInfo.getValue();
-
- // Use the routing table cache to decide which shards to target, and build the
- // requests to send to them.
- auto requests = buildRequestsForShardsForNamespace(
- opCtx, routingInfo, cmdObj, query, collation, appendShardVersion);
-
- // Retrieve the responses from the shards.
- swResponses = gatherResponses(
- opCtx, dbName, cmdObj, readPref, retryPolicy, requests, viewDefinition);
- ++numAttempts;
-
- // If any shard returned a stale shardVersion error, invalidate the routing table
- // cache. This will cause the cache to be refreshed the next time it is accessed.
- if (ErrorCodes::isStaleShardingError(swResponses.getStatus().code())) {
- Grid::get(opCtx)->catalogCache()->onStaleConfigError(std::move(routingInfo));
- LOG(1) << "got stale shardVersion error " << swResponses.getStatus()
- << " while dispatching " << redact(cmdObj) << " after " << numAttempts
- << " dispatch attempts";
- }
- } while (retryOnStaleShardVersion && numAttempts < kMaxNumStaleVersionRetries &&
- !swResponses.getStatus().isOK());
+ return gatherResponses(opCtx, dbName, readPref, retryPolicy, requests, viewDefinition);
+}
- return swResponses;
- }
+StatusWith<std::vector<AsyncRequestsSender::Response>> scatterGatherOnlyVersionIfUnsharded(
+ OperationContext* opCtx,
+ const std::string& dbName,
+ const NamespaceString& nss,
+ const BSONObj& cmdObj,
+ const ReadPreferenceSetting& readPref,
+ Shard::RetryPolicy retryPolicy) {
+ // The database in the full namespace must match the dbName.
+ invariant(nss.db() == dbName);
+
+ auto swRoutingInfo = Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss);
+ if (!swRoutingInfo.isOK()) {
+ return swRoutingInfo.getStatus();
+ }
+ auto routingInfo = swRoutingInfo.getValue();
- default:
- MONGO_UNREACHABLE;
+ std::vector<AsyncRequestsSender::Request> requests;
+ if (routingInfo.cm()) {
+ requests = buildUnversionedRequestsForAllShards(opCtx, cmdObj);
+ } else {
+ requests = buildVersionedRequestsForTargetedShards(
+ opCtx, routingInfo, cmdObj, BSONObj(), BSONObj());
}
+
+ return gatherResponses(
+ opCtx, dbName, readPref, retryPolicy, requests, nullptr /* viewDefinition */);
}
bool appendRawResponses(OperationContext* opCtx,
diff --git a/src/mongo/s/commands/cluster_commands_helpers.h b/src/mongo/s/commands/cluster_commands_helpers.h
index 907656f264b..e55e3677609 100644
--- a/src/mongo/s/commands/cluster_commands_helpers.h
+++ b/src/mongo/s/commands/cluster_commands_helpers.h
@@ -47,12 +47,6 @@ class CachedDatabaseInfo;
class OperationContext;
class ShardId;
-/*
- * Allows callers of routing functions to specify a preferred targeting policy. See scatterGather
- * for a usage example.
- */
-enum class ShardTargetingPolicy { UseRoutingTable, BroadcastToAllShards };
-
/**
* This function appends the provided writeConcernError BSONElement to the sharded response.
*/
@@ -65,43 +59,66 @@ void appendWriteConcernErrorToCmdResponse(const ShardId& shardID,
BSONObj appendShardVersion(BSONObj cmdObj, ChunkVersion version);
/**
- * Generic function for dispatching commands to the cluster.
+ * Utility for dispatching unversioned commands to all shards in a cluster.
*
- * If 'targetPolicy' is ShardTargetingPolicy::BroadcastToAllShards, the command will be sent
- * unversioned to all shards and run on database 'dbName'. The 'query', 'collation' and
- * 'appendShardVersion' arguments, if supplied, will be ignored.
+ * Returns a non-OK status if a failure occurs on *this* node during execution. Otherwise, returns
+ * success and a list of responses from shards (including errors from the shards or errors reaching
+ * the shards).
*
- * If 'targetPolicy' is ShardTargetingPolicy::UseRoutingTable, the routing table cache will be used
- * to determine which shards the command should be dispatched to. If the namespace specified by
- * 'nss' is an unsharded collection, the command will be sent to the Primary shard for the database.
- * If 'query' is specified, only shards that own data needed by the query are targeted; otherwise,
- * all shards are targeted. By default, shardVersions are attached to the outgoing requests, and the
- * function will re-target and retry if it receives a stale shardVersion error from any shard.
+ * Note, if this mongos has not refreshed its shard list since
+ * 1) a shard has been *added* through a different mongos, a request will not be sent to the added
+ * shard
+ * 2) a shard has been *removed* through a different mongos, this function will return a
+ * ShardNotFound error status.
+ */
+StatusWith<std::vector<AsyncRequestsSender::Response>> scatterGatherUnversionedTargetAllShards(
+ OperationContext* opCtx,
+ const std::string& dbName,
+ boost::optional<NamespaceString> nss,
+ const BSONObj& cmdObj,
+ const ReadPreferenceSetting& readPref,
+ Shard::RetryPolicy retryPolicy);
+
+/**
+ * Utility for dispatching versioned commands on a namespace, deciding which shards to
+ * target by applying the passed-in query and collation to the local routing table cache.
*
- * Returns a non-OK status if a failure occurs on *this* node during execution or on seeing an error
- * from a shard that means the operation as a whole should fail, such as a exceeding retries for
- * stale shardVersion errors.
+ * Throws on seeing a StaleConfigException from any shard.
*
- * If a shard returns an error saying that the request was on a view, the shard will also return a
- * view definition. This will be stored in the BSONObj* viewDefinition argument, if non-null, so
- * that the caller can re-run the operation as an aggregation.
+ * Optionally populates a 'viewDefinition' out parameter if a shard's result contains a view
+ * definition. The viewDefinition can be used to re-run the command as an aggregation.
*
- * Otherwise, returns success and a list of responses from shards (including errors from the shards
- * or errors reaching the shards).
+ * Return value is the same as scatterGatherUnversionedTargetAllShards().
*/
-StatusWith<std::vector<AsyncRequestsSender::Response>> scatterGather(
+StatusWith<std::vector<AsyncRequestsSender::Response>> scatterGatherVersionedTargetByRoutingTable(
OperationContext* opCtx,
const std::string& dbName,
- const boost::optional<NamespaceString> nss,
+ const NamespaceString& nss,
const BSONObj& cmdObj,
const ReadPreferenceSetting& readPref,
Shard::RetryPolicy retryPolicy,
- const ShardTargetingPolicy targetPolicy = ShardTargetingPolicy::UseRoutingTable,
- const boost::optional<BSONObj> query = boost::none,
- const boost::optional<BSONObj> collation = boost::none,
- const bool appendShardVersion = true,
- const bool retryOnStaleShardVersion = true,
- BSONObj* viewDefinition = nullptr);
+ const BSONObj& query,
+ const BSONObj& collation,
+ BSONObj* viewDefinition);
+
+/**
+ * Utility for dispatching commands on a namespace, but with special hybrid versioning:
+ * - If the namespace is unsharded, a version is attached (so this node can find out if its routing
+ * table was stale, and the namespace is actually sharded), and only the primary shard is targeted.
+ * - If the namespace is sharded, no version is attached, and the request is broadcast to all
+ * shards.
+ *
+ * Throws on seeing a StaleConfigException.
+ *
+ * Return value is the same as scatterGatherUnversionedTargetAllShards().
+ */
+StatusWith<std::vector<AsyncRequestsSender::Response>> scatterGatherOnlyVersionIfUnsharded(
+ OperationContext* opCtx,
+ const std::string& dbName,
+ const NamespaceString& nss,
+ const BSONObj& cmdObj,
+ const ReadPreferenceSetting& readPref,
+ Shard::RetryPolicy retryPolicy);
/**
* Attaches each shard's response or error status by the shard's connection string in a top-level
diff --git a/src/mongo/s/commands/cluster_count_cmd.cpp b/src/mongo/s/commands/cluster_count_cmd.cpp
index 755188311b0..e88eb8c00cc 100644
--- a/src/mongo/s/commands/cluster_count_cmd.cpp
+++ b/src/mongo/s/commands/cluster_count_cmd.cpp
@@ -141,18 +141,16 @@ public:
auto countCmdObj = countCmdBuilder.done();
BSONObj viewDefinition;
- auto swShardResponses = scatterGather(opCtx,
- dbname,
- nss,
- countCmdObj,
- ReadPreferenceSetting::get(opCtx),
- Shard::RetryPolicy::kIdempotent,
- ShardTargetingPolicy::UseRoutingTable,
- filter,
- collation,
- true, // do shard versioning
- true, // retry on stale shard version
- &viewDefinition);
+ auto swShardResponses =
+ scatterGatherVersionedTargetByRoutingTable(opCtx,
+ dbname,
+ nss,
+ countCmdObj,
+ ReadPreferenceSetting::get(opCtx),
+ Shard::RetryPolicy::kIdempotent,
+ filter,
+ collation,
+ &viewDefinition);
if (ErrorCodes::CommandOnShardedViewNotSupportedOnMongod == swShardResponses.getStatus()) {
if (viewDefinition.isEmpty()) {
@@ -272,18 +270,16 @@ public:
Timer timer;
BSONObj viewDefinition;
- auto swShardResponses = scatterGather(opCtx,
- dbname,
- nss,
- explainCmd,
- ReadPreferenceSetting::get(opCtx),
- Shard::RetryPolicy::kIdempotent,
- ShardTargetingPolicy::UseRoutingTable,
- targetingQuery,
- targetingCollation,
- true, // do shard versioning
- true, // retry on stale shard version
- &viewDefinition);
+ auto swShardResponses =
+ scatterGatherVersionedTargetByRoutingTable(opCtx,
+ dbname,
+ nss,
+ explainCmd,
+ ReadPreferenceSetting::get(opCtx),
+ Shard::RetryPolicy::kIdempotent,
+ targetingQuery,
+ targetingCollation,
+ &viewDefinition);
long long millisElapsed = timer.millis();
diff --git a/src/mongo/s/commands/cluster_db_stats_cmd.cpp b/src/mongo/s/commands/cluster_db_stats_cmd.cpp
index d6f88616a15..54edc5c9db4 100644
--- a/src/mongo/s/commands/cluster_db_stats_cmd.cpp
+++ b/src/mongo/s/commands/cluster_db_stats_cmd.cpp
@@ -68,14 +68,13 @@ public:
const BSONObj& cmdObj,
std::string& errmsg,
BSONObjBuilder& output) override {
- auto shardResponses =
- uassertStatusOK(scatterGather(opCtx,
- dbName,
- boost::none,
- filterCommandRequestForPassthrough(cmdObj),
- ReadPreferenceSetting::get(opCtx),
- Shard::RetryPolicy::kIdempotent,
- ShardTargetingPolicy::BroadcastToAllShards));
+ auto shardResponses = uassertStatusOK(
+ scatterGatherUnversionedTargetAllShards(opCtx,
+ dbName,
+ boost::none,
+ filterCommandRequestForPassthrough(cmdObj),
+ ReadPreferenceSetting::get(opCtx),
+ Shard::RetryPolicy::kIdempotent));
if (!appendRawResponses(opCtx, &errmsg, &output, shardResponses)) {
return false;
}
diff --git a/src/mongo/s/commands/commands_public.cpp b/src/mongo/s/commands/commands_public.cpp
index c0c05398c0e..1d9ab4c6b32 100644
--- a/src/mongo/s/commands/commands_public.cpp
+++ b/src/mongo/s/commands/commands_public.cpp
@@ -274,13 +274,13 @@ public:
const NamespaceString nss(parseNsCollectionRequired(dbName, cmdObj));
LOG(1) << "dropIndexes: " << nss << " cmd:" << redact(cmdObj);
- auto shardResponses =
- uassertStatusOK(scatterGather(opCtx,
- dbName,
- nss,
- filterCommandRequestForPassthrough(cmdObj),
- ReadPreferenceSetting::get(opCtx),
- Shard::RetryPolicy::kNotIdempotent));
+ auto shardResponses = uassertStatusOK(
+ scatterGatherOnlyVersionIfUnsharded(opCtx,
+ dbName,
+ nss,
+ filterCommandRequestForPassthrough(cmdObj),
+ ReadPreferenceSetting::get(opCtx),
+ Shard::RetryPolicy::kNotIdempotent));
return appendRawResponses(opCtx, &errmsg, &output, std::move(shardResponses));
}
} dropIndexesCmd;
@@ -319,13 +319,13 @@ public:
uassertStatusOK(createShardDatabase(opCtx, dbName));
- auto shardResponses =
- uassertStatusOK(scatterGather(opCtx,
- dbName,
- nss,
- filterCommandRequestForPassthrough(cmdObj),
- ReadPreferenceSetting::get(opCtx),
- Shard::RetryPolicy::kNoRetry));
+ auto shardResponses = uassertStatusOK(
+ scatterGatherOnlyVersionIfUnsharded(opCtx,
+ dbName,
+ nss,
+ filterCommandRequestForPassthrough(cmdObj),
+ ReadPreferenceSetting::get(opCtx),
+ Shard::RetryPolicy::kNoRetry));
return appendRawResponses(opCtx, &errmsg, &output, std::move(shardResponses));
}
} createIndexesCmd;
@@ -362,13 +362,13 @@ public:
const NamespaceString nss(parseNsCollectionRequired(dbName, cmdObj));
LOG(1) << "reIndex: " << nss << " cmd:" << redact(cmdObj);
- auto shardResponses =
- uassertStatusOK(scatterGather(opCtx,
- dbName,
- nss,
- filterCommandRequestForPassthrough(cmdObj),
- ReadPreferenceSetting::get(opCtx),
- Shard::RetryPolicy::kNoRetry));
+ auto shardResponses = uassertStatusOK(
+ scatterGatherOnlyVersionIfUnsharded(opCtx,
+ dbName,
+ nss,
+ filterCommandRequestForPassthrough(cmdObj),
+ ReadPreferenceSetting::get(opCtx),
+ Shard::RetryPolicy::kNoRetry));
return appendRawResponses(opCtx, &errmsg, &output, std::move(shardResponses));
}
} reIndexCmd;
@@ -404,13 +404,13 @@ public:
const NamespaceString nss(parseNsCollectionRequired(dbName, cmdObj));
LOG(1) << "collMod: " << nss << " cmd:" << redact(cmdObj);
- auto shardResponses =
- uassertStatusOK(scatterGather(opCtx,
- dbName,
- nss,
- filterCommandRequestForPassthrough(cmdObj),
- ReadPreferenceSetting::get(opCtx),
- Shard::RetryPolicy::kNoRetry));
+ auto shardResponses = uassertStatusOK(
+ scatterGatherOnlyVersionIfUnsharded(opCtx,
+ dbName,
+ nss,
+ filterCommandRequestForPassthrough(cmdObj),
+ ReadPreferenceSetting::get(opCtx),
+ Shard::RetryPolicy::kNoRetry));
return appendRawResponses(opCtx, &errmsg, &output, std::move(shardResponses));
}
} collectionModCmd;
@@ -1077,57 +1077,8 @@ public:
BSONObjBuilder& result) override {
const NamespaceString nss(parseNsCollectionRequired(dbName, cmdObj));
- auto routingInfo =
- uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss));
- if (!routingInfo.cm()) {
- // TODO (SERVER-30734) make distinct versioned against unsharded collections
- if (passthrough(opCtx, dbName, routingInfo.primaryId(), cmdObj, result)) {
- return true;
- }
-
- BSONObj resultObj = result.asTempObj();
- if (ResolvedView::isResolvedViewErrorResponse(resultObj)) {
- auto resolvedView = ResolvedView::fromBSON(resultObj);
- result.resetToEmpty();
-
- auto parsedDistinct = ParsedDistinct::parse(
- opCtx, resolvedView.getNamespace(), cmdObj, ExtensionsCallbackNoop(), false);
- if (!parsedDistinct.isOK()) {
- return appendCommandStatus(result, parsedDistinct.getStatus());
- }
-
- auto aggCmdOnView = parsedDistinct.getValue().asAggregationCommand();
- if (!aggCmdOnView.isOK()) {
- return appendCommandStatus(result, aggCmdOnView.getStatus());
- }
-
- auto aggRequestOnView =
- AggregationRequest::parseFromBSON(nss, aggCmdOnView.getValue());
- if (!aggRequestOnView.isOK()) {
- return appendCommandStatus(result, aggRequestOnView.getStatus());
- }
-
- auto resolvedAggRequest =
- resolvedView.asExpandedViewAggregation(aggRequestOnView.getValue());
- auto resolvedAggCmd = resolvedAggRequest.serializeToCommandObj().toBson();
-
- BSONObj aggResult = Command::runCommandDirectly(
- opCtx, OpMsgRequest::fromDBAndBody(dbName, std::move(resolvedAggCmd)));
-
- ViewResponseFormatter formatter(aggResult);
- auto formatStatus = formatter.appendAsDistinctResponse(&result);
- if (!formatStatus.isOK()) {
- return appendCommandStatus(result, formatStatus);
- }
- return true;
- }
-
- return false;
- }
-
- const auto cm = routingInfo.cm();
-
auto query = getQuery(cmdObj);
+
auto swCollation = getCollation(cmdObj);
if (!swCollation.isOK()) {
return appendEmptyResultSet(result, swCollation.getStatus(), nss.ns());
@@ -1145,20 +1096,71 @@ public:
collator = std::move(swCollator.getValue());
}
- auto shardResponses =
- uassertStatusOK(scatterGather(opCtx,
- dbName,
- nss,
- filterCommandRequestForPassthrough(cmdObj),
- ReadPreferenceSetting::get(opCtx),
- Shard::RetryPolicy::kIdempotent,
- ShardTargetingPolicy::UseRoutingTable,
- query,
- collation));
-
- BSONObjComparator bsonCmp(BSONObj(),
- BSONObjComparator::FieldNamesMode::kConsider,
- !collation.isEmpty() ? collator.get() : cm->getDefaultCollator());
+ // Save a copy of routingInfo before calling scatterGather(), to guarantee that we extract
+ // the collation from the same routingInfo that was used by scatterGather().
+ // (scatterGather() will throw if the routingInfo needs to be refreshed).
+ const auto routingInfo =
+ uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss));
+
+ BSONObj viewDefinition;
+ auto swShardResponses =
+ scatterGatherVersionedTargetByRoutingTable(opCtx,
+ dbName,
+ nss,
+ filterCommandRequestForPassthrough(cmdObj),
+ ReadPreferenceSetting::get(opCtx),
+ Shard::RetryPolicy::kIdempotent,
+ query,
+ collation,
+ &viewDefinition);
+
+ if (ErrorCodes::CommandOnShardedViewNotSupportedOnMongod == swShardResponses.getStatus()) {
+ uassert(ErrorCodes::InternalError,
+ str::stream() << "Missing resolved view definition, but remote returned "
+ << ErrorCodes::errorString(swShardResponses.getStatus().code()),
+ !viewDefinition.isEmpty());
+
+ auto resolvedView = ResolvedView::fromBSON(viewDefinition);
+ auto parsedDistinct = ParsedDistinct::parse(
+ opCtx, resolvedView.getNamespace(), cmdObj, ExtensionsCallbackNoop(), true);
+ if (!parsedDistinct.isOK()) {
+ return appendCommandStatus(result, parsedDistinct.getStatus());
+ }
+
+ auto aggCmdOnView = parsedDistinct.getValue().asAggregationCommand();
+ if (!aggCmdOnView.isOK()) {
+ return appendCommandStatus(result, aggCmdOnView.getStatus());
+ }
+
+ auto aggRequestOnView = AggregationRequest::parseFromBSON(nss, aggCmdOnView.getValue());
+ if (!aggRequestOnView.isOK()) {
+ return appendCommandStatus(result, aggRequestOnView.getStatus());
+ }
+
+ auto resolvedAggRequest =
+ resolvedView.asExpandedViewAggregation(aggRequestOnView.getValue());
+ auto resolvedAggCmd = resolvedAggRequest.serializeToCommandObj().toBson();
+
+ BSONObj aggResult = Command::runCommandDirectly(
+ opCtx, OpMsgRequest::fromDBAndBody(dbName, std::move(resolvedAggCmd)));
+
+ ViewResponseFormatter formatter(aggResult);
+ auto formatStatus = formatter.appendAsDistinctResponse(&result);
+ if (!formatStatus.isOK()) {
+ return appendCommandStatus(result, formatStatus);
+ }
+ return true;
+ }
+
+ uassertStatusOK(swShardResponses.getStatus());
+ auto shardResponses = std::move(swShardResponses.getValue());
+
+ BSONObjComparator bsonCmp(
+ BSONObj(),
+ BSONObjComparator::FieldNamesMode::kConsider,
+ !collation.isEmpty()
+ ? collator.get()
+ : (routingInfo.cm() ? routingInfo.cm()->getDefaultCollator() : nullptr));
BSONObjSet all = bsonCmp.makeBSONObjSet();
for (const auto& response : shardResponses) {
@@ -1219,18 +1221,16 @@ public:
Timer timer;
BSONObj viewDefinition;
- auto swShardResponses = scatterGather(opCtx,
- dbname,
- nss,
- explainCmd,
- ReadPreferenceSetting::get(opCtx),
- Shard::RetryPolicy::kIdempotent,
- ShardTargetingPolicy::UseRoutingTable,
- targetingQuery,
- targetingCollation,
- true, // do shard versioning
- true, // retry on stale shard version
- &viewDefinition);
+ auto swShardResponses =
+ scatterGatherVersionedTargetByRoutingTable(opCtx,
+ dbname,
+ nss,
+ explainCmd,
+ ReadPreferenceSetting::get(opCtx),
+ Shard::RetryPolicy::kIdempotent,
+ targetingQuery,
+ targetingCollation,
+ &viewDefinition);
long long millisElapsed = timer.millis();
diff --git a/src/mongo/s/commands/strategy.cpp b/src/mongo/s/commands/strategy.cpp
index f0727526ac7..cd70651b62f 100644
--- a/src/mongo/s/commands/strategy.cpp
+++ b/src/mongo/s/commands/strategy.cpp
@@ -606,18 +606,16 @@ Status Strategy::explainFind(OperationContext* opCtx,
Timer timer;
BSONObj viewDefinition;
- auto swShardResponses = scatterGather(opCtx,
- qr.nss().db().toString(),
- qr.nss(),
- explainCmd,
- readPref,
- Shard::RetryPolicy::kIdempotent,
- ShardTargetingPolicy::UseRoutingTable,
- qr.getFilter(),
- qr.getCollation(),
- true, // do shard versioning
- true, // retry on stale shard version
- &viewDefinition);
+ auto swShardResponses =
+ scatterGatherVersionedTargetByRoutingTable(opCtx,
+ qr.nss().db().toString(),
+ qr.nss(),
+ explainCmd,
+ readPref,
+ Shard::RetryPolicy::kIdempotent,
+ qr.getFilter(),
+ qr.getCollation(),
+ &viewDefinition);
long long millisElapsed = timer.millis();