summaryrefslogtreecommitdiff
path: root/src/mongo/db/s
diff options
context:
space:
mode:
authorKaloian Manassiev <kaloian.manassiev@mongodb.com>2017-10-25 11:42:44 -0400
committerKaloian Manassiev <kaloian.manassiev@mongodb.com>2017-10-25 12:28:32 -0400
commit8e69370f5f9b30f3097970c4743ae993c21ed0b3 (patch)
tree5aa73c7441465db40d1a77995b5e471a53d514d8 /src/mongo/db/s
parent23e886eebf9794190d198da98cd96e4127bb3dc8 (diff)
downloadmongo-8e69370f5f9b30f3097970c4743ae993c21ed0b3.tar.gz
SERVER-31678 Thread through the operation wall-clock time for oplog entries
Diffstat (limited to 'src/mongo/db/s')
-rw-r--r--src/mongo/db/s/session_catalog_migration_destination.cpp5
-rw-r--r--src/mongo/db/s/session_catalog_migration_destination_test.cpp56
2 files changed, 56 insertions, 5 deletions
diff --git a/src/mongo/db/s/session_catalog_migration_destination.cpp b/src/mongo/db/s/session_catalog_migration_destination.cpp
index cdf0f0a778d..461d53e40b2 100644
--- a/src/mongo/db/s/session_catalog_migration_destination.cpp
+++ b/src/mongo/db/s/session_catalog_migration_destination.cpp
@@ -242,6 +242,10 @@ ProcessOplogResult processSessionOplog(OperationContext* opCtx,
result.txnNum = sessionInfo.getTxnNumber().value();
const auto stmtId = *oplogEntry.getStatementId();
+ // Session oplog entries must always contain wall clock time, because we will not be
+ // transferring anything from a previous version of the server
+ invariant(oplogEntry.getWallClockTime());
+
auto scopedSession = SessionCatalog::get(opCtx)->getOrCreateSession(opCtx, result.sessionId);
scopedSession->beginTxn(opCtx, result.txnNum);
@@ -285,6 +289,7 @@ ProcessOplogResult processSessionOplog(OperationContext* opCtx,
object,
&object2,
true,
+ *oplogEntry.getWallClockTime(),
sessionInfo,
stmtId,
oplogLink);
diff --git a/src/mongo/db/s/session_catalog_migration_destination_test.cpp b/src/mongo/db/s/session_catalog_migration_destination_test.cpp
index 45ef7d0350c..99ee8eae5d8 100644
--- a/src/mongo/db/s/session_catalog_migration_destination_test.cpp
+++ b/src/mongo/db/s/session_catalog_migration_destination_test.cpp
@@ -298,14 +298,17 @@ TEST_F(SessionCatalogMigrationDestinationTest, OplogEntriesWithSameTxn) {
OpTime(Timestamp(100, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 100));
oplog1.setOperationSessionInfo(sessionInfo);
oplog1.setStatementId(23);
+ oplog1.setWallClockTime(Date_t::now());
OplogEntry oplog2(OpTime(Timestamp(80, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 80));
oplog2.setOperationSessionInfo(sessionInfo);
oplog2.setStatementId(45);
+ oplog2.setWallClockTime(Date_t::now());
OplogEntry oplog3(OpTime(Timestamp(60, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 60));
oplog3.setOperationSessionInfo(sessionInfo);
oplog3.setStatementId(5);
+ oplog3.setWallClockTime(Date_t::now());
returnOplog({oplog1, oplog2, oplog3});
@@ -347,16 +350,19 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldOnlyStoreHistoryOfLatestTxn
sessionInfo.setTxnNumber(txnNum++);
oplog1.setOperationSessionInfo(sessionInfo);
oplog1.setStatementId(23);
+ oplog1.setWallClockTime(Date_t::now());
OplogEntry oplog2(OpTime(Timestamp(80, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 80));
sessionInfo.setTxnNumber(txnNum++);
oplog2.setOperationSessionInfo(sessionInfo);
oplog2.setStatementId(45);
+ oplog2.setWallClockTime(Date_t::now());
OplogEntry oplog3(OpTime(Timestamp(60, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 60));
sessionInfo.setTxnNumber(txnNum);
oplog3.setOperationSessionInfo(sessionInfo);
oplog3.setStatementId(5);
+ oplog3.setWallClockTime(Date_t::now());
returnOplog({oplog1, oplog2, oplog3});
@@ -388,17 +394,19 @@ TEST_F(SessionCatalogMigrationDestinationTest, OplogEntriesWithSameTxnInSeparate
OpTime(Timestamp(100, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 100));
oplog1.setOperationSessionInfo(sessionInfo);
oplog1.setStatementId(23);
+ oplog1.setWallClockTime(Date_t::now());
OplogEntry oplog2(OpTime(Timestamp(80, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 80));
oplog2.setOperationSessionInfo(sessionInfo);
oplog2.setStatementId(45);
+ oplog2.setWallClockTime(Date_t::now());
OplogEntry oplog3(OpTime(Timestamp(60, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 60));
oplog3.setOperationSessionInfo(sessionInfo);
oplog3.setStatementId(5);
+ oplog3.setWallClockTime(Date_t::now());
// Return in 2 batches
-
returnOplog({oplog1, oplog2});
returnOplog({oplog3});
@@ -444,14 +452,17 @@ TEST_F(SessionCatalogMigrationDestinationTest, OplogEntriesWithDifferentSession)
OpTime(Timestamp(100, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 100));
oplog1.setOperationSessionInfo(sessionInfo1);
oplog1.setStatementId(23);
+ oplog1.setWallClockTime(Date_t::now());
OplogEntry oplog2(OpTime(Timestamp(80, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 80));
oplog2.setOperationSessionInfo(sessionInfo2);
oplog2.setStatementId(45);
+ oplog2.setWallClockTime(Date_t::now());
OplogEntry oplog3(OpTime(Timestamp(60, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 60));
oplog3.setOperationSessionInfo(sessionInfo2);
oplog3.setStatementId(5);
+ oplog3.setWallClockTime(Date_t::now());
returnOplog({oplog1, oplog2, oplog3});
@@ -518,6 +529,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldNotNestAlreadyNestedOplog)
origInnerOplog1.toBSON());
oplog1.setOperationSessionInfo(sessionInfo);
oplog1.setStatementId(23);
+ oplog1.setWallClockTime(Date_t::now());
OplogEntry oplog2(OpTime(Timestamp(1080, 2), 1),
0,
@@ -528,6 +540,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldNotNestAlreadyNestedOplog)
origInnerOplog2.toBSON());
oplog2.setOperationSessionInfo(sessionInfo);
oplog2.setStatementId(45);
+ oplog2.setWallClockTime(Date_t::now());
returnOplog({oplog1, oplog2});
@@ -565,6 +578,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldBeAbleToHandlePreImageFindA
OpTime(Timestamp(100, 2), 1), 0, OpTypeEnum::kNoop, kNs, 0, BSON("x" << 100));
preImageOplog.setOperationSessionInfo(sessionInfo);
preImageOplog.setStatementId(45);
+ preImageOplog.setWallClockTime(Date_t::now());
OplogEntry updateOplog(OpTime(Timestamp(80, 2), 1),
0,
@@ -575,6 +589,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldBeAbleToHandlePreImageFindA
BSON("$set" << BSON("x" << 101)));
updateOplog.setOperationSessionInfo(sessionInfo);
updateOplog.setStatementId(45);
+ updateOplog.setWallClockTime(Date_t::now());
updateOplog.setPreImageOpTime(repl::OpTime(Timestamp(100, 2), 1));
returnOplog({preImageOplog, updateOplog});
@@ -652,6 +667,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldBeAbleToHandlePostImageFind
OpTime(Timestamp(100, 2), 1), 0, OpTypeEnum::kNoop, kNs, 0, BSON("x" << 100));
postImageOplog.setOperationSessionInfo(sessionInfo);
postImageOplog.setStatementId(45);
+ postImageOplog.setWallClockTime(Date_t::now());
OplogEntry updateOplog(OpTime(Timestamp(80, 2), 1),
0,
@@ -662,6 +678,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldBeAbleToHandlePostImageFind
BSON("$set" << BSON("x" << 101)));
updateOplog.setOperationSessionInfo(sessionInfo);
updateOplog.setStatementId(45);
+ updateOplog.setWallClockTime(Date_t::now());
updateOplog.setPostImageOpTime(repl::OpTime(Timestamp(100, 2), 1));
returnOplog({postImageOplog, updateOplog});
@@ -739,6 +756,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldBeAbleToHandleFindAndModify
OpTime(Timestamp(100, 2), 1), 0, OpTypeEnum::kNoop, kNs, 0, BSON("x" << 100));
preImageOplog.setOperationSessionInfo(sessionInfo);
preImageOplog.setStatementId(45);
+ preImageOplog.setWallClockTime(Date_t::now());
OplogEntry updateOplog(OpTime(Timestamp(80, 2), 1),
0,
@@ -749,6 +767,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldBeAbleToHandleFindAndModify
BSON("$set" << BSON("x" << 101)));
updateOplog.setOperationSessionInfo(sessionInfo);
updateOplog.setStatementId(45);
+ updateOplog.setWallClockTime(Date_t::now());
updateOplog.setPreImageOpTime(repl::OpTime(Timestamp(100, 2), 1));
returnOplog({preImageOplog});
@@ -841,10 +860,12 @@ TEST_F(SessionCatalogMigrationDestinationTest, OlderTxnShouldBeIgnored) {
OpTime(Timestamp(100, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 100));
oplog1.setOperationSessionInfo(oldSessionInfo);
oplog1.setStatementId(23);
+ oplog1.setWallClockTime(Date_t::now());
OplogEntry oplog2(OpTime(Timestamp(80, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 80));
oplog2.setOperationSessionInfo(oldSessionInfo);
oplog2.setStatementId(45);
+ oplog2.setWallClockTime(Date_t::now());
returnOplog({oplog1, oplog2});
@@ -883,6 +904,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, NewerTxnWriteShouldNotBeOverwritt
OpTime(Timestamp(100, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 100));
oplog1.setOperationSessionInfo(oldSessionInfo);
oplog1.setStatementId(23);
+ oplog1.setWallClockTime(Date_t::now());
returnOplog({oplog1});
@@ -902,6 +924,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, NewerTxnWriteShouldNotBeOverwritt
OplogEntry oplog2(OpTime(Timestamp(80, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 80));
oplog2.setOperationSessionInfo(oldSessionInfo);
oplog2.setStatementId(45);
+ oplog2.setWallClockTime(Date_t::now());
returnOplog({oplog2});
@@ -982,6 +1005,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldJoinProperlyForResponseWith
OpTime(Timestamp(100, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 100));
oplog.setOperationSessionInfo(sessionInfo);
oplog.setStatementId(23);
+ oplog.setWallClockTime(Date_t::now());
returnOplog({oplog});
@@ -1004,6 +1028,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldJoinProperlyForResponseWith
OpTime(Timestamp(100, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 100));
oplog.setOperationSessionInfo(sessionInfo);
oplog.setStatementId(23);
+ oplog.setWallClockTime(Date_t::now());
returnOplog({oplog});
@@ -1027,6 +1052,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldJoinProperlyForResponseWith
OplogEntry oplog(
OpTime(Timestamp(100, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 100));
oplog.setOperationSessionInfo(sessionInfo);
+ oplog.setWallClockTime(Date_t::now());
returnOplog({oplog});
@@ -1053,6 +1079,7 @@ TEST_F(SessionCatalogMigrationDestinationTest,
OpTime(Timestamp(100, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 100));
oplog1.setOperationSessionInfo(sessionInfo);
oplog1.setStatementId(23);
+ oplog1.setWallClockTime(Date_t::now());
returnOplog({oplog1});
@@ -1068,6 +1095,7 @@ TEST_F(SessionCatalogMigrationDestinationTest,
OplogEntry oplog2(OpTime(Timestamp(80, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 80));
oplog2.setOperationSessionInfo(sessionInfo);
oplog2.setStatementId(45);
+ oplog2.setWallClockTime(Date_t::now());
returnOplog({oplog2});
@@ -1108,17 +1136,19 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldErrorForConsecutivePreImage
sessionInfo.setSessionId(sessionId);
sessionInfo.setTxnNumber(2);
- OplogEntry preImageOplog(
+ OplogEntry preImageOplog1(
OpTime(Timestamp(100, 2), 1), 0, OpTypeEnum::kNoop, kNs, 0, BSON("x" << 100));
- preImageOplog.setOperationSessionInfo(sessionInfo);
- preImageOplog.setStatementId(45);
+ preImageOplog1.setOperationSessionInfo(sessionInfo);
+ preImageOplog1.setStatementId(45);
+ preImageOplog1.setWallClockTime(Date_t::now());
OplogEntry preImageOplog2(
OpTime(Timestamp(100, 2), 1), 0, OpTypeEnum::kNoop, kNs, 0, BSON("x" << 100));
preImageOplog2.setOperationSessionInfo(sessionInfo);
preImageOplog2.setStatementId(45);
+ preImageOplog2.setWallClockTime(Date_t::now());
- returnOplog({preImageOplog, preImageOplog2});
+ returnOplog({preImageOplog1, preImageOplog2});
sessionMigration.join();
@@ -1145,6 +1175,7 @@ TEST_F(SessionCatalogMigrationDestinationTest,
OpTime(Timestamp(100, 2), 1), 0, OpTypeEnum::kNoop, kNs, 0, BSON("x" << 100));
preImageOplog.setOperationSessionInfo(sessionInfo);
preImageOplog.setStatementId(45);
+ preImageOplog.setWallClockTime(Date_t::now());
OplogEntry updateOplog(OpTime(Timestamp(80, 2), 1),
0,
@@ -1156,6 +1187,7 @@ TEST_F(SessionCatalogMigrationDestinationTest,
sessionInfo.setSessionId(makeLogicalSessionIdForTest());
updateOplog.setOperationSessionInfo(sessionInfo);
updateOplog.setStatementId(45);
+ updateOplog.setWallClockTime(Date_t::now());
updateOplog.setPreImageOpTime(repl::OpTime(Timestamp(100, 2), 1));
returnOplog({preImageOplog, updateOplog});
@@ -1185,6 +1217,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldErrorForPreImageOplogWithNo
OpTime(Timestamp(100, 2), 1), 0, OpTypeEnum::kNoop, kNs, 0, BSON("x" << 100));
preImageOplog.setOperationSessionInfo(sessionInfo);
preImageOplog.setStatementId(45);
+ preImageOplog.setWallClockTime(Date_t::now());
OplogEntry updateOplog(OpTime(Timestamp(80, 2), 1),
0,
@@ -1196,6 +1229,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldErrorForPreImageOplogWithNo
sessionInfo.setTxnNumber(56);
updateOplog.setOperationSessionInfo(sessionInfo);
updateOplog.setStatementId(45);
+ updateOplog.setWallClockTime(Date_t::now());
updateOplog.setPreImageOpTime(repl::OpTime(Timestamp(100, 2), 1));
returnOplog({preImageOplog, updateOplog});
@@ -1226,6 +1260,7 @@ TEST_F(SessionCatalogMigrationDestinationTest,
OpTime(Timestamp(100, 2), 1), 0, OpTypeEnum::kNoop, kNs, 0, BSON("x" << 100));
preImageOplog.setOperationSessionInfo(sessionInfo);
preImageOplog.setStatementId(45);
+ preImageOplog.setWallClockTime(Date_t::now());
OplogEntry updateOplog(OpTime(Timestamp(80, 2), 1),
0,
@@ -1236,6 +1271,7 @@ TEST_F(SessionCatalogMigrationDestinationTest,
BSON("$set" << BSON("x" << 101)));
updateOplog.setOperationSessionInfo(sessionInfo);
updateOplog.setStatementId(45);
+ updateOplog.setWallClockTime(Date_t::now());
returnOplog({preImageOplog, updateOplog});
@@ -1265,6 +1301,7 @@ TEST_F(SessionCatalogMigrationDestinationTest,
OpTime(Timestamp(100, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 100));
oplog1.setOperationSessionInfo(sessionInfo);
oplog1.setStatementId(23);
+ oplog1.setWallClockTime(Date_t::now());
OplogEntry updateOplog(OpTime(Timestamp(80, 2), 1),
0,
@@ -1275,6 +1312,7 @@ TEST_F(SessionCatalogMigrationDestinationTest,
BSON("$set" << BSON("x" << 101)));
updateOplog.setOperationSessionInfo(sessionInfo);
updateOplog.setStatementId(45);
+ updateOplog.setWallClockTime(Date_t::now());
updateOplog.setPreImageOpTime(repl::OpTime(Timestamp(100, 2), 1));
returnOplog({oplog1, updateOplog});
@@ -1305,6 +1343,7 @@ TEST_F(SessionCatalogMigrationDestinationTest,
OpTime(Timestamp(100, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 100));
oplog1.setOperationSessionInfo(sessionInfo);
oplog1.setStatementId(23);
+ oplog1.setWallClockTime(Date_t::now());
OplogEntry updateOplog(OpTime(Timestamp(80, 2), 1),
0,
@@ -1315,6 +1354,7 @@ TEST_F(SessionCatalogMigrationDestinationTest,
BSON("$set" << BSON("x" << 101)));
updateOplog.setOperationSessionInfo(sessionInfo);
updateOplog.setStatementId(45);
+ updateOplog.setWallClockTime(Date_t::now());
updateOplog.setPostImageOpTime(repl::OpTime(Timestamp(100, 2), 1));
returnOplog({oplog1, updateOplog});
@@ -1344,14 +1384,17 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldIgnoreAlreadyExecutedStatem
OpTime(Timestamp(60, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 100));
oplog1.setOperationSessionInfo(sessionInfo);
oplog1.setStatementId(23);
+ oplog1.setWallClockTime(Date_t::now());
OplogEntry oplog2(OpTime(Timestamp(70, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 80));
oplog2.setOperationSessionInfo(sessionInfo);
oplog2.setStatementId(30);
+ oplog2.setWallClockTime(Date_t::now());
OplogEntry oplog3(OpTime(Timestamp(80, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 80));
oplog3.setOperationSessionInfo(sessionInfo);
oplog3.setStatementId(45);
+ oplog3.setWallClockTime(Date_t::now());
returnOplog({oplog1, oplog2, oplog3});
@@ -1395,15 +1438,18 @@ TEST_F(SessionCatalogMigrationDestinationTest, OplogEntriesWithIncompleteHistory
OpTime(Timestamp(100, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 100));
oplog1.setOperationSessionInfo(sessionInfo);
oplog1.setStatementId(23);
+ oplog1.setWallClockTime(Date_t::now());
OplogEntry oplog2(
OpTime(Timestamp(80, 2), 1), 0, OpTypeEnum::kNoop, kNs, 0, {}, Session::kDeadEndSentinel);
oplog2.setOperationSessionInfo(sessionInfo);
oplog2.setStatementId(kIncompleteHistoryStmtId);
+ oplog2.setWallClockTime(Date_t::now());
OplogEntry oplog3(OpTime(Timestamp(60, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 60));
oplog3.setOperationSessionInfo(sessionInfo);
oplog3.setStatementId(5);
+ oplog3.setWallClockTime(Date_t::now());
returnOplog({oplog1, oplog2, oplog3});
// migration always fetches at least twice to transition from committing to done.