/** * Copyright (C) 2017 MongoDB, Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kStorage #include "mongo/platform/basic.h" #include "mongo/db/session.h" #include #include "mongo/db/catalog/index_catalog.h" #include "mongo/db/concurrency/lock_state.h" #include "mongo/db/concurrency/write_conflict_exception.h" #include "mongo/db/db_raii.h" #include "mongo/db/dbdirectclient.h" #include "mongo/db/index/index_access_method.h" #include "mongo/db/namespace_string.h" #include "mongo/db/operation_context.h" #include "mongo/db/ops/update.h" #include "mongo/db/query/get_executor.h" #include "mongo/db/repl/read_concern_args.h" #include "mongo/db/retryable_writes_stats.h" #include "mongo/db/transaction_history_iterator.h" #include "mongo/stdx/memory.h" #include "mongo/transport/transport_layer.h" #include "mongo/util/fail_point_service.h" #include "mongo/util/log.h" #include "mongo/util/mongoutils/str.h" namespace mongo { namespace { void fassertOnRepeatedExecution(const LogicalSessionId& lsid, TxnNumber txnNumber, StmtId stmtId, const repl::OpTime& firstOpTime, const repl::OpTime& secondOpTime) { severe() << "Statement id " << stmtId << " from transaction [ " << lsid.toBSON() << ":" << txnNumber << " ] was committed once with opTime " << firstOpTime << " and a second time with opTime " << secondOpTime << ". This indicates possible data corruption or server bug and the process will be " "terminated."; fassertFailed(40526); } struct ActiveTransactionHistory { boost::optional lastTxnRecord; Session::CommittedStatementTimestampMap committedStatements; bool hasIncompleteHistory{false}; }; ActiveTransactionHistory fetchActiveTransactionHistory(OperationContext* opCtx, const LogicalSessionId& lsid) { ActiveTransactionHistory result; result.lastTxnRecord = [&]() -> boost::optional { DBDirectClient client(opCtx); auto result = client.findOne(NamespaceString::kSessionTransactionsTableNamespace.ns(), {BSON(SessionTxnRecord::kSessionIdFieldName << lsid.toBSON())}); if (result.isEmpty()) { return boost::none; } return SessionTxnRecord::parse(IDLParserErrorContext("parse latest txn record for session"), result); }(); if (!result.lastTxnRecord) { return result; } auto it = TransactionHistoryIterator(result.lastTxnRecord->getLastWriteOpTime()); while (it.hasNext()) { try { const auto entry = it.next(opCtx); invariant(entry.getStatementId()); if (*entry.getStatementId() == kIncompleteHistoryStmtId) { // Only the dead end sentinel can have this id for oplog write history invariant(entry.getObject2()); invariant(entry.getObject2()->woCompare(Session::kDeadEndSentinel) == 0); result.hasIncompleteHistory = true; continue; } const auto insertRes = result.committedStatements.emplace(*entry.getStatementId(), entry.getOpTime()); if (!insertRes.second) { const auto& existingOpTime = insertRes.first->second; fassertOnRepeatedExecution(lsid, result.lastTxnRecord->getTxnNum(), *entry.getStatementId(), existingOpTime, entry.getOpTime()); } } catch (const DBException& ex) { if (ex.code() == ErrorCodes::IncompleteTransactionHistory) { result.hasIncompleteHistory = true; break; } throw; } } return result; } void updateSessionEntry(OperationContext* opCtx, const UpdateRequest& updateRequest) { // Current code only supports replacement update. dassert(UpdateDriver::isDocReplacement(updateRequest.getUpdates())); AutoGetCollection autoColl(opCtx, NamespaceString::kSessionTransactionsTableNamespace, MODE_IX); uassert(40527, str::stream() << "Unable to persist transaction state because the session transaction " "collection is missing. This indicates that the " << NamespaceString::kSessionTransactionsTableNamespace.ns() << " collection has been manually deleted.", autoColl.getCollection()); WriteUnitOfWork wuow(opCtx); auto collection = autoColl.getCollection(); auto idIndex = collection->getIndexCatalog()->findIdIndex(opCtx); uassert(40672, str::stream() << "Failed to fetch _id index for " << NamespaceString::kSessionTransactionsTableNamespace.ns(), idIndex); auto indexAccess = collection->getIndexCatalog()->getIndex(idIndex); // Since we are looking up a key inside the _id index, create a key object consisting of only // the _id field. auto idToFetch = updateRequest.getQuery().firstElement(); auto toUpdateIdDoc = idToFetch.wrap(); dassert(idToFetch.fieldNameStringData() == "_id"_sd); auto recordId = indexAccess->findSingle(opCtx, toUpdateIdDoc); auto startingSnapshotId = opCtx->recoveryUnit()->getSnapshotId(); if (recordId.isNull()) { // Upsert case. auto status = collection->insertDocument( opCtx, InsertStatement(updateRequest.getUpdates()), nullptr, true, false); if (status == ErrorCodes::DuplicateKey) { throw WriteConflictException(); } uassertStatusOK(status); wuow.commit(); return; } auto originalRecordData = collection->getRecordStore()->dataFor(opCtx, recordId); auto originalDoc = originalRecordData.toBson(); invariant(collection->getDefaultCollator() == nullptr); boost::intrusive_ptr expCtx(new ExpressionContext(opCtx, nullptr)); auto matcher = fassert(40673, MatchExpressionParser::parse(updateRequest.getQuery(), std::move(expCtx))); if (!matcher->matchesBSON(originalDoc)) { // Document no longer match what we expect so throw WCE to make the caller re-examine. throw WriteConflictException(); } OplogUpdateEntryArgs args; args.nss = NamespaceString::kSessionTransactionsTableNamespace; args.uuid = collection->uuid(); args.update = updateRequest.getUpdates(); args.criteria = toUpdateIdDoc; args.fromMigrate = false; collection->updateDocument(opCtx, recordId, Snapshotted(startingSnapshotId, originalDoc), updateRequest.getUpdates(), true, // enforceQuota false, // indexesAffected = false because _id is the only index nullptr, &args); wuow.commit(); } /** * Returns a new oplog entry if the given entry has transaction state embedded within in. * The new oplog entry will contain the operation needed to replicate the transaction * table. * Returns boost::none if the given oplog doesn't have any transaction state or does not * support update to the transaction table. */ boost::optional createMatchingTransactionTableUpdate( const repl::OplogEntry& entry) { auto sessionInfo = entry.getOperationSessionInfo(); if (!sessionInfo.getTxnNumber()) { return boost::none; } // Do not write session table entries for applyOps, as multi-document transactions // and retryable writes do not work together. // TODO(SERVER-33501): Make multi-docunment transactions work with retryable writes. if (entry.isCommand() && entry.getCommandType() == repl::OplogEntry::CommandType::kApplyOps) { return boost::none; } invariant(sessionInfo.getSessionId()); invariant(entry.getWallClockTime()); const auto updateBSON = [&] { SessionTxnRecord newTxnRecord; newTxnRecord.setSessionId(*sessionInfo.getSessionId()); newTxnRecord.setTxnNum(*sessionInfo.getTxnNumber()); newTxnRecord.setLastWriteOpTime(entry.getOpTime()); newTxnRecord.setLastWriteDate(*entry.getWallClockTime()); return newTxnRecord.toBSON(); }(); return repl::OplogEntry( entry.getOpTime(), 0, // hash repl::OpTypeEnum::kUpdate, NamespaceString::kSessionTransactionsTableNamespace, boost::none, // uuid false, // fromMigrate repl::OplogEntry::kOplogVersion, updateBSON, BSON(SessionTxnRecord::kSessionIdFieldName << sessionInfo.getSessionId()->toBSON()), {}, // sessionInfo true, // upsert *entry.getWallClockTime(), boost::none, // statementId boost::none, // prevWriteOpTime boost::none, // preImangeOpTime boost::none // postImageOpTime ); } // Failpoint which allows different failure actions to happen after each write. Supports the // parameters below, which can be combined with each other (unless explicitly disallowed): // // closeConnection (bool, default = true): Closes the connection on which the write was executed. // failBeforeCommitExceptionCode (int, default = not specified): If set, the specified exception // code will be thrown, which will cause the write to not commit; if not specified, the write // will be allowed to commit. MONGO_FP_DECLARE(onPrimaryTransactionalWrite); } // namespace const BSONObj Session::kDeadEndSentinel(BSON("$incompleteOplogHistory" << 1)); Session::Session(LogicalSessionId sessionId) : _sessionId(std::move(sessionId)) {} void Session::refreshFromStorageIfNeeded(OperationContext* opCtx) { invariant(!opCtx->lockState()->isLocked()); invariant(repl::ReadConcernArgs::get(opCtx).getLevel() == repl::ReadConcernLevel::kLocalReadConcern); stdx::unique_lock ul(_mutex); while (!_isValid) { const int numInvalidations = _numInvalidations; ul.unlock(); auto activeTxnHistory = fetchActiveTransactionHistory(opCtx, _sessionId); ul.lock(); // Protect against concurrent refreshes or invalidations if (!_isValid && _numInvalidations == numInvalidations) { _isValid = true; _lastWrittenSessionRecord = std::move(activeTxnHistory.lastTxnRecord); if (_lastWrittenSessionRecord) { _activeTxnNumber = _lastWrittenSessionRecord->getTxnNum(); _activeTxnCommittedStatements = std::move(activeTxnHistory.committedStatements); _hasIncompleteHistory = activeTxnHistory.hasIncompleteHistory; } break; } } } void Session::beginOrContinueTxn(OperationContext* opCtx, TxnNumber txnNumber, boost::optional autocommit) { invariant(!opCtx->lockState()->isLocked()); stdx::lock_guard lg(_mutex); _beginOrContinueTxn(lg, txnNumber, autocommit); } void Session::beginOrContinueTxnOnMigration(OperationContext* opCtx, TxnNumber txnNumber) { invariant(!opCtx->lockState()->isLocked()); stdx::lock_guard lg(_mutex); _beginOrContinueTxnOnMigration(lg, txnNumber); } void Session::onWriteOpCompletedOnPrimary(OperationContext* opCtx, TxnNumber txnNumber, std::vector stmtIdsWritten, const repl::OpTime& lastStmtIdWriteOpTime, Date_t lastStmtIdWriteDate) { invariant(opCtx->lockState()->inAWriteUnitOfWork()); stdx::unique_lock ul(_mutex); // Multi-document transactions currently do not write to the transaction table. // TODO(SERVER-32323): Update transaction table appropriately when a transaction commits. if (!_autocommit) return; // Sanity check that we don't double-execute statements for (const auto stmtId : stmtIdsWritten) { const auto stmtOpTime = _checkStatementExecuted(ul, txnNumber, stmtId); if (stmtOpTime) { fassertOnRepeatedExecution( _sessionId, txnNumber, stmtId, *stmtOpTime, lastStmtIdWriteOpTime); } } const auto updateRequest = _makeUpdateRequest(ul, txnNumber, lastStmtIdWriteOpTime, lastStmtIdWriteDate); ul.unlock(); repl::UnreplicatedWritesBlock doNotReplicateWrites(opCtx); updateSessionEntry(opCtx, updateRequest); _registerUpdateCacheOnCommit( opCtx, txnNumber, std::move(stmtIdsWritten), lastStmtIdWriteOpTime); } bool Session::onMigrateBeginOnPrimary(OperationContext* opCtx, TxnNumber txnNumber, StmtId stmtId) { beginOrContinueTxnOnMigration(opCtx, txnNumber); try { if (checkStatementExecuted(opCtx, txnNumber, stmtId)) { return false; } } catch (const DBException& ex) { // If the transaction chain was truncated on the recipient shard, then we // are most likely copying from a session that hasn't been touched on the // recipient shard for a very long time but could be recent on the donor. // We continue copying regardless to get the entire transaction from the donor. if (ex.code() != ErrorCodes::IncompleteTransactionHistory) { throw; } if (stmtId == kIncompleteHistoryStmtId) { return false; } } return true; } void Session::onMigrateCompletedOnPrimary(OperationContext* opCtx, TxnNumber txnNumber, std::vector stmtIdsWritten, const repl::OpTime& lastStmtIdWriteOpTime, Date_t lastStmtIdWriteDate) { invariant(opCtx->lockState()->inAWriteUnitOfWork()); stdx::unique_lock ul(_mutex); _checkValid(ul); _checkIsActiveTransaction(ul, txnNumber); const auto updateRequest = _makeUpdateRequest(ul, txnNumber, lastStmtIdWriteOpTime, lastStmtIdWriteDate); ul.unlock(); repl::UnreplicatedWritesBlock doNotReplicateWrites(opCtx); updateSessionEntry(opCtx, updateRequest); _registerUpdateCacheOnCommit( opCtx, txnNumber, std::move(stmtIdsWritten), lastStmtIdWriteOpTime); } void Session::invalidate() { stdx::lock_guard lg(_mutex); _isValid = false; _numInvalidations++; _lastWrittenSessionRecord.reset(); _activeTxnNumber = kUninitializedTxnNumber; _activeTxnCommittedStatements.clear(); _hasIncompleteHistory = false; } repl::OpTime Session::getLastWriteOpTime(TxnNumber txnNumber) const { stdx::lock_guard lg(_mutex); _checkValid(lg); _checkIsActiveTransaction(lg, txnNumber); if (!_lastWrittenSessionRecord || _lastWrittenSessionRecord->getTxnNum() != txnNumber) return {}; return _lastWrittenSessionRecord->getLastWriteOpTime(); } boost::optional Session::checkStatementExecuted(OperationContext* opCtx, TxnNumber txnNumber, StmtId stmtId) const { const auto stmtTimestamp = [&] { stdx::lock_guard lg(_mutex); return _checkStatementExecuted(lg, txnNumber, stmtId); }(); if (!stmtTimestamp) return boost::none; TransactionHistoryIterator txnIter(*stmtTimestamp); while (txnIter.hasNext()) { const auto entry = txnIter.next(opCtx); invariant(entry.getStatementId()); if (*entry.getStatementId() == stmtId) return entry; } MONGO_UNREACHABLE; } bool Session::checkStatementExecutedNoOplogEntryFetch(TxnNumber txnNumber, StmtId stmtId) const { stdx::lock_guard lg(_mutex); return bool(_checkStatementExecuted(lg, txnNumber, stmtId)); } void Session::_beginOrContinueTxn(WithLock wl, TxnNumber txnNumber, boost::optional autocommit) { _checkValid(wl); _checkTxnValid(wl, txnNumber); if (txnNumber == _activeTxnNumber) { // Continuing an existing transaction. uassert(ErrorCodes::IllegalOperation, "Specifying 'autocommit' is only allowed at the beginning of a transaction", autocommit == boost::none); return; } // Start a new transaction with an autocommit field _setActiveTxn(wl, txnNumber); _autocommit = (autocommit != boost::none) ? *autocommit : true; // autocommit defaults to true _txnState = _autocommit ? MultiDocumentTransactionState::kNone : MultiDocumentTransactionState::kInProgress; invariant(_transactionOperations.empty()); } void Session::_checkTxnValid(WithLock, TxnNumber txnNumber) const { uassert(ErrorCodes::TransactionTooOld, str::stream() << "Cannot start transaction " << txnNumber << " on session " << getSessionId() << " because a newer transaction " << _activeTxnNumber << " has already started.", txnNumber >= _activeTxnNumber); // TODO(SERVER-33432): Auto-abort an old transaction when a new one starts instead of asserting. uassert(40691, str::stream() << "Cannot start transaction " << txnNumber << " on session " << getSessionId() << " because a multi-document transaction " << _activeTxnNumber << " is in progress.", txnNumber == _activeTxnNumber || (_transactionOperations.empty() && _txnState != MultiDocumentTransactionState::kCommitting)); } Session::TxnResources::TxnResources(OperationContext* opCtx) { opCtx->getWriteUnitOfWork()->release(); opCtx->setWriteUnitOfWork(nullptr); _locker = opCtx->swapLockState(stdx::make_unique()); _locker->releaseTicket(); _recoveryUnit = std::unique_ptr(opCtx->releaseRecoveryUnit()); opCtx->setRecoveryUnit(opCtx->getServiceContext()->getGlobalStorageEngine()->newRecoveryUnit(), OperationContext::kNotInUnitOfWork); _readConcernArgs = repl::ReadConcernArgs::get(opCtx); } Session::TxnResources::~TxnResources() { if (!_released) { _recoveryUnit->abortUnitOfWork(); _locker->endWriteUnitOfWork(); } } void Session::TxnResources::release(OperationContext* opCtx) { // Perform operations that can fail the release before marking the TxnResources as released. auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx); uassert(ErrorCodes::InvalidOptions, "Only the first command in a transaction may specify a readConcern", readConcernArgs.isEmpty()); _locker->reacquireTicket(opCtx); invariant(!_released); _released = true; // We intentionally do not capture the return value of swapLockState(), which is just an empty // locker. At the end of the operation, if the transaction is not complete, we will stash the // operation context's locker and replace it with a new empty locker. invariant(opCtx->lockState()->getClientState() == Locker::ClientState::kInactive); opCtx->swapLockState(std::move(_locker)); opCtx->setRecoveryUnit(_recoveryUnit.release(), OperationContext::RecoveryUnitState::kNotInUnitOfWork); opCtx->setWriteUnitOfWork(WriteUnitOfWork::createForSnapshotResume(opCtx)); // 'readConcernArgs' is a mutable reference to the ReadConcernArgs decoration on opCtx. readConcernArgs = _readConcernArgs; } void Session::stashTransactionResources(OperationContext* opCtx) { // We must lock the Client to change the Locker on the OperationContext and the Session mutex to // access Session state. We must lock the Client before the Session mutex, since the Client // effectively owns the Session. That is, a user might lock the Client to ensure it doesn't go // away, and then lock the Session owned by that client. We rely on the fact that we are not // using the DefaultLockerImpl to avoid deadlock. invariant(!isMMAPV1()); stdx::lock_guard lk(*opCtx->getClient()); stdx::lock_guard lg(_mutex); invariant(opCtx->hasStashedCursor() || !_autocommit); if (*opCtx->getTxnNumber() != _activeTxnNumber) { // The session is checked out, so _activeTxnNumber cannot advance due to a user operation. // However, when a chunk is migrated, session and transaction information is copied from the // donor shard to the recipient. This occurs outside of the check-out mechanism and can lead // to a higher _activeTxnNumber during the lifetime of a checkout. If that occurs, we abort // the current transaction. Note that it would indicate a user bug to have a newer // transaction on one shard while an older transaction is still active on another shard. uasserted(ErrorCodes::TransactionAborted, str::stream() << "Transaction aborted. Active txnNumber is now " << _activeTxnNumber); } invariant(!_txnResourceStash); _txnResourceStash = boost::in_place(opCtx); } void Session::unstashTransactionResources(OperationContext* opCtx) { // If the storage engine is mmapv1, it is not safe to lock both the Client and the Session // mutex. This is fine because mmapv1 does not support transactions. if (isMMAPV1()) { return; } // We must lock the Client to change the Locker on the OperationContext and the Session mutex to // access Session state. We must lock the Client before the Session mutex, since the Client // effectively owns the Session. That is, a user might lock the Client to ensure it doesn't go // away, and then lock the Session owned by that client. stdx::lock_guard lk(*opCtx->getClient()); stdx::lock_guard lg(_mutex); if (opCtx->getTxnNumber() < _activeTxnNumber) { // The session is checked out, so _activeTxnNumber cannot advance due to a user operation. // However, when a chunk is migrated, session and transaction information is copied from the // donor shard to the recipient. This occurs outside of the check-out mechanism and can lead // to a higher _activeTxnNumber during the lifetime of a checkout. If that occurs, we abort // the current transaction. Note that it would indicate a user bug to have a newer // transaction on one shard while an older transaction is still active on another shard. _releaseStashedTransactionResources(lg, opCtx); uasserted(ErrorCodes::TransactionAborted, str::stream() << "Transaction aborted. Active txnNumber is now " << _activeTxnNumber); return; } if (_txnResourceStash) { _txnResourceStash->release(opCtx); _txnResourceStash = boost::none; } else { auto readConcernArgs = repl::ReadConcernArgs::get(opCtx); if (readConcernArgs.getLevel() == repl::ReadConcernLevel::kSnapshotReadConcern || _txnState == MultiDocumentTransactionState::kInProgress) { opCtx->setWriteUnitOfWork(std::make_unique(opCtx)); } } } void Session::abortIfSnapshotRead(OperationContext* opCtx, TxnNumber txnNumber) { stdx::lock_guard lg(_mutex); if (_activeTxnNumber == txnNumber && _autocommit) { _releaseStashedTransactionResources(lg, opCtx); } } void Session::_releaseStashedTransactionResources(WithLock wl, OperationContext* opCtx) { if (opCtx->getWriteUnitOfWork()) { opCtx->setWriteUnitOfWork(nullptr); } _txnResourceStash = boost::none; } void Session::_beginOrContinueTxnOnMigration(WithLock wl, TxnNumber txnNumber) { _checkValid(wl); _checkTxnValid(wl, txnNumber); // Check for continuing an existing transaction if (txnNumber == _activeTxnNumber) return; _setActiveTxn(wl, txnNumber); } void Session::_setActiveTxn(WithLock, TxnNumber txnNumber) { _activeTxnNumber = txnNumber; _activeTxnCommittedStatements.clear(); _hasIncompleteHistory = false; } void Session::addTransactionOperation(OperationContext* opCtx, const repl::ReplOperation& operation) { stdx::lock_guard lk(_mutex); invariant(_txnState == MultiDocumentTransactionState::kInProgress); invariant(!_autocommit && _activeTxnNumber != kUninitializedTxnNumber); invariant(opCtx->lockState()->inAWriteUnitOfWork()); if (_transactionOperations.empty()) { auto txnNumberCompleting = _activeTxnNumber; opCtx->recoveryUnit()->onRollback([this, txnNumberCompleting] { stdx::lock_guard lk(_mutex); invariant(_activeTxnNumber == txnNumberCompleting); invariant(_txnState != MultiDocumentTransactionState::kCommitted); _transactionOperations.clear(); _txnState = MultiDocumentTransactionState::kAborted; }); opCtx->recoveryUnit()->onCommit([this, txnNumberCompleting] { stdx::lock_guard lk(_mutex); invariant(_activeTxnNumber == txnNumberCompleting); invariant(_txnState == MultiDocumentTransactionState::kCommitting || _txnState == MultiDocumentTransactionState::kCommitted); _txnState = MultiDocumentTransactionState::kCommitted; }); } _transactionOperations.push_back(operation); } std::vector Session::endTransactionAndRetrieveOperations() { stdx::lock_guard lk(_mutex); invariant(!_autocommit); invariant(_txnState == MultiDocumentTransactionState::kInProgress); // If _transactionOperations is empty, we will not see a commit because the write unit // of work is empty. _txnState = _transactionOperations.empty() ? MultiDocumentTransactionState::kCommitted : MultiDocumentTransactionState::kCommitting; return std::move(_transactionOperations); } void Session::_checkValid(WithLock) const { uassert(ErrorCodes::ConflictingOperationInProgress, str::stream() << "Session " << getSessionId() << " was concurrently modified and the operation must be retried.", _isValid); } void Session::_checkIsActiveTransaction(WithLock, TxnNumber txnNumber) const { uassert(ErrorCodes::ConflictingOperationInProgress, str::stream() << "Cannot perform retryability check for transaction " << txnNumber << " on session " << getSessionId() << " because a different transaction " << _activeTxnNumber << " is now active.", txnNumber == _activeTxnNumber); } boost::optional Session::_checkStatementExecuted(WithLock wl, TxnNumber txnNumber, StmtId stmtId) const { _checkValid(wl); _checkIsActiveTransaction(wl, txnNumber); const auto it = _activeTxnCommittedStatements.find(stmtId); if (it == _activeTxnCommittedStatements.end()) { uassert(ErrorCodes::IncompleteTransactionHistory, str::stream() << "Incomplete history detected for transaction " << txnNumber << " on session " << _sessionId.toBSON(), !_hasIncompleteHistory); return boost::none; } invariant(_lastWrittenSessionRecord); invariant(_lastWrittenSessionRecord->getTxnNum() == txnNumber); return it->second; } UpdateRequest Session::_makeUpdateRequest(WithLock, TxnNumber newTxnNumber, const repl::OpTime& newLastWriteOpTime, Date_t newLastWriteDate) const { UpdateRequest updateRequest(NamespaceString::kSessionTransactionsTableNamespace); const auto updateBSON = [&] { SessionTxnRecord newTxnRecord; newTxnRecord.setSessionId(_sessionId); newTxnRecord.setTxnNum(newTxnNumber); newTxnRecord.setLastWriteOpTime(newLastWriteOpTime); newTxnRecord.setLastWriteDate(newLastWriteDate); return newTxnRecord.toBSON(); }(); updateRequest.setUpdates(updateBSON); updateRequest.setQuery(BSON(SessionTxnRecord::kSessionIdFieldName << _sessionId.toBSON())); updateRequest.setUpsert(true); return updateRequest; } void Session::_registerUpdateCacheOnCommit(OperationContext* opCtx, TxnNumber newTxnNumber, std::vector stmtIdsWritten, const repl::OpTime& lastStmtIdWriteOpTime) { opCtx->recoveryUnit()->onCommit( [ this, newTxnNumber, stmtIdsWritten = std::move(stmtIdsWritten), lastStmtIdWriteOpTime ] { RetryableWritesStats::get(getGlobalServiceContext()) ->incrementTransactionsCollectionWriteCount(); stdx::lock_guard lg(_mutex); if (!_isValid) return; // The cache of the last written record must always be advanced after a write so that // subsequent writes have the correct point to start from. if (!_lastWrittenSessionRecord) { _lastWrittenSessionRecord.emplace(); _lastWrittenSessionRecord->setSessionId(_sessionId); _lastWrittenSessionRecord->setTxnNum(newTxnNumber); _lastWrittenSessionRecord->setLastWriteOpTime(lastStmtIdWriteOpTime); } else { if (newTxnNumber > _lastWrittenSessionRecord->getTxnNum()) _lastWrittenSessionRecord->setTxnNum(newTxnNumber); if (lastStmtIdWriteOpTime > _lastWrittenSessionRecord->getLastWriteOpTime()) _lastWrittenSessionRecord->setLastWriteOpTime(lastStmtIdWriteOpTime); } if (newTxnNumber > _activeTxnNumber) { // This call is necessary in order to advance the txn number and reset the cached // state in the case where just before the storage transaction commits, the cache // entry gets invalidated and immediately refreshed while there were no writes for // newTxnNumber yet. In this case _activeTxnNumber will be less than newTxnNumber // and we will fail to update the cache even though the write was successful. _beginOrContinueTxn(lg, newTxnNumber, boost::none); } if (newTxnNumber == _activeTxnNumber) { for (const auto stmtId : stmtIdsWritten) { if (stmtId == kIncompleteHistoryStmtId) { _hasIncompleteHistory = true; continue; } const auto insertRes = _activeTxnCommittedStatements.emplace(stmtId, lastStmtIdWriteOpTime); if (!insertRes.second) { const auto& existingOpTime = insertRes.first->second; fassertOnRepeatedExecution(_sessionId, newTxnNumber, stmtId, existingOpTime, lastStmtIdWriteOpTime); } } } }); MONGO_FAIL_POINT_BLOCK(onPrimaryTransactionalWrite, customArgs) { const auto& data = customArgs.getData(); const auto closeConnectionElem = data["closeConnection"]; if (closeConnectionElem.eoo() || closeConnectionElem.Bool()) { opCtx->getClient()->session()->end(); } const auto failBeforeCommitExceptionElem = data["failBeforeCommitExceptionCode"]; if (!failBeforeCommitExceptionElem.eoo()) { const auto failureCode = ErrorCodes::Error(int(failBeforeCommitExceptionElem.Number())); uasserted(failureCode, str::stream() << "Failing write for " << _sessionId << ":" << newTxnNumber << " due to failpoint. The write must not be reflected."); } } } std::vector Session::addOpsForReplicatingTxnTable( const std::vector& ops) { std::vector newOps; for (auto&& op : ops) { newOps.push_back(op); if (auto updateTxnTableOp = createMatchingTransactionTableUpdate(op)) { newOps.push_back(*updateTxnTableOp); } } return newOps; } } // namespace mongo