summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Russotto <matthew.russotto@10gen.com>2018-12-06 15:53:36 -0500
committerMatthew Russotto <matthew.russotto@10gen.com>2018-12-06 15:53:43 -0500
commit7cc3c41cf8060966545c844e4d83c7d0ff3994e8 (patch)
tree51e480d76a1b9625e62202936ff3114e85105142
parent28e38550525a1e338ea2882da3ddce518771627f (diff)
downloadmongo-7cc3c41cf8060966545c844e4d83c7d0ff3994e8.tar.gz
SERVER-34620 Make speculative read atClusterTime not wait for the given cluster time to be majority-committed
-rw-r--r--jstests/noPassthrough/readConcern_atClusterTime_noop_write.js32
-rw-r--r--jstests/noPassthrough/readConcern_atClusterTime_snapshot_selection.js34
-rw-r--r--src/mongo/db/op_observer_impl_test.cpp2
-rw-r--r--src/mongo/db/read_concern_mongod.cpp26
-rw-r--r--src/mongo/db/repl/do_txn_test.cpp6
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp16
-rw-r--r--src/mongo/db/repl/storage_interface.h5
-rw-r--r--src/mongo/db/repl/storage_interface_impl.cpp6
-rw-r--r--src/mongo/db/repl/storage_interface_impl.h3
-rw-r--r--src/mongo/db/repl/storage_interface_mock.h3
-rw-r--r--src/mongo/db/transaction_participant.cpp59
-rw-r--r--src/mongo/db/transaction_participant.h18
-rw-r--r--src/mongo/dbtests/storage_timestamp_tests.cpp5
13 files changed, 139 insertions, 76 deletions
diff --git a/jstests/noPassthrough/readConcern_atClusterTime_noop_write.js b/jstests/noPassthrough/readConcern_atClusterTime_noop_write.js
index 2f0a3ae28ae..b74832e1672 100644
--- a/jstests/noPassthrough/readConcern_atClusterTime_noop_write.js
+++ b/jstests/noPassthrough/readConcern_atClusterTime_noop_write.js
@@ -1,8 +1,10 @@
-// Test that 'atClusterTime' triggers a noop write to advance the majority commit point if
-// necessary.
+// Test that 'atClusterTime' triggers a noop write to advance the lastApplied optime if
+// necessary. This covers the case where a read is done at a cluster time that is only present
+// as an actual opTime on another shard.
// @tags: [requires_sharding, uses_transactions]
(function() {
"use strict";
+ load("jstests/replsets/rslib.js");
// Skip this test if running with --nojournal and WiredTiger.
if (jsTest.options().noJournal &&
@@ -45,9 +47,9 @@
testDB1.coll1.find().itcount();
// Attempt a snapshot read at 'clusterTime' on shard 1. Test that it performs a noop write to
- // advance its majority commit point. The snapshot read itself may fail if the noop write
- // advances the node's majority commit point past 'clusterTime' and it releases that snapshot.
- // Test reading from the primary.
+ // advance its lastApplied optime past 'clusterTime'. The snapshot read itself may fail if the
+ // noop write advances the node's majority commit point past 'clusterTime' and it releases that
+ // snapshot. Test reading from the primary.
const shard1Session =
st.rs1.getPrimary().getDB("test1").getMongo().startSession({causalConsistency: false});
shard1Session.startTransaction({readConcern: {level: "snapshot", atClusterTime: clusterTime}});
@@ -59,9 +61,8 @@
} else {
shard1Session.commitTransaction();
}
- const shard1PrimaryMajOpTime =
- st.rs1.getReadConcernMajorityOpTimeOrThrow(st.rs1.getPrimary()).ts;
- assert.gte(shard1PrimaryMajOpTime, clusterTime);
+ const shard1PrimaryOpTime = getLastOpTime(st.rs1.getPrimary()).ts;
+ assert.gte(shard1PrimaryOpTime, clusterTime);
// Perform a write on shard 1 and get its op time.
res = assert.commandWorked(testDB1.runCommand({insert: "coll1", documents: [{_id: 0}]}));
@@ -73,12 +74,14 @@
testDB0.coll0.find().readPref('secondary').itcount();
// Attempt a snapshot read at 'clusterTime' on shard 0. Test that it performs a noop write to
- // advance its majority commit point. The snapshot read itself may fail if the noop write
- // advances the node's majority commit point past 'clusterTime' and it releases that snapshot.
- // Test reading from the secondary.
+ // advance its lastApplied optime past 'clusterTime'. The snapshot read itself may fail if the
+ // noop write advances the node's majority commit point past 'clusterTime' and it releases that
+ // snapshot. Test reading from the secondary.
const shard0Session =
st.rs0.getSecondary().getDB("test0").getMongo().startSession({causalConsistency: false});
- shard0Session.startTransaction({readConcern: {level: "snapshot", atClusterTime: clusterTime}});
+ shard0Session.startTransaction({
+ readConcern: {level: "snapshot", atClusterTime: clusterTime},
+ });
res = shard0Session.getDatabase("test0").runCommand({find: "coll0"});
if (res.ok === 0) {
assert.commandFailedWithCode(res, ErrorCodes.SnapshotTooOld);
@@ -87,9 +90,8 @@
} else {
shard0Session.commitTransaction();
}
- const shard0SecondaryMajOpTime =
- st.rs0.getReadConcernMajorityOpTimeOrThrow(st.rs0.getSecondary()).ts;
- assert.gte(shard0SecondaryMajOpTime, clusterTime);
+ const shard0SecondaryOpTime = getLastOpTime(st.rs0.getSecondary()).ts;
+ assert.gte(shard0SecondaryOpTime, clusterTime);
st.stop();
}());
diff --git a/jstests/noPassthrough/readConcern_atClusterTime_snapshot_selection.js b/jstests/noPassthrough/readConcern_atClusterTime_snapshot_selection.js
index f5930d27894..0b62ccf4bb4 100644
--- a/jstests/noPassthrough/readConcern_atClusterTime_snapshot_selection.js
+++ b/jstests/noPassthrough/readConcern_atClusterTime_snapshot_selection.js
@@ -58,27 +58,35 @@
assert.eq(res.cursor.firstBatch.length, 1, printjson(res));
assert.eq(res.cursor.firstBatch[0]._id, "before", printjson(res));
- // A read on the primary at the new cluster time should time out waiting for the cluster time to
- // be majority committed.
- primarySession.startTransaction(
- {readConcern: {level: "snapshot", atClusterTime: clusterTimeAfter}});
- assert.commandFailedWithCode(primaryDB.runCommand({find: collName, maxTimeMS: 1000}),
- ErrorCodes.MaxTimeMSExpired);
- primarySession.abortTransaction_forTesting();
+ // A read on the primary at the new cluster time should succeed because transactions implement
+ // speculative behavior, but the attempt to commit the transaction should time out waiting for
+ // the transaction to be majority committed.
+ primarySession.startTransaction({
+ readConcern: {level: "snapshot", atClusterTime: clusterTimeAfter},
+ writeConcern: {w: "majority", wtimeout: 1000}
+ });
+ res = assert.commandWorked(primaryDB.runCommand({find: collName}));
+ assert.eq(res.cursor.firstBatch.length, 2, printjson(res));
+ assert.commandFailedWithCode(primarySession.commitTransaction_forTesting(),
+ ErrorCodes.WriteConcernFailed);
+ // A read on the primary at the new cluster time succeeds.
+ primarySession.startTransaction({
+ readConcern: {level: "snapshot", atClusterTime: clusterTimeAfter},
+ writeConcern: {w: "majority"}
+ });
+ res = assert.commandWorked(primaryDB.runCommand({find: collName}));
+ assert.eq(res.cursor.firstBatch.length, 2, printjson(res));
// Restart replication on one of the secondaries.
restartServerReplication(secondaryConn1);
-
- // A read on the primary at the new cluster time now succeeds.
- primarySession.startTransaction(
- {readConcern: {level: "snapshot", atClusterTime: clusterTimeAfter}});
- res = assert.commandWorked(primaryDB.runCommand({find: collName}));
+ // This time the transaction should commit.
primarySession.commitTransaction();
- assert.eq(res.cursor.firstBatch.length, 2, printjson(res));
// A read on the lagged secondary at its view of the majority cluster time should not include
// the write.
const clusterTimeSecondaryBefore = rst.getReadConcernMajorityOpTimeOrThrow(secondaryConn0).ts;
+ // It is necessary to gossip the cluster time to the secondary to avoid an error.
+ secondarySession.advanceClusterTime(primarySession.getClusterTime());
secondarySession.startTransaction(
{readConcern: {level: "snapshot", atClusterTime: clusterTimeSecondaryBefore}});
res = assert.commandWorked(secondaryDB0.runCommand({find: collName}));
diff --git a/src/mongo/db/op_observer_impl_test.cpp b/src/mongo/db/op_observer_impl_test.cpp
index 6c014f43739..7110ff5aeb8 100644
--- a/src/mongo/db/op_observer_impl_test.cpp
+++ b/src/mongo/db/op_observer_impl_test.cpp
@@ -44,6 +44,7 @@
#include "mongo/db/repl/oplog_interface_local.h"
#include "mongo/db/repl/repl_client_info.h"
#include "mongo/db/repl/replication_coordinator_mock.h"
+#include "mongo/db/repl/storage_interface_mock.h"
#include "mongo/db/service_context_d_test_fixture.h"
#include "mongo/db/session_catalog_mongod.h"
#include "mongo/db/storage/ephemeral_for_test/ephemeral_for_test_recovery_unit.h"
@@ -66,6 +67,7 @@ public:
auto service = getServiceContext();
auto opCtx = cc().makeOperationContext();
+ repl::StorageInterface::set(service, stdx::make_unique<repl::StorageInterfaceMock>());
// Set up ReplicationCoordinator and create oplog.
repl::ReplicationCoordinator::set(
diff --git a/src/mongo/db/read_concern_mongod.cpp b/src/mongo/db/read_concern_mongod.cpp
index 3a2aaad568a..3f00c9890bf 100644
--- a/src/mongo/db/read_concern_mongod.cpp
+++ b/src/mongo/db/read_concern_mongod.cpp
@@ -212,15 +212,6 @@ MONGO_REGISTER_SHIM(waitForReadConcern)
repl::ReplicationCoordinator* const replCoord = repl::ReplicationCoordinator::get(opCtx);
invariant(replCoord);
- // Currently speculative read concern is used only for transactions (equivalently, when the read
- // concern level is 'snapshot'). However, speculative read concern is not yet supported with
- // atClusterTime.
- //
- // TODO SERVER-34620: Re-enable speculative behavior when "atClusterTime" is specified.
- const bool speculative =
- readConcernArgs.getLevel() == repl::ReadConcernLevel::kSnapshotReadConcern &&
- !readConcernArgs.getArgsAtClusterTime();
-
if (readConcernArgs.getLevel() == repl::ReadConcernLevel::kLinearizableReadConcern) {
if (replCoord->getReplicationMode() != repl::ReplicationCoordinator::modeReplSet) {
// For standalone nodes, Linearizable Read is not supported.
@@ -295,19 +286,14 @@ MONGO_REGISTER_SHIM(waitForReadConcern)
}
if (atClusterTime) {
- opCtx->recoveryUnit()->setIgnorePrepared(false);
-
- // TODO(SERVER-34620): We should be using Session::setSpeculativeTransactionReadOpTime when
- // doing speculative execution with atClusterTime.
opCtx->recoveryUnit()->setTimestampReadSource(RecoveryUnit::ReadSource::kProvided,
atClusterTime->asTimestamp());
- return Status::OK();
- }
-
- if ((readConcernArgs.getLevel() == repl::ReadConcernLevel::kMajorityReadConcern ||
- readConcernArgs.getLevel() == repl::ReadConcernLevel::kSnapshotReadConcern) &&
- !speculative &&
- replCoord->getReplicationMode() == repl::ReplicationCoordinator::Mode::modeReplSet) {
+ } else if (readConcernArgs.getLevel() == repl::ReadConcernLevel::kMajorityReadConcern &&
+ replCoord->getReplicationMode() == repl::ReplicationCoordinator::Mode::modeReplSet) {
+ // This block is not used for kSnapshotReadConcern because snapshots are always speculative;
+ // we wait for majority when the transaction commits.
+ // It is not used for atClusterTime because waitUntilOpTimeForRead handles waiting for
+ // the majority snapshot in that case.
const int debugLevel = serverGlobalParams.clusterRole == ClusterRole::ConfigServer ? 1 : 2;
diff --git a/src/mongo/db/repl/do_txn_test.cpp b/src/mongo/db/repl/do_txn_test.cpp
index abede66d1f6..922a7f615fa 100644
--- a/src/mongo/db/repl/do_txn_test.cpp
+++ b/src/mongo/db/repl/do_txn_test.cpp
@@ -40,6 +40,7 @@
#include "mongo/db/repl/repl_client_info.h"
#include "mongo/db/repl/replication_coordinator_mock.h"
#include "mongo/db/repl/storage_interface_impl.h"
+#include "mongo/db/repl/storage_interface_mock.h"
#include "mongo/db/s/op_observer_sharding_impl.h"
#include "mongo/db/service_context_d_test_fixture.h"
#include "mongo/db/session_catalog_mongod.h"
@@ -152,6 +153,11 @@ void DoTxnTest::setUp() {
// collections.
_storage = stdx::make_unique<StorageInterfaceImpl>();
+ // We also need to give replication a StorageInterface for checking out the transaction.
+ // The test storage engine doesn't support the necessary call (getPointInTimeReadTimestamp()),
+ // so we use a mock.
+ repl::StorageInterface::set(service, stdx::make_unique<StorageInterfaceMock>());
+
// Set up the transaction and session.
_opCtx->setLogicalSessionId(makeLogicalSessionIdForTest());
_opCtx->setTxnNumber(0); // TxnNumber can always be 0 because we have a new session.
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index c1021b897d8..91cc9f2020e 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -1382,7 +1382,11 @@ Status ReplicationCoordinatorImpl::_waitUntilOpTime(OperationContext* opCtx,
// only then do we know that it will fill in all "holes" before that time. If we do it
// earlier, we may return when the requested optime has been reached, but other writes
// at optimes before that time are not yet visible.
- _storage->waitForAllEarlierOplogWritesToBeVisible(opCtx);
+ //
+ // We wait only on primaries, because on secondaries, other mechanisms assure that the
+ // last applied optime is always hole-free, and waiting for all earlier writes to be visible
+ // can deadlock against secondary command application.
+ _storage->waitForAllEarlierOplogWritesToBeVisible(opCtx, /* primaryOnly =*/true);
}
return Status::OK();
@@ -1403,14 +1407,10 @@ Status ReplicationCoordinatorImpl::_waitUntilClusterTimeForRead(OperationContext
auto targetOpTime = OpTime(clusterTime.asTimestamp(), OpTime::kUninitializedTerm);
invariant(!readConcern.getArgsOpTime());
- // TODO SERVER-34620: Re-enable speculative behavior when "atClusterTime" is specified.
- const bool speculative = readConcern.getLevel() == ReadConcernLevel::kSnapshotReadConcern &&
- !readConcern.getArgsAtClusterTime();
-
+ // We don't set isMajorityCommittedRead for kSnapshotReadConcern because snapshots are always
+ // speculative; we wait for majority when the transaction commits.
const bool isMajorityCommittedRead =
- (readConcern.getLevel() == ReadConcernLevel::kMajorityReadConcern ||
- readConcern.getLevel() == ReadConcernLevel::kSnapshotReadConcern) &&
- !speculative;
+ readConcern.getLevel() == ReadConcernLevel::kMajorityReadConcern;
return _waitUntilOpTime(opCtx, isMajorityCommittedRead, targetOpTime, deadline);
}
diff --git a/src/mongo/db/repl/storage_interface.h b/src/mongo/db/repl/storage_interface.h
index 5ef2d38f22f..3e3b47cd098 100644
--- a/src/mongo/db/repl/storage_interface.h
+++ b/src/mongo/db/repl/storage_interface.h
@@ -390,8 +390,11 @@ public:
* Waits for oplog writes to be visible in the oplog.
* This function is used to ensure tests do not fail due to initial sync receiving an empty
* batch.
+ *
+ * primaryOnly: If this node is not primary, do nothing.
*/
- virtual void waitForAllEarlierOplogWritesToBeVisible(OperationContext* opCtx) = 0;
+ virtual void waitForAllEarlierOplogWritesToBeVisible(OperationContext* opCtx,
+ bool primaryOnly = false) = 0;
/**
* Returns the all committed timestamp. All transactions with timestamps earlier than the
diff --git a/src/mongo/db/repl/storage_interface_impl.cpp b/src/mongo/db/repl/storage_interface_impl.cpp
index 92a78753ad4..a6ba88502bc 100644
--- a/src/mongo/db/repl/storage_interface_impl.cpp
+++ b/src/mongo/db/repl/storage_interface_impl.cpp
@@ -1124,8 +1124,12 @@ Status StorageInterfaceImpl::isAdminDbValid(OperationContext* opCtx) {
return Status::OK();
}
-void StorageInterfaceImpl::waitForAllEarlierOplogWritesToBeVisible(OperationContext* opCtx) {
+void StorageInterfaceImpl::waitForAllEarlierOplogWritesToBeVisible(OperationContext* opCtx,
+ bool primaryOnly) {
Lock::GlobalLock lk(opCtx, MODE_IS);
+ if (primaryOnly &&
+ !repl::ReplicationCoordinator::get(opCtx)->canAcceptWritesForDatabase(opCtx, "admin"))
+ return;
Collection* oplog;
{
// We don't want to be holding the collection lock while blocking, to avoid deadlocks.
diff --git a/src/mongo/db/repl/storage_interface_impl.h b/src/mongo/db/repl/storage_interface_impl.h
index fce61bfa454..a722b053d48 100644
--- a/src/mongo/db/repl/storage_interface_impl.h
+++ b/src/mongo/db/repl/storage_interface_impl.h
@@ -182,7 +182,8 @@ public:
*/
Status isAdminDbValid(OperationContext* opCtx) override;
- void waitForAllEarlierOplogWritesToBeVisible(OperationContext* opCtx) override;
+ void waitForAllEarlierOplogWritesToBeVisible(OperationContext* opCtx,
+ bool primaryOnly) override;
void oplogDiskLocRegister(OperationContext* opCtx,
const Timestamp& ts,
bool orderedCommit) override;
diff --git a/src/mongo/db/repl/storage_interface_mock.h b/src/mongo/db/repl/storage_interface_mock.h
index 3779ecfb509..5189a4303ff 100644
--- a/src/mongo/db/repl/storage_interface_mock.h
+++ b/src/mongo/db/repl/storage_interface_mock.h
@@ -323,7 +323,8 @@ public:
return isAdminDbValidFn(opCtx);
};
- void waitForAllEarlierOplogWritesToBeVisible(OperationContext* opCtx) override {
+ void waitForAllEarlierOplogWritesToBeVisible(OperationContext* opCtx,
+ bool primaryOnly) override {
return;
}
diff --git a/src/mongo/db/transaction_participant.cpp b/src/mongo/db/transaction_participant.cpp
index 90dfba54f8d..4c5f549ae95 100644
--- a/src/mongo/db/transaction_participant.cpp
+++ b/src/mongo/db/transaction_participant.cpp
@@ -467,6 +467,20 @@ void TransactionParticipant::_setSpeculativeTransactionOpTime(
_transactionMetricsObserver.onChooseReadTimestamp(readTimestamp);
}
+void TransactionParticipant::_setSpeculativeTransactionReadTimestamp(WithLock,
+ OperationContext* opCtx,
+ Timestamp timestamp) {
+ // Read concern code should have already set the timestamp on the recovery unit.
+ invariant(timestamp == opCtx->recoveryUnit()->getPointInTimeReadTimestamp());
+
+ repl::ReplicationCoordinator* replCoord =
+ repl::ReplicationCoordinator::get(opCtx->getClient()->getServiceContext());
+ opCtx->recoveryUnit()->preallocateSnapshot();
+ _speculativeTransactionReadOpTime = {timestamp, replCoord->getTerm()};
+ stdx::lock_guard<stdx::mutex> lm(_metricsMutex);
+ _transactionMetricsObserver.onChooseReadTimestamp(timestamp);
+}
+
TransactionParticipant::OplogSlotReserver::OplogSlotReserver(OperationContext* opCtx) {
// Stash the transaction on the OperationContext on the stack. At the end of this function it
// will be unstashed onto the OperationContext.
@@ -706,21 +720,6 @@ void TransactionParticipant::unstashTransactionResources(OperationContext* opCtx
return;
}
- // Set speculative execution.
- const auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx);
- const bool speculative =
- readConcernArgs.getLevel() == repl::ReadConcernLevel::kSnapshotReadConcern &&
- !readConcernArgs.getArgsAtClusterTime();
- // Only set speculative on primary.
- if (opCtx->writesAreReplicated() && speculative) {
- _setSpeculativeTransactionOpTime(lg,
- opCtx,
- readConcernArgs.getOriginalLevel() ==
- repl::ReadConcernLevel::kSnapshotReadConcern
- ? SpeculativeTransactionOpTime::kAllCommitted
- : SpeculativeTransactionOpTime::kLastApplied);
- }
-
// All locks of transactions must be acquired inside the global WUOW so that we can
// yield and restore all locks on state transition. Otherwise, we'd have to remember
// which locks are managed by WUOW.
@@ -752,7 +751,35 @@ void TransactionParticipant::unstashTransactionResources(OperationContext* opCtx
// exclusive lock here because we might be doing writes in this transaction, and it is currently
// not deadlock-safe to upgrade IS to IX.
Lock::GlobalLock(opCtx, MODE_IX);
- opCtx->recoveryUnit()->preallocateSnapshot();
+
+ {
+ // Set speculative execution. This must be done after the global lock is acquired, because
+ // we need to check that we are primary.
+ stdx::lock_guard<stdx::mutex> lg(_mutex);
+ const auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx);
+ // TODO(SERVER-38203): We cannot wait for write concern on secondaries, so we do not set the
+ // speculative optime on secondaries either. This means that reads done in transactions on
+ // secondaries will not wait for the read snapshot to become majority-committed.
+ repl::ReplicationCoordinator* replCoord =
+ repl::ReplicationCoordinator::get(opCtx->getClient()->getServiceContext());
+ if (replCoord->canAcceptWritesForDatabase(
+ opCtx, NamespaceString::kSessionTransactionsTableNamespace.db())) {
+ if (readConcernArgs.getArgsAtClusterTime()) {
+ _setSpeculativeTransactionReadTimestamp(
+ lg, opCtx, readConcernArgs.getArgsAtClusterTime()->asTimestamp());
+ } else {
+ _setSpeculativeTransactionOpTime(
+ lg,
+ opCtx,
+ readConcernArgs.getOriginalLevel() ==
+ repl::ReadConcernLevel::kSnapshotReadConcern
+ ? SpeculativeTransactionOpTime::kAllCommitted
+ : SpeculativeTransactionOpTime::kLastApplied);
+ }
+ } else {
+ opCtx->recoveryUnit()->preallocateSnapshot();
+ }
+ }
// The Client lock must not be held when executing this failpoint as it will block currentOp
// execution.
diff --git a/src/mongo/db/transaction_participant.h b/src/mongo/db/transaction_participant.h
index e2047a44a79..281181904f3 100644
--- a/src/mongo/db/transaction_participant.h
+++ b/src/mongo/db/transaction_participant.h
@@ -58,6 +58,16 @@ class OperationContext;
extern AtomicInt32 transactionLifetimeLimitSeconds;
+/**
+ * Read timestamp to be used for a speculative transaction. For transactions with read
+ * concern level specified as 'snapshot', we will use 'kAllCommitted' which ensures a snapshot
+ * with no 'holes'; that is, it is a state of the system that could be reconstructed from
+ * the oplog. For transactions with read concern level specified as 'local' or 'majority',
+ * we will use 'kLastApplied' which gives us the most recent snapshot. This snapshot may
+ * reflect oplog 'holes' from writes earlier than the last applied write which have not yet
+ * completed. Using 'kLastApplied' ensures that transactions with mode 'local' are always able to
+ * read writes from earlier transactions with mode 'local' on the same connection.
+ */
enum class SpeculativeTransactionOpTime {
kLastApplied,
kAllCommitted,
@@ -640,6 +650,14 @@ private:
OperationContext* opCtx,
SpeculativeTransactionOpTime opTimeChoice);
+
+ // Like _setSpeculativeTransactionOpTime, but caller chooses timestamp of snapshot explicitly.
+ // It is up to the caller to ensure that Timestamp is greater than or equal to the all-committed
+ // optime before calling this method (e.g. by calling ReplCoordinator::waitForOpTimeForRead).
+ void _setSpeculativeTransactionReadTimestamp(WithLock,
+ OperationContext* opCtx,
+ Timestamp timestamp);
+
// Finishes committing the multi-document transaction after the storage-transaction has been
// committed, the oplog entry has been inserted into the oplog, and the transactions table has
// been updated.
diff --git a/src/mongo/dbtests/storage_timestamp_tests.cpp b/src/mongo/dbtests/storage_timestamp_tests.cpp
index 576732c5aea..f013904957d 100644
--- a/src/mongo/dbtests/storage_timestamp_tests.cpp
+++ b/src/mongo/dbtests/storage_timestamp_tests.cpp
@@ -75,6 +75,7 @@
#include "mongo/db/session.h"
#include "mongo/db/session_catalog_mongod.h"
#include "mongo/db/storage/kv/kv_storage_engine.h"
+#include "mongo/db/storage/snapshot_manager.h"
#include "mongo/db/transaction_participant.h"
#include "mongo/dbtests/dbtests.h"
#include "mongo/stdx/future.h"
@@ -2488,6 +2489,10 @@ public:
}
presentTs = _clock->getClusterTime().asTimestamp();
+ // This test does not run a real ReplicationCoordinator, so must advance the snapshot
+ // manager manually.
+ auto storageEngine = cc().getServiceContext()->getStorageEngine();
+ storageEngine->getSnapshotManager()->setLocalSnapshot(presentTs);
const auto beforeTxnTime = _clock->reserveTicks(1);
beforeTxnTs = beforeTxnTime.asTimestamp();
commitEntryTs = beforeTxnTime.addTicks(1).asTimestamp();