diff options
author | Bynn Lee <bynn.lee@mongodb.com> | 2020-07-08 02:36:29 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-07-14 21:06:45 +0000 |
commit | 5bb3a09729ea6d03b3a17915bb4bc3e6e5105fdd (patch) | |
tree | 74c6d6d4f7bb07650c36aeffe7fca742ab57362f /src/mongo/db | |
parent | 316408d14e2358225e8f26bfe4d4022cfcf748d9 (diff) | |
download | mongo-5bb3a09729ea6d03b3a17915bb4bc3e6e5105fdd.tar.gz |
SERVER-49014 Add additional timestamping tests to the KVEngine test harness
Diffstat (limited to 'src/mongo/db')
-rw-r--r-- | src/mongo/db/storage/kv/kv_engine_test_harness.cpp | 762 |
1 files changed, 762 insertions, 0 deletions
diff --git a/src/mongo/db/storage/kv/kv_engine_test_harness.cpp b/src/mongo/db/storage/kv/kv_engine_test_harness.cpp index 4195bddcf5d..e0cb1d86eb8 100644 --- a/src/mongo/db/storage/kv/kv_engine_test_harness.cpp +++ b/src/mongo/db/storage/kv/kv_engine_test_harness.cpp @@ -364,6 +364,768 @@ TEST(KVEngineTestHarness, AllDurableTimestamp) { } } +/* + * Pinned oldest with another session + * | Session 1 | Session 2 | + * |-----------------------------+----------------------------| + * | Begin | | + * | Write A 1 | | + * | Commit :commit 10 | | + * | Begin :readAt 15 | | + * | | Begin | + * | | Write A 2 | + * | Read A (1) | | + * | | Commit :commit 20 | + * | Read A (1) | | + * | | Begin :readAt 15 | + * | | Read A (1) | + * | Rollback | | + * | | Rollback | + */ +TEST(KVEngineTestHarness, PinningOldestWithAnotherSession) { + std::unique_ptr<KVHarnessHelper> helper(KVHarnessHelper::create()); + KVEngine* engine = helper->getEngine(); + // TODO SERVER-48314: Remove after implementing correct behavior on biggie. + if (engine->isEphemeral()) + return; + + std::string ns = "a.b"; + std::unique_ptr<RecordStore> rs; + { + MyOperationContext opCtx(engine); + ASSERT_OK(engine->createRecordStore(&opCtx, ns, ns, CollectionOptions())); + rs = engine->getRecordStore(&opCtx, ns, ns, CollectionOptions()); + ASSERT(rs); + } + + MyOperationContext opCtx1(engine); + WriteUnitOfWork uow1(&opCtx1); + StatusWith<RecordId> res = rs->insertRecord(&opCtx1, "abc", 4, Timestamp(10, 10)); + RecordId rid = res.getValue(); + uow1.commit(); + + RecordData rd; + opCtx1.recoveryUnit()->setTimestampReadSource(RecoveryUnit::ReadSource::kProvided, + Timestamp(15, 15)); + + MyOperationContext opCtx2(engine); + WriteUnitOfWork uow2(&opCtx2); + + ASSERT(rs->findRecord(&opCtx1, rid, &rd)); + ASSERT_OK(opCtx2.recoveryUnit()->setTimestamp(Timestamp(20, 20))); + ASSERT_OK(rs->updateRecord(&opCtx2, rid, "updated", 8)); + + ASSERT(rs->findRecord(&opCtx1, rid, &rd)); + ASSERT_EQUALS(std::string("abc"), rd.data()); + + uow2.commit(); + + opCtx1.recoveryUnit()->abandonSnapshot(); + ASSERT(rs->findRecord(&opCtx1, rid, &rd)); + ASSERT_EQUALS(std::string("abc"), rd.data()); + + + opCtx2.recoveryUnit()->setTimestampReadSource(RecoveryUnit::ReadSource::kProvided, + Timestamp(15, 15)); + ASSERT(rs->findRecord(&opCtx2, rid, &rd)); + ASSERT_EQUALS(std::string("abc"), rd.data()); +} + +/* + * All durable + * | Session 1 | Session 2 | GlobalActor | + * |----------------------+----------------------+----------------------------------| + * | Begin | | | + * | Commit :commit 10 | | | + * | | | QueryTimestamp :all_durable (10) | + * | Begin | | | + * | Timestamp :commit 20 | | | + * | | | QueryTimestamp :all_durable (19) | + * | | Begin | | + * | | Timestamp :commit 30 | | + * | | Commit | | + * | | | QueryTimestamp :all_durable (19) | + * | Commit | | | + * | | | QueryTimestamp :all_durable (30) | + * | Begin | | | + * | Timestamp :commit 25 | | | + * | | | QueryTimestamp :all_durable (30) | + */ +TEST(KVEngineTestHarness, AllDurable) { + std::unique_ptr<KVHarnessHelper> helper(KVHarnessHelper::create()); + KVEngine* engine = helper->getEngine(); + if (!engine->supportsDocLocking()) + return; + + std::string ns = "a.b"; + std::unique_ptr<RecordStore> rs; + { + MyOperationContext opCtx(engine); + ASSERT_OK(engine->createRecordStore(&opCtx, ns, ns, CollectionOptions())); + rs = engine->getRecordStore(&opCtx, ns, ns, CollectionOptions()); + ASSERT(rs); + } + + { + const Timestamp kInsertTimestamp1 = Timestamp(10, 10); + const Timestamp kInsertTimestamp2 = Timestamp(20, 20); + const Timestamp kInsertTimestamp3 = Timestamp(30, 30); + const Timestamp kInsertTimestamp4 = Timestamp(25, 25); + + Timestamp allDurable = engine->getAllDurableTimestamp(); + MyOperationContext opCtx1(engine); + WriteUnitOfWork uow1(&opCtx1); + auto swRid = rs->insertRecord(&opCtx1, "abc", 4, kInsertTimestamp1); + ASSERT_OK(swRid); + uow1.commit(); + + Timestamp lastAllDurable = allDurable; + allDurable = engine->getAllDurableTimestamp(); + ASSERT_GTE(allDurable, lastAllDurable); + ASSERT_LTE(allDurable, kInsertTimestamp1); + + MyOperationContext opCtx2(engine); + WriteUnitOfWork uow2(&opCtx2); + swRid = rs->insertRecord(&opCtx2, "abc", 4, kInsertTimestamp2); + ASSERT_OK(swRid); + + lastAllDurable = allDurable; + allDurable = engine->getAllDurableTimestamp(); + ASSERT_GTE(allDurable, lastAllDurable); + ASSERT_LT(allDurable, kInsertTimestamp2); + + MyOperationContext opCtx3(engine); + WriteUnitOfWork uow3(&opCtx3); + swRid = rs->insertRecord(&opCtx3, "abc", 4, kInsertTimestamp3); + ASSERT_OK(swRid); + uow3.commit(); + + lastAllDurable = allDurable; + allDurable = engine->getAllDurableTimestamp(); + ASSERT_GTE(allDurable, lastAllDurable); + ASSERT_LT(allDurable, kInsertTimestamp2); + + uow2.commit(); + + lastAllDurable = allDurable; + allDurable = engine->getAllDurableTimestamp(); + ASSERT_GTE(allDurable, lastAllDurable); + ASSERT_LTE(allDurable, kInsertTimestamp3); + + MyOperationContext opCtx4(engine); + WriteUnitOfWork uow4(&opCtx4); + swRid = rs->insertRecord(&opCtx4, "abc", 4, kInsertTimestamp4); + ASSERT_OK(swRid); + + lastAllDurable = allDurable; + allDurable = engine->getAllDurableTimestamp(); + ASSERT_GTE(allDurable, lastAllDurable); + ASSERT_LTE(allDurable, kInsertTimestamp3); + uow4.commit(); + } +} + +/* + * Basic Timestamp - Single + * | Session | + * |----------------------| + * | Begin | + * | Write A 1 | + * | Commit :commit 10 | + * | | + * | Begin :readAt 9 | + * | Read A (NOT_FOUND) | + * | Rollback | + * | | + * | Begin :readAt 10 | + * | Read A (1) | + */ +TEST(KVEngineTestHarness, BasicTimestampSingle) { + std::unique_ptr<KVHarnessHelper> helper(KVHarnessHelper::create()); + KVEngine* engine = helper->getEngine(); + // TODO SERVER-48314: Remove after implementing correct behavior on biggie. + if (engine->isEphemeral()) + return; + + std::string ns = "a.b"; + std::unique_ptr<RecordStore> rs; + { + MyOperationContext opCtx(engine); + ASSERT_OK(engine->createRecordStore(&opCtx, ns, ns, CollectionOptions())); + rs = engine->getRecordStore(&opCtx, ns, ns, CollectionOptions()); + ASSERT(rs); + } + + const Timestamp kReadTimestamp = Timestamp(9, 9); + const Timestamp kInsertTimestamp = Timestamp(10, 10); + + // Start a read transaction. + MyOperationContext opCtx1(engine); + + opCtx1.recoveryUnit()->setTimestampReadSource(RecoveryUnit::ReadSource::kProvided, + kReadTimestamp); + ASSERT(!rs->findRecord(&opCtx1, RecordId::min(), nullptr)); + + // Insert a record at a later time. + RecordId rid; + { + MyOperationContext opCtx2(engine); + WriteUnitOfWork wuow(&opCtx2); + auto swRid = rs->insertRecord(&opCtx2, "abc", 4, kInsertTimestamp); + ASSERT_OK(swRid); + rid = swRid.getValue(); + wuow.commit(); + } + + // Should not see the record, even if we abandon the snapshot as the read timestamp is still + // earlier than the insert timestamp. + ASSERT(!rs->findRecord(&opCtx1, rid, nullptr)); + opCtx1.recoveryUnit()->abandonSnapshot(); + ASSERT(!rs->findRecord(&opCtx1, rid, nullptr)); + + + opCtx1.recoveryUnit()->setTimestampReadSource(RecoveryUnit::ReadSource::kProvided, + kInsertTimestamp); + opCtx1.recoveryUnit()->abandonSnapshot(); + RecordData rd; + ASSERT(rs->findRecord(&opCtx1, rid, &rd)); + ASSERT_EQ(std::string("abc"), rd.data()); +} + +/* + * Basic Timestamp - Multiple + * | Session | + * |----------------------| + * | Begin | + * | Timestamp :commit 10 | + * | Write A 1 | + * | Timestamp :commit 20 | + * | Update A 2 | + * | Commit | + * | | + * | Begin :readAt 10 | + * | Read A (1) | + * | Rollback | + * | | + * | Begin :readAt 20 | + * | Read A (2) | + */ +TEST(KVEngineTestHarness, BasicTimestampMultiple) { + std::unique_ptr<KVHarnessHelper> helper(KVHarnessHelper::create()); + KVEngine* engine = helper->getEngine(); + // TODO SERVER-48314: Remove after implementing correct behavior on biggie. + if (engine->isEphemeral()) + return; + + std::string ns = "a.b"; + std::unique_ptr<RecordStore> rs; + { + MyOperationContext opCtx(engine); + ASSERT_OK(engine->createRecordStore(&opCtx, ns, ns, CollectionOptions())); + rs = engine->getRecordStore(&opCtx, ns, ns, CollectionOptions()); + ASSERT(rs); + } + + const Timestamp t10 = Timestamp(10, 10); + const Timestamp t20 = Timestamp(20, 20); + + RecordId rid; + { + // Initial insert of record. + MyOperationContext opCtx(engine); + WriteUnitOfWork wuow(&opCtx); + auto swRid = rs->insertRecord(&opCtx, "abc", 4, t10); + ASSERT_OK(swRid); + rid = swRid.getValue(); + + // Update a record at a later time. + ASSERT_OK(opCtx.recoveryUnit()->setTimestamp(t20)); + auto res = rs->updateRecord(&opCtx, rid, "updated", 8); + ASSERT_OK(res); + wuow.commit(); + } + + RecordData rd; + MyOperationContext opCtx(engine); + opCtx.recoveryUnit()->setTimestampReadSource(RecoveryUnit::ReadSource::kProvided, t10); + ASSERT(rs->findRecord(&opCtx, rid, &rd)); + ASSERT_EQUALS(std::string("abc"), rd.data()); + + opCtx.recoveryUnit()->setTimestampReadSource(RecoveryUnit::ReadSource::kProvided, t20); + opCtx.recoveryUnit()->abandonSnapshot(); + ASSERT(rs->findRecord(&opCtx, rid, &rd)); + ASSERT_EQUALS(std::string("updated"), rd.data()); +} + +/* + * Concurrent operations under snapshot isolation blocks visibility + * | Session 1 | Session 2 | + * |-------------------+--------------------------------------| + * | Begin | | + * | | Begin :readAt 20 :isolation snapshot | + * | Write A 1 | | + * | Commit :commit 10 | | + * | | Read A (NOT_FOUND) | + * | | Abandon Snapshot | + * | | Read A (1) | + */ +TEST(KVEngineTestHarness, SingleReadWithConflict) { + std::unique_ptr<KVHarnessHelper> helper(KVHarnessHelper::create()); + KVEngine* engine = helper->getEngine(); + // TODO SERVER-48314: Remove after implementing correct behavior on biggie. + if (engine->isEphemeral()) + return; + + std::string ns = "a.b"; + std::unique_ptr<RecordStore> rs; + { + MyOperationContext opCtx(engine); + ASSERT_OK(engine->createRecordStore(&opCtx, ns, ns, CollectionOptions())); + rs = engine->getRecordStore(&opCtx, ns, ns, CollectionOptions()); + ASSERT(rs); + } + + MyOperationContext opCtx2(engine); + opCtx2.recoveryUnit()->setTimestampReadSource(RecoveryUnit::ReadSource::kProvided, + Timestamp(20, 20)); + + MyOperationContext opCtx1(engine); + WriteUnitOfWork uow1(&opCtx1); + StatusWith<RecordId> res = rs->insertRecord(&opCtx1, "abc", 4, Timestamp(10, 10)); + ASSERT_OK(res); + RecordId loc = res.getValue(); + + // Cannot find record before commit. + RecordData rd; + ASSERT(!rs->findRecord(&opCtx2, loc, &rd)); + + // Cannot find record after commit due to snapshot isolation. + uow1.commit(); + ASSERT(!rs->findRecord(&opCtx2, loc, &rd)); + + // Abandon snapshot for visibility. + opCtx2.recoveryUnit()->abandonSnapshot(); + + ASSERT(rs->findRecord(&opCtx2, loc, &rd)); + ASSERT_EQUALS(std::string("abc"), rs->dataFor(&opCtx2, loc).data()); +} + +/* + * Item Not Found - Read timestamp hides visibility + * | Session | + * |----------------------| + * | Begin | + * | Write A 1 | + * | Commit :commit 10 | + * | | + * | Begin :readAt 9 | + * | Read A (NOT_FOUND) | + * | Write A 1 (NOT_FOUND)| + */ +DEATH_TEST_REGEX(KVEngineTestHarness, SnapshotHidesVisibility, ".*item not found.*") { + std::unique_ptr<KVHarnessHelper> helper(KVHarnessHelper::create()); + KVEngine* engine = helper->getEngine(); + // TODO: Remove after implementing correct behavior on biggie. + if (engine->isEphemeral()) + invariant(false, "item not found"); + + std::string ns = "a.b"; + std::unique_ptr<RecordStore> rs; + { + MyOperationContext opCtx(engine); + ASSERT_OK(engine->createRecordStore(&opCtx, ns, ns, CollectionOptions())); + rs = engine->getRecordStore(&opCtx, ns, ns, CollectionOptions()); + ASSERT(rs); + } + + MyOperationContext opCtx1(engine); + WriteUnitOfWork uow1(&opCtx1); + StatusWith<RecordId> res = rs->insertRecord(&opCtx1, "abc", 4, Timestamp(10, 10)); + ASSERT_OK(res); + RecordId loc = res.getValue(); + uow1.commit(); + + // Snapshot was taken before the insert and will not find the record even after the commit. + RecordData rd; + MyOperationContext opCtx2(engine); + opCtx2.recoveryUnit()->setTimestampReadSource(RecoveryUnit::ReadSource::kProvided, + Timestamp(9, 9)); + ASSERT(!rs->findRecord(&opCtx2, loc, &rd)); + + // Trying to write in an outdated snapshot will cause item not found. + WriteUnitOfWork uow2(&opCtx2); + auto swRid = rs->updateRecord(&opCtx2, loc, "updated", 8); + uow2.commit(); +} + +/* + * Insert + * | Session | + * |------------------------| + * | Begin | + * | Write A 1 | + * | Timestamp :commit 10 | + * | Write Oplog | + * | Commit | + * | | + * | Begin :readAt 9 | + * | Read A (NOT_FOUND) | + * | Read Oplog (NOT_FOUND) | + * | Rollback | + * | | + * | Begin :readAt 10 | + * | Read A (1) | + * | Read Oplog (FOUND) | + */ +TEST(KVEngineTestHarness, SingleReadWithConflictWithOplog) { + std::unique_ptr<KVHarnessHelper> helper(KVHarnessHelper::create()); + KVEngine* engine = helper->getEngine(); + // TODO SERVER-48314: Remove after implementing correct behavior on biggie. + if (engine->isEphemeral()) + return; + + std::string ns = "a.b"; + std::unique_ptr<RecordStore> collectionRs; + std::unique_ptr<RecordStore> oplogRs; + { + MyOperationContext opCtx(engine); + ASSERT_OK(engine->createRecordStore(&opCtx, ns, ns, CollectionOptions())); + collectionRs = engine->getRecordStore(&opCtx, ns, ns, CollectionOptions()); + ASSERT(collectionRs); + + CollectionOptions options; + options.capped = true; + options.cappedSize = 10240; + options.cappedMaxDocs = -1; + + NamespaceString oplogNss("local.oplog.rs"); + ASSERT_OK(engine->createRecordStore(&opCtx, oplogNss.ns(), "ident", options)); + oplogRs = engine->getRecordStore(&opCtx, oplogNss.ns(), "ident", options); + ASSERT(oplogRs); + } + + RecordData rd; + RecordId locCollection; + RecordId locOplog; + const Timestamp t9(9, 9); + const Timestamp t10(10, 10); + { + MyOperationContext opCtx(engine); + WriteUnitOfWork uow(&opCtx); + + // Insert into collectionRs. + StatusWith<RecordId> res = collectionRs->insertRecord(&opCtx, "abc", 4, t10); + ASSERT_OK(res); + locCollection = res.getValue(); + + // Insert into oplogRs. + auto t11Doc = BSON("ts" << t10); + + ASSERT_EQ(invariant(oplogRs->insertRecord( + &opCtx, t11Doc.objdata(), t11Doc.objsize(), Timestamp::min())), + RecordId(10, 10)); + locOplog = RecordId(10, 10); + uow.commit(); + } + + MyOperationContext opCtx(engine); + opCtx.recoveryUnit()->setTimestampReadSource(RecoveryUnit::ReadSource::kProvided, t9); + ASSERT(!collectionRs->findRecord(&opCtx, locCollection, &rd)); + ASSERT(!oplogRs->findRecord(&opCtx, locOplog, &rd)); + + opCtx.recoveryUnit()->setTimestampReadSource(RecoveryUnit::ReadSource::kProvided, t10); + opCtx.recoveryUnit()->abandonSnapshot(); + ASSERT(collectionRs->findRecord(&opCtx, locCollection, &rd)); + ASSERT(oplogRs->findRecord(&opCtx, locOplog, &rd)); +} + +/* + * Pinned oldest timestamp - Read + * | Session | GlobalActor | + * |-----------------------------+----------------------------| + * | Begin | | + * | Write A 1 | | + * | Commit :commit 10 | | + * | | | + * | Begin :readAt 15 | | + * | Read A (1) | | + * | Rollback | | + * | | GlobalTimestamp :oldest 20 | + * | Begin :readAt 15 | | + * | Read A (DB exception) | | + */ +TEST(KVEngineTestHarness, PinningOldestTimestampWithReadConflict) { + std::unique_ptr<KVHarnessHelper> helper(KVHarnessHelper::create()); + KVEngine* engine = helper->getEngine(); + // TODO SERVER-48314: Remove after implementing correct behavior on biggie. + if (engine->isEphemeral()) + return; + + std::string ns = "a.b"; + std::unique_ptr<RecordStore> rs; + { + MyOperationContext opCtx(engine); + ASSERT_OK(engine->createRecordStore(&opCtx, ns, ns, CollectionOptions())); + rs = engine->getRecordStore(&opCtx, ns, ns, CollectionOptions()); + ASSERT(rs); + } + + MyOperationContext opCtx(engine); + WriteUnitOfWork uow(&opCtx); + StatusWith<RecordId> res = rs->insertRecord(&opCtx, "abc", 4, Timestamp(10, 10)); + RecordId rid = res.getValue(); + uow.commit(); + + RecordData rd; + opCtx.recoveryUnit()->setTimestampReadSource(RecoveryUnit::ReadSource::kProvided, + Timestamp(15, 15)); + ASSERT(rs->findRecord(&opCtx, rid, &rd)); + + engine->setOldestTimestamp(Timestamp(20, 20), false); + + opCtx.recoveryUnit()->abandonSnapshot(); + ASSERT_THROWS_CODE(rs->findRecord(&opCtx, rid, &rd), DBException, ErrorCodes::SnapshotTooOld); +} + + +/* + * Pinned oldest timestamp - Write + * | Session | GlobalActor | + * |-----------------------------+----------------------------| + * | | GlobalTimestamp :oldest 2 | + * | Begin | | + * | Write A 1 | | + * | Commit :commit 2 (WCE) | | + */ +DEATH_TEST_REGEX(KVEngineTestHarness, + PinningOldestTimestampWithWriteConflict, + ".*commit timestamp.*is less than the oldest timestamp.*") { + std::unique_ptr<KVHarnessHelper> helper(KVHarnessHelper::create()); + KVEngine* engine = helper->getEngine(); + // TODO SERVER-48314: Remove after implementing correct behavior on biggie. + if (engine->isEphemeral()) + invariant(false, "commit timestamp is less than the oldest timestamp"); + + std::string ns = "a.b"; + std::unique_ptr<RecordStore> rs; + { + MyOperationContext opCtx(engine); + ASSERT_OK(engine->createRecordStore(&opCtx, ns, ns, CollectionOptions())); + rs = engine->getRecordStore(&opCtx, ns, ns, CollectionOptions()); + ASSERT(rs); + } + + { + // A write transaction cannot insert records before the oldest timestamp. + engine->setOldestTimestamp(Timestamp(2, 2), false); + MyOperationContext opCtx2(engine); + WriteUnitOfWork uow2(&opCtx2); + StatusWith<RecordId> res = rs->insertRecord(&opCtx2, "abc", 4, Timestamp(1, 1)); + uow2.commit(); + } +} + +/* + * Rolling Back To Last Stable + * | Session | GlobalActor | + * |-----------------------------+----------------------------| + * | Begin | | + * | Write A 1 | | + * | Timestamp: commit 1 | | + * | | Last Stable Timetamp: 2 | + * | Begin | | + * | Write B 2 | | + * | Timestamp: commit 3 | | + * | | Recover to Last Stable | + * | Read A (1) | | + * | Read B (NOT_FOUND) | | + */ +TEST(KVEngineTestHarness, RollingBackToLastStable) { + std::unique_ptr<KVHarnessHelper> helper(KVHarnessHelper::create()); + KVEngine* engine = helper->getEngine(); + // TODO SERVER-48314: Remove after implementing correct behavior on biggie. + if (engine->isEphemeral()) + return; + + // The initial data timestamp has to be set to take stable checkpoints. + engine->setInitialDataTimestamp(Timestamp(1, 1)); + std::string ns = "a.b"; + std::unique_ptr<RecordStore> rs; + { + MyOperationContext opCtx(engine); + ASSERT_OK(engine->createRecordStore(&opCtx, ns, ns, CollectionOptions())); + rs = engine->getRecordStore(&opCtx, ns, ns, CollectionOptions()); + ASSERT(rs); + } + + RecordId ridA; + { + MyOperationContext opCtx(engine); + WriteUnitOfWork uow(&opCtx); + auto res = rs->insertRecord(&opCtx, "abc", 4, Timestamp(1, 1)); + ASSERT_OK(res); + ridA = res.getValue(); + uow.commit(); + ASSERT_EQUALS(1, rs->numRecords(&opCtx)); + } + + { + // Set the stable timestamp to (2, 2). + ASSERT(!engine->getLastStableRecoveryTimestamp()); + engine->setStableTimestamp(Timestamp(2, 2), false); + ASSERT(!engine->getLastStableRecoveryTimestamp()); + + // Force a checkpoint to be taken. This should advance the last stable timestamp. + MyOperationContext opCtx(engine); + engine->flushAllFiles(&opCtx, false); + ASSERT_EQ(engine->getLastStableRecoveryTimestamp(), Timestamp(2, 2)); + } + + RecordId ridB; + { + // Insert a record after the stable timestamp. + MyOperationContext opCtx(engine); + WriteUnitOfWork uow(&opCtx); + StatusWith<RecordId> swRid = rs->insertRecord(&opCtx, "def", 4, Timestamp(3, 3)); + ASSERT_OK(swRid); + ridB = swRid.getValue(); + ASSERT_EQUALS(2, rs->numRecords(&opCtx)); + uow.commit(); + } + + { + // Rollback to the last stable timestamp. + MyOperationContext opCtx(engine); + StatusWith<Timestamp> swTimestamp = engine->recoverToStableTimestamp(&opCtx); + ASSERT_EQ(swTimestamp.getValue(), Timestamp(2, 2)); + + // Verify that we can find record A and can't find the record B inserted at Timestamp(3, 3) + // in the collection any longer. 'numRecords' will still show two as it's the fast count and + // doesn't get reflected during the rollback. + RecordData rd; + opCtx.recoveryUnit()->abandonSnapshot(); + ASSERT(rs->findRecord(&opCtx, ridA, &rd)); + ASSERT_EQ(std::string("abc"), rd.data()); + ASSERT_FALSE(rs->findRecord(&opCtx, ridB, nullptr)); + ASSERT_EQUALS(2, rs->numRecords(&opCtx)); + } +} + +/* + * Commit behind stable + * | Session | GlobalActor | + * |---------------------------------+----------------------------| + * | | GlobalTimestamp :stable 2 | + * | Begin | | + * | Write A 1 | | + * | Timestamp :commit 1 (ROLLBACK) | | + */ +DEATH_TEST_REGEX(KVEngineTestHarness, + CommitBehindStable, + ".*commit timestamp.*is less than the stable timestamp.*") { + std::unique_ptr<KVHarnessHelper> helper(KVHarnessHelper::create()); + KVEngine* engine = helper->getEngine(); + // TODO SERVER-48314: Remove after implementing correct behavior on biggie. + if (engine->isEphemeral()) + invariant(false, "commit timestamp is less than the stable timestamp"); + + // The initial data timestamp has to be set to take stable checkpoints. + engine->setInitialDataTimestamp(Timestamp(1, 1)); + std::string ns = "a.b"; + std::unique_ptr<RecordStore> rs; + { + MyOperationContext opCtx(engine); + ASSERT_OK(engine->createRecordStore(&opCtx, ns, ns, CollectionOptions())); + rs = engine->getRecordStore(&opCtx, ns, ns, CollectionOptions()); + ASSERT(rs); + } + + { + // Set the stable timestamp to (2, 2). + ASSERT(!engine->getLastStableRecoveryTimestamp()); + engine->setStableTimestamp(Timestamp(2, 2), false); + ASSERT(!engine->getLastStableRecoveryTimestamp()); + + // Force a checkpoint to be taken. This should advance the last stable timestamp. + MyOperationContext opCtx(engine); + engine->flushAllFiles(&opCtx, false); + ASSERT_EQ(engine->getLastStableRecoveryTimestamp(), Timestamp(2, 2)); + } + + { + // Committing a behind the stable timestamp is not allowed. + MyOperationContext opCtx(engine); + WriteUnitOfWork uow(&opCtx); + auto swRid = rs->insertRecord(&opCtx, "abc", 4, Timestamp(1, 1)); + uow.commit(); + } +} + +/* + * Commit at stable + * | Session | GlobalActor | + * |---------------------------------+----------------------------| + * | | GlobalTimestamp :stable 2 | + * | Begin | | + * | Write A 1 | | + * | Timestamp :commit 2 | | + */ +TEST(KVEngineTestHarness, CommitAtStable) { + std::unique_ptr<KVHarnessHelper> helper(KVHarnessHelper::create()); + KVEngine* engine = helper->getEngine(); + // TODO SERVER-48314: Remove after implementing correct behavior on biggie. + if (engine->isEphemeral()) + return; + + // The initial data timestamp has to be set to take stable checkpoints. + engine->setInitialDataTimestamp(Timestamp(1, 1)); + std::string ns = "a.b"; + std::unique_ptr<RecordStore> rs; + { + MyOperationContext opCtx(engine); + ASSERT_OK(engine->createRecordStore(&opCtx, ns, ns, CollectionOptions())); + rs = engine->getRecordStore(&opCtx, ns, ns, CollectionOptions()); + ASSERT(rs); + } + + { + // Set the stable timestamp to (2, 2). + ASSERT(!engine->getLastStableRecoveryTimestamp()); + engine->setStableTimestamp(Timestamp(2, 2), false); + ASSERT(!engine->getLastStableRecoveryTimestamp()); + + // Force a checkpoint to be taken. This should advance the last stable timestamp. + MyOperationContext opCtx(engine); + engine->flushAllFiles(&opCtx, false); + ASSERT_EQ(engine->getLastStableRecoveryTimestamp(), Timestamp(2, 2)); + } + + RecordId rid; + { + // For a non-prepared transaction, the commit timestamp can be equal to the stable + // timestamp. + MyOperationContext opCtx(engine); + WriteUnitOfWork uow(&opCtx); + auto swRid = rs->insertRecord(&opCtx, "abc", 4, Timestamp(2, 2)); + ASSERT_OK(swRid); + rid = swRid.getValue(); + uow.commit(); + } + + { + // Rollback to the last stable timestamp. + MyOperationContext opCtx(engine); + StatusWith<Timestamp> swTimestamp = engine->recoverToStableTimestamp(&opCtx); + ASSERT_EQ(swTimestamp.getValue(), Timestamp(2, 2)); + + // Transaction with timestamps equal to lastStable will not be rolled back. + opCtx.recoveryUnit()->abandonSnapshot(); + RecordData data; + ASSERT_TRUE(rs->findRecord(&opCtx, rid, &data)); + ASSERT_EQUALS(1, rs->numRecords(&opCtx)); + } +} + TEST_F(DurableCatalogImplTest, Coll1) { KVEngine* engine = helper->getEngine(); |