/**
* Copyright (C) 2013 10gen Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#include "mongo/base/init.h"
#include "mongo/bson/mutable/document.h"
#include "mongo/bson/mutable/element.h"
#include "mongo/db/catalog/database_holder.h"
#include "mongo/db/client.h"
#include "mongo/db/commands.h"
#include "mongo/db/commands/write_commands/write_commands_common.h"
#include "mongo/db/curop.h"
#include "mongo/db/db_raii.h"
#include "mongo/db/json.h"
#include "mongo/db/lasterror.h"
#include "mongo/db/ops/delete_request.h"
#include "mongo/db/ops/parsed_delete.h"
#include "mongo/db/ops/parsed_update.h"
#include "mongo/db/ops/update_lifecycle_impl.h"
#include "mongo/db/ops/write_ops_exec.h"
#include "mongo/db/ops/write_ops_parsers.h"
#include "mongo/db/query/explain.h"
#include "mongo/db/query/get_executor.h"
#include "mongo/db/repl/repl_client_info.h"
#include "mongo/db/repl/replication_coordinator_global.h"
#include "mongo/db/server_parameters.h"
#include "mongo/db/stats/counters.h"
#include "mongo/db/write_concern.h"
#include "mongo/s/stale_exception.h"
namespace mongo {
using std::string;
using std::stringstream;
namespace {
void redactTooLongLog(mutablebson::Document* cmdObj, StringData fieldName) {
namespace mmb = mutablebson;
mmb::Element root = cmdObj->root();
mmb::Element field = root.findFirstChildNamed(fieldName);
// If the cmdObj is too large, it will be a "too big" message given by CachedBSONObj.get()
if (!field.ok()) {
return;
}
// Redact the log if there are more than one documents or operations.
if (field.countChildren() > 1) {
field.setValueInt(field.countChildren());
}
}
Status checkAuthForWriteCommand(Client* client,
BatchedCommandRequest::BatchType batchType,
NamespaceString ns,
const BSONObj& cmdObj) {
Status status =
auth::checkAuthForWriteCommand(AuthorizationSession::get(client), batchType, ns, cmdObj);
if (!status.isOK()) {
LastError::get(client).setLastError(status.code(), status.reason());
}
return status;
}
bool shouldSkipOutput(OperationContext* txn) {
const WriteConcernOptions& writeConcern = txn->getWriteConcern();
return writeConcern.wMode.empty() && writeConcern.wNumNodes == 0 &&
(writeConcern.syncMode == WriteConcernOptions::SyncMode::NONE ||
writeConcern.syncMode == WriteConcernOptions::SyncMode::UNSET);
}
enum class ReplyStyle { kUpdate, kNotUpdate }; // update has extra fields.
void serializeReply(OperationContext* txn,
ReplyStyle replyStyle,
bool continueOnError,
size_t opsInBatch,
const WriteResult& result,
BSONObjBuilder* out) {
if (shouldSkipOutput(txn))
return;
long long n = 0;
long long nModified = 0;
std::vector upsertInfo;
std::vector errors;
BSONSizeTracker upsertInfoSizeTracker;
BSONSizeTracker errorsSizeTracker;
for (size_t i = 0; i < result.results.size(); i++) {
if (result.results[i].isOK()) {
const auto& opResult = result.results[i].getValue();
n += opResult.n; // Always there.
if (replyStyle == ReplyStyle::kUpdate) {
nModified += opResult.nModified;
if (!opResult.upsertedId.isEmpty()) {
BSONObjBuilder upsertedId(upsertInfoSizeTracker);
upsertedId.append("index", int(i));
upsertedId.appendAs(opResult.upsertedId.firstElement(), "_id");
upsertInfo.push_back(upsertedId.obj());
}
}
continue;
}
const auto& status = result.results[i].getStatus();
BSONObjBuilder error(errorsSizeTracker);
error.append("index", int(i));
error.append("code", int(status.code()));
error.append("errmsg", status.reason());
errors.push_back(error.obj());
}
if (result.staleConfigException) {
// For ordered:false commands we need to duplicate the StaleConfig result for all ops
// after we stopped. result.results doesn't include the staleConfigException.
// See the comment on WriteResult::staleConfigException for more info.
int endIndex = continueOnError ? opsInBatch : result.results.size() + 1;
for (int i = result.results.size(); i < endIndex; i++) {
BSONObjBuilder error(errorsSizeTracker);
error.append("index", i);
error.append("code", int(ErrorCodes::StaleShardVersion)); // Different from exception!
error.append("errmsg", result.staleConfigException->getInfo().msg);
{
BSONObjBuilder errInfo(error.subobjStart("errInfo"));
result.staleConfigException->getVersionWanted().addToBSON(errInfo, "vWanted");
}
errors.push_back(error.obj());
}
}
out->appendNumber("n", n);
if (replyStyle == ReplyStyle::kUpdate) {
out->appendNumber("nModified", nModified);
if (!upsertInfo.empty()) {
out->append("upserted", upsertInfo);
}
}
if (!errors.empty()) {
out->append("writeErrors", errors);
}
// writeConcernError field is handled by command processor.
{
// Undocumented repl fields that mongos depends on.
auto* replCoord = repl::ReplicationCoordinator::get(txn->getServiceContext());
const auto replMode = replCoord->getReplicationMode();
if (replMode != repl::ReplicationCoordinator::modeNone) {
const auto lastOp = repl::ReplClientInfo::forClient(txn->getClient()).getLastOp();
if (lastOp.getTerm() == repl::OpTime::kUninitializedTerm) {
out->append("opTime", lastOp.getTimestamp());
} else {
lastOp.append(out, "opTime");
}
if (replMode == repl::ReplicationCoordinator::modeReplSet) {
out->append("electionId", replCoord->getElectionId());
}
}
}
}
class WriteCommand : public Command {
public:
explicit WriteCommand(StringData name) : Command(name) {}
bool slaveOk() const final {
return false;
}
bool shouldAffectCommandCounter() const final {
return false;
}
bool supportsWriteConcern(const BSONObj& cmd) const {
return true;
}
ReadWriteType getReadWriteType() const {
return ReadWriteType::kWrite;
}
bool run(OperationContext* txn,
const std::string& dbname,
BSONObj& cmdObj,
int options,
std::string& errmsg,
BSONObjBuilder& result) final {
try {
runImpl(txn, dbname, cmdObj, result);
return true;
} catch (const DBException& ex) {
LastError::get(txn->getClient()).setLastError(ex.getCode(), ex.getInfo().msg);
throw;
}
}
virtual void runImpl(OperationContext* txn,
const std::string& dbname,
const BSONObj& cmdObj,
BSONObjBuilder& result) = 0;
};
} // namespace
class CmdInsert final : public WriteCommand {
public:
CmdInsert() : WriteCommand("insert") {}
void redactForLogging(mutablebson::Document* cmdObj) final {
redactTooLongLog(cmdObj, "documents");
}
void help(stringstream& help) const final {
help << "insert documents";
}
Status checkAuthForCommand(Client* client,
const std::string& dbname,
const BSONObj& cmdObj) final {
return checkAuthForWriteCommand(client,
BatchedCommandRequest::BatchType_Insert,
NamespaceString(parseNs(dbname, cmdObj)),
cmdObj);
}
void runImpl(OperationContext* txn,
const std::string& dbname,
const BSONObj& cmdObj,
BSONObjBuilder& result) final {
const auto batch = parseInsertCommand(dbname, cmdObj);
const auto reply = performInserts(txn, batch);
serializeReply(txn,
ReplyStyle::kNotUpdate,
batch.continueOnError,
batch.documents.size(),
reply,
&result);
}
} cmdInsert;
class CmdUpdate final : public WriteCommand {
public:
CmdUpdate() : WriteCommand("update") {}
void redactForLogging(mutablebson::Document* cmdObj) final {
redactTooLongLog(cmdObj, "updates");
}
void help(stringstream& help) const final {
help << "update documents";
}
Status checkAuthForCommand(Client* client,
const std::string& dbname,
const BSONObj& cmdObj) final {
return checkAuthForWriteCommand(client,
BatchedCommandRequest::BatchType_Update,
NamespaceString(parseNs(dbname, cmdObj)),
cmdObj);
}
void runImpl(OperationContext* txn,
const std::string& dbname,
const BSONObj& cmdObj,
BSONObjBuilder& result) final {
const auto batch = parseUpdateCommand(dbname, cmdObj);
const auto reply = performUpdates(txn, batch);
serializeReply(
txn, ReplyStyle::kUpdate, batch.continueOnError, batch.updates.size(), reply, &result);
}
Status explain(OperationContext* txn,
const std::string& dbname,
const BSONObj& cmdObj,
ExplainCommon::Verbosity verbosity,
const rpc::ServerSelectionMetadata&,
BSONObjBuilder* out) const final {
const auto batch = parseUpdateCommand(dbname, cmdObj);
uassert(ErrorCodes::InvalidLength,
"explained write batches must be of size 1",
batch.updates.size() == 1);
UpdateLifecycleImpl updateLifecycle(batch.ns);
UpdateRequest updateRequest(batch.ns);
updateRequest.setLifecycle(&updateLifecycle);
updateRequest.setQuery(batch.updates[0].query);
updateRequest.setCollation(batch.updates[0].collation);
updateRequest.setUpdates(batch.updates[0].update);
updateRequest.setMulti(batch.updates[0].multi);
updateRequest.setUpsert(batch.updates[0].upsert);
updateRequest.setYieldPolicy(PlanExecutor::YIELD_AUTO);
updateRequest.setExplain();
ParsedUpdate parsedUpdate(txn, &updateRequest);
uassertStatusOK(parsedUpdate.parseRequest());
// Explains of write commands are read-only, but we take write locks so that timing
// info is more accurate.
ScopedTransaction scopedXact(txn, MODE_IX);
AutoGetCollection collection(txn, batch.ns, MODE_IX);
auto exec = uassertStatusOK(getExecutorUpdate(
txn, &CurOp::get(txn)->debug(), collection.getCollection(), &parsedUpdate));
Explain::explainStages(exec.get(), collection.getCollection(), verbosity, out);
return Status::OK();
}
} cmdUpdate;
class CmdDelete final : public WriteCommand {
public:
CmdDelete() : WriteCommand("delete") {}
void redactForLogging(mutablebson::Document* cmdObj) final {
redactTooLongLog(cmdObj, "deletes");
}
void help(stringstream& help) const final {
help << "delete documents";
}
Status checkAuthForCommand(Client* client,
const std::string& dbname,
const BSONObj& cmdObj) final {
return checkAuthForWriteCommand(client,
BatchedCommandRequest::BatchType_Delete,
NamespaceString(parseNs(dbname, cmdObj)),
cmdObj);
}
void runImpl(OperationContext* txn,
const std::string& dbname,
const BSONObj& cmdObj,
BSONObjBuilder& result) final {
const auto batch = parseDeleteCommand(dbname, cmdObj);
const auto reply = performDeletes(txn, batch);
serializeReply(txn,
ReplyStyle::kNotUpdate,
batch.continueOnError,
batch.deletes.size(),
reply,
&result);
}
Status explain(OperationContext* txn,
const std::string& dbname,
const BSONObj& cmdObj,
ExplainCommon::Verbosity verbosity,
const rpc::ServerSelectionMetadata&,
BSONObjBuilder* out) const final {
const auto batch = parseDeleteCommand(dbname, cmdObj);
uassert(ErrorCodes::InvalidLength,
"explained write batches must be of size 1",
batch.deletes.size() == 1);
DeleteRequest deleteRequest(batch.ns);
deleteRequest.setQuery(batch.deletes[0].query);
deleteRequest.setCollation(batch.deletes[0].collation);
deleteRequest.setMulti(batch.deletes[0].multi);
deleteRequest.setYieldPolicy(PlanExecutor::YIELD_AUTO);
deleteRequest.setExplain();
ParsedDelete parsedDelete(txn, &deleteRequest);
uassertStatusOK(parsedDelete.parseRequest());
// Explains of write commands are read-only, but we take write locks so that timing
// info is more accurate.
ScopedTransaction scopedXact(txn, MODE_IX);
AutoGetCollection collection(txn, batch.ns, MODE_IX);
// Explain the plan tree.
auto exec = uassertStatusOK(getExecutorDelete(
txn, &CurOp::get(txn)->debug(), collection.getCollection(), &parsedDelete));
Explain::explainStages(exec.get(), collection.getCollection(), verbosity, out);
return Status::OK();
}
} cmdDelete;
} // namespace mongo