/**
* Copyright (C) 2015 MongoDB Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kCommand
#include "mongo/platform/basic.h"
#include "mongo/db/repl/apply_ops.h"
#include "mongo/bson/util/bson_extract.h"
#include "mongo/db/catalog/collection.h"
#include "mongo/db/catalog/database.h"
#include "mongo/db/catalog/database_holder.h"
#include "mongo/db/catalog/document_validation.h"
#include "mongo/db/client.h"
#include "mongo/db/concurrency/lock_state.h"
#include "mongo/db/concurrency/write_conflict_exception.h"
#include "mongo/db/curop.h"
#include "mongo/db/db_raii.h"
#include "mongo/db/dbdirectclient.h"
#include "mongo/db/index/index_descriptor.h"
#include "mongo/db/matcher/matcher.h"
#include "mongo/db/op_observer.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/query/collation/collation_spec.h"
#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/db/service_context.h"
#include "mongo/rpc/get_status_from_command_result.h"
#include "mongo/util/fail_point_service.h"
#include "mongo/util/log.h"
namespace mongo {
namespace repl {
constexpr StringData ApplyOps::kPreconditionFieldName;
constexpr StringData ApplyOps::kOplogApplicationModeFieldName;
namespace {
// If enabled, causes loop in _applyOps() to hang after applying current operation.
MONGO_FAIL_POINT_DEFINE(applyOpsPauseBetweenOperations);
/**
* Return true iff the applyOpsCmd can be executed in a single WriteUnitOfWork.
*/
bool _parseAreOpsCrudOnly(const BSONObj& applyOpCmd) {
for (const auto& elem : applyOpCmd.firstElement().Obj()) {
const char* names[] = {"ns", "op"};
BSONElement fields[2];
elem.Obj().getFields(2, names, fields);
BSONElement& fieldNs = fields[0];
BSONElement& fieldOp = fields[1];
const char* opType = fieldOp.valuestrsafe();
const StringData ns = fieldNs.valuestrsafe();
// All atomic ops have an opType of length 1.
if (opType[0] == '\0' || opType[1] != '\0')
return false;
// Only consider CRUD operations.
switch (*opType) {
case 'd':
case 'n':
case 'u':
break;
case 'i':
if (nsToCollectionSubstring(ns) != "system.indexes")
break;
// Fallthrough.
default:
return false;
}
}
return true;
}
Status _applyOps(OperationContext* opCtx,
const std::string& dbName,
const BSONObj& applyOpCmd,
const ApplyOpsCommandInfo& info,
repl::OplogApplication::Mode oplogApplicationMode,
BSONObjBuilder* result,
int* numApplied,
BSONArrayBuilder* opsBuilder) {
const auto& ops = info.getOperations();
// apply
*numApplied = 0;
int errors = 0;
BSONArrayBuilder ab;
const auto& alwaysUpsert = info.getAlwaysUpsert();
const bool haveWrappingWUOW = opCtx->lockState()->inAWriteUnitOfWork();
// Apply each op in the given 'applyOps' command object.
for (const auto& opObj : ops) {
// Ignore 'n' operations.
const char* opType = opObj["op"].valuestrsafe();
if (*opType == 'n')
continue;
const NamespaceString nss(opObj["ns"].String());
// Need to check this here, or OldClientContext may fail an invariant.
if (*opType != 'c' && !nss.isValid())
return {ErrorCodes::InvalidNamespace, "invalid ns: " + nss.ns()};
Status status(ErrorCodes::InternalError, "");
if (haveWrappingWUOW) {
invariant(opCtx->lockState()->isW());
invariant(*opType != 'c');
auto db = DatabaseHolder::getDatabaseHolder().get(opCtx, nss.ns());
if (!db) {
// Retry in non-atomic mode, since MMAP cannot implicitly create a new database
// within an active WriteUnitOfWork.
uasserted(ErrorCodes::AtomicityFailure,
"cannot create a database in atomic applyOps mode; will retry without "
"atomicity");
}
// When processing an update on a non-existent collection, applyOperation_inlock()
// returns UpdateOperationFailed on updates and allows the collection to be
// implicitly created on upserts. We detect both cases here and fail early with
// NamespaceNotFound.
// Additionally for inserts, we fail early on non-existent collections.
auto collection = db->getCollection(opCtx, nss);
if (!collection && !nss.isSystemDotIndexes() && (*opType == 'i' || *opType == 'u')) {
uasserted(
ErrorCodes::AtomicityFailure,
str::stream()
<< "cannot apply insert or update operation on a non-existent namespace "
<< nss.ns()
<< " in atomic applyOps mode: "
<< redact(opObj));
}
// Reject malformed operations in an atomic applyOps.
try {
ReplOperation::parse(IDLParserErrorContext("applyOps"), opObj);
} catch (...) {
uasserted(ErrorCodes::AtomicityFailure,
str::stream()
<< "cannot apply a malformed operation in atomic applyOps mode: "
<< redact(opObj)
<< "; will retry without atomicity: "
<< exceptionToStatus().toString());
}
OldClientContext ctx(opCtx, nss.ns());
status = repl::applyOperation_inlock(
opCtx, ctx.db(), opObj, alwaysUpsert, oplogApplicationMode);
if (!status.isOK())
return status;
// Append completed op, including UUID if available, to 'opsBuilder'.
if (opsBuilder) {
if (opObj.hasField("ui") || nss.isSystemDotIndexes() ||
!(collection && collection->uuid())) {
// No changes needed to operation document.
opsBuilder->append(opObj);
} else {
// Operation document has no "ui" field and collection has a UUID.
auto uuid = collection->uuid();
BSONObjBuilder opBuilder;
opBuilder.appendElements(opObj);
uuid->appendToBuilder(&opBuilder, "ui");
opsBuilder->append(opBuilder.obj());
}
}
} else {
try {
status = writeConflictRetry(
opCtx,
"applyOps",
nss.ns(),
[opCtx, nss, opObj, opType, alwaysUpsert, oplogApplicationMode] {
if (*opType == 'c') {
invariant(opCtx->lockState()->isW());
uassertStatusOK(
repl::applyCommand_inlock(opCtx, opObj, oplogApplicationMode));
return Status::OK();
}
AutoGetCollection autoColl(opCtx, nss, MODE_IX);
if (!autoColl.getCollection() && !nss.isSystemDotIndexes()) {
// For idempotency reasons, return success on delete operations.
if (*opType == 'd') {
return Status::OK();
}
uasserted(ErrorCodes::NamespaceNotFound,
str::stream()
<< "cannot apply insert or update operation on a "
"non-existent namespace "
<< nss.ns()
<< ": "
<< mongo::redact(opObj));
}
OldClientContext ctx(opCtx, nss.ns());
if (!nss.isSystemDotIndexes()) {
// We return the status rather than merely aborting so failure of CRUD
// ops doesn't stop the applyOps from trying to process the rest of the
// ops. This is to leave the door open to parallelizing CRUD op
// application in the future.
return repl::applyOperation_inlock(
opCtx, ctx.db(), opObj, alwaysUpsert, oplogApplicationMode);
}
auto fieldO = opObj["o"];
BSONObj indexSpec;
NamespaceString indexNss;
std::tie(indexSpec, indexNss) =
repl::prepForApplyOpsIndexInsert(fieldO, opObj, nss);
if (!indexSpec["collation"]) {
// If the index spec does not include a collation, explicitly specify
// the simple collation, so the index does not inherit the collection
// default collation.
auto indexVersion = indexSpec["v"];
// The index version is populated by prepForApplyOpsIndexInsert().
invariant(indexVersion);
if (indexVersion.isNumber() &&
(indexVersion.numberInt() >=
static_cast(IndexDescriptor::IndexVersion::kV2))) {
BSONObjBuilder bob;
bob.append("collation", CollationSpec::kSimpleSpec);
bob.appendElements(indexSpec);
indexSpec = bob.obj();
}
}
BSONObjBuilder command;
command.append("createIndexes", indexNss.coll());
{
BSONArrayBuilder indexes(command.subarrayStart("indexes"));
indexes.append(indexSpec);
indexes.doneFast();
}
const BSONObj commandObj = command.done();
DBDirectClient client(opCtx);
BSONObj infoObj;
client.runCommand(nss.db().toString(), commandObj, infoObj);
// Uassert to stop applyOps only when building indexes, but not for CRUD
// ops.
uassertStatusOK(getStatusFromCommandResult(infoObj));
return Status::OK();
});
} catch (const DBException& ex) {
ab.append(false);
result->append("applied", ++(*numApplied));
result->append("code", ex.code());
result->append("codeName", ErrorCodes::errorString(ex.code()));
result->append("errmsg", ex.what());
result->append("results", ab.arr());
return Status(ex.code(), ex.what());
}
}
ab.append(status.isOK());
if (!status.isOK()) {
log() << "applyOps error applying: " << status;
errors++;
}
(*numApplied)++;
if (MONGO_FAIL_POINT(applyOpsPauseBetweenOperations)) {
// While holding a database lock under MMAPv1, we would be implicitly holding the
// flush lock here. This would prevent other threads from acquiring the global
// lock or any database locks. We release all locks temporarily while the fail
// point is enabled to allow other threads to make progress.
boost::optional release;
auto storageEngine = opCtx->getServiceContext()->getStorageEngine();
if (storageEngine->isMmapV1() && !opCtx->lockState()->isW()) {
release.emplace(opCtx->lockState());
}
MONGO_FAIL_POINT_PAUSE_WHILE_SET(applyOpsPauseBetweenOperations);
}
}
result->append("applied", *numApplied);
result->append("results", ab.arr());
if (errors != 0) {
return Status(ErrorCodes::UnknownError, "applyOps had one or more errors applying ops");
}
return Status::OK();
}
Status _checkPrecondition(OperationContext* opCtx,
const std::vector& preConditions,
BSONObjBuilder* result) {
invariant(opCtx->lockState()->isW());
for (const auto& preCondition : preConditions) {
if (preCondition["ns"].type() != BSONType::String) {
return {ErrorCodes::InvalidNamespace,
str::stream() << "ns in preCondition must be a string, but found type: "
<< typeName(preCondition["ns"].type())};
}
const NamespaceString nss(preCondition["ns"].valueStringData());
if (!nss.isValid()) {
return {ErrorCodes::InvalidNamespace, "invalid ns: " + nss.ns()};
}
DBDirectClient db(opCtx);
BSONObj realres = db.findOne(nss.ns(), preCondition["q"].Obj());
// Get collection default collation.
Database* database = DatabaseHolder::getDatabaseHolder().get(opCtx, nss.db());
if (!database) {
return {ErrorCodes::NamespaceNotFound, "database in ns does not exist: " + nss.ns()};
}
Collection* collection = database->getCollection(opCtx, nss);
if (!collection) {
return {ErrorCodes::NamespaceNotFound, "collection in ns does not exist: " + nss.ns()};
}
const CollatorInterface* collator = collection->getDefaultCollator();
// applyOps does not allow any extensions, such as $text, $where, $geoNear, $near,
// $nearSphere, or $expr.
boost::intrusive_ptr expCtx(new ExpressionContext(opCtx, collator));
Matcher matcher(preCondition["res"].Obj(), std::move(expCtx));
if (!matcher.matches(realres)) {
result->append("got", realres);
result->append("whatFailed", preCondition);
return {ErrorCodes::BadValue, "preCondition failed"};
}
}
return Status::OK();
}
} // namespace
// static
ApplyOpsCommandInfo ApplyOpsCommandInfo::parse(const BSONObj& applyOpCmd) {
try {
return ApplyOpsCommandInfo(applyOpCmd);
} catch (DBException& ex) {
ex.addContext(str::stream() << "Failed to parse applyOps command: " << redact(applyOpCmd));
throw;
}
}
bool ApplyOpsCommandInfo::areOpsCrudOnly() const {
return _areOpsCrudOnly;
}
bool ApplyOpsCommandInfo::isAtomic() const {
return getAllowAtomic() && areOpsCrudOnly();
}
ApplyOpsCommandInfo::ApplyOpsCommandInfo(const BSONObj& applyOpCmd)
: _areOpsCrudOnly(_parseAreOpsCrudOnly(applyOpCmd)) {
parseProtected(IDLParserErrorContext("applyOps"), applyOpCmd);
if (getPreCondition()) {
uassert(ErrorCodes::InvalidOptions,
"Cannot use preCondition with {allowAtomic: false}",
getAllowAtomic());
uassert(ErrorCodes::InvalidOptions,
"Cannot use preCondition when operations include commands.",
areOpsCrudOnly());
}
}
Status applyOps(OperationContext* opCtx,
const std::string& dbName,
const BSONObj& applyOpCmd,
repl::OplogApplication::Mode oplogApplicationMode,
BSONObjBuilder* result) {
auto info = ApplyOpsCommandInfo::parse(applyOpCmd);
boost::optional globalWriteLock;
boost::optional dbWriteLock;
// There's only one case where we are allowed to take the database lock instead of the global
// lock - no preconditions; only CRUD ops; and non-atomic mode.
if (!info.getPreCondition() && info.areOpsCrudOnly() && !info.getAllowAtomic()) {
dbWriteLock.emplace(opCtx, dbName, MODE_IX);
} else {
globalWriteLock.emplace(opCtx);
}
auto replCoord = repl::ReplicationCoordinator::get(opCtx);
bool userInitiatedWritesAndNotPrimary =
opCtx->writesAreReplicated() && !replCoord->canAcceptWritesForDatabase(opCtx, dbName);
if (userInitiatedWritesAndNotPrimary)
return Status(ErrorCodes::NotMaster,
str::stream() << "Not primary while applying ops to database " << dbName);
if (auto preCondition = info.getPreCondition()) {
invariant(info.isAtomic());
auto status = _checkPrecondition(opCtx, *preCondition, result);
if (!status.isOK()) {
return status;
}
}
int numApplied = 0;
if (!info.isAtomic()) {
return _applyOps(
opCtx, dbName, applyOpCmd, info, oplogApplicationMode, result, &numApplied, nullptr);
}
// Perform write ops atomically
invariant(globalWriteLock);
try {
writeConflictRetry(opCtx, "applyOps", dbName, [&] {
BSONObjBuilder intermediateResult;
std::unique_ptr opsBuilder;
if (opCtx->writesAreReplicated()) {
opsBuilder = stdx::make_unique();
}
WriteUnitOfWork wunit(opCtx);
numApplied = 0;
{
// Suppress replication for atomic operations until end of applyOps.
repl::UnreplicatedWritesBlock uwb(opCtx);
uassertStatusOK(_applyOps(opCtx,
dbName,
applyOpCmd,
info,
oplogApplicationMode,
&intermediateResult,
&numApplied,
opsBuilder.get()));
}
// Generate oplog entry for all atomic ops collectively.
if (opCtx->writesAreReplicated()) {
// We want this applied atomically on slaves so we rewrite the oplog entry without
// the pre-condition for speed.
BSONObjBuilder cmdBuilder;
auto opsFieldName = applyOpCmd.firstElement().fieldNameStringData();
for (auto elem : applyOpCmd) {
auto name = elem.fieldNameStringData();
if (name == opsFieldName && opsBuilder) {
cmdBuilder.append(opsFieldName, opsBuilder->arr());
continue;
}
if (name == ApplyOps::kPreconditionFieldName)
continue;
if (name == bypassDocumentValidationCommandOption())
continue;
cmdBuilder.append(elem);
}
const BSONObj cmdRewritten = cmdBuilder.done();
auto opObserver = getGlobalServiceContext()->getOpObserver();
invariant(opObserver);
opObserver->onApplyOps(opCtx, dbName, cmdRewritten);
}
wunit.commit();
result->appendElements(intermediateResult.obj());
});
} catch (const DBException& ex) {
if (ex.code() == ErrorCodes::AtomicityFailure) {
// Retry in non-atomic mode.
return _applyOps(opCtx,
dbName,
applyOpCmd,
info,
oplogApplicationMode,
result,
&numApplied,
nullptr);
}
BSONArrayBuilder ab;
++numApplied;
for (int j = 0; j < numApplied; j++)
ab.append(false);
result->append("applied", numApplied);
result->append("code", ex.code());
result->append("codeName", ErrorCodes::errorString(ex.code()));
result->append("errmsg", ex.what());
result->append("results", ab.arr());
return Status(ex.code(), ex.what());
}
return Status::OK();
}
// static
MultiApplier::Operations ApplyOps::extractOperations(const OplogEntry& applyOpsOplogEntry) {
uassert(ErrorCodes::TypeMismatch,
str::stream() << "ApplyOps::extractOperations(): not a command: "
<< redact(applyOpsOplogEntry.toBSON()),
applyOpsOplogEntry.isCommand());
uassert(ErrorCodes::CommandNotSupported,
str::stream() << "ApplyOps::extractOperations(): not applyOps command: "
<< redact(applyOpsOplogEntry.toBSON()),
OplogEntry::CommandType::kApplyOps == applyOpsOplogEntry.getCommandType());
auto cmdObj = applyOpsOplogEntry.getOperationToApply();
auto operationDocs = cmdObj.firstElement().Obj();
if (operationDocs.isEmpty()) {
return {};
}
MultiApplier::Operations operations;
auto topLevelDoc = applyOpsOplogEntry.toBSON();
for (const auto& elem : operationDocs) {
auto operationDoc = elem.Obj();
BSONObjBuilder builder(operationDoc);
builder.appendElementsUnique(topLevelDoc);
auto operation = builder.obj();
operations.emplace_back(operation);
}
return operations;
}
} // namespace repl
} // namespace mongo