diff options
-rw-r--r-- | jstests/core/apply_ops1.js | 11 | ||||
-rw-r--r-- | jstests/core/apply_ops2.js | 3 | ||||
-rw-r--r-- | jstests/core/apply_ops_atomicity.js | 27 | ||||
-rw-r--r-- | jstests/core/bypass_doc_validation.js | 4 | ||||
-rw-r--r-- | jstests/noPassthroughWithMongod/apply_ops_errors.js | 4 | ||||
-rw-r--r-- | src/mongo/db/catalog/apply_ops.cpp | 288 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog.cpp | 35 |
7 files changed, 257 insertions, 115 deletions
diff --git a/jstests/core/apply_ops1.js b/jstests/core/apply_ops1.js index 96840f125e9..34a38185c01 100644 --- a/jstests/core/apply_ops1.js +++ b/jstests/core/apply_ops1.js @@ -39,20 +39,23 @@ assert.commandWorked(db.adminCommand({applyOps: [{op: 'n', ns: ''}]}), 'applyOps should work on no op operation with empty "ns" field value'); + // Missing dbname in 'ns' field. + assert.commandFailed(db.adminCommand({applyOps: [{op: 'd', ns: t.getName(), o: {_id: 1}}]})); + // Missing 'o' field value in an operation of type 'c' (command). - assert.commandFailed(db.adminCommand({applyOps: [{op: 'c', ns: 'foo'}]}), + assert.commandFailed(db.adminCommand({applyOps: [{op: 'c', ns: t.getFullName()}]}), 'applyOps should fail on command operation without "o" field'); // Non-object 'o' field value in an operation of type 'c' (command). - assert.commandFailed(db.adminCommand({applyOps: [{op: 'c', ns: 'foo', o: 'bar'}]}), + assert.commandFailed(db.adminCommand({applyOps: [{op: 'c', ns: t.getFullName(), o: 'bar'}]}), 'applyOps should fail on command operation with non-object "o" field'); // Empty object 'o' field value in an operation of type 'c' (command). - assert.commandFailed(db.adminCommand({applyOps: [{op: 'c', ns: 'foo', o: {}}]}), + assert.commandFailed(db.adminCommand({applyOps: [{op: 'c', ns: t.getFullName(), o: {}}]}), 'applyOps should fail on command operation with empty object "o" field'); // Unknown key in 'o' field value in an operation of type 'c' (command). - assert.commandFailed(db.adminCommand({applyOps: [{op: 'c', ns: 'foo', o: {a: 1}}]}), + assert.commandFailed(db.adminCommand({applyOps: [{op: 'c', ns: t.getFullName(), o: {a: 1}}]}), 'applyOps should fail on command operation on unknown key in "o" field'); // Empty 'ns' field value in operation type other than 'n'. diff --git a/jstests/core/apply_ops2.js b/jstests/core/apply_ops2.js index bf804214846..4e99cee37ff 100644 --- a/jstests/core/apply_ops2.js +++ b/jstests/core/apply_ops2.js @@ -33,7 +33,8 @@ res = db.runCommand({ alwaysUpsert: false }); -assert.eq(true, res.results[0], "upsert = false, existing doc update failed"); +// Because the CRUD apply-ops are atomic, all results are false, even the first one. +assert.eq(false, res.results[0], "upsert = false, existing doc update failed"); assert.eq(false, res.results[1], "upsert = false, nonexisting doc upserted"); assert.eq(2, t.find().count(), "2 docs expected after upsert failure"); diff --git a/jstests/core/apply_ops_atomicity.js b/jstests/core/apply_ops_atomicity.js new file mode 100644 index 00000000000..2d093555200 --- /dev/null +++ b/jstests/core/apply_ops_atomicity.js @@ -0,0 +1,27 @@ +// SERVER-23326: Make applyOps atomic for CRUD operations +(function() { + 'use strict'; + + var t = db.applyOps; + t.drop(); + assert.writeOK(t.insert({_id: 1})); + + // Operations including commands should not be atomic, so the insert will succeed. + assert.commandFailed(db.adminCommand({ + applyOps: [ + {op: 'i', ns: t.getFullName(), o: {_id: ObjectId(), x: 1}}, + {op: 'c', ns: "invalid", o: {create: "t"}}, + ] + })); + assert.eq(t.count({x: 1}), 1); + + // Operations only including CRUD commands should be atomic, so the next insert will fail. + var tooLong = Array(2000).join("hello"); + assert.commandFailed(db.adminCommand({ + applyOps: [ + {op: 'i', ns: t.getFullName(), o: {_id: ObjectId(), x: 1}}, + {op: 'i', ns: t.getFullName(), o: {_id: tooLong, x: 1}}, + ] + })); + assert.eq(t.count({x: 1}), 1); +})(); diff --git a/jstests/core/bypass_doc_validation.js b/jstests/core/bypass_doc_validation.js index 79a2eb7d4a2..d9bca81ab6d 100644 --- a/jstests/core/bypass_doc_validation.js +++ b/jstests/core/bypass_doc_validation.js @@ -19,9 +19,9 @@ // Test applyOps with a simple insert if not on mongos. if (!db.runCommand({isdbgrid: 1}).isdbgrid) { var op = [{ts: Timestamp(0, 0), h: 1, v: 2, op: 'i', ns: coll.getFullName(), o: {_id: 9}}]; - // SERVER-21345: applyOps is returning UnknownError instead of DocumentValidationFailure assert.commandFailedWithCode( - myDb.runCommand({applyOps: op, bypassDocumentValidation: false}), 8); + myDb.runCommand({applyOps: op, bypassDocumentValidation: false}), + ErrorCodes.DocumentValidationFailure); assert.eq(0, coll.count({_id: 9})); assert.commandWorked(myDb.runCommand({applyOps: op, bypassDocumentValidation: true})); assert.eq(1, coll.count({_id: 9})); diff --git a/jstests/noPassthroughWithMongod/apply_ops_errors.js b/jstests/noPassthroughWithMongod/apply_ops_errors.js index d2f584af787..9441d006e43 100644 --- a/jstests/noPassthroughWithMongod/apply_ops_errors.js +++ b/jstests/noPassthroughWithMongod/apply_ops_errors.js @@ -52,6 +52,6 @@ assert.eq(2, res.applied); assert(res.code); assert(res.errmsg); - assert.eq([true, false], res.results); + assert.eq([false, false], res.results); assert.eq(0, res.ok); -})();
\ No newline at end of file +})(); diff --git a/src/mongo/db/catalog/apply_ops.cpp b/src/mongo/db/catalog/apply_ops.cpp index 5fd501ddab3..41582f4869d 100644 --- a/src/mongo/db/catalog/apply_ops.cpp +++ b/src/mongo/db/catalog/apply_ops.cpp @@ -34,6 +34,7 @@ #include "mongo/db/client.h" #include "mongo/db/commands/dbhash.h" +#include "mongo/db/concurrency/lock_state.h" #include "mongo/db/concurrency/write_conflict_exception.h" #include "mongo/db/curop.h" #include "mongo/db/db_raii.h" @@ -41,115 +42,138 @@ #include "mongo/db/matcher/extensions_callback_disallow_extensions.h" #include "mongo/db/matcher/matcher.h" #include "mongo/db/op_observer.h" +#include "mongo/db/operation_context.h" #include "mongo/db/repl/oplog.h" #include "mongo/db/repl/replication_coordinator_global.h" #include "mongo/db/service_context.h" #include "mongo/util/log.h" namespace mongo { -Status applyOps(OperationContext* txn, - const std::string& dbName, - const BSONObj& applyOpCmd, - BSONObjBuilder* result) { - // SERVER-4328 todo : is global ok or does this take a long time? i believe multiple - // ns used so locking individually requires more analysis - ScopedTransaction scopedXact(txn, MODE_X); - Lock::GlobalWrite globalWriteLock(txn->lockState()); +namespace { +/** + * Return true iff the applyOpsCmd can be executed in a single WriteUnitOfWork. + */ +bool canBeAtomic(const BSONObj& applyOpCmd) { + for (const auto& elem : applyOpCmd.firstElement().Obj()) { + const char* names[] = {"ns", "op"}; + BSONElement fields[2]; + elem.Obj().getFields(2, names, fields); + BSONElement& fieldNs = fields[0]; + BSONElement& fieldOp = fields[1]; + + const char* opType = fieldOp.valuestrsafe(); + const StringData ns = fieldNs.valueStringData(); + + // All atomic ops have an opType of length 1. + if (opType[0] == '\0' || opType[1] != '\0') + return false; + + // Only consider CRUD operations. + switch (*opType) { + case 'd': + case 'n': + case 'u': + break; + case 'i': + if (nsToCollectionSubstring(ns) != "system.indexes") + break; + // Fallthrough. + default: + return false; + } + } - bool userInitiatedWritesAndNotPrimary = txn->writesAreReplicated() && - !repl::getGlobalReplicationCoordinator()->canAcceptWritesForDatabase(dbName); + return true; +} - if (userInitiatedWritesAndNotPrimary) { - return Status(ErrorCodes::NotMaster, - str::stream() << "Not primary while applying ops to database " << dbName); - } +Status _applyOps(OperationContext* txn, + const std::string& dbName, + const BSONObj& applyOpCmd, + BSONObjBuilder* result, + int* numApplied) { + dassert(txn->lockState()->isLockHeldForMode( + ResourceId(RESOURCE_GLOBAL, ResourceId::SINGLETON_GLOBAL), MODE_X)); bool shouldReplicateWrites = txn->writesAreReplicated(); txn->setReplicatedWrites(false); BSONObj ops = applyOpCmd.firstElement().Obj(); - // Preconditions check reads the database state, so needs to be done locked - if (applyOpCmd["preCondition"].type() == Array) { - BSONObjIterator i(applyOpCmd["preCondition"].Obj()); - while (i.more()) { - BSONObj f = i.next().Obj(); - - DBDirectClient db(txn); - BSONObj realres = db.findOne(f["ns"].String(), f["q"].Obj()); - - // Apply-ops would never have a $where/$text matcher. Using the "DisallowExtensions" - // callback ensures that parsing will throw an error if $where or $text are found. - // TODO SERVER-23690: Pass the appropriate CollatorInterface* instead of nullptr. - Matcher m(f["res"].Obj(), ExtensionsCallbackDisallowExtensions(), nullptr); - if (!m.matches(realres)) { - result->append("got", realres); - result->append("whatFailed", f); - txn->setReplicatedWrites(shouldReplicateWrites); - return Status(ErrorCodes::BadValue, "pre-condition failed"); - } - } - } // apply - int num = 0; + *numApplied = 0; int errors = 0; BSONObjIterator i(ops); BSONArrayBuilder ab; const bool alwaysUpsert = applyOpCmd.hasField("alwaysUpsert") ? applyOpCmd["alwaysUpsert"].trueValue() : true; + const bool haveWrappingWUOW = txn->lockState()->inAWriteUnitOfWork(); while (i.more()) { BSONElement e = i.next(); - const BSONObj& temp = e.Obj(); + const BSONObj& opObj = e.Obj(); // Ignore 'n' operations. - const char* opType = temp["op"].valuestrsafe(); + const char* opType = opObj["op"].valuestrsafe(); if (*opType == 'n') continue; - const std::string ns = temp["ns"].String(); - - // Run operations under a nested lock as a hack to prevent yielding. - // - // The list of operations is supposed to be applied atomically; yielding - // would break atomicity by allowing an interruption or a shutdown to occur - // after only some operations are applied. We are already locked globally - // at this point, so taking a DBLock on the namespace creates a nested lock, - // and yields are disallowed for operations that hold a nested lock. - // - // We do not have a wrapping WriteUnitOfWork so it is possible for a journal - // commit to happen with a subset of ops applied. - // TODO figure out what to do about this. - Lock::GlobalWrite globalWriteLockDisallowTempRelease(txn->lockState()); - - // Ensures that yielding will not happen (see the comment above). - DEV { - Locker::LockSnapshot lockSnapshot; - invariant(!txn->lockState()->saveLockStateAndUnlock(&lockSnapshot)); - }; + const std::string ns = opObj["ns"].String(); + + // Need to check this here, or OldClientContext may fail an invariant. + if (*opType != 'c' && !NamespaceString(ns).isValid()) + return {ErrorCodes::InvalidNamespace, "invalid ns: " + ns}; Status status(ErrorCodes::InternalError, ""); - try { - MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { - if (*opType == 'c') { - status = repl::applyCommand_inlock(txn, temp); - break; - } else { - OldClientContext ctx(txn, ns); + if (haveWrappingWUOW) { + invariant(*opType != 'c'); - status = repl::applyOperation_inlock(txn, ctx.db(), temp, alwaysUpsert); - break; + OldClientContext ctx(txn, ns); + status = repl::applyOperation_inlock(txn, ctx.db(), opObj, alwaysUpsert); + if (!status.isOK()) + return status; + logOpForDbHash(txn, ns.c_str()); + } else { + try { + // Run operations under a nested lock as a hack to prevent yielding. + // + // The list of operations is supposed to be applied atomically; yielding + // would break atomicity by allowing an interruption or a shutdown to occur + // after only some operations are applied. We are already locked globally + // at this point, so taking a DBLock on the namespace creates a nested lock, + // and yields are disallowed for operations that hold a nested lock. + // + // We do not have a wrapping WriteUnitOfWork so it is possible for a journal + // commit to happen with a subset of ops applied. + Lock::GlobalWrite globalWriteLockDisallowTempRelease(txn->lockState()); + + // Ensures that yielding will not happen (see the comment above). + DEV { + Locker::LockSnapshot lockSnapshot; + invariant(!txn->lockState()->saveLockStateAndUnlock(&lockSnapshot)); + }; + + MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { + if (*opType == 'c') { + status = repl::applyCommand_inlock(txn, opObj); + } else { + OldClientContext ctx(txn, ns); + + status = repl::applyOperation_inlock(txn, ctx.db(), opObj, alwaysUpsert); + } } + MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "applyOps", ns); + } catch (const DBException& ex) { + ab.append(false); + result->append("applied", ++(*numApplied)); + result->append("code", ex.getCode()); + result->append("errmsg", ex.what()); + result->append("results", ab.arr()); + return Status(ErrorCodes::UnknownError, ""); } - MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "applyOps", ns); - } catch (const DBException& ex) { - ab.append(false); - result->append("applied", ++num); - result->append("code", ex.getCode()); - result->append("errmsg", ex.what()); - result->append("results", ab.arr()); - return Status(ErrorCodes::UnknownError, ""); + WriteUnitOfWork wuow(txn); + logOpForDbHash(txn, ns.c_str()); + wuow.commit(); } ab.append(status.isOK()); @@ -157,14 +181,10 @@ Status applyOps(OperationContext* txn, errors++; } - num++; - - WriteUnitOfWork wuow(txn); - logOpForDbHash(txn, ns.c_str()); - wuow.commit(); + (*numApplied)++; } - result->append("applied", num); + result->append("applied", *numApplied); result->append("results", ab.arr()); txn->setReplicatedWrites(shouldReplicateWrites); @@ -189,20 +209,25 @@ Status applyOps(OperationContext* txn, const BSONObj cmdRewritten = cmdBuilder.done(); - // We currently always logOp the command regardless of whether the individial ops - // succeeded and rely on any failures to also happen on secondaries. This isn't - // perfect, but it's what the command has always done and is part of its "correct" - // behavior. - while (true) { - try { - WriteUnitOfWork wunit(txn); - getGlobalServiceContext()->getOpObserver()->onApplyOps(txn, tempNS, cmdRewritten); - wunit.commit(); - break; - } catch (const WriteConflictException& wce) { - LOG(2) << "WriteConflictException while logging applyOps command, retrying."; - txn->recoveryUnit()->abandonSnapshot(); - continue; + if (haveWrappingWUOW) { + getGlobalServiceContext()->getOpObserver()->onApplyOps(txn, tempNS, cmdRewritten); + } else { + // When executing applyOps outside of a wrapping WriteUnitOfWOrk, always logOp the + // command regardless of whether the individial ops succeeded and rely on any failures + // to also on secondaries. This isn't perfect, but it's what the command has always done + // and is part of its "correct" behavior. + while (true) { + try { + WriteUnitOfWork wunit(txn); + getGlobalServiceContext()->getOpObserver()->onApplyOps( + txn, tempNS, cmdRewritten); + wunit.commit(); + break; + } catch (const WriteConflictException& wce) { + LOG(2) << "WriteConflictException while logging applyOps command, retrying."; + txn->recoveryUnit()->abandonSnapshot(); + continue; + } } } } @@ -214,4 +239,77 @@ Status applyOps(OperationContext* txn, return Status::OK(); } +bool preconditionOK(OperationContext* txn, const BSONObj& applyOpCmd, BSONObjBuilder* result) { + dassert(txn->lockState()->isLockHeldForMode( + ResourceId(RESOURCE_GLOBAL, ResourceId::SINGLETON_GLOBAL), MODE_X)); + + if (applyOpCmd["preCondition"].type() == Array) { + BSONObjIterator i(applyOpCmd["preCondition"].Obj()); + while (i.more()) { + BSONObj f = i.next().Obj(); + + DBDirectClient db(txn); + BSONObj realres = db.findOne(f["ns"].String(), f["q"].Obj()); + + // Apply-ops would never have a $where/$text matcher. Using the "DisallowExtensions" + // callback ensures that parsing will throw an error if $where or $text are found. + // TODO SERVER-23690: Pass the appropriate CollatorInterface* instead of nullptr. + Matcher m(f["res"].Obj(), ExtensionsCallbackDisallowExtensions(), nullptr); + if (!m.matches(realres)) { + result->append("got", realres); + result->append("whatFailed", f); + return false; + } + } + } + return true; +} +} // namespace + +Status applyOps(OperationContext* txn, + const std::string& dbName, + const BSONObj& applyOpCmd, + BSONObjBuilder* result) { + // SERVER-4328 todo : is global ok or does this take a long time? i believe multiple + // ns used so locking individually requires more analysis + ScopedTransaction scopedXact(txn, MODE_X); + Lock::GlobalWrite globalWriteLock(txn->lockState()); + + bool userInitiatedWritesAndNotPrimary = txn->writesAreReplicated() && + !repl::getGlobalReplicationCoordinator()->canAcceptWritesForDatabase(dbName); + + if (userInitiatedWritesAndNotPrimary) + return Status(ErrorCodes::NotMaster, + str::stream() << "Not primary while applying ops to database " << dbName); + + if (!preconditionOK(txn, applyOpCmd, result)) + return Status(ErrorCodes::BadValue, "pre-condition failed"); + + int numApplied = 0; + if (!canBeAtomic(applyOpCmd)) + return _applyOps(txn, dbName, applyOpCmd, result, &numApplied); + + // Perform write ops atomically + try { + MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { + WriteUnitOfWork wunit(txn); + numApplied = 0; + uassertStatusOK(_applyOps(txn, dbName, applyOpCmd, result, &numApplied)); + wunit.commit(); + } + MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "applyOps", dbName); + } catch (const DBException& ex) { + BSONArrayBuilder ab; + ++numApplied; + for (int j = 0; j < numApplied; j++) + ab.append(false); + result->append("applied", numApplied); + result->append("code", ex.getCode()); + result->append("errmsg", ex.what()); + result->append("results", ab.arr()); + return Status(ErrorCodes::UnknownError, ""); + } + + return Status::OK(); +} } // namespace mongo diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp index f76a4436860..3f078610bfd 100644 --- a/src/mongo/db/repl/oplog.cpp +++ b/src/mongo/db/repl/oplog.cpp @@ -687,6 +687,7 @@ Status applyOperation_inlock(OperationContext* txn, } Collection* collection = db->getCollection(ns); IndexCatalog* indexCatalog = collection == nullptr ? nullptr : collection->getIndexCatalog(); + const bool haveWrappingWriteUnitOfWork = txn->lockState()->inAWriteUnitOfWork(); // operation type -- see logOp() comments for types const char* opType = fieldOp.valuestrsafe(); @@ -782,12 +783,17 @@ Status applyOperation_inlock(OperationContext* txn, str::stream() << "Failed to apply insert due to missing _id: " << op.toString(), o.hasField("_id")); - // 1. Try insert first + // 1. Try insert first, if we have no wrappingWriteUnitOfWork // 2. If okay, commit - // 3. If not, do update (and commit) + // 3. If not, do upsert (and commit) // 4. If both !Ok, return status Status status{ErrorCodes::NotYetInitialized, ""}; - { + + // We cannot rely on a DuplicateKey error if we'repart of a larger transaction, because + // that would require the transaction to abort. So instead, use upsert in that case. + bool needToDoUpsert = haveWrappingWriteUnitOfWork; + + if (!needToDoUpsert) { WriteUnitOfWork wuow(txn); try { OpDebug* const nullOpDebug = nullptr; @@ -797,14 +803,14 @@ Status applyOperation_inlock(OperationContext* txn, } if (status.isOK()) { wuow.commit(); - } - } - // Now see if we need to do an update, based on duplicate _id index key - if (!status.isOK()) { - if (status.code() != ErrorCodes::DuplicateKey) { + } else if (status == ErrorCodes::DuplicateKey) { + needToDoUpsert = true; + } else { return status; } - + } + // Now see if we need to do an upsert. + if (needToDoUpsert) { // Do update on DuplicateKey errors. // This will only be on the _id field in replication, // since we disable non-_id unique constraint violations. @@ -827,6 +833,7 @@ Status applyOperation_inlock(OperationContext* txn, fassertFailedNoTrace(28750); } } + if (incrementOpsAppliedStats) { incrementOpsAppliedStats(); } @@ -917,8 +924,11 @@ Status applyOperation_inlock(OperationContext* txn, 14825, str::stream() << "error in applyOperation : unknown opType " << *opType); } - // AuthorizationManager's logOp method registers a RecoveryUnit::Change - // and to do so we need to have begun a UnitOfWork + // AuthorizationManager's logOp method registers a RecoveryUnit::Change and to do so we need + // to a new WriteUnitOfWork, if we dont have a wrapping unit of work already. If we already + // have a wrapping WUOW, the extra nexting is harmless. The logOp really should have been + // done in the WUOW that did the write, but this won't happen because applyOps turns off + // observers. WriteUnitOfWork wuow(txn); getGlobalAuthorizationManager()->logOp( txn, opType, ns.toString().c_str(), o, fieldO2.isABSONObj() ? &o2 : NULL); @@ -949,6 +959,9 @@ Status applyCommand_inlock(OperationContext* txn, const BSONObj& op) { BSONObj o = fieldO.embeddedObject(); const char* ns = fieldNs.valuestrsafe(); + if (!NamespaceString(ns).isValid()) { + return {ErrorCodes::InvalidNamespace, "invalid ns: " + std::string(ns)}; + } // Applying commands in repl is done under Global W-lock, so it is safe to not // perform the current DB checks after reacquiring the lock. |