diff options
Diffstat (limited to 'jstests/libs')
-rw-r--r-- | jstests/libs/auto_retry_transaction_in_sharding.js | 97 | ||||
-rw-r--r-- | jstests/libs/change_stream_util.js | 139 |
2 files changed, 195 insertions, 41 deletions
diff --git a/jstests/libs/auto_retry_transaction_in_sharding.js b/jstests/libs/auto_retry_transaction_in_sharding.js new file mode 100644 index 00000000000..f81a366970a --- /dev/null +++ b/jstests/libs/auto_retry_transaction_in_sharding.js @@ -0,0 +1,97 @@ +'use strict'; + +load('jstests/concurrency/fsm_workload_helpers/auto_retry_transaction.js'); +Random.setRandomSeed(); + +var { + withTxnAndAutoRetryOnMongos, + retryOnceOnTransientOnMongos, + retryOnceOnTransientAndRestartTxnOnMongos +} = (() => { + /** + * Runs 'func' inside of a transaction started with 'txnOptions', and automatically retries + * until it either succeeds or the server returns a non-TransientTransactionError error + * response. + * + * The caller should take care to ensure 'func' doesn't modify any captured variables in a + * speculative fashion where calling it multiple times would lead to unintended behavior. The + * transaction started by the withTxnAndAutoRetryOnMongos() function is only known to have + * committed after the withTxnAndAutoRetryOnMongos() function returns. + * + * This behaviour only applies if the client is a mongos + * + * TODO SERVER-39704: Once completed, the usages of this function should be revisited to + * determine whether it is still necessary or the retries performed by MongoS make it + * unnecessary + */ + function withTxnAndAutoRetryOnMongos(session, func, txnOptions) { + if (session.getClient().isMongos()) { + withTxnAndAutoRetry(session, func, {txnOptions}); + } else { + session.startTransaction(txnOptions); + func(); + assert.commandWorked(session.commitTransaction_forTesting()); + } + } + + /** + * Runs 'func' and retries it only once if a transient error occurred. + * + * This behaviour only applies if the client is a mongos + * + * TODO SERVER-39704: Once completed, the usages of this function should be revisited to + * determine whether it is still necessary or the retries performed by MongoS make it + * unnecessary + */ + function retryOnceOnTransientOnMongos(session, func) { + if (session.getClient().isMongos()) { + try { + func(); + } catch (e) { + if ((e.hasOwnProperty('errorLabels') && + e.errorLabels.includes('TransientTransactionError'))) { + func(); + } else { + throw e; + } + } + } else { + func(); + } + } + + /** + * Runs 'func' and retries it only once restarting the transaction if a transient + * error occurred. + * + * This behaviour only applies if the client is a mongos + * + * TODO SERVER-39704: Once completed, the usages of this function should be revisited to + * determine whether it is still necessary or the retries performed by MongoS make it + * unnecessary + */ + function retryOnceOnTransientAndRestartTxnOnMongos(session, func, txnOptions) { + if (session.getClient().isMongos()) { + try { + func(); + } catch (e) { + if ((e.hasOwnProperty('errorLabels') && + e.errorLabels.includes('TransientTransactionError'))) { + session.abortTransaction_forTesting(); + session.startTransaction(txnOptions); + func(); + } else { + throw e; + } + } + } else { + func(); + } + } + + return { + withTxnAndAutoRetryOnMongos, + retryOnceOnTransientOnMongos, + retryOnceOnTransientAndRestartTxnOnMongos + }; +})();
\ No newline at end of file diff --git a/jstests/libs/change_stream_util.js b/jstests/libs/change_stream_util.js index c505e47f39f..7431a64627c 100644 --- a/jstests/libs/change_stream_util.js +++ b/jstests/libs/change_stream_util.js @@ -59,27 +59,28 @@ function assertInvalidateOp({cursor, opType}) { return null; } +function pruneOptionalFields(event, expected) { + if (!expected.hasOwnProperty("_id")) + delete event._id; + + if (!expected.hasOwnProperty("clusterTime")) + delete event.clusterTime; + + if (!expected.hasOwnProperty("txnNumber")) + delete event.txnNumber; + + if (!expected.hasOwnProperty("lsid")) + delete event.lsid; + + return event; +} /** * Helper to check whether a change stream event matches the given expected event. Ignores the * resume token and clusterTime unless they are explicitly listed in the expectedEvent. */ function assertChangeStreamEventEq(actualEvent, expectedEvent) { - const testEvent = Object.assign({}, actualEvent); - if (!expectedEvent.hasOwnProperty("_id")) { - delete testEvent._id; // Remove the resume token, if present. - } - if (!expectedEvent.hasOwnProperty("clusterTime")) { - delete testEvent.clusterTime; // Remove the cluster time, if present. - } + const testEvent = pruneOptionalFields(Object.assign({}, actualEvent), expectedEvent); - // The change stream transaction passthrough causes operations to have txnNumber and lsid - // values that the test doesn't expect, which can cause comparisons to fail. - if (!expectedEvent.hasOwnProperty("txnNumber")) { - delete testEvent.txnNumber; // Remove the txnNumber, if present. - } - if (!expectedEvent.hasOwnProperty("lsid")) { - delete testEvent.lsid; // Remove the lsid, if present. - } assert.docEq(testEvent, expectedEvent, "Change did not match expected change. Expected change: " + tojson(expectedEvent) + @@ -170,6 +171,34 @@ function ChangeStreamTest(_db, name = "ChangeStreamTest") { return nextBatch[0]; } + self.getNextChanges = function(cursor, numChanges, skipFirst) { + let changes = []; + + for (let i = 0; i < numChanges; i++) { + // Since the first change may be on the original cursor, we need to check for that + // change on the cursor before we move the cursor forward. + if (i === 0 && !skipFirst) { + changes[0] = getNextDocFromCursor(cursor); + if (changes[0]) { + continue; + } + } + + assert.soon( + () => { + cursor = self.getNextBatch(cursor); + changes[i] = getNextDocFromCursor(cursor); + return changes[i] !== null; + }, + () => { + return "timed out waiting for another result from the change stream, observed changes: " + + tojson(changes) + ", expected changes: " + numChanges; + }); + } + + return changes; + }; + /** * Checks if the change has been invalidated. */ @@ -205,8 +234,14 @@ function ChangeStreamTest(_db, name = "ChangeStreamTest") { * * Returns a list of the changes seen. */ - self.assertNextChangesEqual = function( - {cursor, expectedChanges, expectedNumChanges, expectInvalidate, skipFirstBatch}) { + self.assertNextChangesEqual = function({ + cursor, + expectedChanges, + expectedNumChanges, + expectInvalidate, + skipFirstBatch, + ignoreOrder + }) { expectInvalidate = expectInvalidate || false; skipFirstBatch = skipFirstBatch || false; @@ -230,25 +265,23 @@ function ChangeStreamTest(_db, name = "ChangeStreamTest") { expectedNumChanges = expectedChanges.length; } - let changes = []; - for (let i = 0; i < expectedNumChanges; i++) { - // Since the first change may be on the original cursor, we need to check for that - // change on the cursor before we move the cursor forward. - if (i === 0 && !skipFirstBatch) { - changes[0] = getNextDocFromCursor(cursor); - if (changes[0]) { - assertChangeIsExpected(expectedChanges, 0, changes, expectInvalidate); - continue; - } + let changes = self.getNextChanges(cursor, expectedNumChanges, skipFirstBatch); + if (ignoreOrder) { + const errMsgFunc = () => `${tojson(changes)} != ${tojson(expectedChanges)}`; + assert.eq(changes.length, expectedNumChanges, errMsgFunc); + for (let i = 0; i < changes.length; i++) { + assert(expectedChanges.some(expectedChange => { + return _convertExceptionToReturnStatus(() => { + assertChangeStreamEventEq(changes[i], expectedChange); + return true; + })(); + }), + errMsgFunc); + } + } else { + for (let i = 0; i < changes.length; i++) { + assertChangeIsExpected(expectedChanges, i, changes, expectInvalidate); } - - assert.soon(function() { - // We need to replace the cursor variable so we return the correct cursor. - cursor = self.getNextBatch(cursor); - changes[i] = getNextDocFromCursor(cursor); - return changes[i] !== null; - }, "timed out waiting for another result from the change stream"); - assertChangeIsExpected(expectedChanges, i, changes, expectInvalidate); } // If we expect invalidation, the final change should have operation type "invalidate". @@ -265,6 +298,25 @@ function ChangeStreamTest(_db, name = "ChangeStreamTest") { }; /** + * Iterates through the change stream and asserts that the next changes are the expected ones. + * The order of the change events from the cursor relative to their order in the list of + * expected changes is ignored, however. + * + * Returns a list of the changes seen. + */ + self.assertNextChangesEqualUnordered = function( + {cursor, expectedChanges, expectedNumChanges, expectInvalidate, skipFirstBatch}) { + return self.assertNextChangesEqual({ + cursor: cursor, + expectedChanges: expectedChanges, + expectedNumChanges: expectedNumChanges, + expectInvalidate: expectInvalidate, + skipFirstBatch: skipFirstBatch, + ignoreOrder: true + }); + }; + + /** * Retrieves the next batch in the change stream and confirms that it is empty. */ self.assertNoChange = function(cursor) { @@ -277,12 +329,17 @@ function ChangeStreamTest(_db, name = "ChangeStreamTest") { * If the current batch has a document in it, that one will be ignored. */ self.getOneChange = function(cursor, expectInvalidate = false) { - changes = self.assertNextChangesEqual({ - cursor: cursor, - expectedNumChanges: 1, - expectInvalidate: expectInvalidate, - skipFirstBatch: true - }); + changes = self.getNextChanges(cursor, 1, true); + + if (expectInvalidate) { + assert(isInvalidated(changes[changes.length - 1]), + "Last change was not invalidated when it was expected: " + tojson(changes)); + + // We make sure that the next batch kills the cursor after an invalidation entry. + let finalCursor = self.getNextBatch(cursor); + assert.eq(finalCursor.id, 0, "Final cursor was not killed: " + tojson(finalCursor)); + } + return changes[0]; }; |