/**
* Copyright (C) 2013 MongoDB Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding
#include "mongo/platform/basic.h"
#include "mongo/base/error_codes.h"
#include "mongo/base/owned_pointer_vector.h"
#include "mongo/client/remote_command_targeter.h"
#include "mongo/db/commands.h"
#include "mongo/db/commands/write_commands/write_commands_common.h"
#include "mongo/db/lasterror.h"
#include "mongo/db/stats/counters.h"
#include "mongo/executor/task_executor_pool.h"
#include "mongo/s/async_requests_sender.h"
#include "mongo/s/client/shard_registry.h"
#include "mongo/s/cluster_last_error_info.h"
#include "mongo/s/commands/chunk_manager_targeter.h"
#include "mongo/s/commands/cluster_explain.h"
#include "mongo/s/commands/cluster_write.h"
#include "mongo/s/grid.h"
#include "mongo/s/write_ops/batched_command_request.h"
#include "mongo/s/write_ops/batched_command_response.h"
#include "mongo/util/log.h"
#include "mongo/util/timer.h"
namespace mongo {
namespace {
void batchErrorToLastError(const BatchedCommandRequest& request,
const BatchedCommandResponse& response,
LastError* error) {
error->reset();
std::unique_ptr commandError;
WriteErrorDetail* lastBatchError = NULL;
if (!response.getOk()) {
// Command-level error, all writes failed
commandError.reset(new WriteErrorDetail);
commandError->setErrCode(response.getErrCode());
commandError->setErrMessage(response.getErrMessage());
lastBatchError = commandError.get();
} else if (response.isErrDetailsSet()) {
// The last error in the batch is always reported - this matches expected COE semantics for
// insert batches. For updates and deletes, error is only reported if the error was on the
// last item.
const bool lastOpErrored = response.getErrDetails().back()->getIndex() ==
static_cast(request.sizeWriteOps() - 1);
if (request.getBatchType() == BatchedCommandRequest::BatchType_Insert || lastOpErrored) {
lastBatchError = response.getErrDetails().back();
}
} else {
// We don't care about write concern errors, these happen in legacy mode in GLE.
}
// Record an error if one exists
if (lastBatchError) {
const auto& errMsg = lastBatchError->getErrMessage();
error->setLastError(lastBatchError->getErrCode(),
errMsg.empty() ? "see code for details" : errMsg);
return;
}
// Record write stats otherwise
//
// NOTE: For multi-write batches, our semantics change a little because we don't have
// un-aggregated "n" stats
if (request.getBatchType() == BatchedCommandRequest::BatchType_Update) {
BSONObj upsertedId;
if (response.isUpsertDetailsSet()) {
// Only report the very last item's upserted id if applicable
if (response.getUpsertDetails().back()->getIndex() + 1 ==
static_cast(request.sizeWriteOps())) {
upsertedId = response.getUpsertDetails().back()->getUpsertedID();
}
}
const int numUpserted = response.isUpsertDetailsSet() ? response.sizeUpsertDetails() : 0;
const int numMatched = response.getN() - numUpserted;
invariant(numMatched >= 0);
// Wrap upserted id in "upserted" field
BSONObj leUpsertedId;
if (!upsertedId.isEmpty()) {
leUpsertedId = upsertedId.firstElement().wrap(kUpsertedFieldName);
}
error->recordUpdate(numMatched > 0, response.getN(), leUpsertedId);
} else if (request.getBatchType() == BatchedCommandRequest::BatchType_Delete) {
error->recordDelete(response.getN());
}
}
BatchedCommandRequest parseRequest(BatchedCommandRequest::BatchType type,
const OpMsgRequest& request) {
switch (type) {
case BatchedCommandRequest::BatchType_Insert:
return BatchedCommandRequest::cloneInsertWithIds(
BatchedCommandRequest::parseInsert(request));
case BatchedCommandRequest::BatchType_Update:
return BatchedCommandRequest::parseUpdate(request);
case BatchedCommandRequest::BatchType_Delete:
return BatchedCommandRequest::parseDelete(request);
}
MONGO_UNREACHABLE;
}
/**
* Base class for mongos write commands.
*/
class ClusterWriteCmd : public Command {
public:
virtual ~ClusterWriteCmd() {}
bool slaveOk() const final {
return false;
}
bool supportsWriteConcern(const BSONObj& cmd) const final {
return true;
}
Status checkAuthForRequest(OperationContext* opCtx, const OpMsgRequest& request) final {
Status status = auth::checkAuthForWriteCommand(
AuthorizationSession::get(opCtx->getClient()), _writeType, request);
// TODO: Remove this when we standardize GLE reporting from commands
if (!status.isOK()) {
LastError::get(opCtx->getClient()).setLastError(status.code(), status.reason());
}
return status;
}
Status explain(OperationContext* opCtx,
const std::string& dbname,
const BSONObj& cmdObj,
ExplainOptions::Verbosity verbosity,
BSONObjBuilder* out) const final {
OpMsgRequest request;
request.body = cmdObj;
invariant(request.getDatabase() == dbname); // Ensured by explain command's run() method.
const auto batchedRequest(parseRequest(_writeType, request));
// We can only explain write batches of size 1.
if (batchedRequest.sizeWriteOps() != 1U) {
return Status(ErrorCodes::InvalidLength, "explained write batches must be of size 1");
}
const auto explainCmd = ClusterExplain::wrapAsExplain(cmdObj, verbosity);
// We will time how long it takes to run the commands on the shards.
Timer timer;
// Target the command to the shards based on the singleton batch item.
BatchItemRef targetingBatchItem(&batchedRequest, 0);
std::vector shardResults;
Status status =
_commandOpWrite(opCtx, dbname, explainCmd, targetingBatchItem, &shardResults);
if (!status.isOK()) {
return status;
}
return ClusterExplain::buildExplainResult(
opCtx, shardResults, ClusterExplain::kWriteOnShards, timer.millis(), out);
}
bool enhancedRun(OperationContext* opCtx,
const OpMsgRequest& request,
BSONObjBuilder& result) final {
const auto batchedRequest(parseRequest(_writeType, request));
BatchWriteExecStats stats;
BatchedCommandResponse response;
ClusterWriter::write(opCtx, batchedRequest, &stats, &response);
// Populate the lastError object based on the write response
batchErrorToLastError(batchedRequest, response, &LastError::get(opCtx->getClient()));
size_t numAttempts;
if (!response.getOk()) {
numAttempts = 0;
} else if (batchedRequest.getWriteCommandBase().getOrdered() &&
response.isErrDetailsSet()) {
// Add one failed attempt
numAttempts = response.getErrDetailsAt(0)->getIndex() + 1;
} else {
numAttempts = batchedRequest.sizeWriteOps();
}
// TODO: increase opcounters by more than one
if (_writeType == BatchedCommandRequest::BatchType_Insert) {
for (size_t i = 0; i < numAttempts; ++i) {
globalOpCounters.gotInsert();
}
} else if (_writeType == BatchedCommandRequest::BatchType_Update) {
for (size_t i = 0; i < numAttempts; ++i) {
globalOpCounters.gotUpdate();
}
} else if (_writeType == BatchedCommandRequest::BatchType_Delete) {
for (size_t i = 0; i < numAttempts; ++i) {
globalOpCounters.gotDelete();
}
}
// Save the last opTimes written on each shard for this client, to allow GLE to work
ClusterLastErrorInfo::get(opCtx->getClient())->addHostOpTimes(stats.getWriteOpTimes());
result.appendElements(response.toBSON());
return response.getOk();
}
protected:
/**
* Instantiates a command that can be invoked by "name", which will be capable of issuing
* write batches of type "writeType", and will require privilege "action" to run.
*/
ClusterWriteCmd(StringData name, BatchedCommandRequest::BatchType writeType)
: Command(name), _writeType(writeType) {}
private:
// Type of batch (e.g. insert, update).
const BatchedCommandRequest::BatchType _writeType;
/**
* Executes a write command against a particular database, and targets the command based on
* a write operation.
*
* Does *not* retry or retarget if the metadata is stale.
*/
static Status _commandOpWrite(OperationContext* opCtx,
const std::string& dbName,
const BSONObj& command,
BatchItemRef targetingBatchItem,
std::vector* results) {
// Note that this implementation will not handle targeting retries and does not completely
// emulate write behavior
TargeterStats stats;
ChunkManagerTargeter targeter(targetingBatchItem.getRequest()->getTargetingNS(), &stats);
Status status = targeter.init(opCtx);
if (!status.isOK())
return status;
std::vector> endpoints;
if (targetingBatchItem.getOpType() == BatchedCommandRequest::BatchType_Insert) {
ShardEndpoint* endpoint;
Status status =
targeter.targetInsert(opCtx, targetingBatchItem.getDocument(), &endpoint);
if (!status.isOK())
return status;
endpoints.push_back(std::unique_ptr{endpoint});
} else if (targetingBatchItem.getOpType() == BatchedCommandRequest::BatchType_Update) {
Status status =
targeter.targetUpdate(opCtx, targetingBatchItem.getUpdate(), &endpoints);
if (!status.isOK())
return status;
} else {
invariant(targetingBatchItem.getOpType() == BatchedCommandRequest::BatchType_Delete);
Status status =
targeter.targetDelete(opCtx, targetingBatchItem.getDelete(), &endpoints);
if (!status.isOK())
return status;
}
auto shardRegistry = Grid::get(opCtx)->shardRegistry();
// Assemble requests
std::vector requests;
for (auto it = endpoints.begin(); it != endpoints.end(); ++it) {
const ShardEndpoint* endpoint = it->get();
auto shardStatus = shardRegistry->getShard(opCtx, endpoint->shardName);
if (!shardStatus.isOK()) {
return shardStatus.getStatus();
}
requests.emplace_back(shardStatus.getValue()->getId(), command);
}
// Send the requests.
const ReadPreferenceSetting readPref(ReadPreference::PrimaryOnly, TagSet());
AsyncRequestsSender ars(opCtx,
Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(),
dbName,
requests,
readPref,
Shard::RetryPolicy::kNoRetry);
// Receive the responses.
Status dispatchStatus = Status::OK();
while (!ars.done()) {
// Block until a response is available.
auto response = ars.next();
if (!response.swResponse.isOK()) {
dispatchStatus = std::move(response.swResponse.getStatus());
break;
}
Strategy::CommandResult result;
// If the response status was OK, the response must contain which host was targeted.
invariant(response.shardHostAndPort);
result.target = ConnectionString(std::move(*response.shardHostAndPort));
result.shardTargetId = std::move(response.shardId);
result.result = std::move(response.swResponse.getValue().data);
results->push_back(result);
}
return dispatchStatus;
}
};
class ClusterCmdInsert : public ClusterWriteCmd {
public:
ClusterCmdInsert() : ClusterWriteCmd("insert", BatchedCommandRequest::BatchType_Insert) {}
void help(std::stringstream& help) const {
help << "insert documents";
}
} clusterInsertCmd;
class ClusterCmdUpdate : public ClusterWriteCmd {
public:
ClusterCmdUpdate() : ClusterWriteCmd("update", BatchedCommandRequest::BatchType_Update) {}
void help(std::stringstream& help) const {
help << "update documents";
}
} clusterUpdateCmd;
class ClusterCmdDelete : public ClusterWriteCmd {
public:
ClusterCmdDelete() : ClusterWriteCmd("delete", BatchedCommandRequest::BatchType_Delete) {}
void help(std::stringstream& help) const {
help << "delete documents";
}
} clusterDeleteCmd;
} // namespace
} // namespace mongo