summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorA. Jesse Jiryu Davis <jesse@mongodb.com>2020-09-29 14:25:00 -0400
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-09-29 18:50:31 +0000
commit09c14216c0f4adae7d12c27dc034ffbf4e9b7001 (patch)
tree0ea8d1fccba2faf95cfc6459e276ee0dd65c501d
parentbe850a07b04d1af6cae9f38800fa618a333b55c3 (diff)
downloadmongo-09c14216c0f4adae7d12c27dc034ffbf4e9b7001.tar.gz
SERVER-48518 Fix rollback via refetch anomaly
-rw-r--r--jstests/replsets/libs/rollback_test.js41
-rw-r--r--jstests/replsets/libs/rollback_test_deluxe.js34
-rw-r--r--jstests/replsets/rollback_via_refetch_anomaly.js193
-rw-r--r--src/mongo/db/repl/rs_rollback.cpp14
-rw-r--r--src/mongo/db/repl/rs_rollback.h3
-rw-r--r--src/mongo/db/repl/sync_tail.cpp14
-rw-r--r--src/mongo/db/storage/kv/kv_engine.h7
-rw-r--r--src/mongo/db/storage/kv/kv_storage_engine.cpp5
-rw-r--r--src/mongo/db/storage/kv/kv_storage_engine.h3
-rw-r--r--src/mongo/db/storage/mmap_v1/mmap_v1_engine.h4
-rw-r--r--src/mongo/db/storage/storage_engine.h5
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp5
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h2
13 files changed, 328 insertions, 2 deletions
diff --git a/jstests/replsets/libs/rollback_test.js b/jstests/replsets/libs/rollback_test.js
index 957e4fe2adf..4e307651917 100644
--- a/jstests/replsets/libs/rollback_test.js
+++ b/jstests/replsets/libs/rollback_test.js
@@ -325,6 +325,36 @@ function RollbackTest(name = "RollbackTest", replSet) {
};
/**
+ * Insert on primary until its lastApplied >= the rollback node's. Useful for testing rollback
+ * via refetch, which completes rollback recovery when new lastApplied >= old top of oplog.
+ */
+ const _awaitPrimaryAppliedSurpassesRollbackApplied = function() {
+ log(`Waiting for lastApplied on sync source ${curPrimary.host} to surpass lastApplied` +
+ ` on rollback node ${curSecondary.host}`);
+
+ function lastApplied(node) {
+ const reply = assert.commandWorked(node.adminCommand({replSetGetStatus: 1}));
+ return reply.optimes.appliedOpTime.ts;
+ }
+
+ const rollbackApplied = lastApplied(curSecondary);
+ assert.soon(() => {
+ const primaryApplied = lastApplied(curPrimary);
+ jsTestLog(
+ `lastApplied on sync source ${curPrimary.host}:` +
+ ` ${tojson(primaryApplied)}, lastApplied on rollback node ${curSecondary.host}:` +
+ ` ${tojson(rollbackApplied)}`);
+
+ if (timestampCmp(primaryApplied, rollbackApplied) >= 0) {
+ return true;
+ }
+
+ let crudColl = curPrimary.getDB("test")["awaitPrimaryAppliedSurpassesRollbackApplied"];
+ assert.commandWorked(crudColl.insertOne({}));
+ }, "primary's lastApplied never surpassed rollback node's");
+ };
+
+ /**
* Transition to the second stage of rollback testing, where we isolate the old primary and
* elect the old secondary as the new primary. Then, operations can be performed on the new
* primary so that that optimes diverge and previous operations on the old primary will be
@@ -383,6 +413,13 @@ function RollbackTest(name = "RollbackTest", replSet) {
lastRBID = assert.commandWorked(curSecondary.adminCommand("replSetGetRBID")).rbid;
+ const isMajorityReadConcernEnabledOnRollbackNode =
+ assert.commandWorked(curSecondary.adminCommand({serverStatus: 1}))
+ .storageEngine.supportsCommittedReads;
+ if (!isMajorityReadConcernEnabledOnRollbackNode) {
+ _awaitPrimaryAppliedSurpassesRollbackApplied();
+ }
+
return curPrimary;
};
@@ -418,6 +455,10 @@ function RollbackTest(name = "RollbackTest", replSet) {
return curSecondary;
};
+ this.getTieBreaker = function() {
+ return tiebreakerNode;
+ };
+
this.restartNode = function(nodeId, signal, startOptions, allowedExitCode) {
assert(signal === SIGKILL || signal === SIGTERM, `Received unknown signal: ${signal}`);
assert.gte(nodeId, 0, "Invalid argument to RollbackTest.restartNode()");
diff --git a/jstests/replsets/libs/rollback_test_deluxe.js b/jstests/replsets/libs/rollback_test_deluxe.js
index 3c87ebb3a21..224b74f2254 100644
--- a/jstests/replsets/libs/rollback_test_deluxe.js
+++ b/jstests/replsets/libs/rollback_test_deluxe.js
@@ -459,6 +459,36 @@ function RollbackTestDeluxe(name = "FiveNodeDoubleRollbackTest", replSet) {
};
/**
+ * Insert on primary until its lastApplied >= the rollback node's. Useful for testing rollback
+ * via refetch, which completes rollback recovery when new lastApplied >= old top of oplog.
+ */
+ const _awaitPrimaryAppliedSurpassesRollbackApplied = function() {
+ log(`Waiting for lastApplied on sync source ${curPrimary.host} to surpass lastApplied` +
+ ` on rollback node ${rollbackSecondary.host}`);
+
+ function lastApplied(node) {
+ const reply = assert.commandWorked(node.adminCommand({replSetGetStatus: 1}));
+ return reply.optimes.appliedOpTime.ts;
+ }
+
+ const rollbackApplied = lastApplied(rollbackSecondary);
+ assert.soon(() => {
+ const primaryApplied = lastApplied(curPrimary);
+ jsTestLog(`lastApplied on sync source ${curPrimary.host}:` +
+ ` ${tojson(primaryApplied)}, lastApplied on rollback node ${
+ rollbackSecondary.host}:` +
+ ` ${tojson(rollbackApplied)}`);
+
+ if (timestampCmp(primaryApplied, rollbackApplied) >= 0) {
+ return true;
+ }
+
+ let crudColl = curPrimary.getDB("test")["awaitPrimaryAppliedSurpassesRollbackApplied"];
+ assert.commandWorked(crudColl.insertOne({}));
+ }, "primary's lastApplied never surpassed rollback node's");
+ };
+
+ /**
* Transition to the second stage of rollback testing, where we isolate the old primary and the
* rollback secondary from the rest of the replica set. The arbiters are reconnected to the
* secondary on standby to elect it as the new primary.
@@ -514,6 +544,10 @@ function RollbackTestDeluxe(name = "FiveNodeDoubleRollbackTest", replSet) {
lastStandbySecondaryRBID =
assert.commandWorked(standbySecondary.adminCommand("replSetGetRBID")).rbid;
+ if (jsTest.options().enableMajorityReadConcern === false) {
+ _awaitPrimaryAppliedSurpassesRollbackApplied();
+ }
+
return curPrimary;
};
diff --git a/jstests/replsets/rollback_via_refetch_anomaly.js b/jstests/replsets/rollback_via_refetch_anomaly.js
new file mode 100644
index 00000000000..ca68cc1c08a
--- /dev/null
+++ b/jstests/replsets/rollback_via_refetch_anomaly.js
@@ -0,0 +1,193 @@
+/**
+ * Rolled-back documents must not be visible when querying a recovered secondary.
+ *
+ * This test attempts to reproduce SERVER-48518. In the following description, 'A' is the arbiter,
+ * 'P1' and 'P2' are primaries in terms 1 and 2.
+ *
+ * - Begin in RollbackTest.kSteadyStateOps with primary P1, all nodes connected:
+ *
+ * A
+ * / \
+ * P1 - P2
+ * primary secondary
+ *
+ * - Insert _id 0 into P1 at timestamp 1.
+ * - Transition to kRollbackOps by disconnecting P1 from P2:
+ *
+ * A
+ * /
+ * P1 P2
+ * primary secondary
+ *
+ * - Insert _id 1 into P1 at timestamp 2.
+ *
+ * TS 1 TS 2
+ * insert 0 insert 1
+ * P1 --------------->
+ * P2 --->
+ *
+ * - Isolate P1 from A, connect P2 to A:
+ *
+ * A
+ * \
+ * P1 P2
+ * primary new primary
+ *
+ * (Same as RollbackTest.transitionToSyncSourceOperationsBeforeRollback(), except do *not* trigger a
+ * stepdown on P1.)
+ *
+ * - Step up P2, which writes a no-op to its oplog at timestamp 3.
+ *
+ * TS 1 TS 2
+ * insert 0 insert 1
+ *
+ * P1 --------------->
+ * P2 ----*
+ * \
+ * *-------------------------->
+ * no-op
+ * TS 3
+ *
+ * - Delete _id 0 and 1 from P1 at timestamp 4.
+ *
+ * TS 1 TS 2 TS 4
+ * insert 0 insert 1 delete 0 and 1
+ *
+ * P1 --------------------------------------------------------------->
+ * P2 ----*
+ * \
+ * *-------------------------->
+ * no-op
+ * TS 3
+ *
+ * - Reconnect P1 to P2 so it rolls back.
+ *
+ * A
+ * \
+ * P1 - P2
+ * rollback new primary
+ *
+ * Rollback via refetch undoes the delete of _id 0 by reinserting _id 0 in P1 with an
+ * untimestamped write. (It can't undo the delete of _id 1, since P2 doesn't have _id 1.)
+ *
+ * Before we fixed SERVER-48518, P1 served queries at lastApplied = top of P2's oplog = TS 3,
+ * which includes _id 0, _id 1, and _id 0 again (it was reinserted with an untimestamped write).
+ * To fix SERVER-48518, P1 won't transition from ROLLBACK until its lastApplied >= max(P1's oplog
+ * top, P2's oplog top) = TS 4.
+ *
+ * - Write to P2 so it advances >= timestamp 4 and satisfies P1's conditions to finish rollback.
+ * - Wait for P1 to finish rollback and transition to SECONDARY.
+ * - Query P1 and check that rolled-back records aren't visible.
+ *
+ * To end the test, RollbackTest.transitionToSteadyStateOperations won't work, we've diverged from
+ * the state it expects, so we end the test manually. Reconnect P1 to A and stop the replica set.
+ *
+ * A
+ * / \
+ * P1 - P2
+ * secondary new primary
+ *
+ * @tags: [
+ * requires_wiredtiger
+ * ]
+ */
+
+(function() {
+ "use strict";
+
+ load("jstests/libs/write_concern_util.js");
+ load("jstests/replsets/libs/rollback_test.js");
+
+ const rst = new ReplSetTest({
+ nodes: 3,
+ useBridge: true,
+ nodeOptions: {
+ enableMajorityReadConcern: "false",
+ setParameter: {logComponentVerbosity: tojsononeline({replication: 2})}
+ }
+ });
+
+ rst.startSet();
+ const config = rst.getReplSetConfig();
+ config.members[2].arbiterOnly = true;
+ config.settings = {chainingAllowed: false};
+ rst.initiateWithHighElectionTimeout(config);
+
+ const rollbackTest = new RollbackTest(jsTestName(), rst);
+ const P1 = rollbackTest.getPrimary();
+ const P2 = rollbackTest.getSecondary();
+ const A = rst.getArbiter();
+
+ jsTestLog(`Node P1: ${P1.host}, P2: ${P2.host}, A: ${A.host}`);
+
+ let testDB = P1.getDB(jsTestName());
+ const testColl = testDB.getCollection("test");
+
+ let reply =
+ assert.commandWorked(testColl.insert({_id: 0}, {"writeConcern": {"w": "majority"}}));
+ jsTestLog(`Inserted _id 0 into P1 ${reply.operationTime}`);
+
+ rollbackTest.transitionToRollbackOperations();
+ reply = assert.commandWorked(testColl.insert({_id: 1}, {"writeConcern": {"w": 1}}));
+ jsTestLog(`Inserted _id 1 into P1 ${reply.operationTime}`);
+
+ jsTestLog("Isolating P1 from arbiter");
+ P1.disconnect([A]);
+
+ jsTestLog("Reconnecting P2 to the arbiter");
+ P2.reconnect([A]);
+
+ jsTestLog("Step up P2");
+ assert.soonNoExcept(() => {
+ const res = P2.adminCommand({replSetStepUp: 1});
+ return res.ok;
+ }, "Failed to step up P2", ReplSetTest.kDefaultTimeoutMS);
+ checkLog.contains(P2, "transition to primary complete; database writes are now permitted");
+ jsTestLog("P2 stepped up");
+
+ // Write to P1 to ensure TS 4 (P1's delete timestamp) > TS 3 (P2's step-up timestamp).
+ assert.soon(() => {
+ testDB.runCommand({insert: "otherCollection", documents: [{}]});
+ function lastApplied(node) {
+ const reply = assert.commandWorked(node.adminCommand({replSetGetStatus: 1}));
+ return reply.optimes.appliedOpTime.ts;
+ }
+ const P1applied = lastApplied(P1);
+ const P2applied = lastApplied(P2);
+ jsTestLog(`P1 lastApplied ${P1applied}, P2 lastApplied ${P2applied}`);
+ return timestampCmp(P1applied, P2applied) > 0;
+ }, "P1's lastApplied never surpassed P2's");
+
+ reply = assert.commandWorked(testDB.runCommand({delete: "test", deletes: [{q: {}, limit: 0}]}));
+ jsTestLog(`Deleted from P1 at ${reply.operationTime}.` +
+ ` Reconnecting P1 to P2, so P1 rolls back.`);
+
+ P1.reconnect([P2]);
+ checkLog.contains(P1, "Rollback using the 'rollbackViaRefetch' method");
+ checkLog.contains(P1, "Finding the Common Point");
+ checkLog.contains(
+ P1,
+ "We cannot transition to SECONDARY state because our 'lastApplied' optime is less than the" +
+ " initial data timestamp and enableMajorityReadConcern = false");
+
+ reply = assert.commandWorked(
+ P2.getDB(jsTestName()).runCommand({insert: "anotherCollection", documents: [{}]}));
+ jsTestLog(`Inserted into P2 at ${tojson(reply.operationTime)}`);
+
+ jsTestLog("Wait for P1 to enter SECONDARY");
+ waitForState(P1, ReplSetTest.State.SECONDARY);
+
+ // Both counts should be 1. If SERVER-48518 isn't fixed then itCount() = 3: _ids 0, 1, and 0
+ // again!
+ jsTestLog("Check collection count");
+ let itCount = testColl.find().itcount();
+ let fastCount = testColl.count();
+ assert.eq(
+ itCount,
+ fastCount,
+ `count: ${fastCount}, itCount: ${itCount}, data: ${tojson(testColl.find().toArray())}`);
+
+ jsTestLog("Check succeeded. Ending test.");
+ P1.reconnect([A]);
+ rst.stopSet();
+}());
diff --git a/src/mongo/db/repl/rs_rollback.cpp b/src/mongo/db/repl/rs_rollback.cpp
index e95fcaf4044..812eb5ef81e 100644
--- a/src/mongo/db/repl/rs_rollback.cpp
+++ b/src/mongo/db/repl/rs_rollback.cpp
@@ -892,6 +892,7 @@ Status _syncRollback(OperationContext* opCtx,
FixUpInfo how;
log() << "Starting rollback. Sync source: " << rollbackSource.getSource() << rsLog;
+ how.localTopOfOplog = replCoord->getMyLastAppliedOpTime();
how.rbid = rollbackSource.getRollbackId();
uassert(
40506, "Upstream node rolled back. Need to retry our rollback.", how.rbid == requiredRBID);
@@ -1432,6 +1433,19 @@ void rollback_internal::syncFixUp(OperationContext* opCtx,
log() << "Rollback deleted " << deletes << " documents and updated " << updates
<< " documents.";
+ // Rolling back via refetch, we set initialDataTimestamp to max(local oplog top, source's oplog
+ // top), then roll back. Data is inconsistent until lastApplied >= initialDataTimestamp.
+ auto syncSourceTopOfOplog =
+ OpTime::parseFromOplogEntry(rollbackSource.getLastOperation()).getValue().getTimestamp();
+
+ log() << "Setting initialDataTimestamp to the max of local top of oplog and sync source "
+ "top of oplog. Local top of oplog: "
+ << fixUpInfo.localTopOfOplog.getTimestamp()
+ << ", sync source top of oplog: " << syncSourceTopOfOplog;
+
+ opCtx->getServiceContext()->getStorageEngine()->setInitialDataTimestamp(
+ std::max(fixUpInfo.localTopOfOplog.getTimestamp(), syncSourceTopOfOplog));
+
log() << "Truncating the oplog at " << fixUpInfo.commonPoint.toString() << " ("
<< fixUpInfo.commonPointOurDiskloc << "), non-inclusive";
diff --git a/src/mongo/db/repl/rs_rollback.h b/src/mongo/db/repl/rs_rollback.h
index 3d8274421d0..f6c3ba4a729 100644
--- a/src/mongo/db/repl/rs_rollback.h
+++ b/src/mongo/db/repl/rs_rollback.h
@@ -286,6 +286,9 @@ struct FixUpInfo {
// after rollback the in-memory transaction table is cleared.
bool refetchTransactionDocs = false;
+ // The local node's top of oplog prior to entering rollback.
+ OpTime localTopOfOplog;
+
OpTime commonPoint;
RecordId commonPointOurDiskloc;
diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp
index 7a9e4b46d57..c0d5d5285f2 100644
--- a/src/mongo/db/repl/sync_tail.cpp
+++ b/src/mongo/db/repl/sync_tail.cpp
@@ -812,6 +812,20 @@ void tryToGoLiveAsASecondary(OperationContext* opCtx,
return;
}
+ // Rolling back with eMRC false, we set initialDataTimestamp to max(local oplog top, source's
+ // oplog top), then rollback via refetch. Data is inconsistent until lastApplied >=
+ // initialDataTimestamp.
+ auto initialTs = opCtx->getServiceContext()->getStorageEngine()->getInitialDataTimestamp();
+ if (lastApplied.getTimestamp() < initialTs) {
+ invariant(!serverGlobalParams.enableMajorityReadConcern);
+ LOG(2) << "We cannot transition to SECONDARY state because our 'lastApplied' optime is "
+ "less than the initial data timestamp and enableMajorityReadConcern = false. "
+ "minValid optime: "
+ << minValid << ", lastApplied optime: " << lastApplied
+ << ", initialDataTimestamp: " << initialTs;
+ return;
+ }
+
// Execute the transition to SECONDARY.
auto status = replCoord->setFollowerMode(MemberState::RS_SECONDARY);
if (!status.isOK()) {
diff --git a/src/mongo/db/storage/kv/kv_engine.h b/src/mongo/db/storage/kv/kv_engine.h
index ae3bdd04d64..848fe379c8c 100644
--- a/src/mongo/db/storage/kv/kv_engine.h
+++ b/src/mongo/db/storage/kv/kv_engine.h
@@ -296,6 +296,13 @@ public:
virtual void setInitialDataTimestamp(Timestamp initialDataTimestamp) {}
/**
+ * See `StorageEngine::getInitialDataTimestamp`
+ */
+ virtual Timestamp getInitialDataTimestamp() const {
+ return Timestamp();
+ }
+
+ /**
* See `StorageEngine::setOldestTimestamp`
*/
virtual void setOldestTimestamp(Timestamp oldestTimestamp, bool force) {}
diff --git a/src/mongo/db/storage/kv/kv_storage_engine.cpp b/src/mongo/db/storage/kv/kv_storage_engine.cpp
index 1917e3b4807..e3dd6d5a319 100644
--- a/src/mongo/db/storage/kv/kv_storage_engine.cpp
+++ b/src/mongo/db/storage/kv/kv_storage_engine.cpp
@@ -634,10 +634,13 @@ void KVStorageEngine::setStableTimestamp(Timestamp stableTimestamp) {
}
void KVStorageEngine::setInitialDataTimestamp(Timestamp initialDataTimestamp) {
- _initialDataTimestamp = initialDataTimestamp;
_engine->setInitialDataTimestamp(initialDataTimestamp);
}
+Timestamp KVStorageEngine::getInitialDataTimestamp() const {
+ return _engine->getInitialDataTimestamp();
+}
+
void KVStorageEngine::setOldestTimestamp(Timestamp oldestTimestamp) {
const bool force = true;
_engine->setOldestTimestamp(oldestTimestamp, force);
diff --git a/src/mongo/db/storage/kv/kv_storage_engine.h b/src/mongo/db/storage/kv/kv_storage_engine.h
index e2054dd8d87..5ac73546bdf 100644
--- a/src/mongo/db/storage/kv/kv_storage_engine.h
+++ b/src/mongo/db/storage/kv/kv_storage_engine.h
@@ -124,6 +124,8 @@ public:
virtual void setInitialDataTimestamp(Timestamp initialDataTimestamp) override;
+ virtual Timestamp getInitialDataTimestamp() const override;
+
virtual void setOldestTimestamp(Timestamp oldestTimestamp) override;
virtual bool supportsRecoverToStableTimestamp() const override;
@@ -224,7 +226,6 @@ private:
const bool _supportsDocLocking;
const bool _supportsDBLocking;
const bool _supportsCappedCollections;
- Timestamp _initialDataTimestamp = Timestamp::kAllowUnstableCheckpointsSentinel;
std::unique_ptr<RecordStore> _catalogRecordStore;
std::unique_ptr<KVCatalog> _catalog;
diff --git a/src/mongo/db/storage/mmap_v1/mmap_v1_engine.h b/src/mongo/db/storage/mmap_v1/mmap_v1_engine.h
index 4055c3f133d..ea50f5c6022 100644
--- a/src/mongo/db/storage/mmap_v1/mmap_v1_engine.h
+++ b/src/mongo/db/storage/mmap_v1/mmap_v1_engine.h
@@ -106,6 +106,10 @@ public:
void setJournalListener(JournalListener* jl) final;
+ Timestamp getInitialDataTimestamp() const override {
+ return Timestamp();
+ }
+
Timestamp getAllCommittedTimestamp() const override {
MONGO_UNREACHABLE;
}
diff --git a/src/mongo/db/storage/storage_engine.h b/src/mongo/db/storage/storage_engine.h
index 63ce23e4781..e63437f97b1 100644
--- a/src/mongo/db/storage/storage_engine.h
+++ b/src/mongo/db/storage/storage_engine.h
@@ -391,6 +391,11 @@ public:
virtual void setInitialDataTimestamp(Timestamp timestamp) {}
/**
+ * Returns the initial data timestamp.
+ */
+ virtual Timestamp getInitialDataTimestamp() const = 0;
+
+ /**
* Sets the oldest timestamp for which the storage engine must maintain snapshot history
* through. Additionally, all future writes must be newer or equal to this value.
*/
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp
index e80512e63fc..0e1fb9b1b43 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp
@@ -1550,11 +1550,16 @@ void WiredTigerKVEngine::setOldestTimestamp(Timestamp oldestTimestamp, bool forc
}
void WiredTigerKVEngine::setInitialDataTimestamp(Timestamp initialDataTimestamp) {
+ _initialDataTimestamp.store(initialDataTimestamp.asULL());
if (_checkpointThread) {
_checkpointThread->setInitialDataTimestamp(initialDataTimestamp);
}
}
+Timestamp WiredTigerKVEngine::getInitialDataTimestamp() const {
+ return Timestamp(_initialDataTimestamp.load());
+}
+
bool WiredTigerKVEngine::supportsRecoverToStableTimestamp() const {
if (_ephemeral || !_keepDataHistory) {
return false;
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h
index ffd7a4f8fd7..ad0b18a5c1f 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h
@@ -189,6 +189,8 @@ public:
virtual void setInitialDataTimestamp(Timestamp initialDataTimestamp) override;
+ Timestamp getInitialDataTimestamp() const override;
+
/**
* This method will set the oldest timestamp and commit timestamp to the input value. Callers
* must be serialized along with `setStableTimestamp`. If force=false, this function does not