summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorJack Mulrow <jack.mulrow@mongodb.com>2018-09-28 11:37:11 -0400
committerJack Mulrow <jack.mulrow@mongodb.com>2018-10-09 09:39:08 -0400
commit3dc5e5979bc9a4a914b0a479e88e70078bd02552 (patch)
treecf79737a7d466d238dd2769960ddc7690befb86b /src
parentf2254e26b9c7bdd94e4e6613ef5a20302ecc855e (diff)
downloadmongo-3dc5e5979bc9a4a914b0a479e88e70078bd02552.tar.gz
SERVER-37223 Invariant sharded txns with snapshot read concern select atClusterTime before creating a participant
Diffstat (limited to 'src')
-rw-r--r--src/mongo/s/transaction_router.cpp32
-rw-r--r--src/mongo/s/transaction_router.h19
-rw-r--r--src/mongo/s/transaction_router_test.cpp148
-rw-r--r--src/mongo/s/write_ops/SConscript1
-rw-r--r--src/mongo/s/write_ops/batch_write_exec_test.cpp7
5 files changed, 106 insertions, 101 deletions
diff --git a/src/mongo/s/transaction_router.cpp b/src/mongo/s/transaction_router.cpp
index 013f212148d..30a6c0054f1 100644
--- a/src/mongo/s/transaction_router.cpp
+++ b/src/mongo/s/transaction_router.cpp
@@ -281,6 +281,25 @@ BSONObj TransactionRouter::attachTxnFieldsIfNeeded(const ShardId& shardId, const
return txnPart.attachTxnFieldsIfNeeded(cmdObj, true);
}
+void TransactionRouter::_verifyReadConcern() {
+ invariant(!_readConcernArgs.isEmpty());
+
+ if (_readConcernArgs.getLevel() == repl::ReadConcernLevel::kSnapshotReadConcern) {
+ invariant(_atClusterTime);
+ invariant(_atClusterTime != LogicalTime::kUninitialized);
+ }
+}
+
+void TransactionRouter::_verifyParticipantAtClusterTime(const Participant& participant) {
+ if (_readConcernArgs.getLevel() != repl::ReadConcernLevel::kSnapshotReadConcern) {
+ return;
+ }
+
+ auto participantAtClusterTime = participant.getSharedOptions().atClusterTime;
+ invariant(participantAtClusterTime);
+ invariant(participantAtClusterTime == _atClusterTime);
+}
+
boost::optional<TransactionRouter::Participant&> TransactionRouter::getParticipant(
const ShardId& shard) {
auto iter = _participants.find(shard.toString());
@@ -288,9 +307,9 @@ boost::optional<TransactionRouter::Participant&> TransactionRouter::getParticipa
return boost::none;
}
- // TODO SERVER-37223: Once mongos aborts transactions by only sending abortTransaction to
- // shards that have been successfully contacted we should be able to add an invariant here
- // to ensure the atClusterTime on the participant matches that on the transaction router.
+ _verifyReadConcern();
+ _verifyParticipantAtClusterTime(iter->second);
+
return iter->second;
}
@@ -302,12 +321,7 @@ TransactionRouter::Participant& TransactionRouter::_createParticipant(const Shar
_coordinatorId = shard.toString();
}
- // The transaction must have been started with a readConcern.
- invariant(!_readConcernArgs.isEmpty());
-
- // TODO SERVER-37223: Once mongos aborts transactions by only sending abortTransaction to shards
- // that have been successfully contacted we should be able to add an invariant here to ensure
- // that an atClusterTime has been chosen if the read concern level is snapshot.
+ _verifyReadConcern();
auto resultPair = _participants.try_emplace(
shard.toString(),
diff --git a/src/mongo/s/transaction_router.h b/src/mongo/s/transaction_router.h
index 3020db842c3..5c869d972d6 100644
--- a/src/mongo/s/transaction_router.h
+++ b/src/mongo/s/transaction_router.h
@@ -92,6 +92,13 @@ public:
*/
StmtId getStmtIdCreatedAt() const;
+ /**
+ * Returns the shared transaction options this participant was created with.
+ */
+ const auto& getSharedOptions() const {
+ return _sharedOptions;
+ }
+
private:
const bool _isCoordinator{false};
@@ -245,6 +252,18 @@ private:
*/
Participant& _createParticipant(const ShardId& shard);
+ /**
+ * Asserts the transaction has a valid read concern and, if the read concern level is snapshot,
+ * has selected a non-null atClusterTime.
+ */
+ void _verifyReadConcern();
+
+ /**
+ * If the transaction's read concern level is snapshot, asserts the participant's atClusterTime
+ * matches the transaction's.
+ */
+ void _verifyParticipantAtClusterTime(const Participant& participant);
+
const LogicalSessionId _sessionId;
TxnNumber _txnNumber{kUninitializedTxnNumber};
diff --git a/src/mongo/s/transaction_router_test.cpp b/src/mongo/s/transaction_router_test.cpp
index cb211627533..dd201c37ab3 100644
--- a/src/mongo/s/transaction_router_test.cpp
+++ b/src/mongo/s/transaction_router_test.cpp
@@ -80,12 +80,15 @@ TEST_F(TransactionRouterTest, StartTxnShouldBeAttachedOnlyOnFirstStatementToPart
TransactionRouter txnRouter({});
txnRouter.checkOut();
txnRouter.beginOrContinueTxn(operationContext(), txnNum, true);
+ txnRouter.setAtClusterTimeToLatestTime(operationContext());
BSONObj expectedNewObj = BSON("insert"
<< "test"
<< "readConcern"
<< BSON("level"
- << "snapshot")
+ << "snapshot"
+ << "atClusterTime"
+ << kInMemoryLogicalTime.asTimestamp())
<< "startTransaction"
<< true
<< "coordinator"
@@ -124,7 +127,7 @@ TEST_F(TransactionRouterTest, BasicStartTxnWithAtClusterTime) {
TransactionRouter txnRouter({});
txnRouter.checkOut();
txnRouter.beginOrContinueTxn(operationContext(), txnNum, true);
- txnRouter.computeAtClusterTimeForOneShard(operationContext(), shard1);
+ txnRouter.setAtClusterTimeToLatestTime(operationContext());
BSONObj expectedNewObj = BSON("insert"
<< "test"
@@ -142,7 +145,6 @@ TEST_F(TransactionRouterTest, BasicStartTxnWithAtClusterTime) {
<< "txnNumber"
<< txnNum);
-
{
auto newCmd = txnRouter.attachTxnFieldsIfNeeded(shard1,
BSON("insert"
@@ -182,83 +184,7 @@ TEST_F(TransactionRouterTest, NewParticipantMustAttachTxnAndReadConcern) {
TransactionRouter txnRouter({});
txnRouter.checkOut();
txnRouter.beginOrContinueTxn(operationContext(), txnNum, true);
-
- BSONObj expectedNewObj = BSON("insert"
- << "test"
- << "readConcern"
- << BSON("level"
- << "snapshot")
- << "startTransaction"
- << true
- << "coordinator"
- << true
- << "autocommit"
- << false
- << "txnNumber"
- << txnNum);
-
- {
- auto newCmd = txnRouter.attachTxnFieldsIfNeeded(shard1,
- BSON("insert"
- << "test"));
- ASSERT_BSONOBJ_EQ(expectedNewObj, newCmd);
- }
-
- {
- auto newCmd = txnRouter.attachTxnFieldsIfNeeded(shard1,
- BSON("update"
- << "test"));
- ASSERT_BSONOBJ_EQ(BSON("update"
- << "test"
- << "coordinator"
- << true
- << "autocommit"
- << false
- << "txnNumber"
- << txnNum),
- newCmd);
- }
-
- expectedNewObj = BSON("insert"
- << "test"
- << "readConcern"
- << BSON("level"
- << "snapshot")
- << "startTransaction"
- << true
- << "autocommit"
- << false
- << "txnNumber"
- << txnNum);
-
- {
- auto newCmd = txnRouter.attachTxnFieldsIfNeeded(shard2,
- BSON("insert"
- << "test"));
- ASSERT_BSONOBJ_EQ(expectedNewObj, newCmd);
- }
-
- {
- auto newCmd = txnRouter.attachTxnFieldsIfNeeded(shard2,
- BSON("update"
- << "test"));
- ASSERT_BSONOBJ_EQ(BSON("update"
- << "test"
- << "autocommit"
- << false
- << "txnNumber"
- << txnNum),
- newCmd);
- }
-}
-
-TEST_F(TransactionRouterTest, NewParticipantMustAttachTxnAndReadConcernWithAtClusterTime) {
- TxnNumber txnNum{3};
-
- TransactionRouter txnRouter({});
- txnRouter.checkOut();
- txnRouter.beginOrContinueTxn(operationContext(), txnNum, true);
- txnRouter.computeAtClusterTimeForOneShard(operationContext(), shard1);
+ txnRouter.setAtClusterTimeToLatestTime(operationContext());
BSONObj expectedNewObj = BSON("insert"
<< "test"
@@ -339,7 +265,7 @@ TEST_F(TransactionRouterTest, StartingNewTxnShouldClearState) {
TransactionRouter txnRouter({});
txnRouter.checkOut();
txnRouter.beginOrContinueTxn(operationContext(), txnNum, true);
- txnRouter.computeAtClusterTimeForOneShard(operationContext(), shard1);
+ txnRouter.setAtClusterTimeToLatestTime(operationContext());
{
auto newCmd = txnRouter.attachTxnFieldsIfNeeded(shard1,
@@ -365,12 +291,15 @@ TEST_F(TransactionRouterTest, StartingNewTxnShouldClearState) {
TxnNumber txnNum2{5};
txnRouter.beginOrContinueTxn(operationContext(), txnNum2, true);
+ txnRouter.setAtClusterTimeToLatestTime(operationContext());
BSONObj expectedNewObj = BSON("insert"
<< "test"
<< "readConcern"
<< BSON("level"
- << "snapshot")
+ << "snapshot"
+ << "atClusterTime"
+ << kInMemoryLogicalTime.asTimestamp())
<< "startTransaction"
<< true
<< "coordinator"
@@ -394,6 +323,7 @@ TEST_F(TransactionRouterTest, FirstParticipantIsCoordinator) {
TransactionRouter txnRouter({});
txnRouter.checkOut();
txnRouter.beginOrContinueTxn(operationContext(), txnNum, true);
+ txnRouter.setAtClusterTimeToLatestTime(operationContext());
ASSERT_FALSE(txnRouter.getCoordinatorId());
@@ -416,6 +346,7 @@ TEST_F(TransactionRouterTest, FirstParticipantIsCoordinator) {
TxnNumber txnNum2{5};
txnRouter.beginOrContinueTxn(operationContext(), txnNum2, true);
+ txnRouter.setAtClusterTimeToLatestTime(operationContext());
ASSERT_FALSE(txnRouter.getCoordinatorId());
@@ -434,6 +365,7 @@ TEST_F(TransactionRouterTest, DoesNotAttachTxnNumIfAlreadyThere) {
TransactionRouter txnRouter({});
txnRouter.checkOut();
txnRouter.beginOrContinueTxn(operationContext(), txnNum, true);
+ txnRouter.setAtClusterTimeToLatestTime(operationContext());
BSONObj expectedNewObj = BSON("insert"
<< "test"
@@ -441,7 +373,9 @@ TEST_F(TransactionRouterTest, DoesNotAttachTxnNumIfAlreadyThere) {
<< txnNum
<< "readConcern"
<< BSON("level"
- << "snapshot")
+ << "snapshot"
+ << "atClusterTime"
+ << kInMemoryLogicalTime.asTimestamp())
<< "startTransaction"
<< true
<< "coordinator"
@@ -463,6 +397,7 @@ DEATH_TEST_F(TransactionRouterTest, CrashesIfCmdHasDifferentTxnNumber, "invarian
TransactionRouter txnRouter({});
txnRouter.checkOut();
txnRouter.beginOrContinueTxn(operationContext(), txnNum, true);
+ txnRouter.setAtClusterTimeToLatestTime(operationContext());
txnRouter.attachTxnFieldsIfNeeded(shard1,
BSON("insert"
@@ -477,7 +412,7 @@ TEST_F(TransactionRouterTest, AttachTxnValidatesReadConcernIfAlreadyOnCmd) {
TransactionRouter txnRouter({});
txnRouter.checkOut();
txnRouter.beginOrContinueTxn(operationContext(), txnNum, true);
-
+ txnRouter.setAtClusterTimeToLatestTime(operationContext());
{
auto newCmd = txnRouter.attachTxnFieldsIfNeeded(shard1,
@@ -490,7 +425,9 @@ TEST_F(TransactionRouterTest, AttachTxnValidatesReadConcernIfAlreadyOnCmd) {
<< "test"
<< "readConcern"
<< BSON("level"
- << "snapshot")
+ << "snapshot"
+ << "atClusterTime"
+ << kInMemoryLogicalTime.asTimestamp())
<< "startTransaction"
<< true
<< "coordinator"
@@ -509,6 +446,7 @@ TEST_F(TransactionRouterTest, CannotSpecifyReadConcernAfterFirstStatement) {
TransactionRouter txnRouter({});
txnRouter.checkOut();
txnRouter.beginOrContinueTxn(operationContext(), txnNum, true /* startTransaction */);
+ txnRouter.setAtClusterTimeToLatestTime(operationContext());
ASSERT_THROWS_CODE(
txnRouter.beginOrContinueTxn(operationContext(), txnNum, false /* startTransaction */),
@@ -523,12 +461,15 @@ TEST_F(TransactionRouterTest, UpconvertToSnapshotIfNoReadConcernLevelGiven) {
TransactionRouter txnRouter({});
txnRouter.checkOut();
txnRouter.beginOrContinueTxn(operationContext(), txnNum, true /* startTransaction */);
+ txnRouter.setAtClusterTimeToLatestTime(operationContext());
BSONObj expectedNewObj = BSON("insert"
<< "test"
<< "readConcern"
<< BSON("level"
- << "snapshot")
+ << "snapshot"
+ << "atClusterTime"
+ << kInMemoryLogicalTime.asTimestamp())
<< "startTransaction"
<< true
<< "coordinator"
@@ -545,23 +486,23 @@ TEST_F(TransactionRouterTest, UpconvertToSnapshotIfNoReadConcernLevelGiven) {
}
TEST_F(TransactionRouterTest, UpconvertToSnapshotIfNoReadConcernLevelButHasAfterClusterTime) {
+ LogicalTime kAfterClusterTime(Timestamp(10, 1));
repl::ReadConcernArgs::get(operationContext()) =
- repl::ReadConcernArgs(LogicalTime(Timestamp(10, 1)), boost::none);
+ repl::ReadConcernArgs(kAfterClusterTime, boost::none);
TxnNumber txnNum{3};
TransactionRouter txnRouter({});
txnRouter.checkOut();
txnRouter.beginOrContinueTxn(operationContext(), txnNum, true /* startTransaction */);
+ txnRouter.setAtClusterTimeToLatestTime(operationContext());
BSONObj expectedNewObj = BSON("insert"
<< "test"
<< "readConcern"
<< BSON("level"
<< "snapshot"
- // TODO SERVER-36237: afterClusterTime should be replaced
- // by an atClusterTime at least as large.
- << "afterClusterTime"
- << Timestamp(10, 1))
+ << "atClusterTime"
+ << kAfterClusterTime.asTimestamp())
<< "startTransaction"
<< true
<< "coordinator"
@@ -656,6 +597,7 @@ TEST_F(TransactionRouterTest, CannotCommitWithoutParticipants) {
TransactionRouter txnRouter({});
txnRouter.checkOut();
txnRouter.beginOrContinueTxn(operationContext(), txnNum, true);
+ txnRouter.setAtClusterTimeToLatestTime(operationContext());
ASSERT_THROWS(txnRouter.commitTransaction(operationContext()), AssertionException);
}
@@ -694,6 +636,7 @@ TEST_F(TransactionRouterTest, SendCommitDirectlyForSingleParticipants) {
auto txnRouter = TransactionRouter::get(opCtx);
txnRouter->beginOrContinueTxn(opCtx, txnNum, true);
+ txnRouter->setAtClusterTimeToLatestTime(operationContext());
txnRouter->attachTxnFieldsIfNeeded(shard1, {});
auto future = launchAsync([&] { txnRouter->commitTransaction(operationContext()); });
@@ -725,6 +668,7 @@ TEST_F(TransactionRouterTest, SendPrepareAndCoordinateCommitForMultipleParticipa
auto txnRouter = TransactionRouter::get(opCtx);
txnRouter->beginOrContinueTxn(opCtx, txnNum, true);
+ txnRouter->setAtClusterTimeToLatestTime(operationContext());
txnRouter->attachTxnFieldsIfNeeded(shard1, {});
txnRouter->attachTxnFieldsIfNeeded(shard2, {});
@@ -772,7 +716,6 @@ TEST_F(TransactionRouterTest, SnapshotErrorsResetAtClusterTime) {
TransactionRouter txnRouter({});
txnRouter.checkOut();
txnRouter.beginOrContinueTxn(operationContext(), txnNum, true);
-
txnRouter.setAtClusterTimeToLatestTime(operationContext());
BSONObj expectedReadConcern = BSON("level"
@@ -816,6 +759,7 @@ TEST_F(TransactionRouterTest, CannotChangeAtClusterTimeWithoutSnapshotError) {
TransactionRouter txnRouter({});
txnRouter.checkOut();
txnRouter.beginOrContinueTxn(operationContext(), txnNum, true);
+ txnRouter.setAtClusterTimeToLatestTime(operationContext());
txnRouter.setAtClusterTimeToLatestTime(operationContext());
@@ -851,6 +795,7 @@ TEST_F(TransactionRouterTest, SnapshotErrorsClearsAllParticipants) {
TransactionRouter txnRouter({});
txnRouter.checkOut();
txnRouter.beginOrContinueTxn(operationContext(), txnNum, true);
+ txnRouter.setAtClusterTimeToLatestTime(operationContext());
// Successfully start a transaction on two shards, selecting one as the coordinator.
@@ -865,6 +810,8 @@ TEST_F(TransactionRouterTest, SnapshotErrorsClearsAllParticipants) {
txnRouter.onSnapshotError();
+ txnRouter.setAtClusterTimeToLatestTime(operationContext());
+
ASSERT_FALSE(txnRouter.getCoordinatorId());
{
@@ -895,10 +842,13 @@ TEST_F(TransactionRouterTest, OnSnapshotErrorThrowsAfterFirstCommand) {
TransactionRouter txnRouter({});
txnRouter.checkOut();
txnRouter.beginOrContinueTxn(operationContext(), txnNum, true);
+ txnRouter.setAtClusterTimeToLatestTime(operationContext());
// Should not throw.
txnRouter.onSnapshotError();
+ txnRouter.setAtClusterTimeToLatestTime(operationContext());
+
repl::ReadConcernArgs::get(operationContext()) = repl::ReadConcernArgs();
txnRouter.beginOrContinueTxn(operationContext(), txnNum, false);
ASSERT_THROWS_CODE(
@@ -916,6 +866,7 @@ TEST_F(TransactionRouterTest, ParticipantsRememberStmtIdCreatedAt) {
TxnNumber txnNum{3};
txnRouter.beginOrContinueTxn(operationContext(), txnNum, true);
+ txnRouter.setAtClusterTimeToLatestTime(operationContext());
// Transaction 1 contacts shard1 and shard2 during the first command, then shard3 in the second
// command.
@@ -944,6 +895,7 @@ TEST_F(TransactionRouterTest, ParticipantsRememberStmtIdCreatedAt) {
repl::ReadConcernArgs(repl::ReadConcernLevel::kSnapshotReadConcern);
TxnNumber txnNum2{5};
txnRouter.beginOrContinueTxn(operationContext(), txnNum2, true);
+ txnRouter.setAtClusterTimeToLatestTime(operationContext());
txnRouter.attachTxnFieldsIfNeeded(shard3, {});
txnRouter.attachTxnFieldsIfNeeded(shard2, {});
@@ -964,6 +916,7 @@ TEST_F(TransactionRouterTest, AllParticipantsAndCoordinatorClearedOnStaleErrorOn
TransactionRouter txnRouter({});
txnRouter.checkOut();
txnRouter.beginOrContinueTxn(operationContext(), txnNum, true);
+ txnRouter.setAtClusterTimeToLatestTime(operationContext());
// Start a transaction on two shards, selecting one as the coordinator, but simulate a
// re-targeting error from at least one of them.
@@ -1004,6 +957,7 @@ TEST_F(TransactionRouterTest, OnlyNewlyCreatedParticipantsClearedOnStaleError) {
TransactionRouter txnRouter({});
txnRouter.checkOut();
txnRouter.beginOrContinueTxn(operationContext(), txnNum, true);
+ txnRouter.setAtClusterTimeToLatestTime(operationContext());
// First statement successfully targets one shard, selecing it as the coordinator.
@@ -1035,6 +989,7 @@ TEST_F(TransactionRouterTest, RetryOnStaleErrorCannotPickNewAtClusterTime) {
TransactionRouter txnRouter({});
txnRouter.checkOut();
txnRouter.beginOrContinueTxn(operationContext(), txnNum, true);
+ txnRouter.setAtClusterTimeToLatestTime(operationContext());
txnRouter.setAtClusterTimeToLatestTime(operationContext());
@@ -1077,6 +1032,7 @@ TEST_F(TransactionRouterTest, WritesCanOnlyBeRetriedIfFirstOverallCommand) {
TransactionRouter txnRouter({});
txnRouter.checkOut();
txnRouter.beginOrContinueTxn(operationContext(), txnNum, true);
+ txnRouter.setAtClusterTimeToLatestTime(operationContext());
txnRouter.attachTxnFieldsIfNeeded(shard1, {});
@@ -1115,6 +1071,7 @@ TEST_F(TransactionRouterTest, AbortThrowsIfNoParticipants) {
auto txnRouter = TransactionRouter::get(opCtx);
txnRouter->beginOrContinueTxn(opCtx, txnNum, true);
+ txnRouter->setAtClusterTimeToLatestTime(operationContext());
ASSERT_THROWS_CODE(
txnRouter->abortTransaction(opCtx), DBException, ErrorCodes::NoSuchTransaction);
@@ -1132,6 +1089,7 @@ TEST_F(TransactionRouterTest, AbortForSingleParticipant) {
auto txnRouter = TransactionRouter::get(opCtx);
txnRouter->beginOrContinueTxn(opCtx, txnNum, true);
+ txnRouter->setAtClusterTimeToLatestTime(operationContext());
txnRouter->attachTxnFieldsIfNeeded(shard1, {});
auto future = launchAsync([&] { return txnRouter->abortTransaction(operationContext()); });
@@ -1164,6 +1122,7 @@ TEST_F(TransactionRouterTest, AbortForMultipleParticipants) {
auto txnRouter = TransactionRouter::get(opCtx);
txnRouter->beginOrContinueTxn(opCtx, txnNum, true);
+ txnRouter->setAtClusterTimeToLatestTime(operationContext());
txnRouter->attachTxnFieldsIfNeeded(shard1, {});
txnRouter->attachTxnFieldsIfNeeded(shard2, {});
@@ -1203,6 +1162,7 @@ TEST_F(TransactionRouterTest, OnViewResolutionErrorClearsAllNewParticipants) {
TransactionRouter txnRouter({});
txnRouter.checkOut();
txnRouter.beginOrContinueTxn(operationContext(), txnNum, true);
+ txnRouter.setAtClusterTimeToLatestTime(operationContext());
// One shard is targeted by the first statement.
auto firstShardCmd = txnRouter.attachTxnFieldsIfNeeded(shard1, {});
@@ -1252,6 +1212,7 @@ TEST_F(TransactionRouterTest, ImplicitAbortIsNoopWithNoParticipants) {
auto txnRouter = TransactionRouter::get(opCtx);
txnRouter->beginOrContinueTxn(opCtx, txnNum, true);
+ txnRouter->setAtClusterTimeToLatestTime(operationContext());
// Should not throw.
txnRouter->implicitlyAbortTransaction(opCtx);
@@ -1269,6 +1230,7 @@ TEST_F(TransactionRouterTest, ImplicitAbortForSingleParticipant) {
auto txnRouter = TransactionRouter::get(opCtx);
txnRouter->beginOrContinueTxn(opCtx, txnNum, true);
+ txnRouter->setAtClusterTimeToLatestTime(operationContext());
txnRouter->attachTxnFieldsIfNeeded(shard1, {});
auto future =
@@ -1301,6 +1263,7 @@ TEST_F(TransactionRouterTest, ImplicitAbortForMultipleParticipants) {
auto txnRouter = TransactionRouter::get(opCtx);
txnRouter->beginOrContinueTxn(opCtx, txnNum, true);
+ txnRouter->setAtClusterTimeToLatestTime(operationContext());
txnRouter->attachTxnFieldsIfNeeded(shard1, {});
txnRouter->attachTxnFieldsIfNeeded(shard2, {});
@@ -1346,6 +1309,7 @@ TEST_F(TransactionRouterTest, ImplicitAbortIgnoresErrors) {
auto txnRouter = TransactionRouter::get(opCtx);
txnRouter->beginOrContinueTxn(opCtx, txnNum, true);
+ txnRouter->setAtClusterTimeToLatestTime(operationContext());
txnRouter->attachTxnFieldsIfNeeded(shard1, {});
auto future =
diff --git a/src/mongo/s/write_ops/SConscript b/src/mongo/s/write_ops/SConscript
index 3de864d1f8e..43d38aecc7e 100644
--- a/src/mongo/s/write_ops/SConscript
+++ b/src/mongo/s/write_ops/SConscript
@@ -68,6 +68,7 @@ env.CppUnitTest(
],
LIBDEPS=[
'$BUILD_DIR/mongo/db/auth/authmocks',
+ '$BUILD_DIR/mongo/db/logical_clock',
'$BUILD_DIR/mongo/s/sharding_router_test_fixture',
'cluster_write_op',
]
diff --git a/src/mongo/s/write_ops/batch_write_exec_test.cpp b/src/mongo/s/write_ops/batch_write_exec_test.cpp
index 0dee3157e48..eb8a27b5e33 100644
--- a/src/mongo/s/write_ops/batch_write_exec_test.cpp
+++ b/src/mongo/s/write_ops/batch_write_exec_test.cpp
@@ -31,6 +31,7 @@
#include "mongo/client/remote_command_targeter_factory_mock.h"
#include "mongo/client/remote_command_targeter_mock.h"
#include "mongo/db/commands.h"
+#include "mongo/db/logical_clock.h"
#include "mongo/db/logical_session_id.h"
#include "mongo/s/catalog/type_shard.h"
#include "mongo/s/client/shard_registry.h"
@@ -576,6 +577,7 @@ TEST_F(BatchWriteExecTest, NonRetryableErrorTxnNumber) {
class BatchWriteExecTransactionTest : public BatchWriteExecTest {
public:
const TxnNumber kTxnNumber = 5;
+ const LogicalTime kInMemoryLogicalTime = LogicalTime(Timestamp(3, 1));
void setUp() override {
BatchWriteExecTest::setUp();
@@ -587,9 +589,14 @@ public:
_scopedSession.emplace(operationContext());
+ auto logicalClock = stdx::make_unique<LogicalClock>(getServiceContext());
+ logicalClock->setClusterTimeFromTrustedSource(kInMemoryLogicalTime);
+ LogicalClock::set(getServiceContext(), std::move(logicalClock));
+
auto txnRouter = TransactionRouter::get(operationContext());
txnRouter->checkOut();
txnRouter->beginOrContinueTxn(operationContext(), kTxnNumber, true);
+ txnRouter->setAtClusterTimeToLatestTime(operationContext());
}
void tearDown() override {