summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/op_observer_impl.cpp31
-rw-r--r--src/mongo/db/repl/idempotency_test_fixture.cpp13
-rw-r--r--src/mongo/db/repl/sync_tail.cpp2
-rw-r--r--src/mongo/db/s/session_catalog_migration_destination.cpp88
-rw-r--r--src/mongo/db/s/session_catalog_migration_source_test.cpp29
-rw-r--r--src/mongo/db/session.cpp23
-rw-r--r--src/mongo/db/session.h6
-rw-r--r--src/mongo/db/session_test.cpp31
-rw-r--r--src/mongo/db/session_txn_record.idl5
-rw-r--r--src/mongo/db/transaction_reaper.cpp10
10 files changed, 149 insertions, 89 deletions
diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp
index 5621f9da8a2..6da9b7f9599 100644
--- a/src/mongo/db/op_observer_impl.cpp
+++ b/src/mongo/db/op_observer_impl.cpp
@@ -77,13 +77,17 @@ void onWriteOpCompleted(OperationContext* opCtx,
const NamespaceString& nss,
Session* session,
std::vector<StmtId> stmtIdsWritten,
- const repl::OpTime& lastStmtIdWriteOpTime) {
+ const repl::OpTime& lastStmtIdWriteOpTime,
+ Date_t lastStmtIdWriteDate) {
if (lastStmtIdWriteOpTime.isNull())
return;
if (session) {
- session->onWriteOpCompletedOnPrimary(
- opCtx, *opCtx->getTxnNumber(), std::move(stmtIdsWritten), lastStmtIdWriteOpTime);
+ session->onWriteOpCompletedOnPrimary(opCtx,
+ *opCtx->getTxnNumber(),
+ std::move(stmtIdsWritten),
+ lastStmtIdWriteOpTime,
+ lastStmtIdWriteDate);
}
}
@@ -304,8 +308,10 @@ void OpObserverImpl::onInserts(OperationContext* opCtx,
bool fromMigrate) {
Session* const session = opCtx->getTxnNumber() ? OperationContextSession::get(opCtx) : nullptr;
- const auto opTimeList = repl::logInsertOps(
- opCtx, nss, uuid, session, begin, end, fromMigrate, getWallClockTimeForOpLog(opCtx));
+ const auto lastWriteDate = getWallClockTimeForOpLog(opCtx);
+
+ const auto opTimeList =
+ repl::logInsertOps(opCtx, nss, uuid, session, begin, end, fromMigrate, lastWriteDate);
auto css = CollectionShardingState::get(opCtx, nss.ns());
@@ -319,7 +325,7 @@ void OpObserverImpl::onInserts(OperationContext* opCtx,
}
}
- auto lastOpTime = opTimeList.empty() ? repl::OpTime() : opTimeList.back();
+ const auto lastOpTime = opTimeList.empty() ? repl::OpTime() : opTimeList.back();
if (nss.coll() == "system.js") {
Scope::storedFuncMod(opCtx);
} else if (nss.coll() == DurableViewCatalog::viewsCollectionName()) {
@@ -339,7 +345,7 @@ void OpObserverImpl::onInserts(OperationContext* opCtx,
return stmt.stmtId;
});
- onWriteOpCompleted(opCtx, nss, session, stmtIdsWritten, lastOpTime);
+ onWriteOpCompleted(opCtx, nss, session, stmtIdsWritten, lastOpTime, lastWriteDate);
}
void OpObserverImpl::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArgs& args) {
@@ -389,8 +395,12 @@ void OpObserverImpl::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArg
SessionCatalog::get(opCtx)->invalidateSessions(opCtx, args.updatedDoc);
}
- onWriteOpCompleted(
- opCtx, args.nss, session, std::vector<StmtId>{args.stmtId}, opTime.writeOpTime);
+ onWriteOpCompleted(opCtx,
+ args.nss,
+ session,
+ std::vector<StmtId>{args.stmtId},
+ opTime.writeOpTime,
+ opTime.wallClockTime);
}
auto OpObserverImpl::aboutToDelete(OperationContext* opCtx,
@@ -434,7 +444,8 @@ void OpObserverImpl::onDelete(OperationContext* opCtx,
SessionCatalog::get(opCtx)->invalidateSessions(opCtx, deleteState.documentKey);
}
- onWriteOpCompleted(opCtx, nss, session, std::vector<StmtId>{stmtId}, opTime.writeOpTime);
+ onWriteOpCompleted(
+ opCtx, nss, session, std::vector<StmtId>{stmtId}, opTime.writeOpTime, opTime.wallClockTime);
}
void OpObserverImpl::onInternalOpMessage(OperationContext* opCtx,
diff --git a/src/mongo/db/repl/idempotency_test_fixture.cpp b/src/mongo/db/repl/idempotency_test_fixture.cpp
index 20b7d2d9ce0..3c3cc06a72f 100644
--- a/src/mongo/db/repl/idempotency_test_fixture.cpp
+++ b/src/mongo/db/repl/idempotency_test_fixture.cpp
@@ -191,7 +191,9 @@ OplogEntry makeCreateCollectionOplogEntry(OpTime opTime,
OplogEntry makeInsertDocumentOplogEntry(OpTime opTime,
const NamespaceString& nss,
const BSONObj& documentToInsert) {
- return OplogEntry(opTime, 1LL, OpTypeEnum::kInsert, nss, documentToInsert);
+ auto oplogEntry = OplogEntry(opTime, 1LL, OpTypeEnum::kInsert, nss, documentToInsert);
+ oplogEntry.setWallClockTime(Date_t::now());
+ return oplogEntry;
}
/**
@@ -200,7 +202,9 @@ OplogEntry makeInsertDocumentOplogEntry(OpTime opTime,
OplogEntry makeDeleteDocumentOplogEntry(OpTime opTime,
const NamespaceString& nss,
const BSONObj& documentToDelete) {
- return OplogEntry(opTime, 1LL, OpTypeEnum::kDelete, nss, documentToDelete);
+ auto oplogEntry = OplogEntry(opTime, 1LL, OpTypeEnum::kDelete, nss, documentToDelete);
+ oplogEntry.setWallClockTime(Date_t::now());
+ return oplogEntry;
}
/**
@@ -210,7 +214,10 @@ OplogEntry makeUpdateDocumentOplogEntry(OpTime opTime,
const NamespaceString& nss,
const BSONObj& documentToUpdate,
const BSONObj& updatedDocument) {
- return OplogEntry(opTime, 1LL, OpTypeEnum::kUpdate, nss, updatedDocument, documentToUpdate);
+ auto oplogEntry =
+ OplogEntry(opTime, 1LL, OpTypeEnum::kUpdate, nss, updatedDocument, documentToUpdate);
+ oplogEntry.setWallClockTime(Date_t::now());
+ return oplogEntry;
}
/**
diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp
index 25295eb133d..4cde3fec5c5 100644
--- a/src/mongo/db/repl/sync_tail.cpp
+++ b/src/mongo/db/repl/sync_tail.cpp
@@ -666,6 +666,8 @@ void fillWriterVectorsAndLastestSessionRecords(
record.setSessionId(lsid);
record.setTxnNum(*sessionInfo.getTxnNumber());
record.setLastWriteOpTime(op.getOpTime());
+ invariant(op.getWallClockTime());
+ record.setLastWriteDate(*op.getWallClockTime());
auto it = latestSessionRecords->find(lsid);
if (it == latestSessionRecords->end()) {
diff --git a/src/mongo/db/s/session_catalog_migration_destination.cpp b/src/mongo/db/s/session_catalog_migration_destination.cpp
index 461d53e40b2..1df0fd145eb 100644
--- a/src/mongo/db/s/session_catalog_migration_destination.cpp
+++ b/src/mongo/db/s/session_catalog_migration_destination.cpp
@@ -267,51 +267,49 @@ ProcessOplogResult processSessionOplog(OperationContext* opCtx,
auto oplogLink = extractPrePostImageTs(lastResult, oplogEntry);
oplogLink.prevOpTime = scopedSession->getLastWriteOpTime(result.txnNum);
- writeConflictRetry(opCtx,
- "SessionOplogMigration",
- NamespaceString::kSessionTransactionsTableNamespace.ns(),
- [&] {
- // Need to take global lock here so repl::logOp will not unlock it and
- // trigger the invariant that disallows unlocking global lock while
- // inside a WUOW. Grab a DBLock here instead of plain GlobalLock to make
- // sure the MMAPV1 flush lock will be lock/unlocked correctly. Take the
- // transaction table db lock to ensure the same lock ordering with normal
- // replicated updates to the table.
- Lock::DBLock lk(opCtx,
- NamespaceString::kSessionTransactionsTableNamespace.db(),
- MODE_IX);
- WriteUnitOfWork wunit(opCtx);
-
- result.oplogTime = repl::logOp(opCtx,
- "n",
- oplogEntry.getNamespace(),
- oplogEntry.getUuid(),
- object,
- &object2,
- true,
- *oplogEntry.getWallClockTime(),
- sessionInfo,
- stmtId,
- oplogLink);
-
- auto oplogOpTime = result.oplogTime;
- uassert(40633,
- str::stream()
- << "Failed to create new oplog entry for oplog with opTime: "
- << oplogEntry.getOpTime().toString()
- << ": "
- << redact(oplogBSON),
- !oplogOpTime.isNull());
-
- // Do not call onWriteOpCompletedOnPrimary if we inserted a pre/post
- // image, because the next oplog will contain the real operation.
- if (!result.isPrePostImage) {
- scopedSession->onWriteOpCompletedOnPrimary(
- opCtx, result.txnNum, {stmtId}, oplogOpTime);
- }
-
- wunit.commit();
- });
+ writeConflictRetry(
+ opCtx,
+ "SessionOplogMigration",
+ NamespaceString::kSessionTransactionsTableNamespace.ns(),
+ [&] {
+ // Need to take global lock here so repl::logOp will not unlock it and trigger the
+ // invariant that disallows unlocking global lock while inside a WUOW. Grab a DBLock
+ // here instead of plain GlobalLock to make sure the MMAPV1 flush lock will be
+ // lock/unlocked correctly. Take the transaction table db lock to ensure the same lock
+ // ordering with normal replicated updates to the table.
+ Lock::DBLock lk(
+ opCtx, NamespaceString::kSessionTransactionsTableNamespace.db(), MODE_IX);
+ WriteUnitOfWork wunit(opCtx);
+
+ result.oplogTime = repl::logOp(opCtx,
+ "n",
+ oplogEntry.getNamespace(),
+ oplogEntry.getUuid(),
+ object,
+ &object2,
+ true,
+ *oplogEntry.getWallClockTime(),
+ sessionInfo,
+ stmtId,
+ oplogLink);
+
+ auto oplogOpTime = result.oplogTime;
+ uassert(40633,
+ str::stream() << "Failed to create new oplog entry for oplog with opTime: "
+ << oplogEntry.getOpTime().toString()
+ << ": "
+ << redact(oplogBSON),
+ !oplogOpTime.isNull());
+
+ // Do not call onWriteOpCompletedOnPrimary if we inserted a pre/post
+ // image, because the next oplog will contain the real operation.
+ if (!result.isPrePostImage) {
+ scopedSession->onWriteOpCompletedOnPrimary(
+ opCtx, result.txnNum, {stmtId}, oplogOpTime, *oplogEntry.getWallClockTime());
+ }
+
+ wunit.commit();
+ });
return result;
}
diff --git a/src/mongo/db/s/session_catalog_migration_source_test.cpp b/src/mongo/db/s/session_catalog_migration_source_test.cpp
index 2bd48ce08dc..acf543d7d05 100644
--- a/src/mongo/db/s/session_catalog_migration_source_test.cpp
+++ b/src/mongo/db/s/session_catalog_migration_source_test.cpp
@@ -61,18 +61,21 @@ TEST_F(SessionCatalogMigrationSourceTest, OneSessionWithTwoWrites) {
repl::OpTime(Timestamp(52, 345), 2), 0, repl::OpTypeEnum::kInsert, kNs, BSON("x" << 30));
entry1.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(0, 0), 0));
entry1.setStatementId(0);
+ entry1.setWallClockTime(Date_t::now());
insertOplogEntry(entry1);
repl::OplogEntry entry2(
repl::OpTime(Timestamp(67, 54801), 2), 0, repl::OpTypeEnum::kInsert, kNs, BSON("y" << 50));
entry2.setPrevWriteOpTimeInTransaction(entry1.getOpTime());
entry2.setStatementId(1);
+ entry2.setWallClockTime(Date_t::now());
insertOplogEntry(entry2);
SessionTxnRecord sessionRecord;
sessionRecord.setSessionId(makeLogicalSessionIdForTest());
sessionRecord.setTxnNum(1);
sessionRecord.setLastWriteOpTime(entry2.getOpTime());
+ sessionRecord.setLastWriteDate(*entry2.getWallClockTime());
DBDirectClient client(opCtx());
client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), sessionRecord.toBSON());
@@ -109,16 +112,19 @@ TEST_F(SessionCatalogMigrationSourceTest, TwoSessionWithTwoWrites) {
repl::OpTime(Timestamp(52, 345), 2), 0, repl::OpTypeEnum::kInsert, kNs, BSON("x" << 30));
entry1a.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(0, 0), 0));
entry1a.setStatementId(0);
+ entry1a.setWallClockTime(Date_t::now());
repl::OplogEntry entry1b(
repl::OpTime(Timestamp(67, 54801), 2), 0, repl::OpTypeEnum::kInsert, kNs, BSON("y" << 50));
entry1b.setStatementId(1);
+ entry1b.setWallClockTime(Date_t::now());
entry1b.setPrevWriteOpTimeInTransaction(entry1a.getOpTime());
SessionTxnRecord sessionRecord1;
sessionRecord1.setSessionId(makeLogicalSessionIdForTest());
sessionRecord1.setTxnNum(1);
sessionRecord1.setLastWriteOpTime(entry1b.getOpTime());
+ sessionRecord1.setLastWriteDate(*entry1b.getWallClockTime());
DBDirectClient client(opCtx());
client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(),
@@ -128,16 +134,19 @@ TEST_F(SessionCatalogMigrationSourceTest, TwoSessionWithTwoWrites) {
repl::OpTime(Timestamp(43, 12), 2), 0, repl::OpTypeEnum::kDelete, kNs, BSON("x" << 30));
entry2a.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(0, 0), 0));
entry2a.setStatementId(3);
+ entry2a.setWallClockTime(Date_t::now());
repl::OplogEntry entry2b(
repl::OpTime(Timestamp(789, 13), 2), 0, repl::OpTypeEnum::kDelete, kNs, BSON("y" << 50));
entry2b.setPrevWriteOpTimeInTransaction(entry2a.getOpTime());
entry2b.setStatementId(4);
+ entry2b.setWallClockTime(Date_t::now());
SessionTxnRecord sessionRecord2;
sessionRecord2.setSessionId(makeLogicalSessionIdForTest());
sessionRecord2.setTxnNum(1);
sessionRecord2.setLastWriteOpTime(entry2b.getOpTime());
+ sessionRecord2.setLastWriteDate(*entry2b.getWallClockTime());
client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(),
sessionRecord2.toBSON());
@@ -198,6 +207,7 @@ TEST_F(SessionCatalogMigrationSourceTest, OneSessionWithFindAndModifyPreImageAnd
repl::OpTime(Timestamp(52, 345), 2), 0, repl::OpTypeEnum::kNoop, kNs, BSON("x" << 30));
entry1.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(0, 0), 0));
entry1.setStatementId(0);
+ entry1.setWallClockTime(Date_t::now());
insertOplogEntry(entry1);
repl::OplogEntry entry2(
@@ -205,12 +215,14 @@ TEST_F(SessionCatalogMigrationSourceTest, OneSessionWithFindAndModifyPreImageAnd
entry2.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(0, 0), 0));
entry2.setPreImageOpTime(entry1.getOpTime());
entry2.setStatementId(1);
+ entry2.setWallClockTime(Date_t::now());
insertOplogEntry(entry2);
repl::OplogEntry entry3(
repl::OpTime(Timestamp(73, 5), 2), 0, repl::OpTypeEnum::kNoop, kNs, BSON("x" << 20));
entry3.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(0, 0), 0));
entry3.setStatementId(2);
+ entry3.setWallClockTime(Date_t::now());
insertOplogEntry(entry3);
repl::OplogEntry entry4(repl::OpTime(Timestamp(73, 6), 2),
@@ -222,12 +234,14 @@ TEST_F(SessionCatalogMigrationSourceTest, OneSessionWithFindAndModifyPreImageAnd
entry4.setPrevWriteOpTimeInTransaction(entry2.getOpTime());
entry4.setPostImageOpTime(entry3.getOpTime());
entry4.setStatementId(3);
+ entry4.setWallClockTime(Date_t::now());
insertOplogEntry(entry4);
SessionTxnRecord sessionRecord;
sessionRecord.setSessionId(makeLogicalSessionIdForTest());
sessionRecord.setTxnNum(1);
sessionRecord.setLastWriteOpTime(entry4.getOpTime());
+ sessionRecord.setLastWriteDate(*entry4.getWallClockTime());
DBDirectClient client(opCtx());
client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), sessionRecord.toBSON());
@@ -257,12 +271,14 @@ TEST_F(SessionCatalogMigrationSourceTest, OplogWithOtherNsShouldBeIgnored) {
repl::OpTime(Timestamp(52, 345), 2), 0, repl::OpTypeEnum::kInsert, kNs, BSON("x" << 30));
entry1.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(0, 0), 0));
entry1.setStatementId(0);
+ entry1.setWallClockTime(Date_t::now());
insertOplogEntry(entry1);
SessionTxnRecord sessionRecord1;
sessionRecord1.setSessionId(makeLogicalSessionIdForTest());
sessionRecord1.setTxnNum(1);
sessionRecord1.setLastWriteOpTime(entry1.getOpTime());
+ sessionRecord1.setLastWriteDate(*entry1.getWallClockTime());
DBDirectClient client(opCtx());
client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(),
@@ -276,12 +292,14 @@ TEST_F(SessionCatalogMigrationSourceTest, OplogWithOtherNsShouldBeIgnored) {
BSON("x" << 30));
entry2.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(0, 0), 0));
entry2.setStatementId(1);
+ entry2.setWallClockTime(Date_t::now());
insertOplogEntry(entry2);
SessionTxnRecord sessionRecord2;
sessionRecord2.setSessionId(makeLogicalSessionIdForTest());
sessionRecord2.setTxnNum(1);
sessionRecord2.setLastWriteOpTime(entry2.getOpTime());
+ sessionRecord2.setLastWriteDate(*entry2.getWallClockTime());
client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(),
sessionRecord2.toBSON());
@@ -307,12 +325,14 @@ TEST_F(SessionCatalogMigrationSourceTest, SessionDumpWithMultipleNewWrites) {
repl::OpTime(Timestamp(52, 345), 2), 0, repl::OpTypeEnum::kInsert, kNs, BSON("x" << 30));
entry1.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(0, 0), 0));
entry1.setStatementId(0);
+ entry1.setWallClockTime(Date_t::now());
insertOplogEntry(entry1);
SessionTxnRecord sessionRecord1;
sessionRecord1.setSessionId(makeLogicalSessionIdForTest());
sessionRecord1.setTxnNum(1);
sessionRecord1.setLastWriteOpTime(entry1.getOpTime());
+ sessionRecord1.setLastWriteDate(*entry1.getWallClockTime());
DBDirectClient client(opCtx());
client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(),
@@ -322,12 +342,14 @@ TEST_F(SessionCatalogMigrationSourceTest, SessionDumpWithMultipleNewWrites) {
repl::OpTime(Timestamp(53, 12), 2), 0, repl::OpTypeEnum::kDelete, kNs, BSON("x" << 30));
entry2.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(0, 0), 0));
entry2.setStatementId(1);
+ entry2.setWallClockTime(Date_t::now());
insertOplogEntry(entry2);
repl::OplogEntry entry3(
repl::OpTime(Timestamp(55, 12), 2), 0, repl::OpTypeEnum::kInsert, kNs, BSON("z" << 40));
entry3.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(0, 0), 0));
entry3.setStatementId(2);
+ entry3.setWallClockTime(Date_t::now());
insertOplogEntry(entry3);
SessionCatalogMigrationSource migrationSource(kNs);
@@ -392,6 +414,7 @@ TEST_F(SessionCatalogMigrationSourceTest, ShouldBeAbleInsertNewWritesAfterBuffer
BSON("x" << 30));
entry.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(0, 0), 0));
entry.setStatementId(0);
+ entry.setWallClockTime(Date_t::now());
insertOplogEntry(entry);
migrationSource.notifyNewWriteOpTime(entry.getOpTime());
@@ -412,6 +435,7 @@ TEST_F(SessionCatalogMigrationSourceTest, ShouldBeAbleInsertNewWritesAfterBuffer
repl::OpTime(Timestamp(53, 12), 2), 0, repl::OpTypeEnum::kDelete, kNs, BSON("x" << 30));
entry.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(0, 0), 0));
entry.setStatementId(1);
+ entry.setWallClockTime(Date_t::now());
insertOplogEntry(entry);
migrationSource.notifyNewWriteOpTime(entry.getOpTime());
@@ -453,6 +477,7 @@ TEST_F(SessionCatalogMigrationSourceTest, ReturnsDeadEndSentinelForIncompleteHis
repl::OpTime(Timestamp(52, 345), 2), 0, repl::OpTypeEnum::kInsert, kNs, BSON("x" << 30));
entry.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(40, 1), 2));
entry.setStatementId(0);
+ entry.setWallClockTime(Date_t::now());
insertOplogEntry(entry);
const auto sessionId = makeLogicalSessionIdForTest();
@@ -461,6 +486,7 @@ TEST_F(SessionCatalogMigrationSourceTest, ReturnsDeadEndSentinelForIncompleteHis
sessionRecord.setSessionId(sessionId);
sessionRecord.setTxnNum(31);
sessionRecord.setLastWriteOpTime(entry.getOpTime());
+ sessionRecord.setLastWriteDate(*entry.getWallClockTime());
DBDirectClient client(opCtx());
client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), sessionRecord.toBSON());
@@ -507,6 +533,7 @@ TEST_F(SessionCatalogMigrationSourceTest, ShouldAssertWhenRollbackDetected) {
repl::OpTime(Timestamp(52, 345), 2), 0, repl::OpTypeEnum::kInsert, kNs, BSON("x" << 30));
entry.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(40, 1), 2));
entry.setStatementId(0);
+ entry.setWallClockTime(Date_t::now());
insertOplogEntry(entry);
const auto sessionId = makeLogicalSessionIdForTest();
@@ -515,6 +542,7 @@ TEST_F(SessionCatalogMigrationSourceTest, ShouldAssertWhenRollbackDetected) {
sessionRecord.setSessionId(sessionId);
sessionRecord.setTxnNum(31);
sessionRecord.setLastWriteOpTime(entry.getOpTime());
+ sessionRecord.setLastWriteDate(*entry.getWallClockTime());
DBDirectClient client(opCtx());
client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), sessionRecord.toBSON());
@@ -538,5 +566,4 @@ TEST_F(SessionCatalogMigrationSourceTest, ShouldAssertWhenRollbackDetected) {
}
} // namespace
-
} // namespace mongo
diff --git a/src/mongo/db/session.cpp b/src/mongo/db/session.cpp
index 8acfb4cb2d0..d4ecfc469bc 100644
--- a/src/mongo/db/session.cpp
+++ b/src/mongo/db/session.cpp
@@ -191,7 +191,8 @@ void Session::beginTxn(OperationContext* opCtx, TxnNumber txnNumber) {
void Session::onWriteOpCompletedOnPrimary(OperationContext* opCtx,
TxnNumber txnNumber,
std::vector<StmtId> stmtIdsWritten,
- const repl::OpTime& lastStmtIdWriteOpTime) {
+ const repl::OpTime& lastStmtIdWriteOpTime,
+ Date_t lastStmtIdWriteDate) {
invariant(opCtx->lockState()->inAWriteUnitOfWork());
stdx::unique_lock<stdx::mutex> ul(_mutex);
@@ -205,7 +206,8 @@ void Session::onWriteOpCompletedOnPrimary(OperationContext* opCtx,
}
}
- const auto updateRequest = _makeUpdateRequest(ul, txnNumber, lastStmtIdWriteOpTime);
+ const auto updateRequest =
+ _makeUpdateRequest(ul, txnNumber, lastStmtIdWriteOpTime, lastStmtIdWriteDate);
ul.unlock();
@@ -223,10 +225,10 @@ void Session::updateSessionRecordOnSecondary(OperationContext* opCtx,
writeConflictRetry(
opCtx, "Update session txn", NamespaceString::kSessionTransactionsTableNamespace.ns(), [&] {
UpdateRequest updateRequest(NamespaceString::kSessionTransactionsTableNamespace);
- updateRequest.setUpsert(true);
updateRequest.setQuery(BSON(SessionTxnRecord::kSessionIdFieldName
<< sessionTxnRecord.getSessionId().toBSON()));
updateRequest.setUpdates(sessionTxnRecord.toBSON());
+ updateRequest.setUpsert(true);
repl::UnreplicatedWritesBlock doNotReplicateWrites(opCtx);
@@ -354,21 +356,30 @@ boost::optional<repl::OpTime> Session::_checkStatementExecuted(WithLock wl,
UpdateRequest Session::_makeUpdateRequest(WithLock,
TxnNumber newTxnNumber,
- const repl::OpTime& newLastWriteOpTime) const {
+ const repl::OpTime& newLastWriteOpTime,
+ Date_t newLastWriteDate) const {
UpdateRequest updateRequest(NamespaceString::kSessionTransactionsTableNamespace);
if (_lastWrittenSessionRecord) {
- updateRequest.setQuery(_lastWrittenSessionRecord->toBSON());
+ updateRequest.setQuery(BSON(SessionTxnRecord::kSessionIdFieldName
+ << _sessionId.toBSON()
+ << SessionTxnRecord::kTxnNumFieldName
+ << _lastWrittenSessionRecord->getTxnNum()
+ << SessionTxnRecord::kLastWriteOpTimeFieldName
+ << _lastWrittenSessionRecord->getLastWriteOpTime()));
updateRequest.setUpdates(BSON("$set" << BSON(SessionTxnRecord::kTxnNumFieldName
<< newTxnNumber
<< SessionTxnRecord::kLastWriteOpTimeFieldName
- << newLastWriteOpTime)));
+ << newLastWriteOpTime
+ << SessionTxnRecord::kLastWriteDateFieldName
+ << newLastWriteDate)));
} else {
const auto updateBSON = [&] {
SessionTxnRecord newTxnRecord;
newTxnRecord.setSessionId(_sessionId);
newTxnRecord.setTxnNum(newTxnNumber);
newTxnRecord.setLastWriteOpTime(newLastWriteOpTime);
+ newTxnRecord.setLastWriteDate(newLastWriteDate);
return newTxnRecord.toBSON();
}();
diff --git a/src/mongo/db/session.h b/src/mongo/db/session.h
index f627ca8468d..c1b6c95b711 100644
--- a/src/mongo/db/session.h
+++ b/src/mongo/db/session.h
@@ -98,7 +98,8 @@ public:
void onWriteOpCompletedOnPrimary(OperationContext* opCtx,
TxnNumber txnNumber,
std::vector<StmtId> stmtIdsWritten,
- const repl::OpTime& lastStmtIdWriteOpTime);
+ const repl::OpTime& lastStmtIdWriteOpTime,
+ Date_t lastStmtIdWriteDate);
/**
* Called after a replication batch has been applied on a secondary node. Keeps the session
@@ -152,7 +153,8 @@ private:
UpdateRequest _makeUpdateRequest(WithLock,
TxnNumber newTxnNumber,
- const repl::OpTime& newLastWriteTs) const;
+ const repl::OpTime& newLastWriteTs,
+ Date_t newLastWriteDate) const;
void _registerUpdateCacheOnCommit(OperationContext* opCtx,
TxnNumber newTxnNumber,
diff --git a/src/mongo/db/session_test.cpp b/src/mongo/db/session_test.cpp
index 4c40d91197c..27d71660f98 100644
--- a/src/mongo/db/session_test.cpp
+++ b/src/mongo/db/session_test.cpp
@@ -128,7 +128,7 @@ TEST_F(SessionTest, SessionEntryWrittenAtFirstWrite) {
AutoGetCollection autoColl(opCtx(), kNss, MODE_IX);
WriteUnitOfWork wuow(opCtx());
const auto opTime = logOp(opCtx(), kNss, sessionId, txnNum, 0);
- session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {0}, opTime);
+ session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {0}, opTime, Date_t::now());
wuow.commit();
return opTime;
@@ -160,7 +160,7 @@ TEST_F(SessionTest, StartingNewerTransactionUpdatesThePersistedSession) {
AutoGetCollection autoColl(opCtx(), kNss, MODE_IX);
WriteUnitOfWork wuow(opCtx());
const auto opTime = logOp(opCtx(), kNss, sessionId, txnNum, stmtId, prevOpTime);
- session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {stmtId}, opTime);
+ session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {stmtId}, opTime, Date_t::now());
wuow.commit();
return opTime;
@@ -218,7 +218,7 @@ TEST_F(SessionTest, SessionTransactionsCollectionNotDefaultCreated) {
AutoGetCollection autoColl(opCtx(), kNss, MODE_IX);
WriteUnitOfWork wuow(opCtx());
const auto opTime = logOp(opCtx(), kNss, sessionId, txnNum, 0);
- ASSERT_THROWS(session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {0}, opTime),
+ ASSERT_THROWS(session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {0}, opTime, Date_t::now()),
AssertionException);
}
@@ -234,7 +234,7 @@ TEST_F(SessionTest, CheckStatementExecuted) {
AutoGetCollection autoColl(opCtx(), kNss, MODE_IX);
WriteUnitOfWork wuow(opCtx());
const auto opTime = logOp(opCtx(), kNss, sessionId, txnNum, stmtId, prevOpTime);
- session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {stmtId}, opTime);
+ session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {stmtId}, opTime, Date_t::now());
wuow.commit();
return opTime;
@@ -291,7 +291,7 @@ TEST_F(SessionTest, WriteOpCompletedOnPrimaryForOldTransactionThrows) {
AutoGetCollection autoColl(opCtx(), kNss, MODE_IX);
WriteUnitOfWork wuow(opCtx());
const auto opTime = logOp(opCtx(), kNss, sessionId, txnNum, 0);
- session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {0}, opTime);
+ session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {0}, opTime, Date_t::now());
wuow.commit();
}
@@ -299,9 +299,10 @@ TEST_F(SessionTest, WriteOpCompletedOnPrimaryForOldTransactionThrows) {
AutoGetCollection autoColl(opCtx(), kNss, MODE_IX);
WriteUnitOfWork wuow(opCtx());
const auto opTime = logOp(opCtx(), kNss, sessionId, txnNum - 1, 0);
- ASSERT_THROWS_CODE(session.onWriteOpCompletedOnPrimary(opCtx(), txnNum - 1, {0}, opTime),
- AssertionException,
- ErrorCodes::ConflictingOperationInProgress);
+ ASSERT_THROWS_CODE(
+ session.onWriteOpCompletedOnPrimary(opCtx(), txnNum - 1, {0}, opTime, Date_t::now()),
+ AssertionException,
+ ErrorCodes::ConflictingOperationInProgress);
}
}
@@ -319,9 +320,10 @@ TEST_F(SessionTest, WriteOpCompletedOnPrimaryForInvalidatedTransactionThrows) {
session.invalidate();
- ASSERT_THROWS_CODE(session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {0}, opTime),
- AssertionException,
- ErrorCodes::ConflictingOperationInProgress);
+ ASSERT_THROWS_CODE(
+ session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {0}, opTime, Date_t::now()),
+ AssertionException,
+ ErrorCodes::ConflictingOperationInProgress);
}
TEST_F(SessionTest, WriteOpCompletedOnPrimaryCommitIgnoresInvalidation) {
@@ -336,7 +338,7 @@ TEST_F(SessionTest, WriteOpCompletedOnPrimaryCommitIgnoresInvalidation) {
AutoGetCollection autoColl(opCtx(), kNss, MODE_IX);
WriteUnitOfWork wuow(opCtx());
const auto opTime = logOp(opCtx(), kNss, sessionId, txnNum, 0);
- session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {0}, opTime);
+ session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {0}, opTime, Date_t::now());
session.invalidate();
@@ -376,7 +378,7 @@ TEST_F(SessionTest, ErrorOnlyWhenStmtIdBeingCheckedIsNotInCache) {
osi,
1,
{});
- session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {1}, opTime);
+ session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {1}, opTime, wallClockTime);
wuow.commit();
return opTime;
@@ -403,7 +405,8 @@ TEST_F(SessionTest, ErrorOnlyWhenStmtIdBeingCheckedIsNotInCache) {
kIncompleteHistoryStmtId,
link);
- session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {kIncompleteHistoryStmtId}, opTime);
+ session.onWriteOpCompletedOnPrimary(
+ opCtx(), txnNum, {kIncompleteHistoryStmtId}, opTime, wallClockTime);
wuow.commit();
}
diff --git a/src/mongo/db/session_txn_record.idl b/src/mongo/db/session_txn_record.idl
index fa81319225b..c9924a32a32 100644
--- a/src/mongo/db/session_txn_record.idl
+++ b/src/mongo/db/session_txn_record.idl
@@ -60,4 +60,7 @@ structs:
type: optime
description: "The optime timestamp component of the last write on this
transaction."
-
+ lastWriteDate:
+ type: date
+ description: "Wall clock time of the last write which happened on on this
+ transaction."
diff --git a/src/mongo/db/transaction_reaper.cpp b/src/mongo/db/transaction_reaper.cpp
index d8203bc8e3f..f9187cb9e82 100644
--- a/src/mongo/db/transaction_reaper.cpp
+++ b/src/mongo/db/transaction_reaper.cpp
@@ -75,8 +75,7 @@ MONGO_EXPORT_STARTUP_SERVER_PARAMETER(TransactionRecordMinimumLifetimeMinutes,
const auto kIdProjection = BSON(SessionTxnRecord::kSessionIdFieldName << 1);
const auto kSortById = BSON(SessionTxnRecord::kSessionIdFieldName << 1);
-const auto kLastWriteTimestampFieldName =
- SessionTxnRecord::kLastWriteOpTimeFieldName + "." + repl::OpTime::kTimestampFieldName;
+const auto kLastWriteDateFieldName = SessionTxnRecord::kLastWriteDateFieldName;
/**
* Makes the query we'll use to scan the transactions table.
@@ -85,11 +84,8 @@ const auto kLastWriteTimestampFieldName =
* to pull records likely to be on the same chunks (because they sort near each other).
*/
Query makeQuery(Date_t now) {
- const Timestamp possiblyExpired(
- duration_cast<Seconds>(
- (now - Minutes(TransactionRecordMinimumLifetimeMinutes)).toDurationSinceEpoch()),
- 0);
- Query query(BSON(kLastWriteTimestampFieldName << LT << possiblyExpired));
+ const Date_t possiblyExpired(now - Minutes(TransactionRecordMinimumLifetimeMinutes));
+ Query query(BSON(kLastWriteDateFieldName << LT << possiblyExpired));
query.sort(kSortById);
return query;
}