summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlyssa Wagenmaker <alyssa.wagenmaker@mongodb.com>2023-04-03 21:12:45 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2023-04-03 23:25:29 +0000
commit0abe5e7b8357aae4b5f324846b7bc0b53abeb442 (patch)
treee8120aab59b3f6eab8decb83fde972e0906d4f68
parentb4cacf7c95adbdf22c2e20b51bd0d0a31ea6e8d3 (diff)
downloadmongo-0abe5e7b8357aae4b5f324846b7bc0b53abeb442.tar.gz
SERVER-75184 Add tests and support for retryable time-series deletes
-rw-r--r--jstests/noPassthrough/timeseries_retry_delete_and_update.js127
-rw-r--r--src/mongo/db/exec/timeseries_modify.cpp8
-rw-r--r--src/mongo/db/timeseries/timeseries_write_util.cpp9
-rw-r--r--src/mongo/db/timeseries/timeseries_write_util.h3
-rw-r--r--src/mongo/db/timeseries/timeseries_write_util_test.cpp16
5 files changed, 149 insertions, 14 deletions
diff --git a/jstests/noPassthrough/timeseries_retry_delete_and_update.js b/jstests/noPassthrough/timeseries_retry_delete_and_update.js
new file mode 100644
index 00000000000..3d592656afd
--- /dev/null
+++ b/jstests/noPassthrough/timeseries_retry_delete_and_update.js
@@ -0,0 +1,127 @@
+/**
+ * Tests retrying of time-series delete and update operations that are eligible for retryable writes
+ * (specifically single deletes and updates).
+ *
+ * @tags: [
+ * requires_replication,
+ * requires_timeseries,
+ * featureFlagTimeseriesDeletesSupport,
+ * ]
+ */
+(function() {
+'use strict';
+
+const rst = new ReplSetTest({
+ nodes: [
+ {},
+ {
+ // Disallow elections on secondary.
+ rsConfig: {
+ priority: 0,
+ votes: 0,
+ },
+ },
+ ]
+});
+rst.startSet();
+rst.initiate();
+
+const primary = rst.getPrimary();
+
+const timeFieldName = 'time';
+const metaFieldName = 'tag';
+const dateTime = ISODate("2021-07-12T16:00:00Z");
+let collCount = 0;
+
+let retriedCommandsCount = 0;
+let retriedStatementsCount = 0;
+
+/**
+ * Verifies that a timeseries delete or update command supports retryable writes. The arguments to
+ * this function are an array of documents to insert initially, a command builder function that
+ * returns the command object given the collection to run it on, and a validate function that
+ * validates the result after the command has been applied to the collection.
+ */
+const runTest = function(initialDocs, cmdBuilderFn, validateFn) {
+ const session = primary.startSession({retryWrites: true});
+ const testDB = session.getDatabase(jsTestName());
+
+ const coll = testDB.getCollection('timeseres_retry_delete_and_update_' + collCount++);
+ coll.drop();
+ assert.commandWorked(
+ testDB.createCollection(coll.getName(), {timeseries: {timeField: timeFieldName}}));
+
+ assert.commandWorked(testDB.runCommand({
+ insert: coll.getName(),
+ documents: initialDocs,
+ lsid: session.getSessionId(),
+ txnNumber: NumberLong(0)
+ }));
+
+ // For retryable writes, the server uses 'txnNumber' as the key to look up previously executed
+ // operations in the sesssion.
+ let cmdObj = cmdBuilderFn(coll);
+ cmdObj["lsid"] = session.getSessionId();
+ cmdObj["txnNumber"] = NumberLong(1);
+
+ assert.commandWorked(testDB.runCommand(cmdObj), 'Failed to write bucket on first write');
+ assert.commandWorked(testDB.runCommand(cmdObj), 'Failed to write bucket on retry write');
+
+ validateFn(coll);
+
+ const transactionsServerStatus = testDB.serverStatus().transactions;
+ assert.eq(++retriedCommandsCount,
+ transactionsServerStatus.retriedCommandsCount,
+ 'Incorrect statistic in db.serverStatus(): ' + tojson(transactionsServerStatus));
+ assert.eq(++retriedStatementsCount,
+ transactionsServerStatus.retriedStatementsCount,
+ 'Incorrect statistic in db.serverStatus(): ' + tojson(transactionsServerStatus));
+
+ session.endSession();
+};
+
+const allDocumentsSameBucket = [
+ {[timeFieldName]: dateTime, [metaFieldName]: "A", f: 100},
+ {[timeFieldName]: dateTime, [metaFieldName]: "A", f: 101},
+ {[timeFieldName]: dateTime, [metaFieldName]: "A", f: 102},
+];
+const allDocumentsDifferentBuckets = [
+ {[timeFieldName]: dateTime, [metaFieldName]: "A", f: 100},
+ {[timeFieldName]: dateTime, [metaFieldName]: "B", f: 101},
+ {[timeFieldName]: dateTime, [metaFieldName]: "C", f: 102},
+];
+
+function deleteCmdBuilderFn(coll) {
+ return {delete: coll.getName(), deletes: [{q: {}, limit: 1}]};
+}
+function deleteValidateFn(coll) {
+ assert.eq(coll.countDocuments({}), 2, "Expected exactly one document to be deleted.");
+}
+
+(function testPartialBucketDelete() {
+ runTest(allDocumentsSameBucket, deleteCmdBuilderFn, deleteValidateFn);
+})();
+(function testFullBucketDelete() {
+ runTest(allDocumentsDifferentBuckets, deleteCmdBuilderFn, deleteValidateFn);
+})();
+
+function updateCmdBuilderFn(coll) {
+ return {update: coll.getName(), updates: [{q: {}, u: {$inc: {updated: 1}}, multi: false}]};
+}
+function updateValidateFn(coll) {
+ assert.eq(coll.countDocuments({updated: {$exists: true}}),
+ 1,
+ "Expected exactly one document to be updated.");
+ assert.eq(coll.countDocuments({updated: 1}), 1, "Expected document to be updated only once.");
+}
+
+// TODO SERVER-73726 Enable update tests.
+// (function testPartialBucketUpdate() {
+// runTest(allDocumentsSameBucket, updateCmdBuilderFn, updateValidateFn);
+// })();
+// (function testFullBucketUpdate() {
+// runTest(allDocumentsDifferentBuckets, updateCmdBuilderFn, updateValidateFn);
+// })();
+
+rst.stopSet();
+})();
diff --git a/src/mongo/db/exec/timeseries_modify.cpp b/src/mongo/db/exec/timeseries_modify.cpp
index 2db371be9ec..9cd7e38061d 100644
--- a/src/mongo/db/exec/timeseries_modify.cpp
+++ b/src/mongo/db/exec/timeseries_modify.cpp
@@ -132,8 +132,8 @@ PlanStage::StageState TimeseriesModifyStage::_writeToTimeseriesBuckets(
write_ops::DeleteOpEntry deleteEntry(BSON("_id" << bucketId), false);
write_ops::DeleteCommandRequest op(collection()->ns(), {deleteEntry});
- auto result =
- timeseries::performAtomicWrites(opCtx(), collection(), recordId, op, bucketFromMigrate);
+ auto result = timeseries::performAtomicWrites(
+ opCtx(), collection(), recordId, op, bucketFromMigrate, _params->stmtId);
if (!result.isOK()) {
return yieldAndRetry(7309300);
}
@@ -159,8 +159,8 @@ PlanStage::StageState TimeseriesModifyStage::_writeToTimeseriesBuckets(
write_ops::UpdateOpEntry updateEntry(BSON("_id" << bucketId), std::move(u));
write_ops::UpdateCommandRequest op(collection()->ns(), {updateEntry});
- auto result =
- timeseries::performAtomicWrites(opCtx(), collection(), recordId, op, bucketFromMigrate);
+ auto result = timeseries::performAtomicWrites(
+ opCtx(), collection(), recordId, op, bucketFromMigrate, _params->stmtId);
if (!result.isOK()) {
return yieldAndRetry(7309301);
}
diff --git a/src/mongo/db/timeseries/timeseries_write_util.cpp b/src/mongo/db/timeseries/timeseries_write_util.cpp
index 6fda7516f4a..cbebc3ca760 100644
--- a/src/mongo/db/timeseries/timeseries_write_util.cpp
+++ b/src/mongo/db/timeseries/timeseries_write_util.cpp
@@ -148,7 +148,8 @@ Status performAtomicWrites(OperationContext* opCtx,
const RecordId& recordId,
const stdx::variant<write_ops::UpdateCommandRequest,
write_ops::DeleteCommandRequest>& modificationOp,
- bool fromMigrate) try {
+ bool fromMigrate,
+ StmtId stmtId) try {
invariant(!opCtx->lockState()->inAWriteUnitOfWork());
invariant(!opCtx->inMultiDocumentTransaction());
@@ -178,9 +179,7 @@ Status performAtomicWrites(OperationContext* opCtx,
CollectionUpdateArgs args{original.value()};
args.criteria = update.getQ();
- if (const auto& stmtIds = updateOp.getStmtIds()) {
- args.stmtIds = *stmtIds;
- }
+ args.stmtIds = {stmtId};
if (fromMigrate) {
args.source = OperationSource::kFromMigrate;
}
@@ -210,7 +209,7 @@ Status performAtomicWrites(OperationContext* opCtx,
record_id_helpers::keyForOID(deleteOp.getDeletes().front().getQ()["_id"].OID());
invariant(recordId == deleteId);
collection_internal::deleteDocument(
- opCtx, coll, kUninitializedStmtId, recordId, &curOp->debug(), fromMigrate);
+ opCtx, coll, stmtId, recordId, &curOp->debug(), fromMigrate);
}},
modificationOp);
diff --git a/src/mongo/db/timeseries/timeseries_write_util.h b/src/mongo/db/timeseries/timeseries_write_util.h
index e99c9ad9340..ed63d9082b7 100644
--- a/src/mongo/db/timeseries/timeseries_write_util.h
+++ b/src/mongo/db/timeseries/timeseries_write_util.h
@@ -66,5 +66,6 @@ Status performAtomicWrites(OperationContext* opCtx,
const RecordId& recordId,
const stdx::variant<write_ops::UpdateCommandRequest,
write_ops::DeleteCommandRequest>& modificationOp,
- bool fromMigrate);
+ bool fromMigrate,
+ StmtId stmtId);
} // namespace mongo::timeseries
diff --git a/src/mongo/db/timeseries/timeseries_write_util_test.cpp b/src/mongo/db/timeseries/timeseries_write_util_test.cpp
index e797ec3db60..790e96fbdb2 100644
--- a/src/mongo/db/timeseries/timeseries_write_util_test.cpp
+++ b/src/mongo/db/timeseries/timeseries_write_util_test.cpp
@@ -223,8 +223,12 @@ TEST_F(TimeseriesWriteUtilTest, PerformAtomicDelete) {
op.setWriteCommandRequestBase(std::move(base));
- ASSERT_OK(performAtomicWrites(
- opCtx, bucketsColl.getCollection(), recordId, op, /*fromMigrate=*/false));
+ ASSERT_OK(performAtomicWrites(opCtx,
+ bucketsColl.getCollection(),
+ recordId,
+ op,
+ /*fromMigrate=*/false,
+ /*stmtId=*/kUninitializedStmtId));
}
// Checks the document is removed.
@@ -286,8 +290,12 @@ TEST_F(TimeseriesWriteUtilTest, PerformAtomicUpdate) {
op.setWriteCommandRequestBase(std::move(base));
- ASSERT_OK(performAtomicWrites(
- opCtx, bucketsColl.getCollection(), recordId, op, /*fromMigrate=*/false));
+ ASSERT_OK(performAtomicWrites(opCtx,
+ bucketsColl.getCollection(),
+ recordId,
+ op,
+ /*fromMigrate=*/false,
+ /*stmtId=*/kUninitializedStmtId));
}
// Checks the document is updated.