diff options
author | Matthew Russotto <matthew.russotto@10gen.com> | 2018-01-08 17:06:56 -0500 |
---|---|---|
committer | Matthew Russotto <matthew.russotto@10gen.com> | 2018-01-08 17:07:32 -0500 |
commit | 8b766fdc6c896ba0a4db30d7b9d2f050b464db22 (patch) | |
tree | 107cf6c037074c53bdddb61423c4c6cb352b3003 /src/mongo/db/repl/do_txn.cpp | |
parent | 27ccd4a3bf1150a1bf4356de8a91e4dff64e0762 (diff) | |
download | mongo-8b766fdc6c896ba0a4db30d7b9d2f050b464db22.tar.gz |
SERVER-32162 Remove non-atomic (push-based replication) support from doTxn.
Diffstat (limited to 'src/mongo/db/repl/do_txn.cpp')
-rw-r--r-- | src/mongo/db/repl/do_txn.cpp | 250 |
1 files changed, 60 insertions, 190 deletions
diff --git a/src/mongo/db/repl/do_txn.cpp b/src/mongo/db/repl/do_txn.cpp index 1bd9d13d8a1..44de9e8590c 100644 --- a/src/mongo/db/repl/do_txn.cpp +++ b/src/mongo/db/repl/do_txn.cpp @@ -56,7 +56,6 @@ namespace mongo { constexpr StringData DoTxn::kPreconditionFieldName; -constexpr StringData DoTxn::kOplogApplicationModeFieldName; namespace { @@ -84,7 +83,6 @@ bool _areOpsCrudOnly(const BSONObj& doTxnCmd) { // Only consider CRUD operations. switch (*opType) { case 'd': - case 'n': case 'u': break; case 'i': @@ -102,7 +100,6 @@ bool _areOpsCrudOnly(const BSONObj& doTxnCmd) { Status _doTxn(OperationContext* opCtx, const std::string& dbName, const BSONObj& doTxnCmd, - repl::OplogApplication::Mode oplogApplicationMode, BSONObjBuilder* result, int* numApplied, BSONArrayBuilder* opsBuilder) { @@ -113,168 +110,80 @@ Status _doTxn(OperationContext* opCtx, BSONObjIterator i(ops); BSONArrayBuilder ab; - const bool alwaysUpsert = - doTxnCmd.hasField("alwaysUpsert") ? doTxnCmd["alwaysUpsert"].trueValue() : true; - const bool haveWrappingWUOW = opCtx->lockState()->inAWriteUnitOfWork(); + invariant(opCtx->lockState()->inAWriteUnitOfWork()); // Apply each op in the given 'doTxn' command object. while (i.more()) { BSONElement e = i.next(); const BSONObj& opObj = e.Obj(); - // Ignore 'n' operations. - const char* opType = opObj["op"].valuestrsafe(); - if (*opType == 'n') - continue; - const NamespaceString nss(opObj["ns"].String()); // Need to check this here, or OldClientContext may fail an invariant. - if (*opType != 'c' && !nss.isValid()) + if (!nss.isValid()) return {ErrorCodes::InvalidNamespace, "invalid ns: " + nss.ns()}; Status status(ErrorCodes::InternalError, ""); - if (haveWrappingWUOW) { - invariant(opCtx->lockState()->isW()); - invariant(*opType != 'c'); - - auto db = dbHolder().get(opCtx, nss.ns()); - if (!db) { - // Retry in non-atomic mode, since MMAP cannot implicitly create a new database - // within an active WriteUnitOfWork. - uasserted(ErrorCodes::AtomicityFailure, - "cannot create a database in atomic doTxn mode; will retry without " - "atomicity"); - } - - // When processing an update on a non-existent collection, applyOperation_inlock() - // returns UpdateOperationFailed on updates and allows the collection to be - // implicitly created on upserts. We detect both cases here and fail early with - // NamespaceNotFound. - // Additionally for inserts, we fail early on non-existent collections. - auto collection = db->getCollection(opCtx, nss); - if (!collection && !nss.isSystemDotIndexes() && (*opType == 'i' || *opType == 'u')) { - uasserted( - ErrorCodes::AtomicityFailure, - str::stream() - << "cannot apply insert or update operation on a non-existent namespace " - << nss.ns() - << " in atomic doTxn mode: " - << redact(opObj)); - } + invariant(opCtx->lockState()->isW()); - // Cannot specify timestamp values in an atomic doTxn. - if (opObj.hasField("ts")) { - uasserted(ErrorCodes::AtomicityFailure, - "cannot apply an op with a timestamp in atomic doTxn mode; " - "will retry without atomicity"); - } + auto db = dbHolder().get(opCtx, nss.ns()); + if (!db) { + uasserted(ErrorCodes::NamespaceNotFound, + str::stream() << "cannot apply insert, delete, or update operation on a " + "non-existent namespace " + << nss.ns() + << ": " + << mongo::redact(opObj)); + } - OldClientContext ctx(opCtx, nss.ns()); - - status = repl::applyOperation_inlock( - opCtx, ctx.db(), opObj, alwaysUpsert, oplogApplicationMode); - if (!status.isOK()) - return status; - - // Append completed op, including UUID if available, to 'opsBuilder'. - if (opsBuilder) { - if (opObj.hasField("ui") || nss.isSystemDotIndexes() || - !(collection && collection->uuid())) { - // No changes needed to operation document. - opsBuilder->append(opObj); - } else { - // Operation document has no "ui" field and collection has a UUID. - auto uuid = collection->uuid(); - BSONObjBuilder opBuilder; - opBuilder.appendElements(opObj); - uuid->appendToBuilder(&opBuilder, "ui"); - opsBuilder->append(opBuilder.obj()); - } - } - } else { - try { - status = writeConflictRetry( - opCtx, - "doTxn", - nss.ns(), - [opCtx, nss, opObj, opType, alwaysUpsert, oplogApplicationMode] { - if (*opType == 'c') { - invariant(opCtx->lockState()->isW()); - return repl::applyCommand_inlock(opCtx, opObj, oplogApplicationMode); - } + // When processing an update on a non-existent collection, applyOperation_inlock() + // returns UpdateOperationFailed on updates and allows the collection to be + // implicitly created on upserts. We detect both cases here and fail early with + // NamespaceNotFound. + // Additionally for inserts, we fail early on non-existent collections. + auto collection = db->getCollection(opCtx, nss); + if (!collection && db->getViewCatalog()->lookup(opCtx, nss.ns())) { + uasserted(ErrorCodes::CommandNotSupportedOnView, + str::stream() << "doTxn not supported on a view: " << redact(opObj)); + } + if (!collection) { + uasserted(ErrorCodes::NamespaceNotFound, + str::stream() << "cannot apply operation on a non-existent namespace " + << nss.ns() + << " with doTxn: " + << redact(opObj)); + } - AutoGetCollection autoColl(opCtx, nss, MODE_IX); - if (!autoColl.getCollection() && !nss.isSystemDotIndexes()) { - // For idempotency reasons, return success on delete operations. - if (*opType == 'd') { - return Status::OK(); - } - uasserted(ErrorCodes::NamespaceNotFound, - str::stream() - << "cannot apply insert or update operation on a " - "non-existent namespace " - << nss.ns() - << ": " - << mongo::redact(opObj)); - } + // Cannot specify timestamp values in an atomic doTxn. + if (opObj.hasField("ts")) { + uasserted(ErrorCodes::AtomicityFailure, + "cannot apply an op with a timestamp with doTxn; "); + } - OldClientContext ctx(opCtx, nss.ns()); + OldClientContext ctx(opCtx, nss.ns()); - if (!nss.isSystemDotIndexes()) { - return repl::applyOperation_inlock( - opCtx, ctx.db(), opObj, alwaysUpsert, oplogApplicationMode); - } + // Setting alwaysUpsert to true makes sense only during oplog replay, and doTxn commands + // should not be executed during oplog replay. + const bool alwaysUpsert = false; + status = repl::applyOperation_inlock( + opCtx, ctx.db(), opObj, alwaysUpsert, repl::OplogApplication::Mode::kApplyOpsCmd); + if (!status.isOK()) + return status; - auto fieldO = opObj["o"]; - BSONObj indexSpec; - NamespaceString indexNss; - std::tie(indexSpec, indexNss) = - repl::prepForApplyOpsIndexInsert(fieldO, opObj, nss); - if (!indexSpec["collation"]) { - // If the index spec does not include a collation, explicitly specify - // the simple collation, so the index does not inherit the collection - // default collation. - auto indexVersion = indexSpec["v"]; - // The index version is populated by prepForApplyOpsIndexInsert(). - invariant(indexVersion); - if (indexVersion.isNumber() && - (indexVersion.numberInt() >= - static_cast<int>(IndexDescriptor::IndexVersion::kV2))) { - BSONObjBuilder bob; - bob.append("collation", CollationSpec::kSimpleSpec); - bob.appendElements(indexSpec); - indexSpec = bob.obj(); - } - } - BSONObjBuilder command; - command.append("createIndexes", indexNss.coll()); - { - BSONArrayBuilder indexes(command.subarrayStart("indexes")); - indexes.append(indexSpec); - indexes.doneFast(); - } - const BSONObj commandObj = command.done(); - - DBDirectClient client(opCtx); - BSONObj infoObj; - client.runCommand(nss.db().toString(), commandObj, infoObj); - - // Uassert to stop doTxn only when building indexes, but not for CRUD - // ops. - uassertStatusOK(getStatusFromCommandResult(infoObj)); - - return Status::OK(); - }); - } catch (const DBException& ex) { - ab.append(false); - result->append("applied", ++(*numApplied)); - result->append("code", ex.code()); - result->append("codeName", ErrorCodes::errorString(ex.code())); - result->append("errmsg", ex.what()); - result->append("results", ab.arr()); - return Status(ErrorCodes::UnknownError, ex.what()); + // Append completed op, including UUID if available, to 'opsBuilder'. + if (opsBuilder) { + if (opObj.hasField("ui") || nss.isSystemDotIndexes() || + !(collection && collection->uuid())) { + // No changes needed to operation document. + opsBuilder->append(opObj); + } else { + // Operation document has no "ui" field and collection has a UUID. + auto uuid = collection->uuid(); + BSONObjBuilder opBuilder; + opBuilder.appendElements(opObj); + uuid->appendToBuilder(&opBuilder, "ui"); + opsBuilder->append(opBuilder.obj()); } } @@ -364,34 +273,12 @@ Status _checkPrecondition(OperationContext* opCtx, Status doTxn(OperationContext* opCtx, const std::string& dbName, const BSONObj& doTxnCmd, - repl::OplogApplication::Mode oplogApplicationMode, BSONObjBuilder* result) { - bool allowAtomic = false; - uassertStatusOK( - bsonExtractBooleanFieldWithDefault(doTxnCmd, "allowAtomic", true, &allowAtomic)); - auto areOpsCrudOnly = _areOpsCrudOnly(doTxnCmd); - auto isAtomic = allowAtomic && areOpsCrudOnly; + uassert( + ErrorCodes::InvalidOptions, "doTxn supports only CRUD opts.", _areOpsCrudOnly(doTxnCmd)); auto hasPrecondition = _hasPrecondition(doTxnCmd); - if (hasPrecondition) { - uassert(ErrorCodes::InvalidOptions, - "Cannot use preCondition with {allowAtomic: false}.", - allowAtomic); - uassert(ErrorCodes::InvalidOptions, - "Cannot use preCondition when operations include commands.", - areOpsCrudOnly); - } - - boost::optional<Lock::GlobalWrite> globalWriteLock; - boost::optional<Lock::DBLock> dbWriteLock; - - // There's only one case where we are allowed to take the database lock instead of the global - // lock - no preconditions; only CRUD ops; and non-atomic mode. - if (!hasPrecondition && areOpsCrudOnly && !allowAtomic) { - dbWriteLock.emplace(opCtx, dbName, MODE_IX); - } else { - globalWriteLock.emplace(opCtx); - } + Lock::GlobalWrite globalWriteLock(opCtx); auto replCoord = repl::ReplicationCoordinator::get(opCtx); bool userInitiatedWritesAndNotPrimary = @@ -402,7 +289,6 @@ Status doTxn(OperationContext* opCtx, str::stream() << "Not primary while applying ops to database " << dbName); if (hasPrecondition) { - invariant(isAtomic); auto status = _checkPrecondition(opCtx, doTxnCmd, result); if (!status.isOK()) { return status; @@ -410,12 +296,6 @@ Status doTxn(OperationContext* opCtx, } int numApplied = 0; - if (!isAtomic) { - return _doTxn(opCtx, dbName, doTxnCmd, oplogApplicationMode, result, &numApplied, nullptr); - } - - // Perform write ops atomically - invariant(globalWriteLock); try { writeConflictRetry(opCtx, "doTxn", dbName, [&] { @@ -430,13 +310,8 @@ Status doTxn(OperationContext* opCtx, { // Suppress replication for atomic operations until end of doTxn. repl::UnreplicatedWritesBlock uwb(opCtx); - uassertStatusOK(_doTxn(opCtx, - dbName, - doTxnCmd, - oplogApplicationMode, - &intermediateResult, - &numApplied, - opsBuilder.get())); + uassertStatusOK(_doTxn( + opCtx, dbName, doTxnCmd, &intermediateResult, &numApplied, opsBuilder.get())); } // Generate oplog entry for all atomic ops collectively. if (opCtx->writesAreReplicated()) { @@ -475,11 +350,6 @@ Status doTxn(OperationContext* opCtx, result->appendElements(intermediateResult.obj()); }); } catch (const DBException& ex) { - if (ex.code() == ErrorCodes::AtomicityFailure) { - // Retry in non-atomic mode. - return _doTxn( - opCtx, dbName, doTxnCmd, oplogApplicationMode, result, &numApplied, nullptr); - } BSONArrayBuilder ab; ++numApplied; for (int j = 0; j < numApplied; j++) |