summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--jstests/core/apply_ops1.js11
-rw-r--r--jstests/core/apply_ops2.js3
-rw-r--r--jstests/core/apply_ops_atomicity.js27
-rw-r--r--jstests/core/bypass_doc_validation.js4
-rw-r--r--jstests/noPassthroughWithMongod/apply_ops_errors.js4
-rw-r--r--src/mongo/db/catalog/apply_ops.cpp288
-rw-r--r--src/mongo/db/repl/oplog.cpp35
7 files changed, 257 insertions, 115 deletions
diff --git a/jstests/core/apply_ops1.js b/jstests/core/apply_ops1.js
index 96840f125e9..34a38185c01 100644
--- a/jstests/core/apply_ops1.js
+++ b/jstests/core/apply_ops1.js
@@ -39,20 +39,23 @@
assert.commandWorked(db.adminCommand({applyOps: [{op: 'n', ns: ''}]}),
'applyOps should work on no op operation with empty "ns" field value');
+ // Missing dbname in 'ns' field.
+ assert.commandFailed(db.adminCommand({applyOps: [{op: 'd', ns: t.getName(), o: {_id: 1}}]}));
+
// Missing 'o' field value in an operation of type 'c' (command).
- assert.commandFailed(db.adminCommand({applyOps: [{op: 'c', ns: 'foo'}]}),
+ assert.commandFailed(db.adminCommand({applyOps: [{op: 'c', ns: t.getFullName()}]}),
'applyOps should fail on command operation without "o" field');
// Non-object 'o' field value in an operation of type 'c' (command).
- assert.commandFailed(db.adminCommand({applyOps: [{op: 'c', ns: 'foo', o: 'bar'}]}),
+ assert.commandFailed(db.adminCommand({applyOps: [{op: 'c', ns: t.getFullName(), o: 'bar'}]}),
'applyOps should fail on command operation with non-object "o" field');
// Empty object 'o' field value in an operation of type 'c' (command).
- assert.commandFailed(db.adminCommand({applyOps: [{op: 'c', ns: 'foo', o: {}}]}),
+ assert.commandFailed(db.adminCommand({applyOps: [{op: 'c', ns: t.getFullName(), o: {}}]}),
'applyOps should fail on command operation with empty object "o" field');
// Unknown key in 'o' field value in an operation of type 'c' (command).
- assert.commandFailed(db.adminCommand({applyOps: [{op: 'c', ns: 'foo', o: {a: 1}}]}),
+ assert.commandFailed(db.adminCommand({applyOps: [{op: 'c', ns: t.getFullName(), o: {a: 1}}]}),
'applyOps should fail on command operation on unknown key in "o" field');
// Empty 'ns' field value in operation type other than 'n'.
diff --git a/jstests/core/apply_ops2.js b/jstests/core/apply_ops2.js
index bf804214846..4e99cee37ff 100644
--- a/jstests/core/apply_ops2.js
+++ b/jstests/core/apply_ops2.js
@@ -33,7 +33,8 @@ res = db.runCommand({
alwaysUpsert: false
});
-assert.eq(true, res.results[0], "upsert = false, existing doc update failed");
+// Because the CRUD apply-ops are atomic, all results are false, even the first one.
+assert.eq(false, res.results[0], "upsert = false, existing doc update failed");
assert.eq(false, res.results[1], "upsert = false, nonexisting doc upserted");
assert.eq(2, t.find().count(), "2 docs expected after upsert failure");
diff --git a/jstests/core/apply_ops_atomicity.js b/jstests/core/apply_ops_atomicity.js
new file mode 100644
index 00000000000..2d093555200
--- /dev/null
+++ b/jstests/core/apply_ops_atomicity.js
@@ -0,0 +1,27 @@
+// SERVER-23326: Make applyOps atomic for CRUD operations
+(function() {
+ 'use strict';
+
+ var t = db.applyOps;
+ t.drop();
+ assert.writeOK(t.insert({_id: 1}));
+
+ // Operations including commands should not be atomic, so the insert will succeed.
+ assert.commandFailed(db.adminCommand({
+ applyOps: [
+ {op: 'i', ns: t.getFullName(), o: {_id: ObjectId(), x: 1}},
+ {op: 'c', ns: "invalid", o: {create: "t"}},
+ ]
+ }));
+ assert.eq(t.count({x: 1}), 1);
+
+ // Operations only including CRUD commands should be atomic, so the next insert will fail.
+ var tooLong = Array(2000).join("hello");
+ assert.commandFailed(db.adminCommand({
+ applyOps: [
+ {op: 'i', ns: t.getFullName(), o: {_id: ObjectId(), x: 1}},
+ {op: 'i', ns: t.getFullName(), o: {_id: tooLong, x: 1}},
+ ]
+ }));
+ assert.eq(t.count({x: 1}), 1);
+})();
diff --git a/jstests/core/bypass_doc_validation.js b/jstests/core/bypass_doc_validation.js
index 79a2eb7d4a2..d9bca81ab6d 100644
--- a/jstests/core/bypass_doc_validation.js
+++ b/jstests/core/bypass_doc_validation.js
@@ -19,9 +19,9 @@
// Test applyOps with a simple insert if not on mongos.
if (!db.runCommand({isdbgrid: 1}).isdbgrid) {
var op = [{ts: Timestamp(0, 0), h: 1, v: 2, op: 'i', ns: coll.getFullName(), o: {_id: 9}}];
- // SERVER-21345: applyOps is returning UnknownError instead of DocumentValidationFailure
assert.commandFailedWithCode(
- myDb.runCommand({applyOps: op, bypassDocumentValidation: false}), 8);
+ myDb.runCommand({applyOps: op, bypassDocumentValidation: false}),
+ ErrorCodes.DocumentValidationFailure);
assert.eq(0, coll.count({_id: 9}));
assert.commandWorked(myDb.runCommand({applyOps: op, bypassDocumentValidation: true}));
assert.eq(1, coll.count({_id: 9}));
diff --git a/jstests/noPassthroughWithMongod/apply_ops_errors.js b/jstests/noPassthroughWithMongod/apply_ops_errors.js
index d2f584af787..9441d006e43 100644
--- a/jstests/noPassthroughWithMongod/apply_ops_errors.js
+++ b/jstests/noPassthroughWithMongod/apply_ops_errors.js
@@ -52,6 +52,6 @@
assert.eq(2, res.applied);
assert(res.code);
assert(res.errmsg);
- assert.eq([true, false], res.results);
+ assert.eq([false, false], res.results);
assert.eq(0, res.ok);
-})(); \ No newline at end of file
+})();
diff --git a/src/mongo/db/catalog/apply_ops.cpp b/src/mongo/db/catalog/apply_ops.cpp
index 5fd501ddab3..41582f4869d 100644
--- a/src/mongo/db/catalog/apply_ops.cpp
+++ b/src/mongo/db/catalog/apply_ops.cpp
@@ -34,6 +34,7 @@
#include "mongo/db/client.h"
#include "mongo/db/commands/dbhash.h"
+#include "mongo/db/concurrency/lock_state.h"
#include "mongo/db/concurrency/write_conflict_exception.h"
#include "mongo/db/curop.h"
#include "mongo/db/db_raii.h"
@@ -41,115 +42,138 @@
#include "mongo/db/matcher/extensions_callback_disallow_extensions.h"
#include "mongo/db/matcher/matcher.h"
#include "mongo/db/op_observer.h"
+#include "mongo/db/operation_context.h"
#include "mongo/db/repl/oplog.h"
#include "mongo/db/repl/replication_coordinator_global.h"
#include "mongo/db/service_context.h"
#include "mongo/util/log.h"
namespace mongo {
-Status applyOps(OperationContext* txn,
- const std::string& dbName,
- const BSONObj& applyOpCmd,
- BSONObjBuilder* result) {
- // SERVER-4328 todo : is global ok or does this take a long time? i believe multiple
- // ns used so locking individually requires more analysis
- ScopedTransaction scopedXact(txn, MODE_X);
- Lock::GlobalWrite globalWriteLock(txn->lockState());
+namespace {
+/**
+ * Return true iff the applyOpsCmd can be executed in a single WriteUnitOfWork.
+ */
+bool canBeAtomic(const BSONObj& applyOpCmd) {
+ for (const auto& elem : applyOpCmd.firstElement().Obj()) {
+ const char* names[] = {"ns", "op"};
+ BSONElement fields[2];
+ elem.Obj().getFields(2, names, fields);
+ BSONElement& fieldNs = fields[0];
+ BSONElement& fieldOp = fields[1];
+
+ const char* opType = fieldOp.valuestrsafe();
+ const StringData ns = fieldNs.valueStringData();
+
+ // All atomic ops have an opType of length 1.
+ if (opType[0] == '\0' || opType[1] != '\0')
+ return false;
+
+ // Only consider CRUD operations.
+ switch (*opType) {
+ case 'd':
+ case 'n':
+ case 'u':
+ break;
+ case 'i':
+ if (nsToCollectionSubstring(ns) != "system.indexes")
+ break;
+ // Fallthrough.
+ default:
+ return false;
+ }
+ }
- bool userInitiatedWritesAndNotPrimary = txn->writesAreReplicated() &&
- !repl::getGlobalReplicationCoordinator()->canAcceptWritesForDatabase(dbName);
+ return true;
+}
- if (userInitiatedWritesAndNotPrimary) {
- return Status(ErrorCodes::NotMaster,
- str::stream() << "Not primary while applying ops to database " << dbName);
- }
+Status _applyOps(OperationContext* txn,
+ const std::string& dbName,
+ const BSONObj& applyOpCmd,
+ BSONObjBuilder* result,
+ int* numApplied) {
+ dassert(txn->lockState()->isLockHeldForMode(
+ ResourceId(RESOURCE_GLOBAL, ResourceId::SINGLETON_GLOBAL), MODE_X));
bool shouldReplicateWrites = txn->writesAreReplicated();
txn->setReplicatedWrites(false);
BSONObj ops = applyOpCmd.firstElement().Obj();
- // Preconditions check reads the database state, so needs to be done locked
- if (applyOpCmd["preCondition"].type() == Array) {
- BSONObjIterator i(applyOpCmd["preCondition"].Obj());
- while (i.more()) {
- BSONObj f = i.next().Obj();
-
- DBDirectClient db(txn);
- BSONObj realres = db.findOne(f["ns"].String(), f["q"].Obj());
-
- // Apply-ops would never have a $where/$text matcher. Using the "DisallowExtensions"
- // callback ensures that parsing will throw an error if $where or $text are found.
- // TODO SERVER-23690: Pass the appropriate CollatorInterface* instead of nullptr.
- Matcher m(f["res"].Obj(), ExtensionsCallbackDisallowExtensions(), nullptr);
- if (!m.matches(realres)) {
- result->append("got", realres);
- result->append("whatFailed", f);
- txn->setReplicatedWrites(shouldReplicateWrites);
- return Status(ErrorCodes::BadValue, "pre-condition failed");
- }
- }
- }
// apply
- int num = 0;
+ *numApplied = 0;
int errors = 0;
BSONObjIterator i(ops);
BSONArrayBuilder ab;
const bool alwaysUpsert =
applyOpCmd.hasField("alwaysUpsert") ? applyOpCmd["alwaysUpsert"].trueValue() : true;
+ const bool haveWrappingWUOW = txn->lockState()->inAWriteUnitOfWork();
while (i.more()) {
BSONElement e = i.next();
- const BSONObj& temp = e.Obj();
+ const BSONObj& opObj = e.Obj();
// Ignore 'n' operations.
- const char* opType = temp["op"].valuestrsafe();
+ const char* opType = opObj["op"].valuestrsafe();
if (*opType == 'n')
continue;
- const std::string ns = temp["ns"].String();
-
- // Run operations under a nested lock as a hack to prevent yielding.
- //
- // The list of operations is supposed to be applied atomically; yielding
- // would break atomicity by allowing an interruption or a shutdown to occur
- // after only some operations are applied. We are already locked globally
- // at this point, so taking a DBLock on the namespace creates a nested lock,
- // and yields are disallowed for operations that hold a nested lock.
- //
- // We do not have a wrapping WriteUnitOfWork so it is possible for a journal
- // commit to happen with a subset of ops applied.
- // TODO figure out what to do about this.
- Lock::GlobalWrite globalWriteLockDisallowTempRelease(txn->lockState());
-
- // Ensures that yielding will not happen (see the comment above).
- DEV {
- Locker::LockSnapshot lockSnapshot;
- invariant(!txn->lockState()->saveLockStateAndUnlock(&lockSnapshot));
- };
+ const std::string ns = opObj["ns"].String();
+
+ // Need to check this here, or OldClientContext may fail an invariant.
+ if (*opType != 'c' && !NamespaceString(ns).isValid())
+ return {ErrorCodes::InvalidNamespace, "invalid ns: " + ns};
Status status(ErrorCodes::InternalError, "");
- try {
- MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
- if (*opType == 'c') {
- status = repl::applyCommand_inlock(txn, temp);
- break;
- } else {
- OldClientContext ctx(txn, ns);
+ if (haveWrappingWUOW) {
+ invariant(*opType != 'c');
- status = repl::applyOperation_inlock(txn, ctx.db(), temp, alwaysUpsert);
- break;
+ OldClientContext ctx(txn, ns);
+ status = repl::applyOperation_inlock(txn, ctx.db(), opObj, alwaysUpsert);
+ if (!status.isOK())
+ return status;
+ logOpForDbHash(txn, ns.c_str());
+ } else {
+ try {
+ // Run operations under a nested lock as a hack to prevent yielding.
+ //
+ // The list of operations is supposed to be applied atomically; yielding
+ // would break atomicity by allowing an interruption or a shutdown to occur
+ // after only some operations are applied. We are already locked globally
+ // at this point, so taking a DBLock on the namespace creates a nested lock,
+ // and yields are disallowed for operations that hold a nested lock.
+ //
+ // We do not have a wrapping WriteUnitOfWork so it is possible for a journal
+ // commit to happen with a subset of ops applied.
+ Lock::GlobalWrite globalWriteLockDisallowTempRelease(txn->lockState());
+
+ // Ensures that yielding will not happen (see the comment above).
+ DEV {
+ Locker::LockSnapshot lockSnapshot;
+ invariant(!txn->lockState()->saveLockStateAndUnlock(&lockSnapshot));
+ };
+
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
+ if (*opType == 'c') {
+ status = repl::applyCommand_inlock(txn, opObj);
+ } else {
+ OldClientContext ctx(txn, ns);
+
+ status = repl::applyOperation_inlock(txn, ctx.db(), opObj, alwaysUpsert);
+ }
}
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "applyOps", ns);
+ } catch (const DBException& ex) {
+ ab.append(false);
+ result->append("applied", ++(*numApplied));
+ result->append("code", ex.getCode());
+ result->append("errmsg", ex.what());
+ result->append("results", ab.arr());
+ return Status(ErrorCodes::UnknownError, "");
}
- MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "applyOps", ns);
- } catch (const DBException& ex) {
- ab.append(false);
- result->append("applied", ++num);
- result->append("code", ex.getCode());
- result->append("errmsg", ex.what());
- result->append("results", ab.arr());
- return Status(ErrorCodes::UnknownError, "");
+ WriteUnitOfWork wuow(txn);
+ logOpForDbHash(txn, ns.c_str());
+ wuow.commit();
}
ab.append(status.isOK());
@@ -157,14 +181,10 @@ Status applyOps(OperationContext* txn,
errors++;
}
- num++;
-
- WriteUnitOfWork wuow(txn);
- logOpForDbHash(txn, ns.c_str());
- wuow.commit();
+ (*numApplied)++;
}
- result->append("applied", num);
+ result->append("applied", *numApplied);
result->append("results", ab.arr());
txn->setReplicatedWrites(shouldReplicateWrites);
@@ -189,20 +209,25 @@ Status applyOps(OperationContext* txn,
const BSONObj cmdRewritten = cmdBuilder.done();
- // We currently always logOp the command regardless of whether the individial ops
- // succeeded and rely on any failures to also happen on secondaries. This isn't
- // perfect, but it's what the command has always done and is part of its "correct"
- // behavior.
- while (true) {
- try {
- WriteUnitOfWork wunit(txn);
- getGlobalServiceContext()->getOpObserver()->onApplyOps(txn, tempNS, cmdRewritten);
- wunit.commit();
- break;
- } catch (const WriteConflictException& wce) {
- LOG(2) << "WriteConflictException while logging applyOps command, retrying.";
- txn->recoveryUnit()->abandonSnapshot();
- continue;
+ if (haveWrappingWUOW) {
+ getGlobalServiceContext()->getOpObserver()->onApplyOps(txn, tempNS, cmdRewritten);
+ } else {
+ // When executing applyOps outside of a wrapping WriteUnitOfWOrk, always logOp the
+ // command regardless of whether the individial ops succeeded and rely on any failures
+ // to also on secondaries. This isn't perfect, but it's what the command has always done
+ // and is part of its "correct" behavior.
+ while (true) {
+ try {
+ WriteUnitOfWork wunit(txn);
+ getGlobalServiceContext()->getOpObserver()->onApplyOps(
+ txn, tempNS, cmdRewritten);
+ wunit.commit();
+ break;
+ } catch (const WriteConflictException& wce) {
+ LOG(2) << "WriteConflictException while logging applyOps command, retrying.";
+ txn->recoveryUnit()->abandonSnapshot();
+ continue;
+ }
}
}
}
@@ -214,4 +239,77 @@ Status applyOps(OperationContext* txn,
return Status::OK();
}
+bool preconditionOK(OperationContext* txn, const BSONObj& applyOpCmd, BSONObjBuilder* result) {
+ dassert(txn->lockState()->isLockHeldForMode(
+ ResourceId(RESOURCE_GLOBAL, ResourceId::SINGLETON_GLOBAL), MODE_X));
+
+ if (applyOpCmd["preCondition"].type() == Array) {
+ BSONObjIterator i(applyOpCmd["preCondition"].Obj());
+ while (i.more()) {
+ BSONObj f = i.next().Obj();
+
+ DBDirectClient db(txn);
+ BSONObj realres = db.findOne(f["ns"].String(), f["q"].Obj());
+
+ // Apply-ops would never have a $where/$text matcher. Using the "DisallowExtensions"
+ // callback ensures that parsing will throw an error if $where or $text are found.
+ // TODO SERVER-23690: Pass the appropriate CollatorInterface* instead of nullptr.
+ Matcher m(f["res"].Obj(), ExtensionsCallbackDisallowExtensions(), nullptr);
+ if (!m.matches(realres)) {
+ result->append("got", realres);
+ result->append("whatFailed", f);
+ return false;
+ }
+ }
+ }
+ return true;
+}
+} // namespace
+
+Status applyOps(OperationContext* txn,
+ const std::string& dbName,
+ const BSONObj& applyOpCmd,
+ BSONObjBuilder* result) {
+ // SERVER-4328 todo : is global ok or does this take a long time? i believe multiple
+ // ns used so locking individually requires more analysis
+ ScopedTransaction scopedXact(txn, MODE_X);
+ Lock::GlobalWrite globalWriteLock(txn->lockState());
+
+ bool userInitiatedWritesAndNotPrimary = txn->writesAreReplicated() &&
+ !repl::getGlobalReplicationCoordinator()->canAcceptWritesForDatabase(dbName);
+
+ if (userInitiatedWritesAndNotPrimary)
+ return Status(ErrorCodes::NotMaster,
+ str::stream() << "Not primary while applying ops to database " << dbName);
+
+ if (!preconditionOK(txn, applyOpCmd, result))
+ return Status(ErrorCodes::BadValue, "pre-condition failed");
+
+ int numApplied = 0;
+ if (!canBeAtomic(applyOpCmd))
+ return _applyOps(txn, dbName, applyOpCmd, result, &numApplied);
+
+ // Perform write ops atomically
+ try {
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
+ WriteUnitOfWork wunit(txn);
+ numApplied = 0;
+ uassertStatusOK(_applyOps(txn, dbName, applyOpCmd, result, &numApplied));
+ wunit.commit();
+ }
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "applyOps", dbName);
+ } catch (const DBException& ex) {
+ BSONArrayBuilder ab;
+ ++numApplied;
+ for (int j = 0; j < numApplied; j++)
+ ab.append(false);
+ result->append("applied", numApplied);
+ result->append("code", ex.getCode());
+ result->append("errmsg", ex.what());
+ result->append("results", ab.arr());
+ return Status(ErrorCodes::UnknownError, "");
+ }
+
+ return Status::OK();
+}
} // namespace mongo
diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp
index f76a4436860..3f078610bfd 100644
--- a/src/mongo/db/repl/oplog.cpp
+++ b/src/mongo/db/repl/oplog.cpp
@@ -687,6 +687,7 @@ Status applyOperation_inlock(OperationContext* txn,
}
Collection* collection = db->getCollection(ns);
IndexCatalog* indexCatalog = collection == nullptr ? nullptr : collection->getIndexCatalog();
+ const bool haveWrappingWriteUnitOfWork = txn->lockState()->inAWriteUnitOfWork();
// operation type -- see logOp() comments for types
const char* opType = fieldOp.valuestrsafe();
@@ -782,12 +783,17 @@ Status applyOperation_inlock(OperationContext* txn,
str::stream() << "Failed to apply insert due to missing _id: " << op.toString(),
o.hasField("_id"));
- // 1. Try insert first
+ // 1. Try insert first, if we have no wrappingWriteUnitOfWork
// 2. If okay, commit
- // 3. If not, do update (and commit)
+ // 3. If not, do upsert (and commit)
// 4. If both !Ok, return status
Status status{ErrorCodes::NotYetInitialized, ""};
- {
+
+ // We cannot rely on a DuplicateKey error if we'repart of a larger transaction, because
+ // that would require the transaction to abort. So instead, use upsert in that case.
+ bool needToDoUpsert = haveWrappingWriteUnitOfWork;
+
+ if (!needToDoUpsert) {
WriteUnitOfWork wuow(txn);
try {
OpDebug* const nullOpDebug = nullptr;
@@ -797,14 +803,14 @@ Status applyOperation_inlock(OperationContext* txn,
}
if (status.isOK()) {
wuow.commit();
- }
- }
- // Now see if we need to do an update, based on duplicate _id index key
- if (!status.isOK()) {
- if (status.code() != ErrorCodes::DuplicateKey) {
+ } else if (status == ErrorCodes::DuplicateKey) {
+ needToDoUpsert = true;
+ } else {
return status;
}
-
+ }
+ // Now see if we need to do an upsert.
+ if (needToDoUpsert) {
// Do update on DuplicateKey errors.
// This will only be on the _id field in replication,
// since we disable non-_id unique constraint violations.
@@ -827,6 +833,7 @@ Status applyOperation_inlock(OperationContext* txn,
fassertFailedNoTrace(28750);
}
}
+
if (incrementOpsAppliedStats) {
incrementOpsAppliedStats();
}
@@ -917,8 +924,11 @@ Status applyOperation_inlock(OperationContext* txn,
14825, str::stream() << "error in applyOperation : unknown opType " << *opType);
}
- // AuthorizationManager's logOp method registers a RecoveryUnit::Change
- // and to do so we need to have begun a UnitOfWork
+ // AuthorizationManager's logOp method registers a RecoveryUnit::Change and to do so we need
+ // to a new WriteUnitOfWork, if we dont have a wrapping unit of work already. If we already
+ // have a wrapping WUOW, the extra nexting is harmless. The logOp really should have been
+ // done in the WUOW that did the write, but this won't happen because applyOps turns off
+ // observers.
WriteUnitOfWork wuow(txn);
getGlobalAuthorizationManager()->logOp(
txn, opType, ns.toString().c_str(), o, fieldO2.isABSONObj() ? &o2 : NULL);
@@ -949,6 +959,9 @@ Status applyCommand_inlock(OperationContext* txn, const BSONObj& op) {
BSONObj o = fieldO.embeddedObject();
const char* ns = fieldNs.valuestrsafe();
+ if (!NamespaceString(ns).isValid()) {
+ return {ErrorCodes::InvalidNamespace, "invalid ns: " + std::string(ns)};
+ }
// Applying commands in repl is done under Global W-lock, so it is safe to not
// perform the current DB checks after reacquiring the lock.