summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEsha Maharishi <esha.maharishi@mongodb.com>2017-04-14 18:32:52 -0400
committerEsha Maharishi <esha.maharishi@mongodb.com>2017-04-14 18:32:52 -0400
commit259185c16ad3e5b81a1ddad2e83e1723ceb7e25c (patch)
treebcdca64f6232a87980e0bb8f70cf85cbe48ed649
parente96a5090084d5d80f1349b3223d38f5f52c013f0 (diff)
downloadmongo-259185c16ad3e5b81a1ddad2e83e1723ceb7e25c.tar.gz
SERVER-28681 make ClusterCountCmd::run use the ARS
-rw-r--r--jstests/concurrency/fsm_workloads/sharded_moveChunk_drop_shard_key_index.js9
-rw-r--r--jstests/sharding/secondary_query_routing.js36
-rw-r--r--src/mongo/s/commands/cluster_commands_common.cpp170
-rw-r--r--src/mongo/s/commands/cluster_commands_common.h38
-rw-r--r--src/mongo/s/commands/cluster_count_cmd.cpp73
-rw-r--r--src/mongo/s/commands/cluster_current_op.cpp3
-rw-r--r--src/mongo/s/commands/cluster_db_stats_cmd.cpp3
-rw-r--r--src/mongo/s/commands/cluster_repair_database_cmd.cpp3
-rw-r--r--src/mongo/s/commands/commands_public.cpp30
9 files changed, 237 insertions, 128 deletions
diff --git a/jstests/concurrency/fsm_workloads/sharded_moveChunk_drop_shard_key_index.js b/jstests/concurrency/fsm_workloads/sharded_moveChunk_drop_shard_key_index.js
index d96fdd60cb0..e3c348fb099 100644
--- a/jstests/concurrency/fsm_workloads/sharded_moveChunk_drop_shard_key_index.js
+++ b/jstests/concurrency/fsm_workloads/sharded_moveChunk_drop_shard_key_index.js
@@ -39,7 +39,14 @@ var $config = (function() {
dropIndex: function dropIndex(db, collName) {
// We don't assert that the command succeeded when dropping an index because it's
// possible another thread has already dropped this index.
- db[collName].dropIndex(this.shardKey);
+ try {
+ db[collName].dropIndex(this.shardKey);
+ } catch (e) {
+ // Ignore stale shardVersion errors.
+ if (e.message.indexOf("stale config") < 0) {
+ throw e;
+ }
+ }
// Re-create the index that was dropped.
assertAlways.commandWorked(db[collName].createIndex(this.shardKey));
diff --git a/jstests/sharding/secondary_query_routing.js b/jstests/sharding/secondary_query_routing.js
deleted file mode 100644
index 36454da4935..00000000000
--- a/jstests/sharding/secondary_query_routing.js
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Test that queries can be routed to the right secondaries that are caught
- * up with the primary.
- */
-(function() {
-
- var rsOpts = {nodes: 2};
- var st = new ShardingTest({mongos: 2, shards: {rs0: rsOpts, rs1: rsOpts}});
-
- st.s0.adminCommand({enableSharding: 'test'});
-
- st.ensurePrimaryShard('test', st.shard0.shardName);
- st.s0.adminCommand({shardCollection: 'test.user', key: {x: 1}});
- st.s0.adminCommand({split: 'test.user', middle: {x: 0}});
-
- st.s1.setReadPref('secondary');
- var testDB = st.s1.getDB('test');
- // This establishes the shard version Mongos #1's view.
- testDB.user.insert({x: 1});
-
- // Mongos #0 bumps up the version without Mongos #1 knowledge.
- // Note: moveChunk has implicit { w: 2 } write concern.
- st.s0.adminCommand(
- {moveChunk: 'test.user', find: {x: 0}, to: st.shard1.shardName, _waitForDelete: true});
-
- // Clear all the connections to make sure that Mongos #1 will attempt to establish
- // the shard version.
- assert.commandWorked(testDB.adminCommand({connPoolSync: 1}));
-
- // Mongos #1 performs a query to the secondary.
- var res = testDB.runReadCommand({count: 'user', query: {x: 1}});
- assert(res.ok);
- assert.eq(1, res.n, tojson(res));
-
- st.stop();
-})();
diff --git a/src/mongo/s/commands/cluster_commands_common.cpp b/src/mongo/s/commands/cluster_commands_common.cpp
index 6d12b8e70e3..c654e881d8c 100644
--- a/src/mongo/s/commands/cluster_commands_common.cpp
+++ b/src/mongo/s/commands/cluster_commands_common.cpp
@@ -47,6 +47,7 @@
#include "mongo/s/grid.h"
#include "mongo/s/stale_exception.h"
#include "mongo/util/log.h"
+#include "mongo/util/scopeguard.h"
namespace mongo {
@@ -57,7 +58,7 @@ BSONObj appendShardVersion(const BSONObj& cmdObj, ChunkVersion version) {
version.appendForCommands(&cmdWithVersionBob);
return cmdWithVersionBob.obj();
}
-}
+} // namespace
std::vector<AsyncRequestsSender::Request> buildRequestsForAllShards(OperationContext* opCtx,
const BSONObj& cmdObj) {
@@ -70,7 +71,7 @@ std::vector<AsyncRequestsSender::Request> buildRequestsForAllShards(OperationCon
return requests;
}
-std::vector<AsyncRequestsSender::Request> buildRequestsForTargetedShards(
+std::vector<AsyncRequestsSender::Request> buildRequestsForShardsThatHaveCollection(
OperationContext* opCtx,
const CachedCollectionRoutingInfo& routingInfo,
const BSONObj& cmdObj) {
@@ -96,12 +97,42 @@ std::vector<AsyncRequestsSender::Request> buildRequestsForTargetedShards(
return requests;
}
+std::vector<AsyncRequestsSender::Request> buildRequestsForShardsForQuery(
+ OperationContext* opCtx,
+ const CachedCollectionRoutingInfo& routingInfo,
+ const BSONObj& cmdObj,
+ const BSONObj& filter,
+ const BSONObj& collation) {
+ std::vector<AsyncRequestsSender::Request> requests;
+ if (routingInfo.cm()) {
+ // The collection is sharded. Target all shards that own chunks that match the query.
+ std::set<ShardId> shardIds;
+ routingInfo.cm()->getShardIdsForQuery(opCtx, filter, collation, &shardIds);
+ for (const ShardId& shardId : shardIds) {
+ requests.emplace_back(
+ shardId, appendShardVersion(cmdObj, routingInfo.cm()->getVersion(shardId)));
+ }
+ } else {
+ // The collection is unsharded. Target only the primary shard for the database.
+ if (routingInfo.primary()->isConfig()) {
+ // Don't append shard version info when contacting the config servers.
+ requests.emplace_back(routingInfo.primaryId(), cmdObj);
+ } else {
+ requests.emplace_back(routingInfo.primaryId(),
+ appendShardVersion(cmdObj, ChunkVersion::UNSHARDED()));
+ }
+ }
+ return requests;
+}
+
+
StatusWith<std::vector<AsyncRequestsSender::Response>> gatherResponsesFromShards(
OperationContext* opCtx,
const std::string& dbName,
const BSONObj& cmdObj,
const std::vector<AsyncRequestsSender::Request>& requests,
- BSONObjBuilder* output) {
+ BSONObjBuilder* output,
+ BSONObj* viewDefinition) {
// Extract the readPreference from the command.
const auto queryOptionsObj = cmdObj.getObjectField(QueryRequest::kUnwrappedReadPrefField);
const auto readPrefObj = queryOptionsObj.getObjectField(QueryRequest::kWrappedReadPrefField);
@@ -110,7 +141,8 @@ StatusWith<std::vector<AsyncRequestsSender::Response>> gatherResponsesFromShards
: uassertStatusOK(ReadPreferenceSetting::fromBSON(readPrefObj));
// Send the requests.
-
+ LOG(0) << "Dispatching command " << redact(cmdObj) << " to " << requests.size()
+ << " targeted shards using readPreference " << readPref;
AsyncRequestsSender ars(opCtx,
Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(),
dbName,
@@ -120,19 +152,25 @@ StatusWith<std::vector<AsyncRequestsSender::Response>> gatherResponsesFromShards
// Get the responses.
std::vector<AsyncRequestsSender::Response> responses; // Stores results by ShardId
- BSONObjBuilder subobj(output->subobjStart("raw")); // Stores results by ConnectionString
- BSONObjBuilder errors; // Stores errors by ConnectionString
- int commonErrCode = -1; // Stores the overall error code
- BSONElement wcErrorElem;
- ShardId wcErrorShardId;
- bool hasWCError = false;
+ // These are only populated if a non-null 'output' was passed, and are used to build 'output'.
+ BSONObjBuilder subobj; // Stores raw responses by ConnectionString
+ BSONObjBuilder errors; // Stores errors by ConnectionString
+ int commonErrCode = -1; // Stores the overall error code
+ BSONElement wcErrorElem; // Stores the first writeConcern error we encounter
+ ShardId wcErrorShardId; // Stores the shardId for the first writeConcern error we encounter
+ bool hasWCError = false; // Whether we have encountered a writeConcern error yet
+
+ ScopeGuard reset = MakeGuard([output]() {
+ if (output) {
+ output->resetToEmpty();
+ }
+ });
while (!ars.done()) {
auto response = ars.next();
const auto swShard = Grid::get(opCtx)->shardRegistry()->getShard(opCtx, response.shardId);
if (!swShard.isOK()) {
- output->resetToEmpty();
return swShard.getStatus();
}
const auto shard = std::move(swShard.getValue());
@@ -142,59 +180,107 @@ StatusWith<std::vector<AsyncRequestsSender::Response>> gatherResponsesFromShards
// We successfully received a response.
status = getStatusFromCommandResult(response.swResponse.getValue().data);
+ LOG(2) << "Received status " << status << " and responseObj "
+ << response.swResponse.getValue() << " from shard " << response.shardId
+ << " at host " << response.shardHostAndPort->toString();
+ // Check for special errors that mean we should retry the entire operation.
+
+ // Failing to establish a consistent shardVersion means the operation should be retried
+ // on all shards.
if (ErrorCodes::isStaleShardingError(status.code())) {
- // Do not report any raw results if we fail to establish a shardVersion.
- output->resetToEmpty();
return status;
}
- auto result = response.swResponse.getValue().data;
- if (!hasWCError) {
- if ((wcErrorElem = result["writeConcernError"])) {
- wcErrorShardId = response.shardId;
- hasWCError = true;
+ // In the case a read is performed against a view, the shard primary can return an error
+ // indicating that the underlying collection may be sharded. When this occurs the return
+ // message will include an expanded view definition and collection namespace. We pass
+ // the definition back to the caller by storing it in the 'viewDefinition' parameter.
+ // This allows the caller to rewrite the request as an aggregation and retry it.
+ if (ErrorCodes::CommandOnShardedViewNotSupportedOnMongod == status) {
+ auto& responseObj = response.swResponse.getValue().data;
+ if (!responseObj.hasField("resolvedView")) {
+ status = Status(ErrorCodes::InternalError,
+ str::stream() << "Missing field 'resolvedView' in document: "
+ << responseObj);
+ return status;
+ }
+
+ auto resolvedViewObj = responseObj.getObjectField("resolvedView");
+ if (resolvedViewObj.isEmpty()) {
+ status = Status(ErrorCodes::InternalError,
+ str::stream() << "Field 'resolvedView' must be an object: "
+ << responseObj);
+ return status;
+ }
+ if (viewDefinition) {
+ *viewDefinition = BSON("resolvedView" << resolvedViewObj.getOwned());
+ }
+ return status;
+ }
+
+ if (output) {
+ if (!hasWCError) {
+ if ((wcErrorElem = response.swResponse.getValue().data["writeConcernError"])) {
+ wcErrorShardId = response.shardId;
+ hasWCError = true;
+ }
}
}
if (status.isOK()) {
// The command status was OK.
- subobj.append(shard->getConnString().toString(), result);
+ if (output) {
+ subobj.append(shard->getConnString().toString(),
+ response.swResponse.getValue().data);
+ }
responses.push_back(std::move(response));
continue;
}
}
- // Either we failed to get a response, or the command had a non-OK status.
+ // Either we failed to get a response, or the command had a non-OK status that we can store
+ // as an individual shard response.
+
+ // Save the extracted command status into the response.
+ response.swResponse = status;
- // Convert the error status back into the format of a command result.
- BSONObjBuilder resultBob;
- Command::appendCommandStatus(resultBob, status);
- auto result = resultBob.obj();
+ if (output) {
+ // Convert the error status back into the format of a command result.
+ BSONObjBuilder statusObjBob;
+ Command::appendCommandStatus(statusObjBob, status);
+ auto statusObj = statusObjBob.obj();
- // Update the data structures that store the results.
- errors.append(shard->getConnString().toString(), status.reason());
- if (commonErrCode == -1) {
- commonErrCode = status.code();
- } else if (commonErrCode != status.code()) {
- commonErrCode = 0;
+ errors.append(shard->getConnString().toString(), status.reason());
+ if (commonErrCode == -1) {
+ commonErrCode = status.code();
+ } else if (commonErrCode != status.code()) {
+ commonErrCode = 0;
+ }
+
+ subobj.append(shard->getConnString().toString(), statusObj);
}
- subobj.append(shard->getConnString().toString(), result);
- responses.push_back(response);
+ LOG(2) << "Got error " << response.swResponse.getStatus() << " from shard "
+ << response.shardId;
+ responses.push_back(std::move(response));
}
- subobj.done();
+ reset.Dismiss();
- if (hasWCError) {
- appendWriteConcernErrorToCmdResponse(wcErrorShardId, wcErrorElem, *output);
- }
+ if (output) {
+ output->append("raw", subobj.done());
- BSONObj errobj = errors.done();
- if (!errobj.isEmpty()) {
- // If code for all errors is the same, then report the common error code.
- if (commonErrCode > 0) {
- return {ErrorCodes::fromInt(commonErrCode), errobj.toString()};
+ if (hasWCError) {
+ appendWriteConcernErrorToCmdResponse(wcErrorShardId, wcErrorElem, *output);
+ }
+
+ BSONObj errobj = errors.done();
+ if (!errobj.isEmpty()) {
+ // If code for all errors is the same, then report the common error code.
+ if (commonErrCode > 0) {
+ return {ErrorCodes::fromInt(commonErrCode), errobj.toString()};
+ }
+ return {ErrorCodes::OperationFailed, errobj.toString()};
}
- return {ErrorCodes::OperationFailed, errobj.toString()};
}
return responses;
diff --git a/src/mongo/s/commands/cluster_commands_common.h b/src/mongo/s/commands/cluster_commands_common.h
index 253f9fa7fc0..7e9fbf6cb75 100644
--- a/src/mongo/s/commands/cluster_commands_common.h
+++ b/src/mongo/s/commands/cluster_commands_common.h
@@ -35,6 +35,7 @@
#include "mongo/base/string_data.h"
#include "mongo/bson/bsonobj.h"
#include "mongo/s/async_requests_sender.h"
+#include "mongo/s/chunk_version.h"
#include "mongo/s/commands/strategy.h"
#include "mongo/stdx/memory.h"
@@ -51,27 +52,52 @@ std::vector<AsyncRequestsSender::Request> buildRequestsForAllShards(OperationCon
const BSONObj& cmdObj);
/**
- * Utility function to get the set of shards to target for a request on a specific namespace.
+ * Utility function to target all shards that own data for a collection.
*
* Selects shards to target based on 'routingInfo', and constructs a vector of requests, one per
* targeted shard, where the cmdObj to send to each shard has been modified to include the shard's
* shardVersion.
*/
-std::vector<AsyncRequestsSender::Request> buildRequestsForTargetedShards(
+std::vector<AsyncRequestsSender::Request> buildRequestsForShardsThatHaveCollection(
OperationContext* opCtx, const CachedCollectionRoutingInfo& routingInfo, const BSONObj& cmdObj);
/**
- * Utility function to scatter 'requests' to shards and fold the responses into a single response.
+ * Utility function to target all shards that own chunks that match a query on a collection.
*
- * Places the raw responses from shards into a field 'raw' in 'output', and also returns the raw
- * responses as a vector so that additional aggregate logic can be applied to them.
+ * Selects shards to target based on the ChunkManager in 'routingInfo', and constructs a vector of
+ * requests, one per targeted shard, where the cmdObj to send to each shard has been modified to
+ * include the shard's shardVersion.
+ */
+std::vector<AsyncRequestsSender::Request> buildRequestsForShardsForQuery(
+ OperationContext* opCtx,
+ const CachedCollectionRoutingInfo& routingInfo,
+ const BSONObj& cmdObj,
+ const BSONObj& filter,
+ const BSONObj& collation);
+
+/**
+ * Utility function to scatter 'requests' to shards and gather the responses.
+ *
+ * Returns an error status if any shard returns a stale shardVersion error or if a shard is not
+ * found.
+ *
+ * @output: if non-null:
+ * -- places the raw responses from the shards into a field called 'raw' in 'output'
+ * -- appends the writeConcern element for the first writeConcern error encountered to 'output'
+ * -- appends an error code and message to 'output'. If all shards had the same error, the error
+ * code is the common error code, otherwise '0'
+ * -- *Warning* resets 'output' to empty if an error status is returned.
+ *
+ * @viewDefinition: if non-null and a shard returns an error saying that the command was on a view,
+ * the view definition is stored in 'viewDefinition'.
*/
StatusWith<std::vector<AsyncRequestsSender::Response>> gatherResponsesFromShards(
OperationContext* opCtx,
const std::string& dbName,
const BSONObj& cmdObj,
const std::vector<AsyncRequestsSender::Request>& requests,
- BSONObjBuilder* output);
+ BSONObjBuilder* output,
+ BSONObj* viewDefinition);
/**
* Utility function to compute a single error code from a vector of command results.
diff --git a/src/mongo/s/commands/cluster_count_cmd.cpp b/src/mongo/s/commands/cluster_count_cmd.cpp
index b92011c56e0..fa60344efda 100644
--- a/src/mongo/s/commands/cluster_count_cmd.cpp
+++ b/src/mongo/s/commands/cluster_count_cmd.cpp
@@ -36,10 +36,12 @@
#include "mongo/db/query/view_response_formatter.h"
#include "mongo/db/views/resolved_view.h"
#include "mongo/rpc/get_status_from_command_result.h"
+#include "mongo/s/catalog_cache.h"
#include "mongo/s/commands/cluster_aggregate.h"
#include "mongo/s/commands/cluster_commands_common.h"
#include "mongo/s/commands/cluster_explain.h"
#include "mongo/s/commands/strategy.h"
+#include "mongo/s/grid.h"
#include "mongo/util/timer.h"
namespace mongo {
@@ -136,12 +138,46 @@ public:
}
}
- std::vector<Strategy::CommandResult> countResult;
- Strategy::commandOp(
- opCtx, dbname, countCmdBuilder.done(), nss.ns(), filter, collation, &countResult);
+ auto countCmdObj = countCmdBuilder.done();
+
+ int numAttempts = 0;
+ StatusWith<std::vector<AsyncRequestsSender::Response>> swResponses(
+ (std::vector<AsyncRequestsSender::Response>()));
+ BSONObj viewDefinition;
+ do {
+ auto routingInfoStatus =
+ Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss);
+ if (ErrorCodes::NamespaceNotFound == routingInfoStatus) {
+ // If there's no collection with this name, the count aggregation behavior below
+ // will produce a total count of 0.
+ break;
+ }
+ auto routingInfo = uassertStatusOK(routingInfoStatus);
+
+ auto requests =
+ buildRequestsForShardsForQuery(opCtx, routingInfo, countCmdObj, filter, collation);
+
+ swResponses = gatherResponsesFromShards(
+ opCtx, dbname, countCmdObj, requests, nullptr, &viewDefinition);
+
+ if (ErrorCodes::isStaleShardingError(swResponses.getStatus().code())) {
+ Grid::get(opCtx)->catalogCache()->onStaleConfigError(std::move(routingInfo));
+ }
+
+ ++numAttempts;
+ } while (numAttempts < kMaxNumStaleVersionRetries && !swResponses.getStatus().isOK());
+
+ if (ErrorCodes::CommandOnShardedViewNotSupportedOnMongod == swResponses.getStatus()) {
+ if (viewDefinition.isEmpty()) {
+ return appendCommandStatus(
+ result,
+ {ErrorCodes::InternalError,
+ str::stream() << "Missing resolved view definition, but remote returned "
+ << ErrorCodes::errorString(swResponses.getStatus().code())});
+ }
+
+ // Rewrite the count command as an aggregation.
- if (countResult.size() == 1 &&
- ResolvedView::isResolvedViewErrorResponse(countResult[0].result)) {
auto countRequest = CountRequest::parseFromBSON(dbname, cmdObj, false);
if (!countRequest.isOK()) {
return appendCommandStatus(result, countRequest.getStatus());
@@ -157,7 +193,7 @@ public:
return appendCommandStatus(result, aggRequestOnView.getStatus());
}
- auto resolvedView = ResolvedView::fromBSON(countResult[0].result);
+ auto resolvedView = ResolvedView::fromBSON(viewDefinition);
auto resolvedAggRequest =
resolvedView.asExpandedViewAggregation(aggRequestOnView.getValue());
auto resolvedAggCmd = resolvedAggRequest.serializeToCommandObj().toBson();
@@ -176,30 +212,27 @@ public:
return true;
}
+ uassertStatusOK(swResponses.getStatus());
+ auto responses = std::move(swResponses.getValue());
+
long long total = 0;
BSONObjBuilder shardSubTotal(result.subobjStart("shards"));
- for (const auto& resultEntry : countResult) {
- const ShardId& shardName = resultEntry.shardTargetId;
- const auto resultBSON = resultEntry.result;
-
- if (resultBSON["ok"].trueValue()) {
- long long shardCount = resultBSON["n"].numberLong();
-
- shardSubTotal.appendNumber(shardName.toString(), shardCount);
- total += shardCount;
- } else {
+ for (const auto& response : responses) {
+ if (!response.swResponse.isOK()) {
shardSubTotal.doneFast();
// Add error context so that you can see on which shard failed as well as details
// about that error.
- auto shardError = getStatusFromCommandResult(resultBSON);
auto errorWithContext =
- Status(shardError.code(),
- str::stream() << "failed on: " << shardName.toString()
- << causedBy(shardError.reason()));
+ Status(response.swResponse.getStatus().code(),
+ str::stream() << "failed on: " << response.shardId
+ << causedBy(response.swResponse.getStatus().reason()));
return appendCommandStatus(result, errorWithContext);
}
+ long long shardCount = response.swResponse.getValue().data["n"].numberLong();
+ shardSubTotal.appendNumber(response.shardId.toString(), shardCount);
+ total += shardCount;
}
shardSubTotal.doneFast();
diff --git a/src/mongo/s/commands/cluster_current_op.cpp b/src/mongo/s/commands/cluster_current_op.cpp
index 14f4dc5e46d..471a37134ed 100644
--- a/src/mongo/s/commands/cluster_current_op.cpp
+++ b/src/mongo/s/commands/cluster_current_op.cpp
@@ -86,7 +86,8 @@ public:
std::string& errmsg,
BSONObjBuilder& output) override {
auto requests = buildRequestsForAllShards(opCtx, cmdObj);
- auto swResponses = gatherResponsesFromShards(opCtx, dbName, cmdObj, requests, &output);
+ auto swResponses =
+ gatherResponsesFromShards(opCtx, dbName, cmdObj, requests, &output, nullptr);
if (!swResponses.isOK()) {
// We failed to obtain a response or error from all shards.
return appendCommandStatus(output, swResponses.getStatus());
diff --git a/src/mongo/s/commands/cluster_db_stats_cmd.cpp b/src/mongo/s/commands/cluster_db_stats_cmd.cpp
index cb7877d6cf8..1dc1e87392e 100644
--- a/src/mongo/s/commands/cluster_db_stats_cmd.cpp
+++ b/src/mongo/s/commands/cluster_db_stats_cmd.cpp
@@ -69,7 +69,8 @@ public:
std::string& errmsg,
BSONObjBuilder& output) override {
auto requests = buildRequestsForAllShards(opCtx, cmdObj);
- auto swResponses = gatherResponsesFromShards(opCtx, dbName, cmdObj, requests, &output);
+ auto swResponses =
+ gatherResponsesFromShards(opCtx, dbName, cmdObj, requests, &output, nullptr);
if (!swResponses.isOK()) {
// We failed to obtain a response or error from all shards.
return appendCommandStatus(output, swResponses.getStatus());
diff --git a/src/mongo/s/commands/cluster_repair_database_cmd.cpp b/src/mongo/s/commands/cluster_repair_database_cmd.cpp
index 55fa0e79565..82facf374e5 100644
--- a/src/mongo/s/commands/cluster_repair_database_cmd.cpp
+++ b/src/mongo/s/commands/cluster_repair_database_cmd.cpp
@@ -65,7 +65,8 @@ public:
std::string& errmsg,
BSONObjBuilder& output) override {
auto requests = buildRequestsForAllShards(opCtx, cmdObj);
- auto swResponses = gatherResponsesFromShards(opCtx, dbName, cmdObj, requests, &output);
+ auto swResponses =
+ gatherResponsesFromShards(opCtx, dbName, cmdObj, requests, &output, nullptr);
return appendCommandStatus(output, swResponses.getStatus());
}
diff --git a/src/mongo/s/commands/commands_public.cpp b/src/mongo/s/commands/commands_public.cpp
index 33836e9b997..ce1c164f8e0 100644
--- a/src/mongo/s/commands/commands_public.cpp
+++ b/src/mongo/s/commands/commands_public.cpp
@@ -203,13 +203,6 @@ protected:
return false;
}
- BSONObj appendShardVersion(BSONObj cmdObj, ChunkVersion version) {
- BSONObjBuilder cmdWithVersionBob;
- cmdWithVersionBob.appendElements(cmdObj);
- version.appendForCommands(&cmdWithVersionBob);
- return cmdWithVersionBob.obj();
- }
-
bool run(OperationContext* opCtx,
const string& dbName,
BSONObj& cmdObj,
@@ -222,25 +215,22 @@ protected:
uassertStatusOK(createShardDatabase(opCtx, dbName));
}
- Status status = Status::OK();
int numAttempts = 0;
- while (numAttempts < kMaxNumStaleVersionRetries) {
- status = Status::OK();
- output.resetToEmpty();
-
+ Status status = Status::OK();
+ do {
auto routingInfo = uassertStatusOK(
Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss));
- auto requests = buildRequestsForTargetedShards(opCtx, routingInfo, cmdObj);
+ auto requests = buildRequestsForShardsThatHaveCollection(opCtx, routingInfo, cmdObj);
- auto swResponses = gatherResponsesFromShards(opCtx, dbName, cmdObj, requests, &output);
- if (ErrorCodes::isStaleShardingError(swResponses.getStatus().code())) {
+ status = gatherResponsesFromShards(opCtx, dbName, cmdObj, requests, &output, nullptr)
+ .getStatus();
+
+ if (ErrorCodes::isStaleShardingError(status.code())) {
Grid::get(opCtx)->catalogCache()->onStaleConfigError(std::move(routingInfo));
- ++numAttempts;
- continue;
}
- status = swResponses.getStatus();
- break;
- }
+
+ ++numAttempts;
+ } while (numAttempts < kMaxNumStaleVersionRetries && !status.isOK());
// We don't uassertStatusOK(), because that causes 'output' to be cleared, but we want to
// report the raw results even if there was an error.