/**
* Copyright (C) 2016 MongoDB Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects
* for all of the code used other than as permitted herein. If you modify
* file(s) with this exception, you may extend this exception to your
* version of the file(s), but you are not obligated to do so. If you do not
* wish to do so, delete this exception statement from your version. If you
* delete this exception statement from all source files in the program,
* then also delete it in the license file.
*/
#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding
#include "mongo/platform/basic.h"
#include "mongo/client/remote_command_retry_scheduler.h"
#include "mongo/db/operation_context.h"
#include "mongo/s/client/shard.h"
#include "mongo/s/client/shard_registry.h"
#include "mongo/s/write_ops/batched_command_request.h"
#include "mongo/s/write_ops/batched_command_response.h"
#include "mongo/util/log.h"
namespace mongo {
using std::string;
namespace {
const int kOnErrorNumRetries = 3;
} // namespace
Status Shard::CommandResponse::getEffectiveStatus(
const StatusWith& swResponse) {
// Check if the request even reached the shard.
if (!swResponse.isOK()) {
return swResponse.getStatus();
}
auto& response = swResponse.getValue();
// If the request reached the shard, check if the command failed.
if (!response.commandStatus.isOK()) {
return response.commandStatus;
}
// Finally check if the write concern failed.
if (!response.writeConcernStatus.isOK()) {
return response.writeConcernStatus;
}
return Status::OK();
}
Status Shard::CommandResponse::processBatchWriteResponse(
StatusWith swResponse, BatchedCommandResponse* batchResponse) {
auto status = getEffectiveStatus(swResponse);
if (status.isOK()) {
string errmsg;
if (!batchResponse->parseBSON(swResponse.getValue().response, &errmsg)) {
status = Status(ErrorCodes::FailedToParse,
str::stream() << "Failed to parse write response: " << errmsg);
} else {
status = batchResponse->toStatus();
}
}
if (!status.isOK()) {
batchResponse->clear();
batchResponse->setStatus(status);
}
return status;
}
const Milliseconds Shard::kDefaultConfigCommandTimeout = Seconds{30};
bool Shard::shouldErrorBePropagated(ErrorCodes::Error code) {
return std::find(RemoteCommandRetryScheduler::kAllRetriableErrors.begin(),
RemoteCommandRetryScheduler::kAllRetriableErrors.end(),
code) == RemoteCommandRetryScheduler::kAllRetriableErrors.end() &&
code != ErrorCodes::NetworkInterfaceExceededTimeLimit;
}
Shard::Shard(const ShardId& id) : _id(id) {}
bool Shard::isConfig() const {
return _id == "config";
}
StatusWith Shard::runCommand(OperationContext* opCtx,
const ReadPreferenceSetting& readPref,
const std::string& dbName,
const BSONObj& cmdObj,
RetryPolicy retryPolicy) {
return runCommand(opCtx, readPref, dbName, cmdObj, Milliseconds::max(), retryPolicy);
}
StatusWith Shard::runCommand(OperationContext* opCtx,
const ReadPreferenceSetting& readPref,
const std::string& dbName,
const BSONObj& cmdObj,
Milliseconds maxTimeMSOverride,
RetryPolicy retryPolicy) {
while (true) {
auto interruptStatus = opCtx->checkForInterruptNoAssert();
if (!interruptStatus.isOK()) {
return interruptStatus;
}
auto swResponse = _runCommand(opCtx, readPref, dbName, maxTimeMSOverride, cmdObj);
auto status = CommandResponse::getEffectiveStatus(swResponse);
if (isRetriableError(status.code(), retryPolicy)) {
LOG(2) << "Command " << redact(cmdObj)
<< " failed with retriable error and will be retried"
<< causedBy(redact(status));
continue;
}
return swResponse;
}
MONGO_UNREACHABLE;
}
StatusWith Shard::runCommandWithFixedRetryAttempts(
OperationContext* opCtx,
const ReadPreferenceSetting& readPref,
const std::string& dbName,
const BSONObj& cmdObj,
RetryPolicy retryPolicy) {
return runCommandWithFixedRetryAttempts(
opCtx, readPref, dbName, cmdObj, Milliseconds::max(), retryPolicy);
}
StatusWith Shard::runCommandWithFixedRetryAttempts(
OperationContext* opCtx,
const ReadPreferenceSetting& readPref,
const std::string& dbName,
const BSONObj& cmdObj,
Milliseconds maxTimeMSOverride,
RetryPolicy retryPolicy) {
for (int retry = 1; retry <= kOnErrorNumRetries; ++retry) {
auto interruptStatus = opCtx->checkForInterruptNoAssert();
if (!interruptStatus.isOK()) {
return interruptStatus;
}
auto swResponse = _runCommand(opCtx, readPref, dbName, maxTimeMSOverride, cmdObj);
auto status = CommandResponse::getEffectiveStatus(swResponse);
if (retry < kOnErrorNumRetries && isRetriableError(status.code(), retryPolicy)) {
LOG(2) << "Command " << redact(cmdObj)
<< " failed with retriable error and will be retried"
<< causedBy(redact(status));
continue;
}
return swResponse;
}
MONGO_UNREACHABLE;
}
StatusWith Shard::runExhaustiveCursorCommand(
OperationContext* opCtx,
const ReadPreferenceSetting& readPref,
const std::string& dbName,
const BSONObj& cmdObj,
Milliseconds maxTimeMSOverride) {
for (int retry = 1; retry <= kOnErrorNumRetries; retry++) {
auto result =
_runExhaustiveCursorCommand(opCtx, readPref, dbName, maxTimeMSOverride, cmdObj);
if (retry < kOnErrorNumRetries &&
isRetriableError(result.getStatus().code(), RetryPolicy::kIdempotent)) {
continue;
}
return result;
}
MONGO_UNREACHABLE;
}
BatchedCommandResponse Shard::runBatchWriteCommand(OperationContext* opCtx,
const Milliseconds maxTimeMS,
const BatchedCommandRequest& batchRequest,
RetryPolicy retryPolicy) {
const std::string dbname = batchRequest.getNS().db().toString();
const BSONObj cmdObj = batchRequest.toBSON();
for (int retry = 1; retry <= kOnErrorNumRetries; ++retry) {
// Note: write commands can only be issued against a primary.
auto swResponse = _runCommand(
opCtx, ReadPreferenceSetting{ReadPreference::PrimaryOnly}, dbname, maxTimeMS, cmdObj);
BatchedCommandResponse batchResponse;
auto writeStatus = CommandResponse::processBatchWriteResponse(swResponse, &batchResponse);
if (retry < kOnErrorNumRetries && isRetriableError(writeStatus.code(), retryPolicy)) {
LOG(2) << "Batch write command to " << getId()
<< " failed with retriable error and will be retried"
<< causedBy(redact(writeStatus));
continue;
}
return batchResponse;
}
MONGO_UNREACHABLE;
}
StatusWith Shard::exhaustiveFindOnConfig(
OperationContext* opCtx,
const ReadPreferenceSetting& readPref,
const repl::ReadConcernLevel& readConcernLevel,
const NamespaceString& nss,
const BSONObj& query,
const BSONObj& sort,
const boost::optional limit) {
// Do not allow exhaustive finds to be run against regular shards.
invariant(isConfig());
for (int retry = 1; retry <= kOnErrorNumRetries; retry++) {
auto result =
_exhaustiveFindOnConfig(opCtx, readPref, readConcernLevel, nss, query, sort, limit);
if (retry < kOnErrorNumRetries &&
isRetriableError(result.getStatus().code(), RetryPolicy::kIdempotent)) {
continue;
}
return result;
}
MONGO_UNREACHABLE;
}
} // namespace mongo