/** * Copyright (C) 2013 MongoDB Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #include "mongo/platform/basic.h" #include "mongo/base/error_codes.h" #include "mongo/base/owned_pointer_vector.h" #include "mongo/client/remote_command_targeter.h" #include "mongo/db/client.h" #include "mongo/db/client.h" #include "mongo/db/commands.h" #include "mongo/db/commands/write_commands/write_commands_common.h" #include "mongo/db/lasterror.h" #include "mongo/db/stats/counters.h" #include "mongo/db/stats/counters.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/cluster_last_error_info.h" #include "mongo/s/commands/chunk_manager_targeter.h" #include "mongo/s/commands/cluster_explain.h" #include "mongo/s/commands/cluster_write.h" #include "mongo/s/commands/dbclient_multi_command.h" #include "mongo/s/grid.h" #include "mongo/s/write_ops/batch_upconvert.h" #include "mongo/s/write_ops/batched_command_request.h" #include "mongo/s/write_ops/batched_command_response.h" #include "mongo/util/timer.h" namespace mongo { using std::string; using std::stringstream; using std::vector; namespace { /** * Base class for mongos write commands. Cluster write commands support batch writes and write * concern, and return per-item error information. All cluster write commands use the entry * point ClusterWriteCmd::run(). * * Batch execution (targeting and dispatching) is performed by the BatchWriteExec class. */ class ClusterWriteCmd : public Command { public: virtual ~ClusterWriteCmd() {} virtual bool slaveOk() const { return false; } virtual bool supportsWriteConcern(const BSONObj& cmd) const override { return true; } virtual Status checkAuthForCommand(Client* client, const std::string& dbname, const BSONObj& cmdObj) { Status status = auth::checkAuthForWriteCommand(AuthorizationSession::get(client), _writeType, NamespaceString(parseNs(dbname, cmdObj)), cmdObj); // TODO: Remove this when we standardize GLE reporting from commands if (!status.isOK()) { LastError::get(client).setLastError(status.code(), status.reason()); } return status; } virtual Status explain(OperationContext* txn, const std::string& dbname, const BSONObj& cmdObj, ExplainCommon::Verbosity verbosity, const rpc::ServerSelectionMetadata& serverSelectionMetadata, BSONObjBuilder* out) const { BatchedCommandRequest request(_writeType); string errMsg; if (!request.parseBSON(dbname, cmdObj, &errMsg) || !request.isValid(&errMsg)) { return Status(ErrorCodes::FailedToParse, errMsg); } // We can only explain write batches of size 1. if (request.sizeWriteOps() != 1U) { return Status(ErrorCodes::InvalidLength, "explained write batches must be of size 1"); } BSONObjBuilder explainCmdBob; int options = 0; ClusterExplain::wrapAsExplain( cmdObj, verbosity, serverSelectionMetadata, &explainCmdBob, &options); // We will time how long it takes to run the commands on the shards. Timer timer; // Target the command to the shards based on the singleton batch item. BatchItemRef targetingBatchItem(&request, 0); vector shardResults; Status status = _commandOpWrite(txn, dbname, explainCmdBob.obj(), targetingBatchItem, &shardResults); if (!status.isOK()) { return status; } return ClusterExplain::buildExplainResult( txn, shardResults, ClusterExplain::kWriteOnShards, timer.millis(), out); } virtual bool run(OperationContext* txn, const string& dbname, BSONObj& cmdObj, int options, string& errmsg, BSONObjBuilder& result) { BatchedCommandRequest request(_writeType); BatchedCommandResponse response; ClusterWriter writer(true, 0); LastError* cmdLastError = &LastError::get(cc()); { // Disable the last error object for the duration of the write LastError::Disabled disableLastError(cmdLastError); // TODO: if we do namespace parsing, push this to the type if (!request.parseBSON(dbname, cmdObj, &errmsg) || !request.isValid(&errmsg)) { // Batch parse failure response.setOk(false); response.setErrCode(ErrorCodes::FailedToParse); response.setErrMessage(errmsg); } else { writer.write(txn, request, &response); } dassert(response.isValid(NULL)); } // Populate the lastError object based on the write response cmdLastError->reset(); batchErrorToLastError(request, response, cmdLastError); size_t numAttempts; if (!response.getOk()) { numAttempts = 0; } else if (request.getOrdered() && response.isErrDetailsSet()) { // Add one failed attempt numAttempts = response.getErrDetailsAt(0)->getIndex() + 1; } else { numAttempts = request.sizeWriteOps(); } // TODO: increase opcounters by more than one if (_writeType == BatchedCommandRequest::BatchType_Insert) { for (size_t i = 0; i < numAttempts; ++i) { globalOpCounters.gotInsert(); } } else if (_writeType == BatchedCommandRequest::BatchType_Update) { for (size_t i = 0; i < numAttempts; ++i) { globalOpCounters.gotUpdate(); } } else if (_writeType == BatchedCommandRequest::BatchType_Delete) { for (size_t i = 0; i < numAttempts; ++i) { globalOpCounters.gotDelete(); } } // Save the last opTimes written on each shard for this client, to allow GLE to work if (haveClient()) { ClusterLastErrorInfo::get(cc()).addHostOpTimes(writer.getStats().getWriteOpTimes()); } // TODO // There's a pending issue about how to report response here. If we use // the command infra-structure, we should reuse the 'errmsg' field. But // we have already filed that message inside the BatchCommandResponse. // return response.getOk(); result.appendElements(response.toBSON()); return true; } protected: /** * Instantiates a command that can be invoked by "name", which will be capable of issuing * write batches of type "writeType", and will require privilege "action" to run. */ ClusterWriteCmd(StringData name, BatchedCommandRequest::BatchType writeType) : Command(name), _writeType(writeType) {} private: // Type of batch (e.g. insert, update). const BatchedCommandRequest::BatchType _writeType; /** * Executes a write command against a particular database, and targets the command based on * a write operation. * * Does *not* retry or retarget if the metadata is stale. */ static Status _commandOpWrite(OperationContext* txn, const std::string& dbName, const BSONObj& command, BatchItemRef targetingBatchItem, std::vector* results) { // Note that this implementation will not handle targeting retries and does not completely // emulate write behavior TargeterStats stats; ChunkManagerTargeter targeter( NamespaceString(targetingBatchItem.getRequest()->getTargetingNS()), &stats); Status status = targeter.init(txn); if (!status.isOK()) return status; OwnedPointerVector endpointsOwned; vector& endpoints = endpointsOwned.mutableVector(); if (targetingBatchItem.getOpType() == BatchedCommandRequest::BatchType_Insert) { ShardEndpoint* endpoint; Status status = targeter.targetInsert(txn, targetingBatchItem.getDocument(), &endpoint); if (!status.isOK()) return status; endpoints.push_back(endpoint); } else if (targetingBatchItem.getOpType() == BatchedCommandRequest::BatchType_Update) { Status status = targeter.targetUpdate(txn, *targetingBatchItem.getUpdate(), &endpoints); if (!status.isOK()) return status; } else { invariant(targetingBatchItem.getOpType() == BatchedCommandRequest::BatchType_Delete); Status status = targeter.targetDelete(txn, *targetingBatchItem.getDelete(), &endpoints); if (!status.isOK()) return status; } DBClientMultiCommand dispatcher; // Assemble requests for (vector::const_iterator it = endpoints.begin(); it != endpoints.end(); ++it) { const ShardEndpoint* endpoint = *it; const ReadPreferenceSetting readPref(ReadPreference::PrimaryOnly, TagSet()); auto shardStatus = grid.shardRegistry()->getShard(txn, endpoint->shardName); if (!shardStatus.isOK()) { return shardStatus.getStatus(); } auto swHostAndPort = shardStatus.getValue()->getTargeter()->findHostNoWait(readPref); if (!swHostAndPort.isOK()) { return swHostAndPort.getStatus(); } ConnectionString host(swHostAndPort.getValue()); dispatcher.addCommand(host, dbName, command); } // Errors reported when recv'ing responses dispatcher.sendAll(); Status dispatchStatus = Status::OK(); // Recv responses while (dispatcher.numPending() > 0) { ConnectionString host; RawBSONSerializable response; Status status = dispatcher.recvAny(&host, &response); if (!status.isOK()) { // We always need to recv() all the sent operations dispatchStatus = status; continue; } Strategy::CommandResult result; result.target = host; { auto shardStatus = grid.shardRegistry()->getShard(txn, host.toString()); if (!shardStatus.isOK()) { return shardStatus.getStatus(); } result.shardTargetId = shardStatus.getValue()->getId(); } result.result = response.toBSON(); results->push_back(result); } return dispatchStatus; } }; class ClusterCmdInsert : public ClusterWriteCmd { public: ClusterCmdInsert() : ClusterWriteCmd("insert", BatchedCommandRequest::BatchType_Insert) {} void help(stringstream& help) const { help << "insert documents"; } } clusterInsertCmd; class ClusterCmdUpdate : public ClusterWriteCmd { public: ClusterCmdUpdate() : ClusterWriteCmd("update", BatchedCommandRequest::BatchType_Update) {} void help(stringstream& help) const { help << "update documents"; } } clusterUpdateCmd; class ClusterCmdDelete : public ClusterWriteCmd { public: ClusterCmdDelete() : ClusterWriteCmd("delete", BatchedCommandRequest::BatchType_Delete) {} void help(stringstream& help) const { help << "delete documents"; } } clusterDeleteCmd; } // namespace } // namespace mongo