diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/op_observer_impl.cpp | 31 | ||||
-rw-r--r-- | src/mongo/db/repl/idempotency_test_fixture.cpp | 13 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_tail.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/s/session_catalog_migration_destination.cpp | 88 | ||||
-rw-r--r-- | src/mongo/db/s/session_catalog_migration_source_test.cpp | 29 | ||||
-rw-r--r-- | src/mongo/db/session.cpp | 23 | ||||
-rw-r--r-- | src/mongo/db/session.h | 6 | ||||
-rw-r--r-- | src/mongo/db/session_test.cpp | 31 | ||||
-rw-r--r-- | src/mongo/db/session_txn_record.idl | 5 | ||||
-rw-r--r-- | src/mongo/db/transaction_reaper.cpp | 10 |
10 files changed, 149 insertions, 89 deletions
diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp index 5621f9da8a2..6da9b7f9599 100644 --- a/src/mongo/db/op_observer_impl.cpp +++ b/src/mongo/db/op_observer_impl.cpp @@ -77,13 +77,17 @@ void onWriteOpCompleted(OperationContext* opCtx, const NamespaceString& nss, Session* session, std::vector<StmtId> stmtIdsWritten, - const repl::OpTime& lastStmtIdWriteOpTime) { + const repl::OpTime& lastStmtIdWriteOpTime, + Date_t lastStmtIdWriteDate) { if (lastStmtIdWriteOpTime.isNull()) return; if (session) { - session->onWriteOpCompletedOnPrimary( - opCtx, *opCtx->getTxnNumber(), std::move(stmtIdsWritten), lastStmtIdWriteOpTime); + session->onWriteOpCompletedOnPrimary(opCtx, + *opCtx->getTxnNumber(), + std::move(stmtIdsWritten), + lastStmtIdWriteOpTime, + lastStmtIdWriteDate); } } @@ -304,8 +308,10 @@ void OpObserverImpl::onInserts(OperationContext* opCtx, bool fromMigrate) { Session* const session = opCtx->getTxnNumber() ? OperationContextSession::get(opCtx) : nullptr; - const auto opTimeList = repl::logInsertOps( - opCtx, nss, uuid, session, begin, end, fromMigrate, getWallClockTimeForOpLog(opCtx)); + const auto lastWriteDate = getWallClockTimeForOpLog(opCtx); + + const auto opTimeList = + repl::logInsertOps(opCtx, nss, uuid, session, begin, end, fromMigrate, lastWriteDate); auto css = CollectionShardingState::get(opCtx, nss.ns()); @@ -319,7 +325,7 @@ void OpObserverImpl::onInserts(OperationContext* opCtx, } } - auto lastOpTime = opTimeList.empty() ? repl::OpTime() : opTimeList.back(); + const auto lastOpTime = opTimeList.empty() ? repl::OpTime() : opTimeList.back(); if (nss.coll() == "system.js") { Scope::storedFuncMod(opCtx); } else if (nss.coll() == DurableViewCatalog::viewsCollectionName()) { @@ -339,7 +345,7 @@ void OpObserverImpl::onInserts(OperationContext* opCtx, return stmt.stmtId; }); - onWriteOpCompleted(opCtx, nss, session, stmtIdsWritten, lastOpTime); + onWriteOpCompleted(opCtx, nss, session, stmtIdsWritten, lastOpTime, lastWriteDate); } void OpObserverImpl::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArgs& args) { @@ -389,8 +395,12 @@ void OpObserverImpl::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArg SessionCatalog::get(opCtx)->invalidateSessions(opCtx, args.updatedDoc); } - onWriteOpCompleted( - opCtx, args.nss, session, std::vector<StmtId>{args.stmtId}, opTime.writeOpTime); + onWriteOpCompleted(opCtx, + args.nss, + session, + std::vector<StmtId>{args.stmtId}, + opTime.writeOpTime, + opTime.wallClockTime); } auto OpObserverImpl::aboutToDelete(OperationContext* opCtx, @@ -434,7 +444,8 @@ void OpObserverImpl::onDelete(OperationContext* opCtx, SessionCatalog::get(opCtx)->invalidateSessions(opCtx, deleteState.documentKey); } - onWriteOpCompleted(opCtx, nss, session, std::vector<StmtId>{stmtId}, opTime.writeOpTime); + onWriteOpCompleted( + opCtx, nss, session, std::vector<StmtId>{stmtId}, opTime.writeOpTime, opTime.wallClockTime); } void OpObserverImpl::onInternalOpMessage(OperationContext* opCtx, diff --git a/src/mongo/db/repl/idempotency_test_fixture.cpp b/src/mongo/db/repl/idempotency_test_fixture.cpp index 20b7d2d9ce0..3c3cc06a72f 100644 --- a/src/mongo/db/repl/idempotency_test_fixture.cpp +++ b/src/mongo/db/repl/idempotency_test_fixture.cpp @@ -191,7 +191,9 @@ OplogEntry makeCreateCollectionOplogEntry(OpTime opTime, OplogEntry makeInsertDocumentOplogEntry(OpTime opTime, const NamespaceString& nss, const BSONObj& documentToInsert) { - return OplogEntry(opTime, 1LL, OpTypeEnum::kInsert, nss, documentToInsert); + auto oplogEntry = OplogEntry(opTime, 1LL, OpTypeEnum::kInsert, nss, documentToInsert); + oplogEntry.setWallClockTime(Date_t::now()); + return oplogEntry; } /** @@ -200,7 +202,9 @@ OplogEntry makeInsertDocumentOplogEntry(OpTime opTime, OplogEntry makeDeleteDocumentOplogEntry(OpTime opTime, const NamespaceString& nss, const BSONObj& documentToDelete) { - return OplogEntry(opTime, 1LL, OpTypeEnum::kDelete, nss, documentToDelete); + auto oplogEntry = OplogEntry(opTime, 1LL, OpTypeEnum::kDelete, nss, documentToDelete); + oplogEntry.setWallClockTime(Date_t::now()); + return oplogEntry; } /** @@ -210,7 +214,10 @@ OplogEntry makeUpdateDocumentOplogEntry(OpTime opTime, const NamespaceString& nss, const BSONObj& documentToUpdate, const BSONObj& updatedDocument) { - return OplogEntry(opTime, 1LL, OpTypeEnum::kUpdate, nss, updatedDocument, documentToUpdate); + auto oplogEntry = + OplogEntry(opTime, 1LL, OpTypeEnum::kUpdate, nss, updatedDocument, documentToUpdate); + oplogEntry.setWallClockTime(Date_t::now()); + return oplogEntry; } /** diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp index 25295eb133d..4cde3fec5c5 100644 --- a/src/mongo/db/repl/sync_tail.cpp +++ b/src/mongo/db/repl/sync_tail.cpp @@ -666,6 +666,8 @@ void fillWriterVectorsAndLastestSessionRecords( record.setSessionId(lsid); record.setTxnNum(*sessionInfo.getTxnNumber()); record.setLastWriteOpTime(op.getOpTime()); + invariant(op.getWallClockTime()); + record.setLastWriteDate(*op.getWallClockTime()); auto it = latestSessionRecords->find(lsid); if (it == latestSessionRecords->end()) { diff --git a/src/mongo/db/s/session_catalog_migration_destination.cpp b/src/mongo/db/s/session_catalog_migration_destination.cpp index 461d53e40b2..1df0fd145eb 100644 --- a/src/mongo/db/s/session_catalog_migration_destination.cpp +++ b/src/mongo/db/s/session_catalog_migration_destination.cpp @@ -267,51 +267,49 @@ ProcessOplogResult processSessionOplog(OperationContext* opCtx, auto oplogLink = extractPrePostImageTs(lastResult, oplogEntry); oplogLink.prevOpTime = scopedSession->getLastWriteOpTime(result.txnNum); - writeConflictRetry(opCtx, - "SessionOplogMigration", - NamespaceString::kSessionTransactionsTableNamespace.ns(), - [&] { - // Need to take global lock here so repl::logOp will not unlock it and - // trigger the invariant that disallows unlocking global lock while - // inside a WUOW. Grab a DBLock here instead of plain GlobalLock to make - // sure the MMAPV1 flush lock will be lock/unlocked correctly. Take the - // transaction table db lock to ensure the same lock ordering with normal - // replicated updates to the table. - Lock::DBLock lk(opCtx, - NamespaceString::kSessionTransactionsTableNamespace.db(), - MODE_IX); - WriteUnitOfWork wunit(opCtx); - - result.oplogTime = repl::logOp(opCtx, - "n", - oplogEntry.getNamespace(), - oplogEntry.getUuid(), - object, - &object2, - true, - *oplogEntry.getWallClockTime(), - sessionInfo, - stmtId, - oplogLink); - - auto oplogOpTime = result.oplogTime; - uassert(40633, - str::stream() - << "Failed to create new oplog entry for oplog with opTime: " - << oplogEntry.getOpTime().toString() - << ": " - << redact(oplogBSON), - !oplogOpTime.isNull()); - - // Do not call onWriteOpCompletedOnPrimary if we inserted a pre/post - // image, because the next oplog will contain the real operation. - if (!result.isPrePostImage) { - scopedSession->onWriteOpCompletedOnPrimary( - opCtx, result.txnNum, {stmtId}, oplogOpTime); - } - - wunit.commit(); - }); + writeConflictRetry( + opCtx, + "SessionOplogMigration", + NamespaceString::kSessionTransactionsTableNamespace.ns(), + [&] { + // Need to take global lock here so repl::logOp will not unlock it and trigger the + // invariant that disallows unlocking global lock while inside a WUOW. Grab a DBLock + // here instead of plain GlobalLock to make sure the MMAPV1 flush lock will be + // lock/unlocked correctly. Take the transaction table db lock to ensure the same lock + // ordering with normal replicated updates to the table. + Lock::DBLock lk( + opCtx, NamespaceString::kSessionTransactionsTableNamespace.db(), MODE_IX); + WriteUnitOfWork wunit(opCtx); + + result.oplogTime = repl::logOp(opCtx, + "n", + oplogEntry.getNamespace(), + oplogEntry.getUuid(), + object, + &object2, + true, + *oplogEntry.getWallClockTime(), + sessionInfo, + stmtId, + oplogLink); + + auto oplogOpTime = result.oplogTime; + uassert(40633, + str::stream() << "Failed to create new oplog entry for oplog with opTime: " + << oplogEntry.getOpTime().toString() + << ": " + << redact(oplogBSON), + !oplogOpTime.isNull()); + + // Do not call onWriteOpCompletedOnPrimary if we inserted a pre/post + // image, because the next oplog will contain the real operation. + if (!result.isPrePostImage) { + scopedSession->onWriteOpCompletedOnPrimary( + opCtx, result.txnNum, {stmtId}, oplogOpTime, *oplogEntry.getWallClockTime()); + } + + wunit.commit(); + }); return result; } diff --git a/src/mongo/db/s/session_catalog_migration_source_test.cpp b/src/mongo/db/s/session_catalog_migration_source_test.cpp index 2bd48ce08dc..acf543d7d05 100644 --- a/src/mongo/db/s/session_catalog_migration_source_test.cpp +++ b/src/mongo/db/s/session_catalog_migration_source_test.cpp @@ -61,18 +61,21 @@ TEST_F(SessionCatalogMigrationSourceTest, OneSessionWithTwoWrites) { repl::OpTime(Timestamp(52, 345), 2), 0, repl::OpTypeEnum::kInsert, kNs, BSON("x" << 30)); entry1.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(0, 0), 0)); entry1.setStatementId(0); + entry1.setWallClockTime(Date_t::now()); insertOplogEntry(entry1); repl::OplogEntry entry2( repl::OpTime(Timestamp(67, 54801), 2), 0, repl::OpTypeEnum::kInsert, kNs, BSON("y" << 50)); entry2.setPrevWriteOpTimeInTransaction(entry1.getOpTime()); entry2.setStatementId(1); + entry2.setWallClockTime(Date_t::now()); insertOplogEntry(entry2); SessionTxnRecord sessionRecord; sessionRecord.setSessionId(makeLogicalSessionIdForTest()); sessionRecord.setTxnNum(1); sessionRecord.setLastWriteOpTime(entry2.getOpTime()); + sessionRecord.setLastWriteDate(*entry2.getWallClockTime()); DBDirectClient client(opCtx()); client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), sessionRecord.toBSON()); @@ -109,16 +112,19 @@ TEST_F(SessionCatalogMigrationSourceTest, TwoSessionWithTwoWrites) { repl::OpTime(Timestamp(52, 345), 2), 0, repl::OpTypeEnum::kInsert, kNs, BSON("x" << 30)); entry1a.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(0, 0), 0)); entry1a.setStatementId(0); + entry1a.setWallClockTime(Date_t::now()); repl::OplogEntry entry1b( repl::OpTime(Timestamp(67, 54801), 2), 0, repl::OpTypeEnum::kInsert, kNs, BSON("y" << 50)); entry1b.setStatementId(1); + entry1b.setWallClockTime(Date_t::now()); entry1b.setPrevWriteOpTimeInTransaction(entry1a.getOpTime()); SessionTxnRecord sessionRecord1; sessionRecord1.setSessionId(makeLogicalSessionIdForTest()); sessionRecord1.setTxnNum(1); sessionRecord1.setLastWriteOpTime(entry1b.getOpTime()); + sessionRecord1.setLastWriteDate(*entry1b.getWallClockTime()); DBDirectClient client(opCtx()); client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), @@ -128,16 +134,19 @@ TEST_F(SessionCatalogMigrationSourceTest, TwoSessionWithTwoWrites) { repl::OpTime(Timestamp(43, 12), 2), 0, repl::OpTypeEnum::kDelete, kNs, BSON("x" << 30)); entry2a.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(0, 0), 0)); entry2a.setStatementId(3); + entry2a.setWallClockTime(Date_t::now()); repl::OplogEntry entry2b( repl::OpTime(Timestamp(789, 13), 2), 0, repl::OpTypeEnum::kDelete, kNs, BSON("y" << 50)); entry2b.setPrevWriteOpTimeInTransaction(entry2a.getOpTime()); entry2b.setStatementId(4); + entry2b.setWallClockTime(Date_t::now()); SessionTxnRecord sessionRecord2; sessionRecord2.setSessionId(makeLogicalSessionIdForTest()); sessionRecord2.setTxnNum(1); sessionRecord2.setLastWriteOpTime(entry2b.getOpTime()); + sessionRecord2.setLastWriteDate(*entry2b.getWallClockTime()); client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), sessionRecord2.toBSON()); @@ -198,6 +207,7 @@ TEST_F(SessionCatalogMigrationSourceTest, OneSessionWithFindAndModifyPreImageAnd repl::OpTime(Timestamp(52, 345), 2), 0, repl::OpTypeEnum::kNoop, kNs, BSON("x" << 30)); entry1.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(0, 0), 0)); entry1.setStatementId(0); + entry1.setWallClockTime(Date_t::now()); insertOplogEntry(entry1); repl::OplogEntry entry2( @@ -205,12 +215,14 @@ TEST_F(SessionCatalogMigrationSourceTest, OneSessionWithFindAndModifyPreImageAnd entry2.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(0, 0), 0)); entry2.setPreImageOpTime(entry1.getOpTime()); entry2.setStatementId(1); + entry2.setWallClockTime(Date_t::now()); insertOplogEntry(entry2); repl::OplogEntry entry3( repl::OpTime(Timestamp(73, 5), 2), 0, repl::OpTypeEnum::kNoop, kNs, BSON("x" << 20)); entry3.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(0, 0), 0)); entry3.setStatementId(2); + entry3.setWallClockTime(Date_t::now()); insertOplogEntry(entry3); repl::OplogEntry entry4(repl::OpTime(Timestamp(73, 6), 2), @@ -222,12 +234,14 @@ TEST_F(SessionCatalogMigrationSourceTest, OneSessionWithFindAndModifyPreImageAnd entry4.setPrevWriteOpTimeInTransaction(entry2.getOpTime()); entry4.setPostImageOpTime(entry3.getOpTime()); entry4.setStatementId(3); + entry4.setWallClockTime(Date_t::now()); insertOplogEntry(entry4); SessionTxnRecord sessionRecord; sessionRecord.setSessionId(makeLogicalSessionIdForTest()); sessionRecord.setTxnNum(1); sessionRecord.setLastWriteOpTime(entry4.getOpTime()); + sessionRecord.setLastWriteDate(*entry4.getWallClockTime()); DBDirectClient client(opCtx()); client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), sessionRecord.toBSON()); @@ -257,12 +271,14 @@ TEST_F(SessionCatalogMigrationSourceTest, OplogWithOtherNsShouldBeIgnored) { repl::OpTime(Timestamp(52, 345), 2), 0, repl::OpTypeEnum::kInsert, kNs, BSON("x" << 30)); entry1.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(0, 0), 0)); entry1.setStatementId(0); + entry1.setWallClockTime(Date_t::now()); insertOplogEntry(entry1); SessionTxnRecord sessionRecord1; sessionRecord1.setSessionId(makeLogicalSessionIdForTest()); sessionRecord1.setTxnNum(1); sessionRecord1.setLastWriteOpTime(entry1.getOpTime()); + sessionRecord1.setLastWriteDate(*entry1.getWallClockTime()); DBDirectClient client(opCtx()); client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), @@ -276,12 +292,14 @@ TEST_F(SessionCatalogMigrationSourceTest, OplogWithOtherNsShouldBeIgnored) { BSON("x" << 30)); entry2.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(0, 0), 0)); entry2.setStatementId(1); + entry2.setWallClockTime(Date_t::now()); insertOplogEntry(entry2); SessionTxnRecord sessionRecord2; sessionRecord2.setSessionId(makeLogicalSessionIdForTest()); sessionRecord2.setTxnNum(1); sessionRecord2.setLastWriteOpTime(entry2.getOpTime()); + sessionRecord2.setLastWriteDate(*entry2.getWallClockTime()); client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), sessionRecord2.toBSON()); @@ -307,12 +325,14 @@ TEST_F(SessionCatalogMigrationSourceTest, SessionDumpWithMultipleNewWrites) { repl::OpTime(Timestamp(52, 345), 2), 0, repl::OpTypeEnum::kInsert, kNs, BSON("x" << 30)); entry1.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(0, 0), 0)); entry1.setStatementId(0); + entry1.setWallClockTime(Date_t::now()); insertOplogEntry(entry1); SessionTxnRecord sessionRecord1; sessionRecord1.setSessionId(makeLogicalSessionIdForTest()); sessionRecord1.setTxnNum(1); sessionRecord1.setLastWriteOpTime(entry1.getOpTime()); + sessionRecord1.setLastWriteDate(*entry1.getWallClockTime()); DBDirectClient client(opCtx()); client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), @@ -322,12 +342,14 @@ TEST_F(SessionCatalogMigrationSourceTest, SessionDumpWithMultipleNewWrites) { repl::OpTime(Timestamp(53, 12), 2), 0, repl::OpTypeEnum::kDelete, kNs, BSON("x" << 30)); entry2.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(0, 0), 0)); entry2.setStatementId(1); + entry2.setWallClockTime(Date_t::now()); insertOplogEntry(entry2); repl::OplogEntry entry3( repl::OpTime(Timestamp(55, 12), 2), 0, repl::OpTypeEnum::kInsert, kNs, BSON("z" << 40)); entry3.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(0, 0), 0)); entry3.setStatementId(2); + entry3.setWallClockTime(Date_t::now()); insertOplogEntry(entry3); SessionCatalogMigrationSource migrationSource(kNs); @@ -392,6 +414,7 @@ TEST_F(SessionCatalogMigrationSourceTest, ShouldBeAbleInsertNewWritesAfterBuffer BSON("x" << 30)); entry.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(0, 0), 0)); entry.setStatementId(0); + entry.setWallClockTime(Date_t::now()); insertOplogEntry(entry); migrationSource.notifyNewWriteOpTime(entry.getOpTime()); @@ -412,6 +435,7 @@ TEST_F(SessionCatalogMigrationSourceTest, ShouldBeAbleInsertNewWritesAfterBuffer repl::OpTime(Timestamp(53, 12), 2), 0, repl::OpTypeEnum::kDelete, kNs, BSON("x" << 30)); entry.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(0, 0), 0)); entry.setStatementId(1); + entry.setWallClockTime(Date_t::now()); insertOplogEntry(entry); migrationSource.notifyNewWriteOpTime(entry.getOpTime()); @@ -453,6 +477,7 @@ TEST_F(SessionCatalogMigrationSourceTest, ReturnsDeadEndSentinelForIncompleteHis repl::OpTime(Timestamp(52, 345), 2), 0, repl::OpTypeEnum::kInsert, kNs, BSON("x" << 30)); entry.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(40, 1), 2)); entry.setStatementId(0); + entry.setWallClockTime(Date_t::now()); insertOplogEntry(entry); const auto sessionId = makeLogicalSessionIdForTest(); @@ -461,6 +486,7 @@ TEST_F(SessionCatalogMigrationSourceTest, ReturnsDeadEndSentinelForIncompleteHis sessionRecord.setSessionId(sessionId); sessionRecord.setTxnNum(31); sessionRecord.setLastWriteOpTime(entry.getOpTime()); + sessionRecord.setLastWriteDate(*entry.getWallClockTime()); DBDirectClient client(opCtx()); client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), sessionRecord.toBSON()); @@ -507,6 +533,7 @@ TEST_F(SessionCatalogMigrationSourceTest, ShouldAssertWhenRollbackDetected) { repl::OpTime(Timestamp(52, 345), 2), 0, repl::OpTypeEnum::kInsert, kNs, BSON("x" << 30)); entry.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(40, 1), 2)); entry.setStatementId(0); + entry.setWallClockTime(Date_t::now()); insertOplogEntry(entry); const auto sessionId = makeLogicalSessionIdForTest(); @@ -515,6 +542,7 @@ TEST_F(SessionCatalogMigrationSourceTest, ShouldAssertWhenRollbackDetected) { sessionRecord.setSessionId(sessionId); sessionRecord.setTxnNum(31); sessionRecord.setLastWriteOpTime(entry.getOpTime()); + sessionRecord.setLastWriteDate(*entry.getWallClockTime()); DBDirectClient client(opCtx()); client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), sessionRecord.toBSON()); @@ -538,5 +566,4 @@ TEST_F(SessionCatalogMigrationSourceTest, ShouldAssertWhenRollbackDetected) { } } // namespace - } // namespace mongo diff --git a/src/mongo/db/session.cpp b/src/mongo/db/session.cpp index 8acfb4cb2d0..d4ecfc469bc 100644 --- a/src/mongo/db/session.cpp +++ b/src/mongo/db/session.cpp @@ -191,7 +191,8 @@ void Session::beginTxn(OperationContext* opCtx, TxnNumber txnNumber) { void Session::onWriteOpCompletedOnPrimary(OperationContext* opCtx, TxnNumber txnNumber, std::vector<StmtId> stmtIdsWritten, - const repl::OpTime& lastStmtIdWriteOpTime) { + const repl::OpTime& lastStmtIdWriteOpTime, + Date_t lastStmtIdWriteDate) { invariant(opCtx->lockState()->inAWriteUnitOfWork()); stdx::unique_lock<stdx::mutex> ul(_mutex); @@ -205,7 +206,8 @@ void Session::onWriteOpCompletedOnPrimary(OperationContext* opCtx, } } - const auto updateRequest = _makeUpdateRequest(ul, txnNumber, lastStmtIdWriteOpTime); + const auto updateRequest = + _makeUpdateRequest(ul, txnNumber, lastStmtIdWriteOpTime, lastStmtIdWriteDate); ul.unlock(); @@ -223,10 +225,10 @@ void Session::updateSessionRecordOnSecondary(OperationContext* opCtx, writeConflictRetry( opCtx, "Update session txn", NamespaceString::kSessionTransactionsTableNamespace.ns(), [&] { UpdateRequest updateRequest(NamespaceString::kSessionTransactionsTableNamespace); - updateRequest.setUpsert(true); updateRequest.setQuery(BSON(SessionTxnRecord::kSessionIdFieldName << sessionTxnRecord.getSessionId().toBSON())); updateRequest.setUpdates(sessionTxnRecord.toBSON()); + updateRequest.setUpsert(true); repl::UnreplicatedWritesBlock doNotReplicateWrites(opCtx); @@ -354,21 +356,30 @@ boost::optional<repl::OpTime> Session::_checkStatementExecuted(WithLock wl, UpdateRequest Session::_makeUpdateRequest(WithLock, TxnNumber newTxnNumber, - const repl::OpTime& newLastWriteOpTime) const { + const repl::OpTime& newLastWriteOpTime, + Date_t newLastWriteDate) const { UpdateRequest updateRequest(NamespaceString::kSessionTransactionsTableNamespace); if (_lastWrittenSessionRecord) { - updateRequest.setQuery(_lastWrittenSessionRecord->toBSON()); + updateRequest.setQuery(BSON(SessionTxnRecord::kSessionIdFieldName + << _sessionId.toBSON() + << SessionTxnRecord::kTxnNumFieldName + << _lastWrittenSessionRecord->getTxnNum() + << SessionTxnRecord::kLastWriteOpTimeFieldName + << _lastWrittenSessionRecord->getLastWriteOpTime())); updateRequest.setUpdates(BSON("$set" << BSON(SessionTxnRecord::kTxnNumFieldName << newTxnNumber << SessionTxnRecord::kLastWriteOpTimeFieldName - << newLastWriteOpTime))); + << newLastWriteOpTime + << SessionTxnRecord::kLastWriteDateFieldName + << newLastWriteDate))); } else { const auto updateBSON = [&] { SessionTxnRecord newTxnRecord; newTxnRecord.setSessionId(_sessionId); newTxnRecord.setTxnNum(newTxnNumber); newTxnRecord.setLastWriteOpTime(newLastWriteOpTime); + newTxnRecord.setLastWriteDate(newLastWriteDate); return newTxnRecord.toBSON(); }(); diff --git a/src/mongo/db/session.h b/src/mongo/db/session.h index f627ca8468d..c1b6c95b711 100644 --- a/src/mongo/db/session.h +++ b/src/mongo/db/session.h @@ -98,7 +98,8 @@ public: void onWriteOpCompletedOnPrimary(OperationContext* opCtx, TxnNumber txnNumber, std::vector<StmtId> stmtIdsWritten, - const repl::OpTime& lastStmtIdWriteOpTime); + const repl::OpTime& lastStmtIdWriteOpTime, + Date_t lastStmtIdWriteDate); /** * Called after a replication batch has been applied on a secondary node. Keeps the session @@ -152,7 +153,8 @@ private: UpdateRequest _makeUpdateRequest(WithLock, TxnNumber newTxnNumber, - const repl::OpTime& newLastWriteTs) const; + const repl::OpTime& newLastWriteTs, + Date_t newLastWriteDate) const; void _registerUpdateCacheOnCommit(OperationContext* opCtx, TxnNumber newTxnNumber, diff --git a/src/mongo/db/session_test.cpp b/src/mongo/db/session_test.cpp index 4c40d91197c..27d71660f98 100644 --- a/src/mongo/db/session_test.cpp +++ b/src/mongo/db/session_test.cpp @@ -128,7 +128,7 @@ TEST_F(SessionTest, SessionEntryWrittenAtFirstWrite) { AutoGetCollection autoColl(opCtx(), kNss, MODE_IX); WriteUnitOfWork wuow(opCtx()); const auto opTime = logOp(opCtx(), kNss, sessionId, txnNum, 0); - session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {0}, opTime); + session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {0}, opTime, Date_t::now()); wuow.commit(); return opTime; @@ -160,7 +160,7 @@ TEST_F(SessionTest, StartingNewerTransactionUpdatesThePersistedSession) { AutoGetCollection autoColl(opCtx(), kNss, MODE_IX); WriteUnitOfWork wuow(opCtx()); const auto opTime = logOp(opCtx(), kNss, sessionId, txnNum, stmtId, prevOpTime); - session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {stmtId}, opTime); + session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {stmtId}, opTime, Date_t::now()); wuow.commit(); return opTime; @@ -218,7 +218,7 @@ TEST_F(SessionTest, SessionTransactionsCollectionNotDefaultCreated) { AutoGetCollection autoColl(opCtx(), kNss, MODE_IX); WriteUnitOfWork wuow(opCtx()); const auto opTime = logOp(opCtx(), kNss, sessionId, txnNum, 0); - ASSERT_THROWS(session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {0}, opTime), + ASSERT_THROWS(session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {0}, opTime, Date_t::now()), AssertionException); } @@ -234,7 +234,7 @@ TEST_F(SessionTest, CheckStatementExecuted) { AutoGetCollection autoColl(opCtx(), kNss, MODE_IX); WriteUnitOfWork wuow(opCtx()); const auto opTime = logOp(opCtx(), kNss, sessionId, txnNum, stmtId, prevOpTime); - session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {stmtId}, opTime); + session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {stmtId}, opTime, Date_t::now()); wuow.commit(); return opTime; @@ -291,7 +291,7 @@ TEST_F(SessionTest, WriteOpCompletedOnPrimaryForOldTransactionThrows) { AutoGetCollection autoColl(opCtx(), kNss, MODE_IX); WriteUnitOfWork wuow(opCtx()); const auto opTime = logOp(opCtx(), kNss, sessionId, txnNum, 0); - session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {0}, opTime); + session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {0}, opTime, Date_t::now()); wuow.commit(); } @@ -299,9 +299,10 @@ TEST_F(SessionTest, WriteOpCompletedOnPrimaryForOldTransactionThrows) { AutoGetCollection autoColl(opCtx(), kNss, MODE_IX); WriteUnitOfWork wuow(opCtx()); const auto opTime = logOp(opCtx(), kNss, sessionId, txnNum - 1, 0); - ASSERT_THROWS_CODE(session.onWriteOpCompletedOnPrimary(opCtx(), txnNum - 1, {0}, opTime), - AssertionException, - ErrorCodes::ConflictingOperationInProgress); + ASSERT_THROWS_CODE( + session.onWriteOpCompletedOnPrimary(opCtx(), txnNum - 1, {0}, opTime, Date_t::now()), + AssertionException, + ErrorCodes::ConflictingOperationInProgress); } } @@ -319,9 +320,10 @@ TEST_F(SessionTest, WriteOpCompletedOnPrimaryForInvalidatedTransactionThrows) { session.invalidate(); - ASSERT_THROWS_CODE(session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {0}, opTime), - AssertionException, - ErrorCodes::ConflictingOperationInProgress); + ASSERT_THROWS_CODE( + session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {0}, opTime, Date_t::now()), + AssertionException, + ErrorCodes::ConflictingOperationInProgress); } TEST_F(SessionTest, WriteOpCompletedOnPrimaryCommitIgnoresInvalidation) { @@ -336,7 +338,7 @@ TEST_F(SessionTest, WriteOpCompletedOnPrimaryCommitIgnoresInvalidation) { AutoGetCollection autoColl(opCtx(), kNss, MODE_IX); WriteUnitOfWork wuow(opCtx()); const auto opTime = logOp(opCtx(), kNss, sessionId, txnNum, 0); - session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {0}, opTime); + session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {0}, opTime, Date_t::now()); session.invalidate(); @@ -376,7 +378,7 @@ TEST_F(SessionTest, ErrorOnlyWhenStmtIdBeingCheckedIsNotInCache) { osi, 1, {}); - session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {1}, opTime); + session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {1}, opTime, wallClockTime); wuow.commit(); return opTime; @@ -403,7 +405,8 @@ TEST_F(SessionTest, ErrorOnlyWhenStmtIdBeingCheckedIsNotInCache) { kIncompleteHistoryStmtId, link); - session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {kIncompleteHistoryStmtId}, opTime); + session.onWriteOpCompletedOnPrimary( + opCtx(), txnNum, {kIncompleteHistoryStmtId}, opTime, wallClockTime); wuow.commit(); } diff --git a/src/mongo/db/session_txn_record.idl b/src/mongo/db/session_txn_record.idl index fa81319225b..c9924a32a32 100644 --- a/src/mongo/db/session_txn_record.idl +++ b/src/mongo/db/session_txn_record.idl @@ -60,4 +60,7 @@ structs: type: optime description: "The optime timestamp component of the last write on this transaction." - + lastWriteDate: + type: date + description: "Wall clock time of the last write which happened on on this + transaction." diff --git a/src/mongo/db/transaction_reaper.cpp b/src/mongo/db/transaction_reaper.cpp index d8203bc8e3f..f9187cb9e82 100644 --- a/src/mongo/db/transaction_reaper.cpp +++ b/src/mongo/db/transaction_reaper.cpp @@ -75,8 +75,7 @@ MONGO_EXPORT_STARTUP_SERVER_PARAMETER(TransactionRecordMinimumLifetimeMinutes, const auto kIdProjection = BSON(SessionTxnRecord::kSessionIdFieldName << 1); const auto kSortById = BSON(SessionTxnRecord::kSessionIdFieldName << 1); -const auto kLastWriteTimestampFieldName = - SessionTxnRecord::kLastWriteOpTimeFieldName + "." + repl::OpTime::kTimestampFieldName; +const auto kLastWriteDateFieldName = SessionTxnRecord::kLastWriteDateFieldName; /** * Makes the query we'll use to scan the transactions table. @@ -85,11 +84,8 @@ const auto kLastWriteTimestampFieldName = * to pull records likely to be on the same chunks (because they sort near each other). */ Query makeQuery(Date_t now) { - const Timestamp possiblyExpired( - duration_cast<Seconds>( - (now - Minutes(TransactionRecordMinimumLifetimeMinutes)).toDurationSinceEpoch()), - 0); - Query query(BSON(kLastWriteTimestampFieldName << LT << possiblyExpired)); + const Date_t possiblyExpired(now - Minutes(TransactionRecordMinimumLifetimeMinutes)); + Query query(BSON(kLastWriteDateFieldName << LT << possiblyExpired)); query.sort(kSortById); return query; } |