diff options
author | Alyssa Wagenmaker <alyssa.wagenmaker@mongodb.com> | 2023-04-03 21:12:45 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2023-04-03 23:25:29 +0000 |
commit | 0abe5e7b8357aae4b5f324846b7bc0b53abeb442 (patch) | |
tree | e8120aab59b3f6eab8decb83fde972e0906d4f68 | |
parent | b4cacf7c95adbdf22c2e20b51bd0d0a31ea6e8d3 (diff) | |
download | mongo-0abe5e7b8357aae4b5f324846b7bc0b53abeb442.tar.gz |
SERVER-75184 Add tests and support for retryable time-series deletes
-rw-r--r-- | jstests/noPassthrough/timeseries_retry_delete_and_update.js | 127 | ||||
-rw-r--r-- | src/mongo/db/exec/timeseries_modify.cpp | 8 | ||||
-rw-r--r-- | src/mongo/db/timeseries/timeseries_write_util.cpp | 9 | ||||
-rw-r--r-- | src/mongo/db/timeseries/timeseries_write_util.h | 3 | ||||
-rw-r--r-- | src/mongo/db/timeseries/timeseries_write_util_test.cpp | 16 |
5 files changed, 149 insertions, 14 deletions
diff --git a/jstests/noPassthrough/timeseries_retry_delete_and_update.js b/jstests/noPassthrough/timeseries_retry_delete_and_update.js new file mode 100644 index 00000000000..3d592656afd --- /dev/null +++ b/jstests/noPassthrough/timeseries_retry_delete_and_update.js @@ -0,0 +1,127 @@ +/** + * Tests retrying of time-series delete and update operations that are eligible for retryable writes + * (specifically single deletes and updates). + * + * @tags: [ + * requires_replication, + * requires_timeseries, + * featureFlagTimeseriesDeletesSupport, + * ] + */ +(function() { +'use strict'; + +const rst = new ReplSetTest({ + nodes: [ + {}, + { + // Disallow elections on secondary. + rsConfig: { + priority: 0, + votes: 0, + }, + }, + ] +}); +rst.startSet(); +rst.initiate(); + +const primary = rst.getPrimary(); + +const timeFieldName = 'time'; +const metaFieldName = 'tag'; +const dateTime = ISODate("2021-07-12T16:00:00Z"); +let collCount = 0; + +let retriedCommandsCount = 0; +let retriedStatementsCount = 0; + +/** + * Verifies that a timeseries delete or update command supports retryable writes. The arguments to + * this function are an array of documents to insert initially, a command builder function that + * returns the command object given the collection to run it on, and a validate function that + * validates the result after the command has been applied to the collection. + */ +const runTest = function(initialDocs, cmdBuilderFn, validateFn) { + const session = primary.startSession({retryWrites: true}); + const testDB = session.getDatabase(jsTestName()); + + const coll = testDB.getCollection('timeseres_retry_delete_and_update_' + collCount++); + coll.drop(); + assert.commandWorked( + testDB.createCollection(coll.getName(), {timeseries: {timeField: timeFieldName}})); + + assert.commandWorked(testDB.runCommand({ + insert: coll.getName(), + documents: initialDocs, + lsid: session.getSessionId(), + txnNumber: NumberLong(0) + })); + + // For retryable writes, the server uses 'txnNumber' as the key to look up previously executed + // operations in the sesssion. + let cmdObj = cmdBuilderFn(coll); + cmdObj["lsid"] = session.getSessionId(); + cmdObj["txnNumber"] = NumberLong(1); + + assert.commandWorked(testDB.runCommand(cmdObj), 'Failed to write bucket on first write'); + assert.commandWorked(testDB.runCommand(cmdObj), 'Failed to write bucket on retry write'); + + validateFn(coll); + + const transactionsServerStatus = testDB.serverStatus().transactions; + assert.eq(++retriedCommandsCount, + transactionsServerStatus.retriedCommandsCount, + 'Incorrect statistic in db.serverStatus(): ' + tojson(transactionsServerStatus)); + assert.eq(++retriedStatementsCount, + transactionsServerStatus.retriedStatementsCount, + 'Incorrect statistic in db.serverStatus(): ' + tojson(transactionsServerStatus)); + + session.endSession(); +}; + +const allDocumentsSameBucket = [ + {[timeFieldName]: dateTime, [metaFieldName]: "A", f: 100}, + {[timeFieldName]: dateTime, [metaFieldName]: "A", f: 101}, + {[timeFieldName]: dateTime, [metaFieldName]: "A", f: 102}, +]; +const allDocumentsDifferentBuckets = [ + {[timeFieldName]: dateTime, [metaFieldName]: "A", f: 100}, + {[timeFieldName]: dateTime, [metaFieldName]: "B", f: 101}, + {[timeFieldName]: dateTime, [metaFieldName]: "C", f: 102}, +]; + +function deleteCmdBuilderFn(coll) { + return {delete: coll.getName(), deletes: [{q: {}, limit: 1}]}; +} +function deleteValidateFn(coll) { + assert.eq(coll.countDocuments({}), 2, "Expected exactly one document to be deleted."); +} + +(function testPartialBucketDelete() { + runTest(allDocumentsSameBucket, deleteCmdBuilderFn, deleteValidateFn); +})(); +(function testFullBucketDelete() { + runTest(allDocumentsDifferentBuckets, deleteCmdBuilderFn, deleteValidateFn); +})(); + +function updateCmdBuilderFn(coll) { + return {update: coll.getName(), updates: [{q: {}, u: {$inc: {updated: 1}}, multi: false}]}; +} +function updateValidateFn(coll) { + assert.eq(coll.countDocuments({updated: {$exists: true}}), + 1, + "Expected exactly one document to be updated."); + assert.eq(coll.countDocuments({updated: 1}), 1, "Expected document to be updated only once."); +} + +// TODO SERVER-73726 Enable update tests. +// (function testPartialBucketUpdate() { +// runTest(allDocumentsSameBucket, updateCmdBuilderFn, updateValidateFn); +// })(); +// (function testFullBucketUpdate() { +// runTest(allDocumentsDifferentBuckets, updateCmdBuilderFn, updateValidateFn); +// })(); + +rst.stopSet(); +})(); diff --git a/src/mongo/db/exec/timeseries_modify.cpp b/src/mongo/db/exec/timeseries_modify.cpp index 2db371be9ec..9cd7e38061d 100644 --- a/src/mongo/db/exec/timeseries_modify.cpp +++ b/src/mongo/db/exec/timeseries_modify.cpp @@ -132,8 +132,8 @@ PlanStage::StageState TimeseriesModifyStage::_writeToTimeseriesBuckets( write_ops::DeleteOpEntry deleteEntry(BSON("_id" << bucketId), false); write_ops::DeleteCommandRequest op(collection()->ns(), {deleteEntry}); - auto result = - timeseries::performAtomicWrites(opCtx(), collection(), recordId, op, bucketFromMigrate); + auto result = timeseries::performAtomicWrites( + opCtx(), collection(), recordId, op, bucketFromMigrate, _params->stmtId); if (!result.isOK()) { return yieldAndRetry(7309300); } @@ -159,8 +159,8 @@ PlanStage::StageState TimeseriesModifyStage::_writeToTimeseriesBuckets( write_ops::UpdateOpEntry updateEntry(BSON("_id" << bucketId), std::move(u)); write_ops::UpdateCommandRequest op(collection()->ns(), {updateEntry}); - auto result = - timeseries::performAtomicWrites(opCtx(), collection(), recordId, op, bucketFromMigrate); + auto result = timeseries::performAtomicWrites( + opCtx(), collection(), recordId, op, bucketFromMigrate, _params->stmtId); if (!result.isOK()) { return yieldAndRetry(7309301); } diff --git a/src/mongo/db/timeseries/timeseries_write_util.cpp b/src/mongo/db/timeseries/timeseries_write_util.cpp index 6fda7516f4a..cbebc3ca760 100644 --- a/src/mongo/db/timeseries/timeseries_write_util.cpp +++ b/src/mongo/db/timeseries/timeseries_write_util.cpp @@ -148,7 +148,8 @@ Status performAtomicWrites(OperationContext* opCtx, const RecordId& recordId, const stdx::variant<write_ops::UpdateCommandRequest, write_ops::DeleteCommandRequest>& modificationOp, - bool fromMigrate) try { + bool fromMigrate, + StmtId stmtId) try { invariant(!opCtx->lockState()->inAWriteUnitOfWork()); invariant(!opCtx->inMultiDocumentTransaction()); @@ -178,9 +179,7 @@ Status performAtomicWrites(OperationContext* opCtx, CollectionUpdateArgs args{original.value()}; args.criteria = update.getQ(); - if (const auto& stmtIds = updateOp.getStmtIds()) { - args.stmtIds = *stmtIds; - } + args.stmtIds = {stmtId}; if (fromMigrate) { args.source = OperationSource::kFromMigrate; } @@ -210,7 +209,7 @@ Status performAtomicWrites(OperationContext* opCtx, record_id_helpers::keyForOID(deleteOp.getDeletes().front().getQ()["_id"].OID()); invariant(recordId == deleteId); collection_internal::deleteDocument( - opCtx, coll, kUninitializedStmtId, recordId, &curOp->debug(), fromMigrate); + opCtx, coll, stmtId, recordId, &curOp->debug(), fromMigrate); }}, modificationOp); diff --git a/src/mongo/db/timeseries/timeseries_write_util.h b/src/mongo/db/timeseries/timeseries_write_util.h index e99c9ad9340..ed63d9082b7 100644 --- a/src/mongo/db/timeseries/timeseries_write_util.h +++ b/src/mongo/db/timeseries/timeseries_write_util.h @@ -66,5 +66,6 @@ Status performAtomicWrites(OperationContext* opCtx, const RecordId& recordId, const stdx::variant<write_ops::UpdateCommandRequest, write_ops::DeleteCommandRequest>& modificationOp, - bool fromMigrate); + bool fromMigrate, + StmtId stmtId); } // namespace mongo::timeseries diff --git a/src/mongo/db/timeseries/timeseries_write_util_test.cpp b/src/mongo/db/timeseries/timeseries_write_util_test.cpp index e797ec3db60..790e96fbdb2 100644 --- a/src/mongo/db/timeseries/timeseries_write_util_test.cpp +++ b/src/mongo/db/timeseries/timeseries_write_util_test.cpp @@ -223,8 +223,12 @@ TEST_F(TimeseriesWriteUtilTest, PerformAtomicDelete) { op.setWriteCommandRequestBase(std::move(base)); - ASSERT_OK(performAtomicWrites( - opCtx, bucketsColl.getCollection(), recordId, op, /*fromMigrate=*/false)); + ASSERT_OK(performAtomicWrites(opCtx, + bucketsColl.getCollection(), + recordId, + op, + /*fromMigrate=*/false, + /*stmtId=*/kUninitializedStmtId)); } // Checks the document is removed. @@ -286,8 +290,12 @@ TEST_F(TimeseriesWriteUtilTest, PerformAtomicUpdate) { op.setWriteCommandRequestBase(std::move(base)); - ASSERT_OK(performAtomicWrites( - opCtx, bucketsColl.getCollection(), recordId, op, /*fromMigrate=*/false)); + ASSERT_OK(performAtomicWrites(opCtx, + bucketsColl.getCollection(), + recordId, + op, + /*fromMigrate=*/false, + /*stmtId=*/kUninitializedStmtId)); } // Checks the document is updated. |