summaryrefslogtreecommitdiff
path: root/jstests/libs
diff options
context:
space:
mode:
Diffstat (limited to 'jstests/libs')
-rw-r--r--jstests/libs/auto_retry_transaction_in_sharding.js97
-rw-r--r--jstests/libs/change_stream_util.js139
2 files changed, 195 insertions, 41 deletions
diff --git a/jstests/libs/auto_retry_transaction_in_sharding.js b/jstests/libs/auto_retry_transaction_in_sharding.js
new file mode 100644
index 00000000000..f81a366970a
--- /dev/null
+++ b/jstests/libs/auto_retry_transaction_in_sharding.js
@@ -0,0 +1,97 @@
+'use strict';
+
+load('jstests/concurrency/fsm_workload_helpers/auto_retry_transaction.js');
+Random.setRandomSeed();
+
+var {
+ withTxnAndAutoRetryOnMongos,
+ retryOnceOnTransientOnMongos,
+ retryOnceOnTransientAndRestartTxnOnMongos
+} = (() => {
+ /**
+ * Runs 'func' inside of a transaction started with 'txnOptions', and automatically retries
+ * until it either succeeds or the server returns a non-TransientTransactionError error
+ * response.
+ *
+ * The caller should take care to ensure 'func' doesn't modify any captured variables in a
+ * speculative fashion where calling it multiple times would lead to unintended behavior. The
+ * transaction started by the withTxnAndAutoRetryOnMongos() function is only known to have
+ * committed after the withTxnAndAutoRetryOnMongos() function returns.
+ *
+ * This behaviour only applies if the client is a mongos
+ *
+ * TODO SERVER-39704: Once completed, the usages of this function should be revisited to
+ * determine whether it is still necessary or the retries performed by MongoS make it
+ * unnecessary
+ */
+ function withTxnAndAutoRetryOnMongos(session, func, txnOptions) {
+ if (session.getClient().isMongos()) {
+ withTxnAndAutoRetry(session, func, {txnOptions});
+ } else {
+ session.startTransaction(txnOptions);
+ func();
+ assert.commandWorked(session.commitTransaction_forTesting());
+ }
+ }
+
+ /**
+ * Runs 'func' and retries it only once if a transient error occurred.
+ *
+ * This behaviour only applies if the client is a mongos
+ *
+ * TODO SERVER-39704: Once completed, the usages of this function should be revisited to
+ * determine whether it is still necessary or the retries performed by MongoS make it
+ * unnecessary
+ */
+ function retryOnceOnTransientOnMongos(session, func) {
+ if (session.getClient().isMongos()) {
+ try {
+ func();
+ } catch (e) {
+ if ((e.hasOwnProperty('errorLabels') &&
+ e.errorLabels.includes('TransientTransactionError'))) {
+ func();
+ } else {
+ throw e;
+ }
+ }
+ } else {
+ func();
+ }
+ }
+
+ /**
+ * Runs 'func' and retries it only once restarting the transaction if a transient
+ * error occurred.
+ *
+ * This behaviour only applies if the client is a mongos
+ *
+ * TODO SERVER-39704: Once completed, the usages of this function should be revisited to
+ * determine whether it is still necessary or the retries performed by MongoS make it
+ * unnecessary
+ */
+ function retryOnceOnTransientAndRestartTxnOnMongos(session, func, txnOptions) {
+ if (session.getClient().isMongos()) {
+ try {
+ func();
+ } catch (e) {
+ if ((e.hasOwnProperty('errorLabels') &&
+ e.errorLabels.includes('TransientTransactionError'))) {
+ session.abortTransaction_forTesting();
+ session.startTransaction(txnOptions);
+ func();
+ } else {
+ throw e;
+ }
+ }
+ } else {
+ func();
+ }
+ }
+
+ return {
+ withTxnAndAutoRetryOnMongos,
+ retryOnceOnTransientOnMongos,
+ retryOnceOnTransientAndRestartTxnOnMongos
+ };
+})(); \ No newline at end of file
diff --git a/jstests/libs/change_stream_util.js b/jstests/libs/change_stream_util.js
index c505e47f39f..7431a64627c 100644
--- a/jstests/libs/change_stream_util.js
+++ b/jstests/libs/change_stream_util.js
@@ -59,27 +59,28 @@ function assertInvalidateOp({cursor, opType}) {
return null;
}
+function pruneOptionalFields(event, expected) {
+ if (!expected.hasOwnProperty("_id"))
+ delete event._id;
+
+ if (!expected.hasOwnProperty("clusterTime"))
+ delete event.clusterTime;
+
+ if (!expected.hasOwnProperty("txnNumber"))
+ delete event.txnNumber;
+
+ if (!expected.hasOwnProperty("lsid"))
+ delete event.lsid;
+
+ return event;
+}
/**
* Helper to check whether a change stream event matches the given expected event. Ignores the
* resume token and clusterTime unless they are explicitly listed in the expectedEvent.
*/
function assertChangeStreamEventEq(actualEvent, expectedEvent) {
- const testEvent = Object.assign({}, actualEvent);
- if (!expectedEvent.hasOwnProperty("_id")) {
- delete testEvent._id; // Remove the resume token, if present.
- }
- if (!expectedEvent.hasOwnProperty("clusterTime")) {
- delete testEvent.clusterTime; // Remove the cluster time, if present.
- }
+ const testEvent = pruneOptionalFields(Object.assign({}, actualEvent), expectedEvent);
- // The change stream transaction passthrough causes operations to have txnNumber and lsid
- // values that the test doesn't expect, which can cause comparisons to fail.
- if (!expectedEvent.hasOwnProperty("txnNumber")) {
- delete testEvent.txnNumber; // Remove the txnNumber, if present.
- }
- if (!expectedEvent.hasOwnProperty("lsid")) {
- delete testEvent.lsid; // Remove the lsid, if present.
- }
assert.docEq(testEvent,
expectedEvent,
"Change did not match expected change. Expected change: " + tojson(expectedEvent) +
@@ -170,6 +171,34 @@ function ChangeStreamTest(_db, name = "ChangeStreamTest") {
return nextBatch[0];
}
+ self.getNextChanges = function(cursor, numChanges, skipFirst) {
+ let changes = [];
+
+ for (let i = 0; i < numChanges; i++) {
+ // Since the first change may be on the original cursor, we need to check for that
+ // change on the cursor before we move the cursor forward.
+ if (i === 0 && !skipFirst) {
+ changes[0] = getNextDocFromCursor(cursor);
+ if (changes[0]) {
+ continue;
+ }
+ }
+
+ assert.soon(
+ () => {
+ cursor = self.getNextBatch(cursor);
+ changes[i] = getNextDocFromCursor(cursor);
+ return changes[i] !== null;
+ },
+ () => {
+ return "timed out waiting for another result from the change stream, observed changes: " +
+ tojson(changes) + ", expected changes: " + numChanges;
+ });
+ }
+
+ return changes;
+ };
+
/**
* Checks if the change has been invalidated.
*/
@@ -205,8 +234,14 @@ function ChangeStreamTest(_db, name = "ChangeStreamTest") {
*
* Returns a list of the changes seen.
*/
- self.assertNextChangesEqual = function(
- {cursor, expectedChanges, expectedNumChanges, expectInvalidate, skipFirstBatch}) {
+ self.assertNextChangesEqual = function({
+ cursor,
+ expectedChanges,
+ expectedNumChanges,
+ expectInvalidate,
+ skipFirstBatch,
+ ignoreOrder
+ }) {
expectInvalidate = expectInvalidate || false;
skipFirstBatch = skipFirstBatch || false;
@@ -230,25 +265,23 @@ function ChangeStreamTest(_db, name = "ChangeStreamTest") {
expectedNumChanges = expectedChanges.length;
}
- let changes = [];
- for (let i = 0; i < expectedNumChanges; i++) {
- // Since the first change may be on the original cursor, we need to check for that
- // change on the cursor before we move the cursor forward.
- if (i === 0 && !skipFirstBatch) {
- changes[0] = getNextDocFromCursor(cursor);
- if (changes[0]) {
- assertChangeIsExpected(expectedChanges, 0, changes, expectInvalidate);
- continue;
- }
+ let changes = self.getNextChanges(cursor, expectedNumChanges, skipFirstBatch);
+ if (ignoreOrder) {
+ const errMsgFunc = () => `${tojson(changes)} != ${tojson(expectedChanges)}`;
+ assert.eq(changes.length, expectedNumChanges, errMsgFunc);
+ for (let i = 0; i < changes.length; i++) {
+ assert(expectedChanges.some(expectedChange => {
+ return _convertExceptionToReturnStatus(() => {
+ assertChangeStreamEventEq(changes[i], expectedChange);
+ return true;
+ })();
+ }),
+ errMsgFunc);
+ }
+ } else {
+ for (let i = 0; i < changes.length; i++) {
+ assertChangeIsExpected(expectedChanges, i, changes, expectInvalidate);
}
-
- assert.soon(function() {
- // We need to replace the cursor variable so we return the correct cursor.
- cursor = self.getNextBatch(cursor);
- changes[i] = getNextDocFromCursor(cursor);
- return changes[i] !== null;
- }, "timed out waiting for another result from the change stream");
- assertChangeIsExpected(expectedChanges, i, changes, expectInvalidate);
}
// If we expect invalidation, the final change should have operation type "invalidate".
@@ -265,6 +298,25 @@ function ChangeStreamTest(_db, name = "ChangeStreamTest") {
};
/**
+ * Iterates through the change stream and asserts that the next changes are the expected ones.
+ * The order of the change events from the cursor relative to their order in the list of
+ * expected changes is ignored, however.
+ *
+ * Returns a list of the changes seen.
+ */
+ self.assertNextChangesEqualUnordered = function(
+ {cursor, expectedChanges, expectedNumChanges, expectInvalidate, skipFirstBatch}) {
+ return self.assertNextChangesEqual({
+ cursor: cursor,
+ expectedChanges: expectedChanges,
+ expectedNumChanges: expectedNumChanges,
+ expectInvalidate: expectInvalidate,
+ skipFirstBatch: skipFirstBatch,
+ ignoreOrder: true
+ });
+ };
+
+ /**
* Retrieves the next batch in the change stream and confirms that it is empty.
*/
self.assertNoChange = function(cursor) {
@@ -277,12 +329,17 @@ function ChangeStreamTest(_db, name = "ChangeStreamTest") {
* If the current batch has a document in it, that one will be ignored.
*/
self.getOneChange = function(cursor, expectInvalidate = false) {
- changes = self.assertNextChangesEqual({
- cursor: cursor,
- expectedNumChanges: 1,
- expectInvalidate: expectInvalidate,
- skipFirstBatch: true
- });
+ changes = self.getNextChanges(cursor, 1, true);
+
+ if (expectInvalidate) {
+ assert(isInvalidated(changes[changes.length - 1]),
+ "Last change was not invalidated when it was expected: " + tojson(changes));
+
+ // We make sure that the next batch kills the cursor after an invalidation entry.
+ let finalCursor = self.getNextBatch(cursor);
+ assert.eq(finalCursor.id, 0, "Final cursor was not killed: " + tojson(finalCursor));
+ }
+
return changes[0];
};