diff options
Diffstat (limited to 'src/mongo/s/cluster_commands_helpers.cpp')
-rw-r--r-- | src/mongo/s/cluster_commands_helpers.cpp | 153 |
1 files changed, 71 insertions, 82 deletions
diff --git a/src/mongo/s/cluster_commands_helpers.cpp b/src/mongo/s/cluster_commands_helpers.cpp index 5c498273192..981f33669ef 100644 --- a/src/mongo/s/cluster_commands_helpers.cpp +++ b/src/mongo/s/cluster_commands_helpers.cpp @@ -342,111 +342,100 @@ AsyncRequestsSender::Response executeCommandAgainstDatabasePrimary( bool appendRawResponses(OperationContext* opCtx, std::string* errmsg, BSONObjBuilder* output, - std::vector<AsyncRequestsSender::Response> shardResponses, - std::set<ErrorCodes::Error> ignoredErrors) { - // Always include ShardNotFound as an ignored error, since this node may not have realized a + const std::vector<AsyncRequestsSender::Response>& shardResponses, + std::set<ErrorCodes::Error> ignorableErrors) { + // Always include ShardNotFound as an ignorable error, since this node may not have realized a // shard has been removed. - ignoredErrors.insert(ErrorCodes::ShardNotFound); + ignorableErrors.insert(ErrorCodes::ShardNotFound); - BSONObjBuilder subobj; // Stores raw responses by ConnectionString + std::vector<std::pair<ShardId, BSONObj>> successResponsesReceived; + std::vector<std::pair<ShardId, Status>> ignorableErrorsReceived; + std::vector<std::pair<ShardId, Status>> nonIgnorableErrorsReceived; - // Stores all errors; we will remove ignoredErrors later if some shard returned success. - std::vector<std::pair<std::string, Status>> errors; // Stores errors by ConnectionString + boost::optional<std::pair<ShardId, BSONElement>> firstWriteConcernErrorReceived; - BSONElement wcErrorElem; // Stores the first writeConcern error we encounter - ShardId wcErrorShardId; // Stores the shardId for the first writeConcern error we encounter - bool hasWCError = false; // Whether we have encountered a writeConcern error yet + const auto processError = [&](const ShardId& shardId, const Status& status) { + invariant(!status.isOK()); + if (ignorableErrors.find(status.code()) != ignorableErrors.end()) { + ignorableErrorsReceived.emplace_back(std::move(shardId), std::move(status)); + return; + } + nonIgnorableErrorsReceived.emplace_back(shardId, status); + }; + // Do a pass through all the received responses and group them into success, ignorable, and + // non-ignorable. for (const auto& shardResponse : shardResponses) { - // Get the Shard object in order to get the shard's ConnectionString. - const auto swShard = - Grid::get(opCtx)->shardRegistry()->getShard(opCtx, shardResponse.shardId); - if (ErrorCodes::ShardNotFound == swShard.getStatus().code()) { - // Store the error by ShardId, since we cannot know the shard connection string, and it - // is only used for reporting. - errors.push_back(std::make_pair(shardResponse.shardId.toString(), swShard.getStatus())); - continue; - } - const auto shard = uassertStatusOK(swShard); - const auto shardConnStr = shard->getConnString().toString(); + const auto& shardId = shardResponse.shardId; - Status sendStatus = shardResponse.swResponse.getStatus(); + const auto sendStatus = shardResponse.swResponse.getStatus(); if (!sendStatus.isOK()) { - // Convert the error status back into the form of a command result and append it as the - // raw response. - BSONObjBuilder statusObjBob; - CommandHelpers::appendCommandStatusNoThrow(statusObjBob, sendStatus); - subobj.append(shardConnStr, statusObjBob.obj()); - - errors.push_back(std::make_pair(shardConnStr, sendStatus)); + processError(shardId, sendStatus); continue; } - // Got a response from the shard. - - auto& resObj = shardResponse.swResponse.getValue().data; - - // Append the shard's raw response. - subobj.append(shardConnStr, CommandHelpers::filterCommandReplyForPassthrough(resObj)); - - auto commandStatus = getStatusFromCommandResult(resObj); + const auto& resObj = shardResponse.swResponse.getValue().data; + const auto commandStatus = getStatusFromCommandResult(resObj); if (!commandStatus.isOK()) { - errors.push_back(std::make_pair(shardConnStr, std::move(commandStatus))); + processError(shardId, commandStatus); + continue; } - // Report the first writeConcern error we see. - if (!hasWCError && (wcErrorElem = resObj["writeConcernError"])) { - wcErrorShardId = shardResponse.shardId; - hasWCError = true; + if (!firstWriteConcernErrorReceived && resObj["writeConcernError"]) { + firstWriteConcernErrorReceived.emplace(shardId, resObj["writeConcernError"]); } - } - output->append("raw", subobj.done()); - - if (hasWCError) { - appendWriteConcernErrorToCmdResponse(wcErrorShardId, wcErrorElem, *output); + successResponsesReceived.emplace_back(shardId, resObj); } - // If any shard returned success, filter out ignored errors - bool someShardReturnedOK = (errors.size() != shardResponses.size()); + // If all shards reported ignorable errors, promote them all to non-ignorable errors. + if (ignorableErrorsReceived.size() == shardResponses.size()) { + invariant(nonIgnorableErrorsReceived.empty()); + nonIgnorableErrorsReceived = std::move(ignorableErrorsReceived); + } - BSONObjBuilder errorBob; - int commonErrCode = -1; - auto it = errors.begin(); - while (it != errors.end()) { - if (someShardReturnedOK && ignoredErrors.find(it->second.code()) != ignoredErrors.end()) { - // Ignore the error. - it = errors.erase(it); - } else { - errorBob.append(it->first, it->second.reason()); - if (commonErrCode == -1) { - commonErrCode = it->second.code(); - } else if (commonErrCode != it->second.code()) { - commonErrCode = 0; - } - ++it; + // Append a 'raw' field containing the success responses and non-ignorable error responses. + BSONObjBuilder rawShardResponses; + const auto appendRawResponse = [&](const ShardId& shardId, const BSONObj& response) { + // Try to report the response by the shard's full connection string. + try { + const auto shardConnString = + uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(opCtx, shardId)) + ->getConnString(); + rawShardResponses.append(shardConnString.toString(), + CommandHelpers::filterCommandReplyForPassthrough(response)); + } catch (const ExceptionFor<ErrorCodes::ShardNotFound>&) { + // If we could not get the shard's connection string, fall back to reporting the + // response by shard id. + rawShardResponses.append(shardId, response); } + }; + for (const auto& success : successResponsesReceived) { + appendRawResponse(success.first, success.second); } - BSONObj errobj = errorBob.obj(); - - if (!errobj.isEmpty()) { - *errmsg = errobj.toString(); - - // If every error has a code, and the code for all errors is the same, then add - // a top-level field "code" with this value to the output object. - if (commonErrCode > 0) { - output->append("code", commonErrCode); - output->append("codeName", ErrorCodes::errorString(ErrorCodes::Error(commonErrCode))); - if (errors.size() == 1) { - // Only propagate extra info if there was a single error object. - if (auto extraInfo = errors.begin()->second.extraInfo()) { - extraInfo->serialize(output); - } - } + for (const auto& error : nonIgnorableErrorsReceived) { + BSONObjBuilder statusObjBob; + CommandHelpers::appendCommandStatusNoThrow(statusObjBob, error.second); + appendRawResponse(error.first, statusObjBob.obj()); + } + output->append("raw", rawShardResponses.done()); + + // If there were no non-ignorable errors, report success (possibly with a writeConcern error). + if (nonIgnorableErrorsReceived.empty()) { + if (firstWriteConcernErrorReceived) { + appendWriteConcernErrorToCmdResponse(firstWriteConcernErrorReceived->first, + firstWriteConcernErrorReceived->second, + *output); } - return false; + return true; } - return true; + + // There was a non-ignorable error. Choose the first non-ignorable error as the top-level error. + const auto& firstNonIgnorableError = nonIgnorableErrorsReceived.front().second; + output->append("code", firstNonIgnorableError.code()); + output->append("codeName", ErrorCodes::errorString(firstNonIgnorableError.code())); + *errmsg = firstNonIgnorableError.reason(); + return false; } BSONObj extractQuery(const BSONObj& cmdObj) { |