diff options
Diffstat (limited to 'src/mongo/db/session.cpp')
-rw-r--r-- | src/mongo/db/session.cpp | 36 |
1 files changed, 34 insertions, 2 deletions
diff --git a/src/mongo/db/session.cpp b/src/mongo/db/session.cpp index 7c66713b13c..8acfb4cb2d0 100644 --- a/src/mongo/db/session.cpp +++ b/src/mongo/db/session.cpp @@ -102,6 +102,8 @@ MONGO_FP_DECLARE(onPrimaryTransactionalWrite); } // namespace +const BSONObj Session::kDeadEndSentinel(BSON("$incompleteOplogHistory" << 1)); + Session::Session(LogicalSessionId sessionId) : _sessionId(std::move(sessionId)) {} void Session::refreshFromStorageIfNeeded(OperationContext* opCtx) { @@ -131,11 +133,21 @@ void Session::refreshFromStorageIfNeeded(OperationContext* opCtx) { CommittedStatementTimestampMap activeTxnCommittedStatements; + bool hasIncompleteHistory = false; if (lastWrittenTxnRecord) { auto it = TransactionHistoryIterator(lastWrittenTxnRecord->getLastWriteOpTime()); while (it.hasNext()) { const auto entry = it.next(opCtx); invariant(entry.getStatementId()); + + if (*entry.getStatementId() == kIncompleteHistoryStmtId) { + // Only the dead end sentinel can have this id for oplog write history. + invariant(entry.getObject2()); + invariant(entry.getObject2()->woCompare(kDeadEndSentinel) == 0); + hasIncompleteHistory = true; + continue; + } + const auto insertRes = activeTxnCommittedStatements.emplace(*entry.getStatementId(), entry.getOpTime()); if (!insertRes.second) { @@ -152,6 +164,8 @@ void Session::refreshFromStorageIfNeeded(OperationContext* opCtx) { ul.lock(); + _hasIncompleteHistory = hasIncompleteHistory; + // Protect against concurrent refreshes or invalidations if (!_isValid && _numInvalidations == numInvalidations) { _isValid = true; @@ -232,6 +246,7 @@ void Session::invalidate() { _activeTxnNumber = kUninitializedTxnNumber; _activeTxnCommittedStatements.clear(); + _hasIncompleteHistory = false; } repl::OpTime Session::getLastWriteOpTime(TxnNumber txnNumber) const { @@ -250,7 +265,17 @@ boost::optional<repl::OplogEntry> Session::checkStatementExecuted(OperationConte StmtId stmtId) const { const auto stmtTimestamp = [&] { stdx::lock_guard<stdx::mutex> lg(_mutex); - return _checkStatementExecuted(lg, txnNumber, stmtId); + auto result = _checkStatementExecuted(lg, txnNumber, stmtId); + + if (!result) { + uassert(ErrorCodes::IncompleteTransactionHistory, + str::stream() << "incomplete history detected for lsid: " << _sessionId.toBSON() + << ", txnNum: " + << txnNumber, + !_hasIncompleteHistory); + } + + return result; }(); if (!stmtTimestamp) @@ -289,6 +314,7 @@ void Session::_beginTxn(WithLock wl, TxnNumber txnNumber) { _activeTxnNumber = txnNumber; _activeTxnCommittedStatements.clear(); + _hasIncompleteHistory = false; } void Session::_checkValid(WithLock) const { @@ -316,8 +342,9 @@ boost::optional<repl::OpTime> Session::_checkStatementExecuted(WithLock wl, _checkIsActiveTransaction(wl, txnNumber); const auto it = _activeTxnCommittedStatements.find(stmtId); - if (it == _activeTxnCommittedStatements.end()) + if (it == _activeTxnCommittedStatements.end()) { return boost::none; + } invariant(_lastWrittenSessionRecord); invariant(_lastWrittenSessionRecord->getTxnNum() == txnNumber); @@ -396,6 +423,11 @@ void Session::_registerUpdateCacheOnCommit(OperationContext* opCtx, if (newTxnNumber == _activeTxnNumber) { for (const auto stmtId : stmtIdsWritten) { + if (stmtId == kIncompleteHistoryStmtId) { + _hasIncompleteHistory = true; + continue; + } + const auto insertRes = _activeTxnCommittedStatements.emplace(stmtId, lastStmtIdWriteOpTime); if (!insertRes.second) { |