diff options
author | Jack Mulrow <jack.mulrow@mongodb.com> | 2018-09-28 11:37:11 -0400 |
---|---|---|
committer | Jack Mulrow <jack.mulrow@mongodb.com> | 2018-10-09 09:39:08 -0400 |
commit | 3dc5e5979bc9a4a914b0a479e88e70078bd02552 (patch) | |
tree | cf79737a7d466d238dd2769960ddc7690befb86b /src | |
parent | f2254e26b9c7bdd94e4e6613ef5a20302ecc855e (diff) | |
download | mongo-3dc5e5979bc9a4a914b0a479e88e70078bd02552.tar.gz |
SERVER-37223 Invariant sharded txns with snapshot read concern select atClusterTime before creating a participant
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/s/transaction_router.cpp | 32 | ||||
-rw-r--r-- | src/mongo/s/transaction_router.h | 19 | ||||
-rw-r--r-- | src/mongo/s/transaction_router_test.cpp | 148 | ||||
-rw-r--r-- | src/mongo/s/write_ops/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/s/write_ops/batch_write_exec_test.cpp | 7 |
5 files changed, 106 insertions, 101 deletions
diff --git a/src/mongo/s/transaction_router.cpp b/src/mongo/s/transaction_router.cpp index 013f212148d..30a6c0054f1 100644 --- a/src/mongo/s/transaction_router.cpp +++ b/src/mongo/s/transaction_router.cpp @@ -281,6 +281,25 @@ BSONObj TransactionRouter::attachTxnFieldsIfNeeded(const ShardId& shardId, const return txnPart.attachTxnFieldsIfNeeded(cmdObj, true); } +void TransactionRouter::_verifyReadConcern() { + invariant(!_readConcernArgs.isEmpty()); + + if (_readConcernArgs.getLevel() == repl::ReadConcernLevel::kSnapshotReadConcern) { + invariant(_atClusterTime); + invariant(_atClusterTime != LogicalTime::kUninitialized); + } +} + +void TransactionRouter::_verifyParticipantAtClusterTime(const Participant& participant) { + if (_readConcernArgs.getLevel() != repl::ReadConcernLevel::kSnapshotReadConcern) { + return; + } + + auto participantAtClusterTime = participant.getSharedOptions().atClusterTime; + invariant(participantAtClusterTime); + invariant(participantAtClusterTime == _atClusterTime); +} + boost::optional<TransactionRouter::Participant&> TransactionRouter::getParticipant( const ShardId& shard) { auto iter = _participants.find(shard.toString()); @@ -288,9 +307,9 @@ boost::optional<TransactionRouter::Participant&> TransactionRouter::getParticipa return boost::none; } - // TODO SERVER-37223: Once mongos aborts transactions by only sending abortTransaction to - // shards that have been successfully contacted we should be able to add an invariant here - // to ensure the atClusterTime on the participant matches that on the transaction router. + _verifyReadConcern(); + _verifyParticipantAtClusterTime(iter->second); + return iter->second; } @@ -302,12 +321,7 @@ TransactionRouter::Participant& TransactionRouter::_createParticipant(const Shar _coordinatorId = shard.toString(); } - // The transaction must have been started with a readConcern. - invariant(!_readConcernArgs.isEmpty()); - - // TODO SERVER-37223: Once mongos aborts transactions by only sending abortTransaction to shards - // that have been successfully contacted we should be able to add an invariant here to ensure - // that an atClusterTime has been chosen if the read concern level is snapshot. + _verifyReadConcern(); auto resultPair = _participants.try_emplace( shard.toString(), diff --git a/src/mongo/s/transaction_router.h b/src/mongo/s/transaction_router.h index 3020db842c3..5c869d972d6 100644 --- a/src/mongo/s/transaction_router.h +++ b/src/mongo/s/transaction_router.h @@ -92,6 +92,13 @@ public: */ StmtId getStmtIdCreatedAt() const; + /** + * Returns the shared transaction options this participant was created with. + */ + const auto& getSharedOptions() const { + return _sharedOptions; + } + private: const bool _isCoordinator{false}; @@ -245,6 +252,18 @@ private: */ Participant& _createParticipant(const ShardId& shard); + /** + * Asserts the transaction has a valid read concern and, if the read concern level is snapshot, + * has selected a non-null atClusterTime. + */ + void _verifyReadConcern(); + + /** + * If the transaction's read concern level is snapshot, asserts the participant's atClusterTime + * matches the transaction's. + */ + void _verifyParticipantAtClusterTime(const Participant& participant); + const LogicalSessionId _sessionId; TxnNumber _txnNumber{kUninitializedTxnNumber}; diff --git a/src/mongo/s/transaction_router_test.cpp b/src/mongo/s/transaction_router_test.cpp index cb211627533..dd201c37ab3 100644 --- a/src/mongo/s/transaction_router_test.cpp +++ b/src/mongo/s/transaction_router_test.cpp @@ -80,12 +80,15 @@ TEST_F(TransactionRouterTest, StartTxnShouldBeAttachedOnlyOnFirstStatementToPart TransactionRouter txnRouter({}); txnRouter.checkOut(); txnRouter.beginOrContinueTxn(operationContext(), txnNum, true); + txnRouter.setAtClusterTimeToLatestTime(operationContext()); BSONObj expectedNewObj = BSON("insert" << "test" << "readConcern" << BSON("level" - << "snapshot") + << "snapshot" + << "atClusterTime" + << kInMemoryLogicalTime.asTimestamp()) << "startTransaction" << true << "coordinator" @@ -124,7 +127,7 @@ TEST_F(TransactionRouterTest, BasicStartTxnWithAtClusterTime) { TransactionRouter txnRouter({}); txnRouter.checkOut(); txnRouter.beginOrContinueTxn(operationContext(), txnNum, true); - txnRouter.computeAtClusterTimeForOneShard(operationContext(), shard1); + txnRouter.setAtClusterTimeToLatestTime(operationContext()); BSONObj expectedNewObj = BSON("insert" << "test" @@ -142,7 +145,6 @@ TEST_F(TransactionRouterTest, BasicStartTxnWithAtClusterTime) { << "txnNumber" << txnNum); - { auto newCmd = txnRouter.attachTxnFieldsIfNeeded(shard1, BSON("insert" @@ -182,83 +184,7 @@ TEST_F(TransactionRouterTest, NewParticipantMustAttachTxnAndReadConcern) { TransactionRouter txnRouter({}); txnRouter.checkOut(); txnRouter.beginOrContinueTxn(operationContext(), txnNum, true); - - BSONObj expectedNewObj = BSON("insert" - << "test" - << "readConcern" - << BSON("level" - << "snapshot") - << "startTransaction" - << true - << "coordinator" - << true - << "autocommit" - << false - << "txnNumber" - << txnNum); - - { - auto newCmd = txnRouter.attachTxnFieldsIfNeeded(shard1, - BSON("insert" - << "test")); - ASSERT_BSONOBJ_EQ(expectedNewObj, newCmd); - } - - { - auto newCmd = txnRouter.attachTxnFieldsIfNeeded(shard1, - BSON("update" - << "test")); - ASSERT_BSONOBJ_EQ(BSON("update" - << "test" - << "coordinator" - << true - << "autocommit" - << false - << "txnNumber" - << txnNum), - newCmd); - } - - expectedNewObj = BSON("insert" - << "test" - << "readConcern" - << BSON("level" - << "snapshot") - << "startTransaction" - << true - << "autocommit" - << false - << "txnNumber" - << txnNum); - - { - auto newCmd = txnRouter.attachTxnFieldsIfNeeded(shard2, - BSON("insert" - << "test")); - ASSERT_BSONOBJ_EQ(expectedNewObj, newCmd); - } - - { - auto newCmd = txnRouter.attachTxnFieldsIfNeeded(shard2, - BSON("update" - << "test")); - ASSERT_BSONOBJ_EQ(BSON("update" - << "test" - << "autocommit" - << false - << "txnNumber" - << txnNum), - newCmd); - } -} - -TEST_F(TransactionRouterTest, NewParticipantMustAttachTxnAndReadConcernWithAtClusterTime) { - TxnNumber txnNum{3}; - - TransactionRouter txnRouter({}); - txnRouter.checkOut(); - txnRouter.beginOrContinueTxn(operationContext(), txnNum, true); - txnRouter.computeAtClusterTimeForOneShard(operationContext(), shard1); + txnRouter.setAtClusterTimeToLatestTime(operationContext()); BSONObj expectedNewObj = BSON("insert" << "test" @@ -339,7 +265,7 @@ TEST_F(TransactionRouterTest, StartingNewTxnShouldClearState) { TransactionRouter txnRouter({}); txnRouter.checkOut(); txnRouter.beginOrContinueTxn(operationContext(), txnNum, true); - txnRouter.computeAtClusterTimeForOneShard(operationContext(), shard1); + txnRouter.setAtClusterTimeToLatestTime(operationContext()); { auto newCmd = txnRouter.attachTxnFieldsIfNeeded(shard1, @@ -365,12 +291,15 @@ TEST_F(TransactionRouterTest, StartingNewTxnShouldClearState) { TxnNumber txnNum2{5}; txnRouter.beginOrContinueTxn(operationContext(), txnNum2, true); + txnRouter.setAtClusterTimeToLatestTime(operationContext()); BSONObj expectedNewObj = BSON("insert" << "test" << "readConcern" << BSON("level" - << "snapshot") + << "snapshot" + << "atClusterTime" + << kInMemoryLogicalTime.asTimestamp()) << "startTransaction" << true << "coordinator" @@ -394,6 +323,7 @@ TEST_F(TransactionRouterTest, FirstParticipantIsCoordinator) { TransactionRouter txnRouter({}); txnRouter.checkOut(); txnRouter.beginOrContinueTxn(operationContext(), txnNum, true); + txnRouter.setAtClusterTimeToLatestTime(operationContext()); ASSERT_FALSE(txnRouter.getCoordinatorId()); @@ -416,6 +346,7 @@ TEST_F(TransactionRouterTest, FirstParticipantIsCoordinator) { TxnNumber txnNum2{5}; txnRouter.beginOrContinueTxn(operationContext(), txnNum2, true); + txnRouter.setAtClusterTimeToLatestTime(operationContext()); ASSERT_FALSE(txnRouter.getCoordinatorId()); @@ -434,6 +365,7 @@ TEST_F(TransactionRouterTest, DoesNotAttachTxnNumIfAlreadyThere) { TransactionRouter txnRouter({}); txnRouter.checkOut(); txnRouter.beginOrContinueTxn(operationContext(), txnNum, true); + txnRouter.setAtClusterTimeToLatestTime(operationContext()); BSONObj expectedNewObj = BSON("insert" << "test" @@ -441,7 +373,9 @@ TEST_F(TransactionRouterTest, DoesNotAttachTxnNumIfAlreadyThere) { << txnNum << "readConcern" << BSON("level" - << "snapshot") + << "snapshot" + << "atClusterTime" + << kInMemoryLogicalTime.asTimestamp()) << "startTransaction" << true << "coordinator" @@ -463,6 +397,7 @@ DEATH_TEST_F(TransactionRouterTest, CrashesIfCmdHasDifferentTxnNumber, "invarian TransactionRouter txnRouter({}); txnRouter.checkOut(); txnRouter.beginOrContinueTxn(operationContext(), txnNum, true); + txnRouter.setAtClusterTimeToLatestTime(operationContext()); txnRouter.attachTxnFieldsIfNeeded(shard1, BSON("insert" @@ -477,7 +412,7 @@ TEST_F(TransactionRouterTest, AttachTxnValidatesReadConcernIfAlreadyOnCmd) { TransactionRouter txnRouter({}); txnRouter.checkOut(); txnRouter.beginOrContinueTxn(operationContext(), txnNum, true); - + txnRouter.setAtClusterTimeToLatestTime(operationContext()); { auto newCmd = txnRouter.attachTxnFieldsIfNeeded(shard1, @@ -490,7 +425,9 @@ TEST_F(TransactionRouterTest, AttachTxnValidatesReadConcernIfAlreadyOnCmd) { << "test" << "readConcern" << BSON("level" - << "snapshot") + << "snapshot" + << "atClusterTime" + << kInMemoryLogicalTime.asTimestamp()) << "startTransaction" << true << "coordinator" @@ -509,6 +446,7 @@ TEST_F(TransactionRouterTest, CannotSpecifyReadConcernAfterFirstStatement) { TransactionRouter txnRouter({}); txnRouter.checkOut(); txnRouter.beginOrContinueTxn(operationContext(), txnNum, true /* startTransaction */); + txnRouter.setAtClusterTimeToLatestTime(operationContext()); ASSERT_THROWS_CODE( txnRouter.beginOrContinueTxn(operationContext(), txnNum, false /* startTransaction */), @@ -523,12 +461,15 @@ TEST_F(TransactionRouterTest, UpconvertToSnapshotIfNoReadConcernLevelGiven) { TransactionRouter txnRouter({}); txnRouter.checkOut(); txnRouter.beginOrContinueTxn(operationContext(), txnNum, true /* startTransaction */); + txnRouter.setAtClusterTimeToLatestTime(operationContext()); BSONObj expectedNewObj = BSON("insert" << "test" << "readConcern" << BSON("level" - << "snapshot") + << "snapshot" + << "atClusterTime" + << kInMemoryLogicalTime.asTimestamp()) << "startTransaction" << true << "coordinator" @@ -545,23 +486,23 @@ TEST_F(TransactionRouterTest, UpconvertToSnapshotIfNoReadConcernLevelGiven) { } TEST_F(TransactionRouterTest, UpconvertToSnapshotIfNoReadConcernLevelButHasAfterClusterTime) { + LogicalTime kAfterClusterTime(Timestamp(10, 1)); repl::ReadConcernArgs::get(operationContext()) = - repl::ReadConcernArgs(LogicalTime(Timestamp(10, 1)), boost::none); + repl::ReadConcernArgs(kAfterClusterTime, boost::none); TxnNumber txnNum{3}; TransactionRouter txnRouter({}); txnRouter.checkOut(); txnRouter.beginOrContinueTxn(operationContext(), txnNum, true /* startTransaction */); + txnRouter.setAtClusterTimeToLatestTime(operationContext()); BSONObj expectedNewObj = BSON("insert" << "test" << "readConcern" << BSON("level" << "snapshot" - // TODO SERVER-36237: afterClusterTime should be replaced - // by an atClusterTime at least as large. - << "afterClusterTime" - << Timestamp(10, 1)) + << "atClusterTime" + << kAfterClusterTime.asTimestamp()) << "startTransaction" << true << "coordinator" @@ -656,6 +597,7 @@ TEST_F(TransactionRouterTest, CannotCommitWithoutParticipants) { TransactionRouter txnRouter({}); txnRouter.checkOut(); txnRouter.beginOrContinueTxn(operationContext(), txnNum, true); + txnRouter.setAtClusterTimeToLatestTime(operationContext()); ASSERT_THROWS(txnRouter.commitTransaction(operationContext()), AssertionException); } @@ -694,6 +636,7 @@ TEST_F(TransactionRouterTest, SendCommitDirectlyForSingleParticipants) { auto txnRouter = TransactionRouter::get(opCtx); txnRouter->beginOrContinueTxn(opCtx, txnNum, true); + txnRouter->setAtClusterTimeToLatestTime(operationContext()); txnRouter->attachTxnFieldsIfNeeded(shard1, {}); auto future = launchAsync([&] { txnRouter->commitTransaction(operationContext()); }); @@ -725,6 +668,7 @@ TEST_F(TransactionRouterTest, SendPrepareAndCoordinateCommitForMultipleParticipa auto txnRouter = TransactionRouter::get(opCtx); txnRouter->beginOrContinueTxn(opCtx, txnNum, true); + txnRouter->setAtClusterTimeToLatestTime(operationContext()); txnRouter->attachTxnFieldsIfNeeded(shard1, {}); txnRouter->attachTxnFieldsIfNeeded(shard2, {}); @@ -772,7 +716,6 @@ TEST_F(TransactionRouterTest, SnapshotErrorsResetAtClusterTime) { TransactionRouter txnRouter({}); txnRouter.checkOut(); txnRouter.beginOrContinueTxn(operationContext(), txnNum, true); - txnRouter.setAtClusterTimeToLatestTime(operationContext()); BSONObj expectedReadConcern = BSON("level" @@ -816,6 +759,7 @@ TEST_F(TransactionRouterTest, CannotChangeAtClusterTimeWithoutSnapshotError) { TransactionRouter txnRouter({}); txnRouter.checkOut(); txnRouter.beginOrContinueTxn(operationContext(), txnNum, true); + txnRouter.setAtClusterTimeToLatestTime(operationContext()); txnRouter.setAtClusterTimeToLatestTime(operationContext()); @@ -851,6 +795,7 @@ TEST_F(TransactionRouterTest, SnapshotErrorsClearsAllParticipants) { TransactionRouter txnRouter({}); txnRouter.checkOut(); txnRouter.beginOrContinueTxn(operationContext(), txnNum, true); + txnRouter.setAtClusterTimeToLatestTime(operationContext()); // Successfully start a transaction on two shards, selecting one as the coordinator. @@ -865,6 +810,8 @@ TEST_F(TransactionRouterTest, SnapshotErrorsClearsAllParticipants) { txnRouter.onSnapshotError(); + txnRouter.setAtClusterTimeToLatestTime(operationContext()); + ASSERT_FALSE(txnRouter.getCoordinatorId()); { @@ -895,10 +842,13 @@ TEST_F(TransactionRouterTest, OnSnapshotErrorThrowsAfterFirstCommand) { TransactionRouter txnRouter({}); txnRouter.checkOut(); txnRouter.beginOrContinueTxn(operationContext(), txnNum, true); + txnRouter.setAtClusterTimeToLatestTime(operationContext()); // Should not throw. txnRouter.onSnapshotError(); + txnRouter.setAtClusterTimeToLatestTime(operationContext()); + repl::ReadConcernArgs::get(operationContext()) = repl::ReadConcernArgs(); txnRouter.beginOrContinueTxn(operationContext(), txnNum, false); ASSERT_THROWS_CODE( @@ -916,6 +866,7 @@ TEST_F(TransactionRouterTest, ParticipantsRememberStmtIdCreatedAt) { TxnNumber txnNum{3}; txnRouter.beginOrContinueTxn(operationContext(), txnNum, true); + txnRouter.setAtClusterTimeToLatestTime(operationContext()); // Transaction 1 contacts shard1 and shard2 during the first command, then shard3 in the second // command. @@ -944,6 +895,7 @@ TEST_F(TransactionRouterTest, ParticipantsRememberStmtIdCreatedAt) { repl::ReadConcernArgs(repl::ReadConcernLevel::kSnapshotReadConcern); TxnNumber txnNum2{5}; txnRouter.beginOrContinueTxn(operationContext(), txnNum2, true); + txnRouter.setAtClusterTimeToLatestTime(operationContext()); txnRouter.attachTxnFieldsIfNeeded(shard3, {}); txnRouter.attachTxnFieldsIfNeeded(shard2, {}); @@ -964,6 +916,7 @@ TEST_F(TransactionRouterTest, AllParticipantsAndCoordinatorClearedOnStaleErrorOn TransactionRouter txnRouter({}); txnRouter.checkOut(); txnRouter.beginOrContinueTxn(operationContext(), txnNum, true); + txnRouter.setAtClusterTimeToLatestTime(operationContext()); // Start a transaction on two shards, selecting one as the coordinator, but simulate a // re-targeting error from at least one of them. @@ -1004,6 +957,7 @@ TEST_F(TransactionRouterTest, OnlyNewlyCreatedParticipantsClearedOnStaleError) { TransactionRouter txnRouter({}); txnRouter.checkOut(); txnRouter.beginOrContinueTxn(operationContext(), txnNum, true); + txnRouter.setAtClusterTimeToLatestTime(operationContext()); // First statement successfully targets one shard, selecing it as the coordinator. @@ -1035,6 +989,7 @@ TEST_F(TransactionRouterTest, RetryOnStaleErrorCannotPickNewAtClusterTime) { TransactionRouter txnRouter({}); txnRouter.checkOut(); txnRouter.beginOrContinueTxn(operationContext(), txnNum, true); + txnRouter.setAtClusterTimeToLatestTime(operationContext()); txnRouter.setAtClusterTimeToLatestTime(operationContext()); @@ -1077,6 +1032,7 @@ TEST_F(TransactionRouterTest, WritesCanOnlyBeRetriedIfFirstOverallCommand) { TransactionRouter txnRouter({}); txnRouter.checkOut(); txnRouter.beginOrContinueTxn(operationContext(), txnNum, true); + txnRouter.setAtClusterTimeToLatestTime(operationContext()); txnRouter.attachTxnFieldsIfNeeded(shard1, {}); @@ -1115,6 +1071,7 @@ TEST_F(TransactionRouterTest, AbortThrowsIfNoParticipants) { auto txnRouter = TransactionRouter::get(opCtx); txnRouter->beginOrContinueTxn(opCtx, txnNum, true); + txnRouter->setAtClusterTimeToLatestTime(operationContext()); ASSERT_THROWS_CODE( txnRouter->abortTransaction(opCtx), DBException, ErrorCodes::NoSuchTransaction); @@ -1132,6 +1089,7 @@ TEST_F(TransactionRouterTest, AbortForSingleParticipant) { auto txnRouter = TransactionRouter::get(opCtx); txnRouter->beginOrContinueTxn(opCtx, txnNum, true); + txnRouter->setAtClusterTimeToLatestTime(operationContext()); txnRouter->attachTxnFieldsIfNeeded(shard1, {}); auto future = launchAsync([&] { return txnRouter->abortTransaction(operationContext()); }); @@ -1164,6 +1122,7 @@ TEST_F(TransactionRouterTest, AbortForMultipleParticipants) { auto txnRouter = TransactionRouter::get(opCtx); txnRouter->beginOrContinueTxn(opCtx, txnNum, true); + txnRouter->setAtClusterTimeToLatestTime(operationContext()); txnRouter->attachTxnFieldsIfNeeded(shard1, {}); txnRouter->attachTxnFieldsIfNeeded(shard2, {}); @@ -1203,6 +1162,7 @@ TEST_F(TransactionRouterTest, OnViewResolutionErrorClearsAllNewParticipants) { TransactionRouter txnRouter({}); txnRouter.checkOut(); txnRouter.beginOrContinueTxn(operationContext(), txnNum, true); + txnRouter.setAtClusterTimeToLatestTime(operationContext()); // One shard is targeted by the first statement. auto firstShardCmd = txnRouter.attachTxnFieldsIfNeeded(shard1, {}); @@ -1252,6 +1212,7 @@ TEST_F(TransactionRouterTest, ImplicitAbortIsNoopWithNoParticipants) { auto txnRouter = TransactionRouter::get(opCtx); txnRouter->beginOrContinueTxn(opCtx, txnNum, true); + txnRouter->setAtClusterTimeToLatestTime(operationContext()); // Should not throw. txnRouter->implicitlyAbortTransaction(opCtx); @@ -1269,6 +1230,7 @@ TEST_F(TransactionRouterTest, ImplicitAbortForSingleParticipant) { auto txnRouter = TransactionRouter::get(opCtx); txnRouter->beginOrContinueTxn(opCtx, txnNum, true); + txnRouter->setAtClusterTimeToLatestTime(operationContext()); txnRouter->attachTxnFieldsIfNeeded(shard1, {}); auto future = @@ -1301,6 +1263,7 @@ TEST_F(TransactionRouterTest, ImplicitAbortForMultipleParticipants) { auto txnRouter = TransactionRouter::get(opCtx); txnRouter->beginOrContinueTxn(opCtx, txnNum, true); + txnRouter->setAtClusterTimeToLatestTime(operationContext()); txnRouter->attachTxnFieldsIfNeeded(shard1, {}); txnRouter->attachTxnFieldsIfNeeded(shard2, {}); @@ -1346,6 +1309,7 @@ TEST_F(TransactionRouterTest, ImplicitAbortIgnoresErrors) { auto txnRouter = TransactionRouter::get(opCtx); txnRouter->beginOrContinueTxn(opCtx, txnNum, true); + txnRouter->setAtClusterTimeToLatestTime(operationContext()); txnRouter->attachTxnFieldsIfNeeded(shard1, {}); auto future = diff --git a/src/mongo/s/write_ops/SConscript b/src/mongo/s/write_ops/SConscript index 3de864d1f8e..43d38aecc7e 100644 --- a/src/mongo/s/write_ops/SConscript +++ b/src/mongo/s/write_ops/SConscript @@ -68,6 +68,7 @@ env.CppUnitTest( ], LIBDEPS=[ '$BUILD_DIR/mongo/db/auth/authmocks', + '$BUILD_DIR/mongo/db/logical_clock', '$BUILD_DIR/mongo/s/sharding_router_test_fixture', 'cluster_write_op', ] diff --git a/src/mongo/s/write_ops/batch_write_exec_test.cpp b/src/mongo/s/write_ops/batch_write_exec_test.cpp index 0dee3157e48..eb8a27b5e33 100644 --- a/src/mongo/s/write_ops/batch_write_exec_test.cpp +++ b/src/mongo/s/write_ops/batch_write_exec_test.cpp @@ -31,6 +31,7 @@ #include "mongo/client/remote_command_targeter_factory_mock.h" #include "mongo/client/remote_command_targeter_mock.h" #include "mongo/db/commands.h" +#include "mongo/db/logical_clock.h" #include "mongo/db/logical_session_id.h" #include "mongo/s/catalog/type_shard.h" #include "mongo/s/client/shard_registry.h" @@ -576,6 +577,7 @@ TEST_F(BatchWriteExecTest, NonRetryableErrorTxnNumber) { class BatchWriteExecTransactionTest : public BatchWriteExecTest { public: const TxnNumber kTxnNumber = 5; + const LogicalTime kInMemoryLogicalTime = LogicalTime(Timestamp(3, 1)); void setUp() override { BatchWriteExecTest::setUp(); @@ -587,9 +589,14 @@ public: _scopedSession.emplace(operationContext()); + auto logicalClock = stdx::make_unique<LogicalClock>(getServiceContext()); + logicalClock->setClusterTimeFromTrustedSource(kInMemoryLogicalTime); + LogicalClock::set(getServiceContext(), std::move(logicalClock)); + auto txnRouter = TransactionRouter::get(operationContext()); txnRouter->checkOut(); txnRouter->beginOrContinueTxn(operationContext(), kTxnNumber, true); + txnRouter->setAtClusterTimeToLatestTime(operationContext()); } void tearDown() override { |