summaryrefslogtreecommitdiff
path: root/src/mongo/db
diff options
context:
space:
mode:
authorBynn Lee <bynn.lee@mongodb.com>2020-07-08 02:36:29 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-07-14 21:06:45 +0000
commit5bb3a09729ea6d03b3a17915bb4bc3e6e5105fdd (patch)
tree74c6d6d4f7bb07650c36aeffe7fca742ab57362f /src/mongo/db
parent316408d14e2358225e8f26bfe4d4022cfcf748d9 (diff)
downloadmongo-5bb3a09729ea6d03b3a17915bb4bc3e6e5105fdd.tar.gz
SERVER-49014 Add additional timestamping tests to the KVEngine test harness
Diffstat (limited to 'src/mongo/db')
-rw-r--r--src/mongo/db/storage/kv/kv_engine_test_harness.cpp762
1 files changed, 762 insertions, 0 deletions
diff --git a/src/mongo/db/storage/kv/kv_engine_test_harness.cpp b/src/mongo/db/storage/kv/kv_engine_test_harness.cpp
index 4195bddcf5d..e0cb1d86eb8 100644
--- a/src/mongo/db/storage/kv/kv_engine_test_harness.cpp
+++ b/src/mongo/db/storage/kv/kv_engine_test_harness.cpp
@@ -364,6 +364,768 @@ TEST(KVEngineTestHarness, AllDurableTimestamp) {
}
}
+/*
+ * Pinned oldest with another session
+ * | Session 1 | Session 2 |
+ * |-----------------------------+----------------------------|
+ * | Begin | |
+ * | Write A 1 | |
+ * | Commit :commit 10 | |
+ * | Begin :readAt 15 | |
+ * | | Begin |
+ * | | Write A 2 |
+ * | Read A (1) | |
+ * | | Commit :commit 20 |
+ * | Read A (1) | |
+ * | | Begin :readAt 15 |
+ * | | Read A (1) |
+ * | Rollback | |
+ * | | Rollback |
+ */
+TEST(KVEngineTestHarness, PinningOldestWithAnotherSession) {
+ std::unique_ptr<KVHarnessHelper> helper(KVHarnessHelper::create());
+ KVEngine* engine = helper->getEngine();
+ // TODO SERVER-48314: Remove after implementing correct behavior on biggie.
+ if (engine->isEphemeral())
+ return;
+
+ std::string ns = "a.b";
+ std::unique_ptr<RecordStore> rs;
+ {
+ MyOperationContext opCtx(engine);
+ ASSERT_OK(engine->createRecordStore(&opCtx, ns, ns, CollectionOptions()));
+ rs = engine->getRecordStore(&opCtx, ns, ns, CollectionOptions());
+ ASSERT(rs);
+ }
+
+ MyOperationContext opCtx1(engine);
+ WriteUnitOfWork uow1(&opCtx1);
+ StatusWith<RecordId> res = rs->insertRecord(&opCtx1, "abc", 4, Timestamp(10, 10));
+ RecordId rid = res.getValue();
+ uow1.commit();
+
+ RecordData rd;
+ opCtx1.recoveryUnit()->setTimestampReadSource(RecoveryUnit::ReadSource::kProvided,
+ Timestamp(15, 15));
+
+ MyOperationContext opCtx2(engine);
+ WriteUnitOfWork uow2(&opCtx2);
+
+ ASSERT(rs->findRecord(&opCtx1, rid, &rd));
+ ASSERT_OK(opCtx2.recoveryUnit()->setTimestamp(Timestamp(20, 20)));
+ ASSERT_OK(rs->updateRecord(&opCtx2, rid, "updated", 8));
+
+ ASSERT(rs->findRecord(&opCtx1, rid, &rd));
+ ASSERT_EQUALS(std::string("abc"), rd.data());
+
+ uow2.commit();
+
+ opCtx1.recoveryUnit()->abandonSnapshot();
+ ASSERT(rs->findRecord(&opCtx1, rid, &rd));
+ ASSERT_EQUALS(std::string("abc"), rd.data());
+
+
+ opCtx2.recoveryUnit()->setTimestampReadSource(RecoveryUnit::ReadSource::kProvided,
+ Timestamp(15, 15));
+ ASSERT(rs->findRecord(&opCtx2, rid, &rd));
+ ASSERT_EQUALS(std::string("abc"), rd.data());
+}
+
+/*
+ * All durable
+ * | Session 1 | Session 2 | GlobalActor |
+ * |----------------------+----------------------+----------------------------------|
+ * | Begin | | |
+ * | Commit :commit 10 | | |
+ * | | | QueryTimestamp :all_durable (10) |
+ * | Begin | | |
+ * | Timestamp :commit 20 | | |
+ * | | | QueryTimestamp :all_durable (19) |
+ * | | Begin | |
+ * | | Timestamp :commit 30 | |
+ * | | Commit | |
+ * | | | QueryTimestamp :all_durable (19) |
+ * | Commit | | |
+ * | | | QueryTimestamp :all_durable (30) |
+ * | Begin | | |
+ * | Timestamp :commit 25 | | |
+ * | | | QueryTimestamp :all_durable (30) |
+ */
+TEST(KVEngineTestHarness, AllDurable) {
+ std::unique_ptr<KVHarnessHelper> helper(KVHarnessHelper::create());
+ KVEngine* engine = helper->getEngine();
+ if (!engine->supportsDocLocking())
+ return;
+
+ std::string ns = "a.b";
+ std::unique_ptr<RecordStore> rs;
+ {
+ MyOperationContext opCtx(engine);
+ ASSERT_OK(engine->createRecordStore(&opCtx, ns, ns, CollectionOptions()));
+ rs = engine->getRecordStore(&opCtx, ns, ns, CollectionOptions());
+ ASSERT(rs);
+ }
+
+ {
+ const Timestamp kInsertTimestamp1 = Timestamp(10, 10);
+ const Timestamp kInsertTimestamp2 = Timestamp(20, 20);
+ const Timestamp kInsertTimestamp3 = Timestamp(30, 30);
+ const Timestamp kInsertTimestamp4 = Timestamp(25, 25);
+
+ Timestamp allDurable = engine->getAllDurableTimestamp();
+ MyOperationContext opCtx1(engine);
+ WriteUnitOfWork uow1(&opCtx1);
+ auto swRid = rs->insertRecord(&opCtx1, "abc", 4, kInsertTimestamp1);
+ ASSERT_OK(swRid);
+ uow1.commit();
+
+ Timestamp lastAllDurable = allDurable;
+ allDurable = engine->getAllDurableTimestamp();
+ ASSERT_GTE(allDurable, lastAllDurable);
+ ASSERT_LTE(allDurable, kInsertTimestamp1);
+
+ MyOperationContext opCtx2(engine);
+ WriteUnitOfWork uow2(&opCtx2);
+ swRid = rs->insertRecord(&opCtx2, "abc", 4, kInsertTimestamp2);
+ ASSERT_OK(swRid);
+
+ lastAllDurable = allDurable;
+ allDurable = engine->getAllDurableTimestamp();
+ ASSERT_GTE(allDurable, lastAllDurable);
+ ASSERT_LT(allDurable, kInsertTimestamp2);
+
+ MyOperationContext opCtx3(engine);
+ WriteUnitOfWork uow3(&opCtx3);
+ swRid = rs->insertRecord(&opCtx3, "abc", 4, kInsertTimestamp3);
+ ASSERT_OK(swRid);
+ uow3.commit();
+
+ lastAllDurable = allDurable;
+ allDurable = engine->getAllDurableTimestamp();
+ ASSERT_GTE(allDurable, lastAllDurable);
+ ASSERT_LT(allDurable, kInsertTimestamp2);
+
+ uow2.commit();
+
+ lastAllDurable = allDurable;
+ allDurable = engine->getAllDurableTimestamp();
+ ASSERT_GTE(allDurable, lastAllDurable);
+ ASSERT_LTE(allDurable, kInsertTimestamp3);
+
+ MyOperationContext opCtx4(engine);
+ WriteUnitOfWork uow4(&opCtx4);
+ swRid = rs->insertRecord(&opCtx4, "abc", 4, kInsertTimestamp4);
+ ASSERT_OK(swRid);
+
+ lastAllDurable = allDurable;
+ allDurable = engine->getAllDurableTimestamp();
+ ASSERT_GTE(allDurable, lastAllDurable);
+ ASSERT_LTE(allDurable, kInsertTimestamp3);
+ uow4.commit();
+ }
+}
+
+/*
+ * Basic Timestamp - Single
+ * | Session |
+ * |----------------------|
+ * | Begin |
+ * | Write A 1 |
+ * | Commit :commit 10 |
+ * | |
+ * | Begin :readAt 9 |
+ * | Read A (NOT_FOUND) |
+ * | Rollback |
+ * | |
+ * | Begin :readAt 10 |
+ * | Read A (1) |
+ */
+TEST(KVEngineTestHarness, BasicTimestampSingle) {
+ std::unique_ptr<KVHarnessHelper> helper(KVHarnessHelper::create());
+ KVEngine* engine = helper->getEngine();
+ // TODO SERVER-48314: Remove after implementing correct behavior on biggie.
+ if (engine->isEphemeral())
+ return;
+
+ std::string ns = "a.b";
+ std::unique_ptr<RecordStore> rs;
+ {
+ MyOperationContext opCtx(engine);
+ ASSERT_OK(engine->createRecordStore(&opCtx, ns, ns, CollectionOptions()));
+ rs = engine->getRecordStore(&opCtx, ns, ns, CollectionOptions());
+ ASSERT(rs);
+ }
+
+ const Timestamp kReadTimestamp = Timestamp(9, 9);
+ const Timestamp kInsertTimestamp = Timestamp(10, 10);
+
+ // Start a read transaction.
+ MyOperationContext opCtx1(engine);
+
+ opCtx1.recoveryUnit()->setTimestampReadSource(RecoveryUnit::ReadSource::kProvided,
+ kReadTimestamp);
+ ASSERT(!rs->findRecord(&opCtx1, RecordId::min(), nullptr));
+
+ // Insert a record at a later time.
+ RecordId rid;
+ {
+ MyOperationContext opCtx2(engine);
+ WriteUnitOfWork wuow(&opCtx2);
+ auto swRid = rs->insertRecord(&opCtx2, "abc", 4, kInsertTimestamp);
+ ASSERT_OK(swRid);
+ rid = swRid.getValue();
+ wuow.commit();
+ }
+
+ // Should not see the record, even if we abandon the snapshot as the read timestamp is still
+ // earlier than the insert timestamp.
+ ASSERT(!rs->findRecord(&opCtx1, rid, nullptr));
+ opCtx1.recoveryUnit()->abandonSnapshot();
+ ASSERT(!rs->findRecord(&opCtx1, rid, nullptr));
+
+
+ opCtx1.recoveryUnit()->setTimestampReadSource(RecoveryUnit::ReadSource::kProvided,
+ kInsertTimestamp);
+ opCtx1.recoveryUnit()->abandonSnapshot();
+ RecordData rd;
+ ASSERT(rs->findRecord(&opCtx1, rid, &rd));
+ ASSERT_EQ(std::string("abc"), rd.data());
+}
+
+/*
+ * Basic Timestamp - Multiple
+ * | Session |
+ * |----------------------|
+ * | Begin |
+ * | Timestamp :commit 10 |
+ * | Write A 1 |
+ * | Timestamp :commit 20 |
+ * | Update A 2 |
+ * | Commit |
+ * | |
+ * | Begin :readAt 10 |
+ * | Read A (1) |
+ * | Rollback |
+ * | |
+ * | Begin :readAt 20 |
+ * | Read A (2) |
+ */
+TEST(KVEngineTestHarness, BasicTimestampMultiple) {
+ std::unique_ptr<KVHarnessHelper> helper(KVHarnessHelper::create());
+ KVEngine* engine = helper->getEngine();
+ // TODO SERVER-48314: Remove after implementing correct behavior on biggie.
+ if (engine->isEphemeral())
+ return;
+
+ std::string ns = "a.b";
+ std::unique_ptr<RecordStore> rs;
+ {
+ MyOperationContext opCtx(engine);
+ ASSERT_OK(engine->createRecordStore(&opCtx, ns, ns, CollectionOptions()));
+ rs = engine->getRecordStore(&opCtx, ns, ns, CollectionOptions());
+ ASSERT(rs);
+ }
+
+ const Timestamp t10 = Timestamp(10, 10);
+ const Timestamp t20 = Timestamp(20, 20);
+
+ RecordId rid;
+ {
+ // Initial insert of record.
+ MyOperationContext opCtx(engine);
+ WriteUnitOfWork wuow(&opCtx);
+ auto swRid = rs->insertRecord(&opCtx, "abc", 4, t10);
+ ASSERT_OK(swRid);
+ rid = swRid.getValue();
+
+ // Update a record at a later time.
+ ASSERT_OK(opCtx.recoveryUnit()->setTimestamp(t20));
+ auto res = rs->updateRecord(&opCtx, rid, "updated", 8);
+ ASSERT_OK(res);
+ wuow.commit();
+ }
+
+ RecordData rd;
+ MyOperationContext opCtx(engine);
+ opCtx.recoveryUnit()->setTimestampReadSource(RecoveryUnit::ReadSource::kProvided, t10);
+ ASSERT(rs->findRecord(&opCtx, rid, &rd));
+ ASSERT_EQUALS(std::string("abc"), rd.data());
+
+ opCtx.recoveryUnit()->setTimestampReadSource(RecoveryUnit::ReadSource::kProvided, t20);
+ opCtx.recoveryUnit()->abandonSnapshot();
+ ASSERT(rs->findRecord(&opCtx, rid, &rd));
+ ASSERT_EQUALS(std::string("updated"), rd.data());
+}
+
+/*
+ * Concurrent operations under snapshot isolation blocks visibility
+ * | Session 1 | Session 2 |
+ * |-------------------+--------------------------------------|
+ * | Begin | |
+ * | | Begin :readAt 20 :isolation snapshot |
+ * | Write A 1 | |
+ * | Commit :commit 10 | |
+ * | | Read A (NOT_FOUND) |
+ * | | Abandon Snapshot |
+ * | | Read A (1) |
+ */
+TEST(KVEngineTestHarness, SingleReadWithConflict) {
+ std::unique_ptr<KVHarnessHelper> helper(KVHarnessHelper::create());
+ KVEngine* engine = helper->getEngine();
+ // TODO SERVER-48314: Remove after implementing correct behavior on biggie.
+ if (engine->isEphemeral())
+ return;
+
+ std::string ns = "a.b";
+ std::unique_ptr<RecordStore> rs;
+ {
+ MyOperationContext opCtx(engine);
+ ASSERT_OK(engine->createRecordStore(&opCtx, ns, ns, CollectionOptions()));
+ rs = engine->getRecordStore(&opCtx, ns, ns, CollectionOptions());
+ ASSERT(rs);
+ }
+
+ MyOperationContext opCtx2(engine);
+ opCtx2.recoveryUnit()->setTimestampReadSource(RecoveryUnit::ReadSource::kProvided,
+ Timestamp(20, 20));
+
+ MyOperationContext opCtx1(engine);
+ WriteUnitOfWork uow1(&opCtx1);
+ StatusWith<RecordId> res = rs->insertRecord(&opCtx1, "abc", 4, Timestamp(10, 10));
+ ASSERT_OK(res);
+ RecordId loc = res.getValue();
+
+ // Cannot find record before commit.
+ RecordData rd;
+ ASSERT(!rs->findRecord(&opCtx2, loc, &rd));
+
+ // Cannot find record after commit due to snapshot isolation.
+ uow1.commit();
+ ASSERT(!rs->findRecord(&opCtx2, loc, &rd));
+
+ // Abandon snapshot for visibility.
+ opCtx2.recoveryUnit()->abandonSnapshot();
+
+ ASSERT(rs->findRecord(&opCtx2, loc, &rd));
+ ASSERT_EQUALS(std::string("abc"), rs->dataFor(&opCtx2, loc).data());
+}
+
+/*
+ * Item Not Found - Read timestamp hides visibility
+ * | Session |
+ * |----------------------|
+ * | Begin |
+ * | Write A 1 |
+ * | Commit :commit 10 |
+ * | |
+ * | Begin :readAt 9 |
+ * | Read A (NOT_FOUND) |
+ * | Write A 1 (NOT_FOUND)|
+ */
+DEATH_TEST_REGEX(KVEngineTestHarness, SnapshotHidesVisibility, ".*item not found.*") {
+ std::unique_ptr<KVHarnessHelper> helper(KVHarnessHelper::create());
+ KVEngine* engine = helper->getEngine();
+ // TODO: Remove after implementing correct behavior on biggie.
+ if (engine->isEphemeral())
+ invariant(false, "item not found");
+
+ std::string ns = "a.b";
+ std::unique_ptr<RecordStore> rs;
+ {
+ MyOperationContext opCtx(engine);
+ ASSERT_OK(engine->createRecordStore(&opCtx, ns, ns, CollectionOptions()));
+ rs = engine->getRecordStore(&opCtx, ns, ns, CollectionOptions());
+ ASSERT(rs);
+ }
+
+ MyOperationContext opCtx1(engine);
+ WriteUnitOfWork uow1(&opCtx1);
+ StatusWith<RecordId> res = rs->insertRecord(&opCtx1, "abc", 4, Timestamp(10, 10));
+ ASSERT_OK(res);
+ RecordId loc = res.getValue();
+ uow1.commit();
+
+ // Snapshot was taken before the insert and will not find the record even after the commit.
+ RecordData rd;
+ MyOperationContext opCtx2(engine);
+ opCtx2.recoveryUnit()->setTimestampReadSource(RecoveryUnit::ReadSource::kProvided,
+ Timestamp(9, 9));
+ ASSERT(!rs->findRecord(&opCtx2, loc, &rd));
+
+ // Trying to write in an outdated snapshot will cause item not found.
+ WriteUnitOfWork uow2(&opCtx2);
+ auto swRid = rs->updateRecord(&opCtx2, loc, "updated", 8);
+ uow2.commit();
+}
+
+/*
+ * Insert
+ * | Session |
+ * |------------------------|
+ * | Begin |
+ * | Write A 1 |
+ * | Timestamp :commit 10 |
+ * | Write Oplog |
+ * | Commit |
+ * | |
+ * | Begin :readAt 9 |
+ * | Read A (NOT_FOUND) |
+ * | Read Oplog (NOT_FOUND) |
+ * | Rollback |
+ * | |
+ * | Begin :readAt 10 |
+ * | Read A (1) |
+ * | Read Oplog (FOUND) |
+ */
+TEST(KVEngineTestHarness, SingleReadWithConflictWithOplog) {
+ std::unique_ptr<KVHarnessHelper> helper(KVHarnessHelper::create());
+ KVEngine* engine = helper->getEngine();
+ // TODO SERVER-48314: Remove after implementing correct behavior on biggie.
+ if (engine->isEphemeral())
+ return;
+
+ std::string ns = "a.b";
+ std::unique_ptr<RecordStore> collectionRs;
+ std::unique_ptr<RecordStore> oplogRs;
+ {
+ MyOperationContext opCtx(engine);
+ ASSERT_OK(engine->createRecordStore(&opCtx, ns, ns, CollectionOptions()));
+ collectionRs = engine->getRecordStore(&opCtx, ns, ns, CollectionOptions());
+ ASSERT(collectionRs);
+
+ CollectionOptions options;
+ options.capped = true;
+ options.cappedSize = 10240;
+ options.cappedMaxDocs = -1;
+
+ NamespaceString oplogNss("local.oplog.rs");
+ ASSERT_OK(engine->createRecordStore(&opCtx, oplogNss.ns(), "ident", options));
+ oplogRs = engine->getRecordStore(&opCtx, oplogNss.ns(), "ident", options);
+ ASSERT(oplogRs);
+ }
+
+ RecordData rd;
+ RecordId locCollection;
+ RecordId locOplog;
+ const Timestamp t9(9, 9);
+ const Timestamp t10(10, 10);
+ {
+ MyOperationContext opCtx(engine);
+ WriteUnitOfWork uow(&opCtx);
+
+ // Insert into collectionRs.
+ StatusWith<RecordId> res = collectionRs->insertRecord(&opCtx, "abc", 4, t10);
+ ASSERT_OK(res);
+ locCollection = res.getValue();
+
+ // Insert into oplogRs.
+ auto t11Doc = BSON("ts" << t10);
+
+ ASSERT_EQ(invariant(oplogRs->insertRecord(
+ &opCtx, t11Doc.objdata(), t11Doc.objsize(), Timestamp::min())),
+ RecordId(10, 10));
+ locOplog = RecordId(10, 10);
+ uow.commit();
+ }
+
+ MyOperationContext opCtx(engine);
+ opCtx.recoveryUnit()->setTimestampReadSource(RecoveryUnit::ReadSource::kProvided, t9);
+ ASSERT(!collectionRs->findRecord(&opCtx, locCollection, &rd));
+ ASSERT(!oplogRs->findRecord(&opCtx, locOplog, &rd));
+
+ opCtx.recoveryUnit()->setTimestampReadSource(RecoveryUnit::ReadSource::kProvided, t10);
+ opCtx.recoveryUnit()->abandonSnapshot();
+ ASSERT(collectionRs->findRecord(&opCtx, locCollection, &rd));
+ ASSERT(oplogRs->findRecord(&opCtx, locOplog, &rd));
+}
+
+/*
+ * Pinned oldest timestamp - Read
+ * | Session | GlobalActor |
+ * |-----------------------------+----------------------------|
+ * | Begin | |
+ * | Write A 1 | |
+ * | Commit :commit 10 | |
+ * | | |
+ * | Begin :readAt 15 | |
+ * | Read A (1) | |
+ * | Rollback | |
+ * | | GlobalTimestamp :oldest 20 |
+ * | Begin :readAt 15 | |
+ * | Read A (DB exception) | |
+ */
+TEST(KVEngineTestHarness, PinningOldestTimestampWithReadConflict) {
+ std::unique_ptr<KVHarnessHelper> helper(KVHarnessHelper::create());
+ KVEngine* engine = helper->getEngine();
+ // TODO SERVER-48314: Remove after implementing correct behavior on biggie.
+ if (engine->isEphemeral())
+ return;
+
+ std::string ns = "a.b";
+ std::unique_ptr<RecordStore> rs;
+ {
+ MyOperationContext opCtx(engine);
+ ASSERT_OK(engine->createRecordStore(&opCtx, ns, ns, CollectionOptions()));
+ rs = engine->getRecordStore(&opCtx, ns, ns, CollectionOptions());
+ ASSERT(rs);
+ }
+
+ MyOperationContext opCtx(engine);
+ WriteUnitOfWork uow(&opCtx);
+ StatusWith<RecordId> res = rs->insertRecord(&opCtx, "abc", 4, Timestamp(10, 10));
+ RecordId rid = res.getValue();
+ uow.commit();
+
+ RecordData rd;
+ opCtx.recoveryUnit()->setTimestampReadSource(RecoveryUnit::ReadSource::kProvided,
+ Timestamp(15, 15));
+ ASSERT(rs->findRecord(&opCtx, rid, &rd));
+
+ engine->setOldestTimestamp(Timestamp(20, 20), false);
+
+ opCtx.recoveryUnit()->abandonSnapshot();
+ ASSERT_THROWS_CODE(rs->findRecord(&opCtx, rid, &rd), DBException, ErrorCodes::SnapshotTooOld);
+}
+
+
+/*
+ * Pinned oldest timestamp - Write
+ * | Session | GlobalActor |
+ * |-----------------------------+----------------------------|
+ * | | GlobalTimestamp :oldest 2 |
+ * | Begin | |
+ * | Write A 1 | |
+ * | Commit :commit 2 (WCE) | |
+ */
+DEATH_TEST_REGEX(KVEngineTestHarness,
+ PinningOldestTimestampWithWriteConflict,
+ ".*commit timestamp.*is less than the oldest timestamp.*") {
+ std::unique_ptr<KVHarnessHelper> helper(KVHarnessHelper::create());
+ KVEngine* engine = helper->getEngine();
+ // TODO SERVER-48314: Remove after implementing correct behavior on biggie.
+ if (engine->isEphemeral())
+ invariant(false, "commit timestamp is less than the oldest timestamp");
+
+ std::string ns = "a.b";
+ std::unique_ptr<RecordStore> rs;
+ {
+ MyOperationContext opCtx(engine);
+ ASSERT_OK(engine->createRecordStore(&opCtx, ns, ns, CollectionOptions()));
+ rs = engine->getRecordStore(&opCtx, ns, ns, CollectionOptions());
+ ASSERT(rs);
+ }
+
+ {
+ // A write transaction cannot insert records before the oldest timestamp.
+ engine->setOldestTimestamp(Timestamp(2, 2), false);
+ MyOperationContext opCtx2(engine);
+ WriteUnitOfWork uow2(&opCtx2);
+ StatusWith<RecordId> res = rs->insertRecord(&opCtx2, "abc", 4, Timestamp(1, 1));
+ uow2.commit();
+ }
+}
+
+/*
+ * Rolling Back To Last Stable
+ * | Session | GlobalActor |
+ * |-----------------------------+----------------------------|
+ * | Begin | |
+ * | Write A 1 | |
+ * | Timestamp: commit 1 | |
+ * | | Last Stable Timetamp: 2 |
+ * | Begin | |
+ * | Write B 2 | |
+ * | Timestamp: commit 3 | |
+ * | | Recover to Last Stable |
+ * | Read A (1) | |
+ * | Read B (NOT_FOUND) | |
+ */
+TEST(KVEngineTestHarness, RollingBackToLastStable) {
+ std::unique_ptr<KVHarnessHelper> helper(KVHarnessHelper::create());
+ KVEngine* engine = helper->getEngine();
+ // TODO SERVER-48314: Remove after implementing correct behavior on biggie.
+ if (engine->isEphemeral())
+ return;
+
+ // The initial data timestamp has to be set to take stable checkpoints.
+ engine->setInitialDataTimestamp(Timestamp(1, 1));
+ std::string ns = "a.b";
+ std::unique_ptr<RecordStore> rs;
+ {
+ MyOperationContext opCtx(engine);
+ ASSERT_OK(engine->createRecordStore(&opCtx, ns, ns, CollectionOptions()));
+ rs = engine->getRecordStore(&opCtx, ns, ns, CollectionOptions());
+ ASSERT(rs);
+ }
+
+ RecordId ridA;
+ {
+ MyOperationContext opCtx(engine);
+ WriteUnitOfWork uow(&opCtx);
+ auto res = rs->insertRecord(&opCtx, "abc", 4, Timestamp(1, 1));
+ ASSERT_OK(res);
+ ridA = res.getValue();
+ uow.commit();
+ ASSERT_EQUALS(1, rs->numRecords(&opCtx));
+ }
+
+ {
+ // Set the stable timestamp to (2, 2).
+ ASSERT(!engine->getLastStableRecoveryTimestamp());
+ engine->setStableTimestamp(Timestamp(2, 2), false);
+ ASSERT(!engine->getLastStableRecoveryTimestamp());
+
+ // Force a checkpoint to be taken. This should advance the last stable timestamp.
+ MyOperationContext opCtx(engine);
+ engine->flushAllFiles(&opCtx, false);
+ ASSERT_EQ(engine->getLastStableRecoveryTimestamp(), Timestamp(2, 2));
+ }
+
+ RecordId ridB;
+ {
+ // Insert a record after the stable timestamp.
+ MyOperationContext opCtx(engine);
+ WriteUnitOfWork uow(&opCtx);
+ StatusWith<RecordId> swRid = rs->insertRecord(&opCtx, "def", 4, Timestamp(3, 3));
+ ASSERT_OK(swRid);
+ ridB = swRid.getValue();
+ ASSERT_EQUALS(2, rs->numRecords(&opCtx));
+ uow.commit();
+ }
+
+ {
+ // Rollback to the last stable timestamp.
+ MyOperationContext opCtx(engine);
+ StatusWith<Timestamp> swTimestamp = engine->recoverToStableTimestamp(&opCtx);
+ ASSERT_EQ(swTimestamp.getValue(), Timestamp(2, 2));
+
+ // Verify that we can find record A and can't find the record B inserted at Timestamp(3, 3)
+ // in the collection any longer. 'numRecords' will still show two as it's the fast count and
+ // doesn't get reflected during the rollback.
+ RecordData rd;
+ opCtx.recoveryUnit()->abandonSnapshot();
+ ASSERT(rs->findRecord(&opCtx, ridA, &rd));
+ ASSERT_EQ(std::string("abc"), rd.data());
+ ASSERT_FALSE(rs->findRecord(&opCtx, ridB, nullptr));
+ ASSERT_EQUALS(2, rs->numRecords(&opCtx));
+ }
+}
+
+/*
+ * Commit behind stable
+ * | Session | GlobalActor |
+ * |---------------------------------+----------------------------|
+ * | | GlobalTimestamp :stable 2 |
+ * | Begin | |
+ * | Write A 1 | |
+ * | Timestamp :commit 1 (ROLLBACK) | |
+ */
+DEATH_TEST_REGEX(KVEngineTestHarness,
+ CommitBehindStable,
+ ".*commit timestamp.*is less than the stable timestamp.*") {
+ std::unique_ptr<KVHarnessHelper> helper(KVHarnessHelper::create());
+ KVEngine* engine = helper->getEngine();
+ // TODO SERVER-48314: Remove after implementing correct behavior on biggie.
+ if (engine->isEphemeral())
+ invariant(false, "commit timestamp is less than the stable timestamp");
+
+ // The initial data timestamp has to be set to take stable checkpoints.
+ engine->setInitialDataTimestamp(Timestamp(1, 1));
+ std::string ns = "a.b";
+ std::unique_ptr<RecordStore> rs;
+ {
+ MyOperationContext opCtx(engine);
+ ASSERT_OK(engine->createRecordStore(&opCtx, ns, ns, CollectionOptions()));
+ rs = engine->getRecordStore(&opCtx, ns, ns, CollectionOptions());
+ ASSERT(rs);
+ }
+
+ {
+ // Set the stable timestamp to (2, 2).
+ ASSERT(!engine->getLastStableRecoveryTimestamp());
+ engine->setStableTimestamp(Timestamp(2, 2), false);
+ ASSERT(!engine->getLastStableRecoveryTimestamp());
+
+ // Force a checkpoint to be taken. This should advance the last stable timestamp.
+ MyOperationContext opCtx(engine);
+ engine->flushAllFiles(&opCtx, false);
+ ASSERT_EQ(engine->getLastStableRecoveryTimestamp(), Timestamp(2, 2));
+ }
+
+ {
+ // Committing a behind the stable timestamp is not allowed.
+ MyOperationContext opCtx(engine);
+ WriteUnitOfWork uow(&opCtx);
+ auto swRid = rs->insertRecord(&opCtx, "abc", 4, Timestamp(1, 1));
+ uow.commit();
+ }
+}
+
+/*
+ * Commit at stable
+ * | Session | GlobalActor |
+ * |---------------------------------+----------------------------|
+ * | | GlobalTimestamp :stable 2 |
+ * | Begin | |
+ * | Write A 1 | |
+ * | Timestamp :commit 2 | |
+ */
+TEST(KVEngineTestHarness, CommitAtStable) {
+ std::unique_ptr<KVHarnessHelper> helper(KVHarnessHelper::create());
+ KVEngine* engine = helper->getEngine();
+ // TODO SERVER-48314: Remove after implementing correct behavior on biggie.
+ if (engine->isEphemeral())
+ return;
+
+ // The initial data timestamp has to be set to take stable checkpoints.
+ engine->setInitialDataTimestamp(Timestamp(1, 1));
+ std::string ns = "a.b";
+ std::unique_ptr<RecordStore> rs;
+ {
+ MyOperationContext opCtx(engine);
+ ASSERT_OK(engine->createRecordStore(&opCtx, ns, ns, CollectionOptions()));
+ rs = engine->getRecordStore(&opCtx, ns, ns, CollectionOptions());
+ ASSERT(rs);
+ }
+
+ {
+ // Set the stable timestamp to (2, 2).
+ ASSERT(!engine->getLastStableRecoveryTimestamp());
+ engine->setStableTimestamp(Timestamp(2, 2), false);
+ ASSERT(!engine->getLastStableRecoveryTimestamp());
+
+ // Force a checkpoint to be taken. This should advance the last stable timestamp.
+ MyOperationContext opCtx(engine);
+ engine->flushAllFiles(&opCtx, false);
+ ASSERT_EQ(engine->getLastStableRecoveryTimestamp(), Timestamp(2, 2));
+ }
+
+ RecordId rid;
+ {
+ // For a non-prepared transaction, the commit timestamp can be equal to the stable
+ // timestamp.
+ MyOperationContext opCtx(engine);
+ WriteUnitOfWork uow(&opCtx);
+ auto swRid = rs->insertRecord(&opCtx, "abc", 4, Timestamp(2, 2));
+ ASSERT_OK(swRid);
+ rid = swRid.getValue();
+ uow.commit();
+ }
+
+ {
+ // Rollback to the last stable timestamp.
+ MyOperationContext opCtx(engine);
+ StatusWith<Timestamp> swTimestamp = engine->recoverToStableTimestamp(&opCtx);
+ ASSERT_EQ(swTimestamp.getValue(), Timestamp(2, 2));
+
+ // Transaction with timestamps equal to lastStable will not be rolled back.
+ opCtx.recoveryUnit()->abandonSnapshot();
+ RecordData data;
+ ASSERT_TRUE(rs->findRecord(&opCtx, rid, &data));
+ ASSERT_EQUALS(1, rs->numRecords(&opCtx));
+ }
+}
+
TEST_F(DurableCatalogImplTest, Coll1) {
KVEngine* engine = helper->getEngine();