diff options
author | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2017-10-25 11:42:44 -0400 |
---|---|---|
committer | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2017-10-25 12:28:32 -0400 |
commit | 8e69370f5f9b30f3097970c4743ae993c21ed0b3 (patch) | |
tree | 5aa73c7441465db40d1a77995b5e471a53d514d8 /src/mongo/db/s | |
parent | 23e886eebf9794190d198da98cd96e4127bb3dc8 (diff) | |
download | mongo-8e69370f5f9b30f3097970c4743ae993c21ed0b3.tar.gz |
SERVER-31678 Thread through the operation wall-clock time for oplog entries
Diffstat (limited to 'src/mongo/db/s')
-rw-r--r-- | src/mongo/db/s/session_catalog_migration_destination.cpp | 5 | ||||
-rw-r--r-- | src/mongo/db/s/session_catalog_migration_destination_test.cpp | 56 |
2 files changed, 56 insertions, 5 deletions
diff --git a/src/mongo/db/s/session_catalog_migration_destination.cpp b/src/mongo/db/s/session_catalog_migration_destination.cpp index cdf0f0a778d..461d53e40b2 100644 --- a/src/mongo/db/s/session_catalog_migration_destination.cpp +++ b/src/mongo/db/s/session_catalog_migration_destination.cpp @@ -242,6 +242,10 @@ ProcessOplogResult processSessionOplog(OperationContext* opCtx, result.txnNum = sessionInfo.getTxnNumber().value(); const auto stmtId = *oplogEntry.getStatementId(); + // Session oplog entries must always contain wall clock time, because we will not be + // transferring anything from a previous version of the server + invariant(oplogEntry.getWallClockTime()); + auto scopedSession = SessionCatalog::get(opCtx)->getOrCreateSession(opCtx, result.sessionId); scopedSession->beginTxn(opCtx, result.txnNum); @@ -285,6 +289,7 @@ ProcessOplogResult processSessionOplog(OperationContext* opCtx, object, &object2, true, + *oplogEntry.getWallClockTime(), sessionInfo, stmtId, oplogLink); diff --git a/src/mongo/db/s/session_catalog_migration_destination_test.cpp b/src/mongo/db/s/session_catalog_migration_destination_test.cpp index 45ef7d0350c..99ee8eae5d8 100644 --- a/src/mongo/db/s/session_catalog_migration_destination_test.cpp +++ b/src/mongo/db/s/session_catalog_migration_destination_test.cpp @@ -298,14 +298,17 @@ TEST_F(SessionCatalogMigrationDestinationTest, OplogEntriesWithSameTxn) { OpTime(Timestamp(100, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 100)); oplog1.setOperationSessionInfo(sessionInfo); oplog1.setStatementId(23); + oplog1.setWallClockTime(Date_t::now()); OplogEntry oplog2(OpTime(Timestamp(80, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 80)); oplog2.setOperationSessionInfo(sessionInfo); oplog2.setStatementId(45); + oplog2.setWallClockTime(Date_t::now()); OplogEntry oplog3(OpTime(Timestamp(60, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 60)); oplog3.setOperationSessionInfo(sessionInfo); oplog3.setStatementId(5); + oplog3.setWallClockTime(Date_t::now()); returnOplog({oplog1, oplog2, oplog3}); @@ -347,16 +350,19 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldOnlyStoreHistoryOfLatestTxn sessionInfo.setTxnNumber(txnNum++); oplog1.setOperationSessionInfo(sessionInfo); oplog1.setStatementId(23); + oplog1.setWallClockTime(Date_t::now()); OplogEntry oplog2(OpTime(Timestamp(80, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 80)); sessionInfo.setTxnNumber(txnNum++); oplog2.setOperationSessionInfo(sessionInfo); oplog2.setStatementId(45); + oplog2.setWallClockTime(Date_t::now()); OplogEntry oplog3(OpTime(Timestamp(60, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 60)); sessionInfo.setTxnNumber(txnNum); oplog3.setOperationSessionInfo(sessionInfo); oplog3.setStatementId(5); + oplog3.setWallClockTime(Date_t::now()); returnOplog({oplog1, oplog2, oplog3}); @@ -388,17 +394,19 @@ TEST_F(SessionCatalogMigrationDestinationTest, OplogEntriesWithSameTxnInSeparate OpTime(Timestamp(100, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 100)); oplog1.setOperationSessionInfo(sessionInfo); oplog1.setStatementId(23); + oplog1.setWallClockTime(Date_t::now()); OplogEntry oplog2(OpTime(Timestamp(80, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 80)); oplog2.setOperationSessionInfo(sessionInfo); oplog2.setStatementId(45); + oplog2.setWallClockTime(Date_t::now()); OplogEntry oplog3(OpTime(Timestamp(60, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 60)); oplog3.setOperationSessionInfo(sessionInfo); oplog3.setStatementId(5); + oplog3.setWallClockTime(Date_t::now()); // Return in 2 batches - returnOplog({oplog1, oplog2}); returnOplog({oplog3}); @@ -444,14 +452,17 @@ TEST_F(SessionCatalogMigrationDestinationTest, OplogEntriesWithDifferentSession) OpTime(Timestamp(100, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 100)); oplog1.setOperationSessionInfo(sessionInfo1); oplog1.setStatementId(23); + oplog1.setWallClockTime(Date_t::now()); OplogEntry oplog2(OpTime(Timestamp(80, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 80)); oplog2.setOperationSessionInfo(sessionInfo2); oplog2.setStatementId(45); + oplog2.setWallClockTime(Date_t::now()); OplogEntry oplog3(OpTime(Timestamp(60, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 60)); oplog3.setOperationSessionInfo(sessionInfo2); oplog3.setStatementId(5); + oplog3.setWallClockTime(Date_t::now()); returnOplog({oplog1, oplog2, oplog3}); @@ -518,6 +529,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldNotNestAlreadyNestedOplog) origInnerOplog1.toBSON()); oplog1.setOperationSessionInfo(sessionInfo); oplog1.setStatementId(23); + oplog1.setWallClockTime(Date_t::now()); OplogEntry oplog2(OpTime(Timestamp(1080, 2), 1), 0, @@ -528,6 +540,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldNotNestAlreadyNestedOplog) origInnerOplog2.toBSON()); oplog2.setOperationSessionInfo(sessionInfo); oplog2.setStatementId(45); + oplog2.setWallClockTime(Date_t::now()); returnOplog({oplog1, oplog2}); @@ -565,6 +578,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldBeAbleToHandlePreImageFindA OpTime(Timestamp(100, 2), 1), 0, OpTypeEnum::kNoop, kNs, 0, BSON("x" << 100)); preImageOplog.setOperationSessionInfo(sessionInfo); preImageOplog.setStatementId(45); + preImageOplog.setWallClockTime(Date_t::now()); OplogEntry updateOplog(OpTime(Timestamp(80, 2), 1), 0, @@ -575,6 +589,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldBeAbleToHandlePreImageFindA BSON("$set" << BSON("x" << 101))); updateOplog.setOperationSessionInfo(sessionInfo); updateOplog.setStatementId(45); + updateOplog.setWallClockTime(Date_t::now()); updateOplog.setPreImageOpTime(repl::OpTime(Timestamp(100, 2), 1)); returnOplog({preImageOplog, updateOplog}); @@ -652,6 +667,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldBeAbleToHandlePostImageFind OpTime(Timestamp(100, 2), 1), 0, OpTypeEnum::kNoop, kNs, 0, BSON("x" << 100)); postImageOplog.setOperationSessionInfo(sessionInfo); postImageOplog.setStatementId(45); + postImageOplog.setWallClockTime(Date_t::now()); OplogEntry updateOplog(OpTime(Timestamp(80, 2), 1), 0, @@ -662,6 +678,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldBeAbleToHandlePostImageFind BSON("$set" << BSON("x" << 101))); updateOplog.setOperationSessionInfo(sessionInfo); updateOplog.setStatementId(45); + updateOplog.setWallClockTime(Date_t::now()); updateOplog.setPostImageOpTime(repl::OpTime(Timestamp(100, 2), 1)); returnOplog({postImageOplog, updateOplog}); @@ -739,6 +756,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldBeAbleToHandleFindAndModify OpTime(Timestamp(100, 2), 1), 0, OpTypeEnum::kNoop, kNs, 0, BSON("x" << 100)); preImageOplog.setOperationSessionInfo(sessionInfo); preImageOplog.setStatementId(45); + preImageOplog.setWallClockTime(Date_t::now()); OplogEntry updateOplog(OpTime(Timestamp(80, 2), 1), 0, @@ -749,6 +767,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldBeAbleToHandleFindAndModify BSON("$set" << BSON("x" << 101))); updateOplog.setOperationSessionInfo(sessionInfo); updateOplog.setStatementId(45); + updateOplog.setWallClockTime(Date_t::now()); updateOplog.setPreImageOpTime(repl::OpTime(Timestamp(100, 2), 1)); returnOplog({preImageOplog}); @@ -841,10 +860,12 @@ TEST_F(SessionCatalogMigrationDestinationTest, OlderTxnShouldBeIgnored) { OpTime(Timestamp(100, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 100)); oplog1.setOperationSessionInfo(oldSessionInfo); oplog1.setStatementId(23); + oplog1.setWallClockTime(Date_t::now()); OplogEntry oplog2(OpTime(Timestamp(80, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 80)); oplog2.setOperationSessionInfo(oldSessionInfo); oplog2.setStatementId(45); + oplog2.setWallClockTime(Date_t::now()); returnOplog({oplog1, oplog2}); @@ -883,6 +904,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, NewerTxnWriteShouldNotBeOverwritt OpTime(Timestamp(100, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 100)); oplog1.setOperationSessionInfo(oldSessionInfo); oplog1.setStatementId(23); + oplog1.setWallClockTime(Date_t::now()); returnOplog({oplog1}); @@ -902,6 +924,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, NewerTxnWriteShouldNotBeOverwritt OplogEntry oplog2(OpTime(Timestamp(80, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 80)); oplog2.setOperationSessionInfo(oldSessionInfo); oplog2.setStatementId(45); + oplog2.setWallClockTime(Date_t::now()); returnOplog({oplog2}); @@ -982,6 +1005,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldJoinProperlyForResponseWith OpTime(Timestamp(100, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 100)); oplog.setOperationSessionInfo(sessionInfo); oplog.setStatementId(23); + oplog.setWallClockTime(Date_t::now()); returnOplog({oplog}); @@ -1004,6 +1028,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldJoinProperlyForResponseWith OpTime(Timestamp(100, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 100)); oplog.setOperationSessionInfo(sessionInfo); oplog.setStatementId(23); + oplog.setWallClockTime(Date_t::now()); returnOplog({oplog}); @@ -1027,6 +1052,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldJoinProperlyForResponseWith OplogEntry oplog( OpTime(Timestamp(100, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 100)); oplog.setOperationSessionInfo(sessionInfo); + oplog.setWallClockTime(Date_t::now()); returnOplog({oplog}); @@ -1053,6 +1079,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, OpTime(Timestamp(100, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 100)); oplog1.setOperationSessionInfo(sessionInfo); oplog1.setStatementId(23); + oplog1.setWallClockTime(Date_t::now()); returnOplog({oplog1}); @@ -1068,6 +1095,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, OplogEntry oplog2(OpTime(Timestamp(80, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 80)); oplog2.setOperationSessionInfo(sessionInfo); oplog2.setStatementId(45); + oplog2.setWallClockTime(Date_t::now()); returnOplog({oplog2}); @@ -1108,17 +1136,19 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldErrorForConsecutivePreImage sessionInfo.setSessionId(sessionId); sessionInfo.setTxnNumber(2); - OplogEntry preImageOplog( + OplogEntry preImageOplog1( OpTime(Timestamp(100, 2), 1), 0, OpTypeEnum::kNoop, kNs, 0, BSON("x" << 100)); - preImageOplog.setOperationSessionInfo(sessionInfo); - preImageOplog.setStatementId(45); + preImageOplog1.setOperationSessionInfo(sessionInfo); + preImageOplog1.setStatementId(45); + preImageOplog1.setWallClockTime(Date_t::now()); OplogEntry preImageOplog2( OpTime(Timestamp(100, 2), 1), 0, OpTypeEnum::kNoop, kNs, 0, BSON("x" << 100)); preImageOplog2.setOperationSessionInfo(sessionInfo); preImageOplog2.setStatementId(45); + preImageOplog2.setWallClockTime(Date_t::now()); - returnOplog({preImageOplog, preImageOplog2}); + returnOplog({preImageOplog1, preImageOplog2}); sessionMigration.join(); @@ -1145,6 +1175,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, OpTime(Timestamp(100, 2), 1), 0, OpTypeEnum::kNoop, kNs, 0, BSON("x" << 100)); preImageOplog.setOperationSessionInfo(sessionInfo); preImageOplog.setStatementId(45); + preImageOplog.setWallClockTime(Date_t::now()); OplogEntry updateOplog(OpTime(Timestamp(80, 2), 1), 0, @@ -1156,6 +1187,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, sessionInfo.setSessionId(makeLogicalSessionIdForTest()); updateOplog.setOperationSessionInfo(sessionInfo); updateOplog.setStatementId(45); + updateOplog.setWallClockTime(Date_t::now()); updateOplog.setPreImageOpTime(repl::OpTime(Timestamp(100, 2), 1)); returnOplog({preImageOplog, updateOplog}); @@ -1185,6 +1217,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldErrorForPreImageOplogWithNo OpTime(Timestamp(100, 2), 1), 0, OpTypeEnum::kNoop, kNs, 0, BSON("x" << 100)); preImageOplog.setOperationSessionInfo(sessionInfo); preImageOplog.setStatementId(45); + preImageOplog.setWallClockTime(Date_t::now()); OplogEntry updateOplog(OpTime(Timestamp(80, 2), 1), 0, @@ -1196,6 +1229,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldErrorForPreImageOplogWithNo sessionInfo.setTxnNumber(56); updateOplog.setOperationSessionInfo(sessionInfo); updateOplog.setStatementId(45); + updateOplog.setWallClockTime(Date_t::now()); updateOplog.setPreImageOpTime(repl::OpTime(Timestamp(100, 2), 1)); returnOplog({preImageOplog, updateOplog}); @@ -1226,6 +1260,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, OpTime(Timestamp(100, 2), 1), 0, OpTypeEnum::kNoop, kNs, 0, BSON("x" << 100)); preImageOplog.setOperationSessionInfo(sessionInfo); preImageOplog.setStatementId(45); + preImageOplog.setWallClockTime(Date_t::now()); OplogEntry updateOplog(OpTime(Timestamp(80, 2), 1), 0, @@ -1236,6 +1271,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, BSON("$set" << BSON("x" << 101))); updateOplog.setOperationSessionInfo(sessionInfo); updateOplog.setStatementId(45); + updateOplog.setWallClockTime(Date_t::now()); returnOplog({preImageOplog, updateOplog}); @@ -1265,6 +1301,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, OpTime(Timestamp(100, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 100)); oplog1.setOperationSessionInfo(sessionInfo); oplog1.setStatementId(23); + oplog1.setWallClockTime(Date_t::now()); OplogEntry updateOplog(OpTime(Timestamp(80, 2), 1), 0, @@ -1275,6 +1312,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, BSON("$set" << BSON("x" << 101))); updateOplog.setOperationSessionInfo(sessionInfo); updateOplog.setStatementId(45); + updateOplog.setWallClockTime(Date_t::now()); updateOplog.setPreImageOpTime(repl::OpTime(Timestamp(100, 2), 1)); returnOplog({oplog1, updateOplog}); @@ -1305,6 +1343,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, OpTime(Timestamp(100, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 100)); oplog1.setOperationSessionInfo(sessionInfo); oplog1.setStatementId(23); + oplog1.setWallClockTime(Date_t::now()); OplogEntry updateOplog(OpTime(Timestamp(80, 2), 1), 0, @@ -1315,6 +1354,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, BSON("$set" << BSON("x" << 101))); updateOplog.setOperationSessionInfo(sessionInfo); updateOplog.setStatementId(45); + updateOplog.setWallClockTime(Date_t::now()); updateOplog.setPostImageOpTime(repl::OpTime(Timestamp(100, 2), 1)); returnOplog({oplog1, updateOplog}); @@ -1344,14 +1384,17 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldIgnoreAlreadyExecutedStatem OpTime(Timestamp(60, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 100)); oplog1.setOperationSessionInfo(sessionInfo); oplog1.setStatementId(23); + oplog1.setWallClockTime(Date_t::now()); OplogEntry oplog2(OpTime(Timestamp(70, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 80)); oplog2.setOperationSessionInfo(sessionInfo); oplog2.setStatementId(30); + oplog2.setWallClockTime(Date_t::now()); OplogEntry oplog3(OpTime(Timestamp(80, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 80)); oplog3.setOperationSessionInfo(sessionInfo); oplog3.setStatementId(45); + oplog3.setWallClockTime(Date_t::now()); returnOplog({oplog1, oplog2, oplog3}); @@ -1395,15 +1438,18 @@ TEST_F(SessionCatalogMigrationDestinationTest, OplogEntriesWithIncompleteHistory OpTime(Timestamp(100, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 100)); oplog1.setOperationSessionInfo(sessionInfo); oplog1.setStatementId(23); + oplog1.setWallClockTime(Date_t::now()); OplogEntry oplog2( OpTime(Timestamp(80, 2), 1), 0, OpTypeEnum::kNoop, kNs, 0, {}, Session::kDeadEndSentinel); oplog2.setOperationSessionInfo(sessionInfo); oplog2.setStatementId(kIncompleteHistoryStmtId); + oplog2.setWallClockTime(Date_t::now()); OplogEntry oplog3(OpTime(Timestamp(60, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 60)); oplog3.setOperationSessionInfo(sessionInfo); oplog3.setStatementId(5); + oplog3.setWallClockTime(Date_t::now()); returnOplog({oplog1, oplog2, oplog3}); // migration always fetches at least twice to transition from committing to done. |