diff options
-rw-r--r-- | jstests/core/txns/currentop_blocked_operations.js | 83 | ||||
-rw-r--r-- | jstests/core/txns/write_conflicts_with_non_txns.js | 3 | ||||
-rw-r--r-- | src/mongo/db/curop.cpp | 59 | ||||
-rw-r--r-- | src/mongo/db/curop.h | 28 | ||||
-rw-r--r-- | src/mongo/db/curop_metrics.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/curop_test.cpp | 68 |
6 files changed, 195 insertions, 50 deletions
diff --git a/jstests/core/txns/currentop_blocked_operations.js b/jstests/core/txns/currentop_blocked_operations.js new file mode 100644 index 00000000000..8e51334bdff --- /dev/null +++ b/jstests/core/txns/currentop_blocked_operations.js @@ -0,0 +1,83 @@ +/** + * Tests that currentOp reports debug information for operations that are blocked on transactions. + * + * @tags: [uses_transactions, uses_prepare_transaction] + */ +(function() { + "use strict"; + load("jstests/core/txns/libs/prepare_helpers.js"); + + const dbName = "test"; + const collName = "currentop_blocked_operations"; + const testDB = db.getSiblingDB(dbName); + const testColl = testDB.getCollection(collName); + + testColl.drop({writeConcern: {w: "majority"}}); + assert.commandWorked(testDB.runCommand({create: collName, writeConcern: {w: "majority"}})); + + const session = db.getMongo().startSession(); + const sessionDB = session.getDatabase(dbName); + const sessionColl = sessionDB.getCollection(collName); + + // Returns when the operation matching the 'matchExpr' is blocked, as evaluated by the + // 'isBlockedFunc'. + let waitForBlockedOp = function(matchExpr, isBlockedFunc) { + assert.soon(function() { + let cursor = + db.getSiblingDB("admin").aggregate([{$currentOp: {}}, {$match: matchExpr}]); + if (cursor.hasNext()) { + let op = cursor.next(); + printjson(op); + return isBlockedFunc(op); + } + return false; + }); + }; + + // This transaction will block conflicting non-transactional operations. + session.startTransaction(); + assert.commandWorked(sessionColl.insert({_id: 2222})); + + // This insert operation will encounter a WriteConflictException due to the unique key + // violation. It will block in an infinite write conflict loop until the transaction completes. + TestData.dbName = dbName; + TestData.collName = collName; + let awaitInsert = startParallelShell(function() { + let coll = db.getSiblingDB(TestData.dbName).getCollection(TestData.collName); + assert.commandWorked(coll.insert({_id: 2222, x: 0})); + }); + + // Wait for the counter to reach a high enough number to confirm the operation is retrying + // constantly. + waitForBlockedOp({"command.insert": collName}, function(op) { + return op.writeConflicts > 20; + }); + + assert.commandWorked(session.abortTransaction_forTesting()); + awaitInsert(); + assert.eq(1, testColl.find({_id: 2222, x: 0}).itcount()); + + // This prepared transaction will block conflicting non-transactional operations. + session.startTransaction(); + assert.commandWorked(sessionColl.update({_id: 2222}, {$set: {x: 1}})); + PrepareHelpers.prepareTransaction(session); + + // This update operation will encounter a prepare conflict due to the prepared transaction's + // modification to the same document. It will block without retrying until the prepared + // transaction completes. + TestData.dbName = dbName; + TestData.collName = collName; + let awaitUpdate = startParallelShell(function() { + let coll = db.getSiblingDB(TestData.dbName).getCollection(TestData.collName); + assert.commandWorked(coll.update({_id: 2222}, {$set: {x: 999}})); + }); + + // Expect at least one prepare conflict. + waitForBlockedOp({ns: testColl.getFullName(), op: "update"}, function(op) { + return op.prepareReadConflicts > 0; + }); + + assert.commandWorked(session.abortTransaction_forTesting()); + awaitUpdate(); + assert.eq(1, testColl.find({_id: 2222, x: 999}).itcount()); +})(); diff --git a/jstests/core/txns/write_conflicts_with_non_txns.js b/jstests/core/txns/write_conflicts_with_non_txns.js index 5d7c1cdd589..c198c85084c 100644 --- a/jstests/core/txns/write_conflicts_with_non_txns.js +++ b/jstests/core/txns/write_conflicts_with_non_txns.js @@ -52,7 +52,8 @@ // Returns true if a single document insert has started running on the server. function writeStarted() { return testDB.currentOp().inprog.some(op => { - return op.active && (op.ns === testColl.getFullName()) && (op.op === "insert"); + return op.active && (op.ns === testColl.getFullName()) && (op.op === "insert") && + (op.writeConflicts > 0); }); } diff --git a/src/mongo/db/curop.cpp b/src/mongo/db/curop.cpp index 2111f168edf..cf6b63648e6 100644 --- a/src/mongo/db/curop.cpp +++ b/src/mongo/db/curop.cpp @@ -575,6 +575,13 @@ void CurOp::reportState(BSONObjBuilder* builder, bool truncateOps) { } } + if (auto n = _debug.additiveMetrics.prepareReadConflicts.load(); n > 0) { + builder->append("prepareReadConflicts", n); + } + if (auto n = _debug.additiveMetrics.writeConflicts.load(); n > 0) { + builder->append("writeConflicts", n); + } + builder->append("numYields", _numYields); } @@ -595,6 +602,9 @@ StringData getProtoString(int op) { #define OPDEBUG_TOSTRING_HELP_BOOL(x) \ if (x) \ s << " " #x ":" << (x) +#define OPDEBUG_TOSTRING_HELP_ATOMIC(x, y) \ + if (auto __y = y.load(); __y > 0) \ + s << " " x ":" << (__y) #define OPDEBUG_TOSTRING_HELP_OPTIONAL(x, y) \ if (y) \ s << " " x ":" << (*y) @@ -672,8 +682,8 @@ string OpDebug::report(Client* client, OPDEBUG_TOSTRING_HELP_OPTIONAL("keysInserted", additiveMetrics.keysInserted); OPDEBUG_TOSTRING_HELP_OPTIONAL("keysDeleted", additiveMetrics.keysDeleted); - OPDEBUG_TOSTRING_HELP_OPTIONAL("prepareReadConflicts", additiveMetrics.prepareReadConflicts); - OPDEBUG_TOSTRING_HELP_OPTIONAL("writeConflicts", additiveMetrics.writeConflicts); + OPDEBUG_TOSTRING_HELP_ATOMIC("prepareReadConflicts", additiveMetrics.prepareReadConflicts); + OPDEBUG_TOSTRING_HELP_ATOMIC("writeConflicts", additiveMetrics.writeConflicts); s << " numYields:" << curop.numYields(); OPDEBUG_TOSTRING_HELP(nreturned); @@ -727,6 +737,9 @@ string OpDebug::report(Client* client, #define OPDEBUG_APPEND_BOOL(x) \ if (x) \ b.appendBool(#x, (x)) +#define OPDEBUG_APPEND_ATOMIC(x, y) \ + if (auto __y = y.load(); __y > 0) \ + b.appendNumber(x, __y) #define OPDEBUG_APPEND_OPTIONAL(x, y) \ if (y) \ b.appendNumber(x, (*y)) @@ -769,8 +782,8 @@ void OpDebug::append(const CurOp& curop, OPDEBUG_APPEND_OPTIONAL("keysInserted", additiveMetrics.keysInserted); OPDEBUG_APPEND_OPTIONAL("keysDeleted", additiveMetrics.keysDeleted); - OPDEBUG_APPEND_OPTIONAL("prepareReadConflicts", additiveMetrics.prepareReadConflicts); - OPDEBUG_APPEND_OPTIONAL("writeConflicts", additiveMetrics.writeConflicts); + OPDEBUG_APPEND_ATOMIC("prepareReadConflicts", additiveMetrics.prepareReadConflicts); + OPDEBUG_APPEND_ATOMIC("writeConflicts", additiveMetrics.writeConflicts); b.appendNumber("numYield", curop.numYields()); OPDEBUG_APPEND_NUMBER(nreturned); @@ -872,25 +885,34 @@ void OpDebug::AdditiveMetrics::add(const AdditiveMetrics& otherMetrics) { ndeleted = addOptionalLongs(ndeleted, otherMetrics.ndeleted); keysInserted = addOptionalLongs(keysInserted, otherMetrics.keysInserted); keysDeleted = addOptionalLongs(keysDeleted, otherMetrics.keysDeleted); - prepareReadConflicts = - addOptionalLongs(prepareReadConflicts, otherMetrics.prepareReadConflicts); - writeConflicts = addOptionalLongs(writeConflicts, otherMetrics.writeConflicts); + prepareReadConflicts.fetchAndAdd(otherMetrics.prepareReadConflicts.load()); + writeConflicts.fetchAndAdd(otherMetrics.writeConflicts.load()); } -bool OpDebug::AdditiveMetrics::equals(const AdditiveMetrics& otherMetrics) { +void OpDebug::AdditiveMetrics::reset() { + keysExamined = boost::none; + docsExamined = boost::none; + nMatched = boost::none; + nModified = boost::none; + ninserted = boost::none; + ndeleted = boost::none; + keysInserted = boost::none; + keysDeleted = boost::none; + prepareReadConflicts.store(0); + writeConflicts.store(0); +} + +bool OpDebug::AdditiveMetrics::equals(const AdditiveMetrics& otherMetrics) const { return keysExamined == otherMetrics.keysExamined && docsExamined == otherMetrics.docsExamined && nMatched == otherMetrics.nMatched && nModified == otherMetrics.nModified && ninserted == otherMetrics.ninserted && ndeleted == otherMetrics.ndeleted && keysInserted == otherMetrics.keysInserted && keysDeleted == otherMetrics.keysDeleted && - prepareReadConflicts == otherMetrics.prepareReadConflicts && - writeConflicts == otherMetrics.writeConflicts; + prepareReadConflicts.load() == otherMetrics.prepareReadConflicts.load() && + writeConflicts.load() == otherMetrics.writeConflicts.load(); } void OpDebug::AdditiveMetrics::incrementWriteConflicts(long long n) { - if (!writeConflicts) { - writeConflicts = 0; - } - *writeConflicts += n; + writeConflicts.fetchAndAdd(n); } void OpDebug::AdditiveMetrics::incrementKeysInserted(long long n) { @@ -915,10 +937,7 @@ void OpDebug::AdditiveMetrics::incrementNinserted(long long n) { } void OpDebug::AdditiveMetrics::incrementPrepareReadConflicts(long long n) { - if (!prepareReadConflicts) { - prepareReadConflicts = 0; - } - *prepareReadConflicts += n; + prepareReadConflicts.fetchAndAdd(n); } string OpDebug::AdditiveMetrics::report() const { @@ -932,8 +951,8 @@ string OpDebug::AdditiveMetrics::report() const { OPDEBUG_TOSTRING_HELP_OPTIONAL("ndeleted", ndeleted); OPDEBUG_TOSTRING_HELP_OPTIONAL("keysInserted", keysInserted); OPDEBUG_TOSTRING_HELP_OPTIONAL("keysDeleted", keysDeleted); - OPDEBUG_TOSTRING_HELP_OPTIONAL("prepareReadConflicts", prepareReadConflicts); - OPDEBUG_TOSTRING_HELP_OPTIONAL("writeConflicts", writeConflicts); + OPDEBUG_TOSTRING_HELP_ATOMIC("prepareReadConflicts", prepareReadConflicts); + OPDEBUG_TOSTRING_HELP_ATOMIC("writeConflicts", writeConflicts); return s.str(); } diff --git a/src/mongo/db/curop.h b/src/mongo/db/curop.h index a2466ffeeb2..ba1438f92a5 100644 --- a/src/mongo/db/curop.h +++ b/src/mongo/db/curop.h @@ -55,6 +55,17 @@ public: */ class AdditiveMetrics { public: + AdditiveMetrics() = default; + AdditiveMetrics(const AdditiveMetrics& other) { + this->add(other); + } + + AdditiveMetrics& operator=(const AdditiveMetrics& other) { + reset(); + add(other); + return *this; + } + /** * Adds all the fields of another AdditiveMetrics object together with the fields of this * AdditiveMetrics instance. @@ -62,10 +73,15 @@ public: void add(const AdditiveMetrics& otherMetrics); /** + * Resets all members to the default state. + */ + void reset(); + + /** * Returns true if the AdditiveMetrics object we are comparing has the same field values as * this AdditiveMetrics instance. */ - bool equals(const AdditiveMetrics& otherMetrics); + bool equals(const AdditiveMetrics& otherMetrics) const; /** * Increments writeConflicts by n. @@ -113,9 +129,15 @@ public: boost::optional<long long> keysInserted; // Number of index keys removed. boost::optional<long long> keysDeleted; + + // The following fields are atomic because they are reported by CurrentOp. This is an + // exception to the prescription that OpDebug only be used by the owning thread because + // these metrics are tracked over the course of a transaction by SingleTransactionStats, + // which is built on OpDebug. + // Number of read conflicts caused by a prepared transaction. - boost::optional<long long> prepareReadConflicts; - boost::optional<long long> writeConflicts; + AtomicWord<long long> prepareReadConflicts{0}; + AtomicWord<long long> writeConflicts{0}; }; OpDebug() = default; diff --git a/src/mongo/db/curop_metrics.cpp b/src/mongo/db/curop_metrics.cpp index a625a9bd2f1..0bc9a23f387 100644 --- a/src/mongo/db/curop_metrics.cpp +++ b/src/mongo/db/curop_metrics.cpp @@ -78,8 +78,8 @@ void recordCurOpMetrics(OperationContext* opCtx) { if (debug.hasSortStage) scanAndOrderCounter.increment(); - if (debug.additiveMetrics.writeConflicts) - writeConflictsCounter.increment(*debug.additiveMetrics.writeConflicts); + if (auto n = debug.additiveMetrics.writeConflicts.load(); n > 0) + writeConflictsCounter.increment(n); } } // namespace mongo diff --git a/src/mongo/db/curop_test.cpp b/src/mongo/db/curop_test.cpp index b472c71e4de..abda927f3c8 100644 --- a/src/mongo/db/curop_test.cpp +++ b/src/mongo/db/curop_test.cpp @@ -39,6 +39,22 @@ namespace mongo { namespace { +TEST(CurOpTest, CopyConstructors) { + OpDebug::AdditiveMetrics a, b; + a.keysExamined = 1; + b.keysExamined = 2; + a.prepareReadConflicts.store(1); + b.prepareReadConflicts.store(2); + // Test copy constructor. + OpDebug::AdditiveMetrics c = a; + ASSERT_EQ(a.keysExamined, c.keysExamined); + ASSERT_EQ(a.prepareReadConflicts.load(), c.prepareReadConflicts.load()); + // Test copy assignment. + a = b; + ASSERT_EQ(a.keysExamined, b.keysExamined); + ASSERT_EQ(a.prepareReadConflicts.load(), b.prepareReadConflicts.load()); +} + TEST(CurOpTest, AddingAdditiveMetricsObjectsTogetherShouldAddFieldsTogether) { OpDebug::AdditiveMetrics currentAdditiveMetrics = OpDebug::AdditiveMetrics(); OpDebug::AdditiveMetrics additiveMetricsToAdd = OpDebug::AdditiveMetrics(); @@ -60,13 +76,14 @@ TEST(CurOpTest, AddingAdditiveMetricsObjectsTogetherShouldAddFieldsTogether) { additiveMetricsToAdd.keysInserted = 5; currentAdditiveMetrics.keysDeleted = 4; additiveMetricsToAdd.keysDeleted = 2; - currentAdditiveMetrics.prepareReadConflicts = 1; - additiveMetricsToAdd.prepareReadConflicts = 5; - currentAdditiveMetrics.writeConflicts = 7; - additiveMetricsToAdd.writeConflicts = 0; + currentAdditiveMetrics.prepareReadConflicts.store(1); + additiveMetricsToAdd.prepareReadConflicts.store(5); + currentAdditiveMetrics.writeConflicts.store(7); + additiveMetricsToAdd.writeConflicts.store(0); // Save the current AdditiveMetrics object before adding. - OpDebug::AdditiveMetrics additiveMetricsBeforeAdd = currentAdditiveMetrics; + OpDebug::AdditiveMetrics additiveMetricsBeforeAdd; + additiveMetricsBeforeAdd.add(currentAdditiveMetrics); currentAdditiveMetrics.add(additiveMetricsToAdd); // The following field values should have changed after adding. @@ -86,11 +103,12 @@ TEST(CurOpTest, AddingAdditiveMetricsObjectsTogetherShouldAddFieldsTogether) { *additiveMetricsBeforeAdd.keysInserted + *additiveMetricsToAdd.keysInserted); ASSERT_EQ(*currentAdditiveMetrics.keysDeleted, *additiveMetricsBeforeAdd.keysDeleted + *additiveMetricsToAdd.keysDeleted); - ASSERT_EQ(*currentAdditiveMetrics.prepareReadConflicts, - *additiveMetricsBeforeAdd.prepareReadConflicts + - *additiveMetricsToAdd.prepareReadConflicts); - ASSERT_EQ(*currentAdditiveMetrics.writeConflicts, - *additiveMetricsBeforeAdd.writeConflicts + *additiveMetricsToAdd.writeConflicts); + ASSERT_EQ(currentAdditiveMetrics.prepareReadConflicts.load(), + additiveMetricsBeforeAdd.prepareReadConflicts.load() + + additiveMetricsToAdd.prepareReadConflicts.load()); + ASSERT_EQ(currentAdditiveMetrics.writeConflicts.load(), + additiveMetricsBeforeAdd.writeConflicts.load() + + additiveMetricsToAdd.writeConflicts.load()); } TEST(CurOpTest, AddingUninitializedAdditiveMetricsFieldsShouldBeTreatedAsZero) { @@ -106,13 +124,14 @@ TEST(CurOpTest, AddingUninitializedAdditiveMetricsFieldsShouldBeTreatedAsZero) { additiveMetricsToAdd.keysInserted = 5; currentAdditiveMetrics.keysDeleted = 4; additiveMetricsToAdd.keysDeleted = 2; - currentAdditiveMetrics.prepareReadConflicts = 1; - additiveMetricsToAdd.prepareReadConflicts = 5; - currentAdditiveMetrics.writeConflicts = 7; - additiveMetricsToAdd.writeConflicts = 0; + currentAdditiveMetrics.prepareReadConflicts.store(1); + additiveMetricsToAdd.prepareReadConflicts.store(5); + currentAdditiveMetrics.writeConflicts.store(7); + additiveMetricsToAdd.writeConflicts.store(0); // Save the current AdditiveMetrics object before adding. - OpDebug::AdditiveMetrics additiveMetricsBeforeAdd = currentAdditiveMetrics; + OpDebug::AdditiveMetrics additiveMetricsBeforeAdd; + additiveMetricsBeforeAdd.add(currentAdditiveMetrics); currentAdditiveMetrics.add(additiveMetricsToAdd); // The 'keysExamined' field for the current AdditiveMetrics object was not initialized, so it @@ -132,20 +151,21 @@ TEST(CurOpTest, AddingUninitializedAdditiveMetricsFieldsShouldBeTreatedAsZero) { *additiveMetricsBeforeAdd.keysInserted + *additiveMetricsToAdd.keysInserted); ASSERT_EQ(*currentAdditiveMetrics.keysDeleted, *additiveMetricsBeforeAdd.keysDeleted + *additiveMetricsToAdd.keysDeleted); - ASSERT_EQ(*currentAdditiveMetrics.prepareReadConflicts, - *additiveMetricsBeforeAdd.prepareReadConflicts + - *additiveMetricsToAdd.prepareReadConflicts); - ASSERT_EQ(*currentAdditiveMetrics.writeConflicts, - *additiveMetricsBeforeAdd.writeConflicts + *additiveMetricsToAdd.writeConflicts); + ASSERT_EQ(currentAdditiveMetrics.prepareReadConflicts.load(), + additiveMetricsBeforeAdd.prepareReadConflicts.load() + + additiveMetricsToAdd.prepareReadConflicts.load()); + ASSERT_EQ(currentAdditiveMetrics.writeConflicts.load(), + additiveMetricsBeforeAdd.writeConflicts.load() + + additiveMetricsToAdd.writeConflicts.load()); } TEST(CurOpTest, AdditiveMetricsFieldsShouldIncrementByN) { OpDebug::AdditiveMetrics additiveMetrics = OpDebug::AdditiveMetrics(); // Initialize field values. - additiveMetrics.writeConflicts = 1; + additiveMetrics.writeConflicts.store(1); additiveMetrics.keysInserted = 2; - additiveMetrics.prepareReadConflicts = 6; + additiveMetrics.prepareReadConflicts.store(6); // Increment the fields. additiveMetrics.incrementWriteConflicts(1); @@ -154,11 +174,11 @@ TEST(CurOpTest, AdditiveMetricsFieldsShouldIncrementByN) { additiveMetrics.incrementNinserted(3); additiveMetrics.incrementPrepareReadConflicts(2); - ASSERT_EQ(*additiveMetrics.writeConflicts, 2); + ASSERT_EQ(additiveMetrics.writeConflicts.load(), 2); ASSERT_EQ(*additiveMetrics.keysInserted, 7); ASSERT_EQ(*additiveMetrics.keysDeleted, 0); ASSERT_EQ(*additiveMetrics.ninserted, 3); - ASSERT_EQ(*additiveMetrics.prepareReadConflicts, 8); + ASSERT_EQ(additiveMetrics.prepareReadConflicts.load(), 8); } TEST(CurOpTest, OptionalAdditiveMetricsNotDisplayedIfUninitialized) { |