diff options
author | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2017-10-17 16:28:09 -0400 |
---|---|---|
committer | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2017-10-25 09:10:33 -0400 |
commit | 0935d7067068b3cb62a802a8696dd39c8d7e1944 (patch) | |
tree | 099dbd1d303769de9f5eaa942e4e96434c83d8eb /src | |
parent | 080514c9ee36564d8342cd832f3c5f3a199a9845 (diff) | |
download | mongo-0935d7067068b3cb62a802a8696dd39c8d7e1944.tar.gz |
SERVER-31281 Use separate wallclock time to track the last use of a transaction
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/op_observer_impl.cpp | 31 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_tail.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/s/session_catalog_migration_destination.cpp | 88 | ||||
-rw-r--r-- | src/mongo/db/session.cpp | 38 | ||||
-rw-r--r-- | src/mongo/db/session.h | 6 | ||||
-rw-r--r-- | src/mongo/db/session_test.cpp | 26 | ||||
-rw-r--r-- | src/mongo/db/session_txn_record.idl | 5 | ||||
-rw-r--r-- | src/mongo/db/transaction_reaper.cpp | 10 |
8 files changed, 114 insertions, 92 deletions
diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp index 5621f9da8a2..6da9b7f9599 100644 --- a/src/mongo/db/op_observer_impl.cpp +++ b/src/mongo/db/op_observer_impl.cpp @@ -77,13 +77,17 @@ void onWriteOpCompleted(OperationContext* opCtx, const NamespaceString& nss, Session* session, std::vector<StmtId> stmtIdsWritten, - const repl::OpTime& lastStmtIdWriteOpTime) { + const repl::OpTime& lastStmtIdWriteOpTime, + Date_t lastStmtIdWriteDate) { if (lastStmtIdWriteOpTime.isNull()) return; if (session) { - session->onWriteOpCompletedOnPrimary( - opCtx, *opCtx->getTxnNumber(), std::move(stmtIdsWritten), lastStmtIdWriteOpTime); + session->onWriteOpCompletedOnPrimary(opCtx, + *opCtx->getTxnNumber(), + std::move(stmtIdsWritten), + lastStmtIdWriteOpTime, + lastStmtIdWriteDate); } } @@ -304,8 +308,10 @@ void OpObserverImpl::onInserts(OperationContext* opCtx, bool fromMigrate) { Session* const session = opCtx->getTxnNumber() ? OperationContextSession::get(opCtx) : nullptr; - const auto opTimeList = repl::logInsertOps( - opCtx, nss, uuid, session, begin, end, fromMigrate, getWallClockTimeForOpLog(opCtx)); + const auto lastWriteDate = getWallClockTimeForOpLog(opCtx); + + const auto opTimeList = + repl::logInsertOps(opCtx, nss, uuid, session, begin, end, fromMigrate, lastWriteDate); auto css = CollectionShardingState::get(opCtx, nss.ns()); @@ -319,7 +325,7 @@ void OpObserverImpl::onInserts(OperationContext* opCtx, } } - auto lastOpTime = opTimeList.empty() ? repl::OpTime() : opTimeList.back(); + const auto lastOpTime = opTimeList.empty() ? repl::OpTime() : opTimeList.back(); if (nss.coll() == "system.js") { Scope::storedFuncMod(opCtx); } else if (nss.coll() == DurableViewCatalog::viewsCollectionName()) { @@ -339,7 +345,7 @@ void OpObserverImpl::onInserts(OperationContext* opCtx, return stmt.stmtId; }); - onWriteOpCompleted(opCtx, nss, session, stmtIdsWritten, lastOpTime); + onWriteOpCompleted(opCtx, nss, session, stmtIdsWritten, lastOpTime, lastWriteDate); } void OpObserverImpl::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArgs& args) { @@ -389,8 +395,12 @@ void OpObserverImpl::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArg SessionCatalog::get(opCtx)->invalidateSessions(opCtx, args.updatedDoc); } - onWriteOpCompleted( - opCtx, args.nss, session, std::vector<StmtId>{args.stmtId}, opTime.writeOpTime); + onWriteOpCompleted(opCtx, + args.nss, + session, + std::vector<StmtId>{args.stmtId}, + opTime.writeOpTime, + opTime.wallClockTime); } auto OpObserverImpl::aboutToDelete(OperationContext* opCtx, @@ -434,7 +444,8 @@ void OpObserverImpl::onDelete(OperationContext* opCtx, SessionCatalog::get(opCtx)->invalidateSessions(opCtx, deleteState.documentKey); } - onWriteOpCompleted(opCtx, nss, session, std::vector<StmtId>{stmtId}, opTime.writeOpTime); + onWriteOpCompleted( + opCtx, nss, session, std::vector<StmtId>{stmtId}, opTime.writeOpTime, opTime.wallClockTime); } void OpObserverImpl::onInternalOpMessage(OperationContext* opCtx, diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp index 25295eb133d..4cde3fec5c5 100644 --- a/src/mongo/db/repl/sync_tail.cpp +++ b/src/mongo/db/repl/sync_tail.cpp @@ -666,6 +666,8 @@ void fillWriterVectorsAndLastestSessionRecords( record.setSessionId(lsid); record.setTxnNum(*sessionInfo.getTxnNumber()); record.setLastWriteOpTime(op.getOpTime()); + invariant(op.getWallClockTime()); + record.setLastWriteDate(*op.getWallClockTime()); auto it = latestSessionRecords->find(lsid); if (it == latestSessionRecords->end()) { diff --git a/src/mongo/db/s/session_catalog_migration_destination.cpp b/src/mongo/db/s/session_catalog_migration_destination.cpp index 461d53e40b2..1df0fd145eb 100644 --- a/src/mongo/db/s/session_catalog_migration_destination.cpp +++ b/src/mongo/db/s/session_catalog_migration_destination.cpp @@ -267,51 +267,49 @@ ProcessOplogResult processSessionOplog(OperationContext* opCtx, auto oplogLink = extractPrePostImageTs(lastResult, oplogEntry); oplogLink.prevOpTime = scopedSession->getLastWriteOpTime(result.txnNum); - writeConflictRetry(opCtx, - "SessionOplogMigration", - NamespaceString::kSessionTransactionsTableNamespace.ns(), - [&] { - // Need to take global lock here so repl::logOp will not unlock it and - // trigger the invariant that disallows unlocking global lock while - // inside a WUOW. Grab a DBLock here instead of plain GlobalLock to make - // sure the MMAPV1 flush lock will be lock/unlocked correctly. Take the - // transaction table db lock to ensure the same lock ordering with normal - // replicated updates to the table. - Lock::DBLock lk(opCtx, - NamespaceString::kSessionTransactionsTableNamespace.db(), - MODE_IX); - WriteUnitOfWork wunit(opCtx); - - result.oplogTime = repl::logOp(opCtx, - "n", - oplogEntry.getNamespace(), - oplogEntry.getUuid(), - object, - &object2, - true, - *oplogEntry.getWallClockTime(), - sessionInfo, - stmtId, - oplogLink); - - auto oplogOpTime = result.oplogTime; - uassert(40633, - str::stream() - << "Failed to create new oplog entry for oplog with opTime: " - << oplogEntry.getOpTime().toString() - << ": " - << redact(oplogBSON), - !oplogOpTime.isNull()); - - // Do not call onWriteOpCompletedOnPrimary if we inserted a pre/post - // image, because the next oplog will contain the real operation. - if (!result.isPrePostImage) { - scopedSession->onWriteOpCompletedOnPrimary( - opCtx, result.txnNum, {stmtId}, oplogOpTime); - } - - wunit.commit(); - }); + writeConflictRetry( + opCtx, + "SessionOplogMigration", + NamespaceString::kSessionTransactionsTableNamespace.ns(), + [&] { + // Need to take global lock here so repl::logOp will not unlock it and trigger the + // invariant that disallows unlocking global lock while inside a WUOW. Grab a DBLock + // here instead of plain GlobalLock to make sure the MMAPV1 flush lock will be + // lock/unlocked correctly. Take the transaction table db lock to ensure the same lock + // ordering with normal replicated updates to the table. + Lock::DBLock lk( + opCtx, NamespaceString::kSessionTransactionsTableNamespace.db(), MODE_IX); + WriteUnitOfWork wunit(opCtx); + + result.oplogTime = repl::logOp(opCtx, + "n", + oplogEntry.getNamespace(), + oplogEntry.getUuid(), + object, + &object2, + true, + *oplogEntry.getWallClockTime(), + sessionInfo, + stmtId, + oplogLink); + + auto oplogOpTime = result.oplogTime; + uassert(40633, + str::stream() << "Failed to create new oplog entry for oplog with opTime: " + << oplogEntry.getOpTime().toString() + << ": " + << redact(oplogBSON), + !oplogOpTime.isNull()); + + // Do not call onWriteOpCompletedOnPrimary if we inserted a pre/post + // image, because the next oplog will contain the real operation. + if (!result.isPrePostImage) { + scopedSession->onWriteOpCompletedOnPrimary( + opCtx, result.txnNum, {stmtId}, oplogOpTime, *oplogEntry.getWallClockTime()); + } + + wunit.commit(); + }); return result; } diff --git a/src/mongo/db/session.cpp b/src/mongo/db/session.cpp index 8acfb4cb2d0..bc586137081 100644 --- a/src/mongo/db/session.cpp +++ b/src/mongo/db/session.cpp @@ -191,7 +191,8 @@ void Session::beginTxn(OperationContext* opCtx, TxnNumber txnNumber) { void Session::onWriteOpCompletedOnPrimary(OperationContext* opCtx, TxnNumber txnNumber, std::vector<StmtId> stmtIdsWritten, - const repl::OpTime& lastStmtIdWriteOpTime) { + const repl::OpTime& lastStmtIdWriteOpTime, + Date_t lastStmtIdWriteDate) { invariant(opCtx->lockState()->inAWriteUnitOfWork()); stdx::unique_lock<stdx::mutex> ul(_mutex); @@ -205,7 +206,8 @@ void Session::onWriteOpCompletedOnPrimary(OperationContext* opCtx, } } - const auto updateRequest = _makeUpdateRequest(ul, txnNumber, lastStmtIdWriteOpTime); + const auto updateRequest = + _makeUpdateRequest(ul, txnNumber, lastStmtIdWriteOpTime, lastStmtIdWriteDate); ul.unlock(); @@ -354,26 +356,32 @@ boost::optional<repl::OpTime> Session::_checkStatementExecuted(WithLock wl, UpdateRequest Session::_makeUpdateRequest(WithLock, TxnNumber newTxnNumber, - const repl::OpTime& newLastWriteOpTime) const { + const repl::OpTime& newLastWriteOpTime, + Date_t newLastWriteDate) const { UpdateRequest updateRequest(NamespaceString::kSessionTransactionsTableNamespace); if (_lastWrittenSessionRecord) { - updateRequest.setQuery(_lastWrittenSessionRecord->toBSON()); + updateRequest.setQuery(BSON(SessionTxnRecord::kSessionIdFieldName + << _sessionId.toBSON() + << SessionTxnRecord::kTxnNumFieldName + << _lastWrittenSessionRecord->getTxnNum() + << SessionTxnRecord::kLastWriteOpTimeFieldName + << _lastWrittenSessionRecord->getLastWriteOpTime())); updateRequest.setUpdates(BSON("$set" << BSON(SessionTxnRecord::kTxnNumFieldName << newTxnNumber << SessionTxnRecord::kLastWriteOpTimeFieldName - << newLastWriteOpTime))); + << newLastWriteOpTime + << SessionTxnRecord::kLastWriteDateFieldName + << newLastWriteDate))); } else { - const auto updateBSON = [&] { - SessionTxnRecord newTxnRecord; - newTxnRecord.setSessionId(_sessionId); - newTxnRecord.setTxnNum(newTxnNumber); - newTxnRecord.setLastWriteOpTime(newLastWriteOpTime); - return newTxnRecord.toBSON(); - }(); - - updateRequest.setQuery(updateBSON); - updateRequest.setUpdates(updateBSON); + updateRequest.setQuery(BSON(SessionTxnRecord::kSessionIdFieldName + << _sessionId.toBSON() + << SessionTxnRecord::kTxnNumFieldName + << newTxnNumber + << SessionTxnRecord::kLastWriteOpTimeFieldName + << newLastWriteOpTime)); + updateRequest.setUpdates( + BSON("$set" << BSON(SessionTxnRecord::kLastWriteDateFieldName << newLastWriteDate))); updateRequest.setUpsert(true); } diff --git a/src/mongo/db/session.h b/src/mongo/db/session.h index f627ca8468d..c1b6c95b711 100644 --- a/src/mongo/db/session.h +++ b/src/mongo/db/session.h @@ -98,7 +98,8 @@ public: void onWriteOpCompletedOnPrimary(OperationContext* opCtx, TxnNumber txnNumber, std::vector<StmtId> stmtIdsWritten, - const repl::OpTime& lastStmtIdWriteOpTime); + const repl::OpTime& lastStmtIdWriteOpTime, + Date_t lastStmtIdWriteDate); /** * Called after a replication batch has been applied on a secondary node. Keeps the session @@ -152,7 +153,8 @@ private: UpdateRequest _makeUpdateRequest(WithLock, TxnNumber newTxnNumber, - const repl::OpTime& newLastWriteTs) const; + const repl::OpTime& newLastWriteTs, + Date_t newLastWriteDate) const; void _registerUpdateCacheOnCommit(OperationContext* opCtx, TxnNumber newTxnNumber, diff --git a/src/mongo/db/session_test.cpp b/src/mongo/db/session_test.cpp index a7b1690b369..aacc609394d 100644 --- a/src/mongo/db/session_test.cpp +++ b/src/mongo/db/session_test.cpp @@ -128,7 +128,7 @@ TEST_F(SessionTest, SessionEntryWrittenAtFirstWrite) { AutoGetCollection autoColl(opCtx(), kNss, MODE_IX); WriteUnitOfWork wuow(opCtx()); const auto opTime = logOp(opCtx(), kNss, sessionId, txnNum, 0); - session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {0}, opTime); + session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {0}, opTime, Date_t::now()); wuow.commit(); return opTime; @@ -160,7 +160,7 @@ TEST_F(SessionTest, StartingNewerTransactionUpdatesThePersistedSession) { AutoGetCollection autoColl(opCtx(), kNss, MODE_IX); WriteUnitOfWork wuow(opCtx()); const auto opTime = logOp(opCtx(), kNss, sessionId, txnNum, stmtId, prevOpTime); - session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {stmtId}, opTime); + session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {stmtId}, opTime, Date_t::now()); wuow.commit(); return opTime; @@ -218,7 +218,7 @@ TEST_F(SessionTest, SessionTransactionsCollectionNotDefaultCreated) { AutoGetCollection autoColl(opCtx(), kNss, MODE_IX); WriteUnitOfWork wuow(opCtx()); const auto opTime = logOp(opCtx(), kNss, sessionId, txnNum, 0); - ASSERT_THROWS(session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {0}, opTime), + ASSERT_THROWS(session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {0}, opTime, Date_t::now()), AssertionException); } @@ -234,7 +234,7 @@ TEST_F(SessionTest, CheckStatementExecuted) { AutoGetCollection autoColl(opCtx(), kNss, MODE_IX); WriteUnitOfWork wuow(opCtx()); const auto opTime = logOp(opCtx(), kNss, sessionId, txnNum, stmtId, prevOpTime); - session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {stmtId}, opTime); + session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {stmtId}, opTime, Date_t::now()); wuow.commit(); return opTime; @@ -291,7 +291,7 @@ TEST_F(SessionTest, WriteOpCompletedOnPrimaryForOldTransactionThrows) { AutoGetCollection autoColl(opCtx(), kNss, MODE_IX); WriteUnitOfWork wuow(opCtx()); const auto opTime = logOp(opCtx(), kNss, sessionId, txnNum, 0); - session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {0}, opTime); + session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {0}, opTime, Date_t::now()); wuow.commit(); } @@ -299,9 +299,10 @@ TEST_F(SessionTest, WriteOpCompletedOnPrimaryForOldTransactionThrows) { AutoGetCollection autoColl(opCtx(), kNss, MODE_IX); WriteUnitOfWork wuow(opCtx()); const auto opTime = logOp(opCtx(), kNss, sessionId, txnNum - 1, 0); - ASSERT_THROWS_CODE(session.onWriteOpCompletedOnPrimary(opCtx(), txnNum - 1, {0}, opTime), - AssertionException, - ErrorCodes::ConflictingOperationInProgress); + ASSERT_THROWS_CODE( + session.onWriteOpCompletedOnPrimary(opCtx(), txnNum - 1, {0}, opTime, Date_t::now()), + AssertionException, + ErrorCodes::ConflictingOperationInProgress); } } @@ -319,9 +320,10 @@ TEST_F(SessionTest, WriteOpCompletedOnPrimaryForInvalidatedTransactionThrows) { session.invalidate(); - ASSERT_THROWS_CODE(session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {0}, opTime), - AssertionException, - ErrorCodes::ConflictingOperationInProgress); + ASSERT_THROWS_CODE( + session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {0}, opTime, Date_t::now()), + AssertionException, + ErrorCodes::ConflictingOperationInProgress); } TEST_F(SessionTest, WriteOpCompletedOnPrimaryCommitIgnoresInvalidation) { @@ -336,7 +338,7 @@ TEST_F(SessionTest, WriteOpCompletedOnPrimaryCommitIgnoresInvalidation) { AutoGetCollection autoColl(opCtx(), kNss, MODE_IX); WriteUnitOfWork wuow(opCtx()); const auto opTime = logOp(opCtx(), kNss, sessionId, txnNum, 0); - session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {0}, opTime); + session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {0}, opTime, Date_t::now()); session.invalidate(); diff --git a/src/mongo/db/session_txn_record.idl b/src/mongo/db/session_txn_record.idl index fa81319225b..c9924a32a32 100644 --- a/src/mongo/db/session_txn_record.idl +++ b/src/mongo/db/session_txn_record.idl @@ -60,4 +60,7 @@ structs: type: optime description: "The optime timestamp component of the last write on this transaction." - + lastWriteDate: + type: date + description: "Wall clock time of the last write which happened on on this + transaction." diff --git a/src/mongo/db/transaction_reaper.cpp b/src/mongo/db/transaction_reaper.cpp index d8203bc8e3f..f9187cb9e82 100644 --- a/src/mongo/db/transaction_reaper.cpp +++ b/src/mongo/db/transaction_reaper.cpp @@ -75,8 +75,7 @@ MONGO_EXPORT_STARTUP_SERVER_PARAMETER(TransactionRecordMinimumLifetimeMinutes, const auto kIdProjection = BSON(SessionTxnRecord::kSessionIdFieldName << 1); const auto kSortById = BSON(SessionTxnRecord::kSessionIdFieldName << 1); -const auto kLastWriteTimestampFieldName = - SessionTxnRecord::kLastWriteOpTimeFieldName + "." + repl::OpTime::kTimestampFieldName; +const auto kLastWriteDateFieldName = SessionTxnRecord::kLastWriteDateFieldName; /** * Makes the query we'll use to scan the transactions table. @@ -85,11 +84,8 @@ const auto kLastWriteTimestampFieldName = * to pull records likely to be on the same chunks (because they sort near each other). */ Query makeQuery(Date_t now) { - const Timestamp possiblyExpired( - duration_cast<Seconds>( - (now - Minutes(TransactionRecordMinimumLifetimeMinutes)).toDurationSinceEpoch()), - 0); - Query query(BSON(kLastWriteTimestampFieldName << LT << possiblyExpired)); + const Date_t possiblyExpired(now - Minutes(TransactionRecordMinimumLifetimeMinutes)); + Query query(BSON(kLastWriteDateFieldName << LT << possiblyExpired)); query.sort(kSortById); return query; } |