/** * Copyright (C) 2017 MongoDB, Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kStorage #include "mongo/platform/basic.h" #include "mongo/db/session.h" #include "mongo/db/catalog/index_catalog.h" #include "mongo/db/concurrency/lock_state.h" #include "mongo/db/concurrency/locker.h" #include "mongo/db/concurrency/write_conflict_exception.h" #include "mongo/db/db_raii.h" #include "mongo/db/dbdirectclient.h" #include "mongo/db/index/index_access_method.h" #include "mongo/db/namespace_string.h" #include "mongo/db/operation_context.h" #include "mongo/db/ops/update.h" #include "mongo/db/query/get_executor.h" #include "mongo/db/retryable_writes_stats.h" #include "mongo/db/transaction_history_iterator.h" #include "mongo/stdx/memory.h" #include "mongo/transport/transport_layer.h" #include "mongo/util/fail_point_service.h" #include "mongo/util/log.h" #include "mongo/util/mongoutils/str.h" namespace mongo { namespace { void fassertOnRepeatedExecution(const LogicalSessionId& lsid, TxnNumber txnNumber, StmtId stmtId, const repl::OpTime& firstOpTime, const repl::OpTime& secondOpTime) { severe() << "Statement id " << stmtId << " from transaction [ " << lsid.toBSON() << ":" << txnNumber << " ] was committed once with opTime " << firstOpTime << " and a second time with opTime " << secondOpTime << ". This indicates possible data corruption or server bug and the process will be " "terminated."; fassertFailed(40526); } struct ActiveTransactionHistory { boost::optional lastTxnRecord; Session::CommittedStatementTimestampMap committedStatements; bool transactionCommitted{false}; bool hasIncompleteHistory{false}; }; ActiveTransactionHistory fetchActiveTransactionHistory(OperationContext* opCtx, const LogicalSessionId& lsid) { ActiveTransactionHistory result; result.lastTxnRecord = [&]() -> boost::optional { DBDirectClient client(opCtx); auto result = client.findOne(NamespaceString::kSessionTransactionsTableNamespace.ns(), {BSON(SessionTxnRecord::kSessionIdFieldName << lsid.toBSON())}); if (result.isEmpty()) { return boost::none; } return SessionTxnRecord::parse(IDLParserErrorContext("parse latest txn record for session"), result); }(); if (!result.lastTxnRecord) { return result; } auto it = TransactionHistoryIterator(result.lastTxnRecord->getLastWriteOpTime()); while (it.hasNext()) { try { const auto entry = it.next(opCtx); invariant(entry.getStatementId()); if (*entry.getStatementId() == kIncompleteHistoryStmtId) { // Only the dead end sentinel can have this id for oplog write history invariant(entry.getObject2()); invariant(entry.getObject2()->woCompare(Session::kDeadEndSentinel) == 0); result.hasIncompleteHistory = true; continue; } const auto insertRes = result.committedStatements.emplace(*entry.getStatementId(), entry.getOpTime()); if (!insertRes.second) { const auto& existingOpTime = insertRes.first->second; fassertOnRepeatedExecution(lsid, result.lastTxnRecord->getTxnNum(), *entry.getStatementId(), existingOpTime, entry.getOpTime()); } // applyOps oplog entry marks the commit of a transaction. if (entry.isCommand() && entry.getCommandType() == repl::OplogEntry::CommandType::kApplyOps) { result.transactionCommitted = true; } } catch (const DBException& ex) { if (ex.code() == ErrorCodes::IncompleteTransactionHistory) { result.hasIncompleteHistory = true; break; } throw; } } return result; } void updateSessionEntry(OperationContext* opCtx, const UpdateRequest& updateRequest) { // Current code only supports replacement update. dassert(UpdateDriver::isDocReplacement(updateRequest.getUpdates())); AutoGetCollection autoColl(opCtx, NamespaceString::kSessionTransactionsTableNamespace, MODE_IX); uassert(40527, str::stream() << "Unable to persist transaction state because the session transaction " "collection is missing. This indicates that the " << NamespaceString::kSessionTransactionsTableNamespace.ns() << " collection has been manually deleted.", autoColl.getCollection()); WriteUnitOfWork wuow(opCtx); auto collection = autoColl.getCollection(); auto idIndex = collection->getIndexCatalog()->findIdIndex(opCtx); uassert(40672, str::stream() << "Failed to fetch _id index for " << NamespaceString::kSessionTransactionsTableNamespace.ns(), idIndex); auto indexAccess = collection->getIndexCatalog()->getIndex(idIndex); // Since we are looking up a key inside the _id index, create a key object consisting of only // the _id field. auto idToFetch = updateRequest.getQuery().firstElement(); auto toUpdateIdDoc = idToFetch.wrap(); dassert(idToFetch.fieldNameStringData() == "_id"_sd); auto recordId = indexAccess->findSingle(opCtx, toUpdateIdDoc); auto startingSnapshotId = opCtx->recoveryUnit()->getSnapshotId(); if (recordId.isNull()) { // Upsert case. auto status = collection->insertDocument( opCtx, InsertStatement(updateRequest.getUpdates()), nullptr, false); if (status == ErrorCodes::DuplicateKey) { throw WriteConflictException(); } uassertStatusOK(status); wuow.commit(); return; } auto originalRecordData = collection->getRecordStore()->dataFor(opCtx, recordId); auto originalDoc = originalRecordData.toBson(); invariant(collection->getDefaultCollator() == nullptr); boost::intrusive_ptr expCtx(new ExpressionContext(opCtx, nullptr)); auto matcher = fassert(40673, MatchExpressionParser::parse(updateRequest.getQuery(), std::move(expCtx))); if (!matcher->matchesBSON(originalDoc)) { // Document no longer match what we expect so throw WCE to make the caller re-examine. throw WriteConflictException(); } OplogUpdateEntryArgs args; args.nss = NamespaceString::kSessionTransactionsTableNamespace; args.uuid = collection->uuid(); args.update = updateRequest.getUpdates(); args.criteria = toUpdateIdDoc; args.fromMigrate = false; collection->updateDocument(opCtx, recordId, Snapshotted(startingSnapshotId, originalDoc), updateRequest.getUpdates(), false, // indexesAffected = false because _id is the only index nullptr, &args); wuow.commit(); } // Failpoint which allows different failure actions to happen after each write. Supports the // parameters below, which can be combined with each other (unless explicitly disallowed): // // closeConnection (bool, default = true): Closes the connection on which the write was executed. // failBeforeCommitExceptionCode (int, default = not specified): If set, the specified exception // code will be thrown, which will cause the write to not commit; if not specified, the write // will be allowed to commit. MONGO_FAIL_POINT_DEFINE(onPrimaryTransactionalWrite); } // namespace const BSONObj Session::kDeadEndSentinel(BSON("$incompleteOplogHistory" << 1)); Session::Session(LogicalSessionId sessionId) : _sessionId(std::move(sessionId)) {} void Session::refreshFromStorageIfNeeded(OperationContext* opCtx) { if (opCtx->getClient()->isInDirectClient()) { return; } invariant(!opCtx->lockState()->isLocked()); invariant(repl::ReadConcernArgs::get(opCtx).getLevel() == repl::ReadConcernLevel::kLocalReadConcern); stdx::unique_lock ul(_mutex); while (!_isValid) { const int numInvalidations = _numInvalidations; ul.unlock(); auto activeTxnHistory = fetchActiveTransactionHistory(opCtx, _sessionId); ul.lock(); // Protect against concurrent refreshes or invalidations if (!_isValid && _numInvalidations == numInvalidations) { _isValid = true; _lastWrittenSessionRecord = std::move(activeTxnHistory.lastTxnRecord); if (_lastWrittenSessionRecord) { if (!_lastRefreshState) { _lastRefreshState.emplace(); } _lastRefreshState->refreshCount++; _activeTxnNumber = _lastWrittenSessionRecord->getTxnNum(); _lastRefreshState->txnNumber = _activeTxnNumber; _activeTxnCommittedStatements = std::move(activeTxnHistory.committedStatements); _hasIncompleteHistory = activeTxnHistory.hasIncompleteHistory; _lastRefreshState->isCommitted = activeTxnHistory.transactionCommitted; } break; } } } void Session::beginOrContinueTxn(OperationContext* opCtx, TxnNumber txnNumber) { stdx::lock_guard lg(_mutex); _beginOrContinueTxn(lg, txnNumber); } void Session::onWriteOpCompletedOnPrimary(OperationContext* opCtx, TxnNumber txnNumber, std::vector stmtIdsWritten, const repl::OpTime& lastStmtIdWriteOpTime, Date_t lastStmtIdWriteDate, boost::optional txnState) { invariant(opCtx->lockState()->inAWriteUnitOfWork()); stdx::unique_lock ul(_mutex); // Sanity check that we don't double-execute statements for (const auto stmtId : stmtIdsWritten) { const auto stmtOpTime = _checkStatementExecuted(ul, txnNumber, stmtId); if (stmtOpTime) { fassertOnRepeatedExecution( _sessionId, txnNumber, stmtId, *stmtOpTime, lastStmtIdWriteOpTime); } } const auto updateRequest = _makeUpdateRequest(ul, txnNumber, lastStmtIdWriteOpTime, lastStmtIdWriteDate, txnState); ul.unlock(); repl::UnreplicatedWritesBlock doNotReplicateWrites(opCtx); updateSessionEntry(opCtx, updateRequest); _registerUpdateCacheOnCommit( opCtx, txnNumber, std::move(stmtIdsWritten), lastStmtIdWriteOpTime); } bool Session::onMigrateBeginOnPrimary(OperationContext* opCtx, TxnNumber txnNumber, StmtId stmtId) { beginOrContinueTxn(opCtx, txnNumber); try { if (checkStatementExecuted(opCtx, txnNumber, stmtId)) { return false; } } catch (const DBException& ex) { // If the transaction chain was truncated on the recipient shard, then we // are most likely copying from a session that hasn't been touched on the // recipient shard for a very long time but could be recent on the donor. // We continue copying regardless to get the entire transaction from the donor. if (ex.code() != ErrorCodes::IncompleteTransactionHistory) { throw; } if (stmtId == kIncompleteHistoryStmtId) { return false; } } return true; } void Session::onMigrateCompletedOnPrimary(OperationContext* opCtx, TxnNumber txnNumber, std::vector stmtIdsWritten, const repl::OpTime& lastStmtIdWriteOpTime, Date_t oplogLastStmtIdWriteDate) { invariant(opCtx->lockState()->inAWriteUnitOfWork()); stdx::unique_lock ul(_mutex); _checkValid(ul); _checkIsActiveTransaction(ul, txnNumber); // If the transaction has a populated lastWriteDate, we will use that as the most up-to-date // value. Using the lastWriteDate from the oplog being migrated may move the lastWriteDate // back. However, in the case that the transaction doesn't have the lastWriteDate populated, // the oplog's value serves as a best-case fallback. const auto txnLastStmtIdWriteDate = _getLastWriteDate(ul, txnNumber); const auto updatedLastStmtIdWriteDate = txnLastStmtIdWriteDate == Date_t::min() ? oplogLastStmtIdWriteDate : txnLastStmtIdWriteDate; // We do not migrate transaction oplog entries. auto txnState = boost::none; const auto updateRequest = _makeUpdateRequest( ul, txnNumber, lastStmtIdWriteOpTime, updatedLastStmtIdWriteDate, txnState); ul.unlock(); repl::UnreplicatedWritesBlock doNotReplicateWrites(opCtx); updateSessionEntry(opCtx, updateRequest); _registerUpdateCacheOnCommit( opCtx, txnNumber, std::move(stmtIdsWritten), lastStmtIdWriteOpTime); } void Session::invalidate() { stdx::lock_guard lg(_mutex); if (_isTxnNumberLocked) { invariant(_txnNumberLockConflictStatus); uasserted(50908, str::stream() << "cannot invalidate session because txnNumber is locked: " << *_txnNumberLockConflictStatus); } _isValid = false; _numInvalidations++; _lastWrittenSessionRecord.reset(); _activeTxnNumber = kUninitializedTxnNumber; _activeTxnCommittedStatements.clear(); _hasIncompleteHistory = false; } repl::OpTime Session::getLastWriteOpTime(TxnNumber txnNumber) const { stdx::lock_guard lg(_mutex); _checkValid(lg); _checkIsActiveTransaction(lg, txnNumber); if (!_lastWrittenSessionRecord || _lastWrittenSessionRecord->getTxnNum() != txnNumber) return {}; return _lastWrittenSessionRecord->getLastWriteOpTime(); } boost::optional Session::checkStatementExecuted(OperationContext* opCtx, TxnNumber txnNumber, StmtId stmtId) const { const auto stmtTimestamp = [&] { stdx::lock_guard lg(_mutex); return _checkStatementExecuted(lg, txnNumber, stmtId); }(); if (!stmtTimestamp) return boost::none; TransactionHistoryIterator txnIter(*stmtTimestamp); while (txnIter.hasNext()) { const auto entry = txnIter.next(opCtx); invariant(entry.getStatementId()); if (*entry.getStatementId() == stmtId) return entry; } MONGO_UNREACHABLE; } bool Session::checkStatementExecutedNoOplogEntryFetch(TxnNumber txnNumber, StmtId stmtId) const { stdx::lock_guard lg(_mutex); return bool(_checkStatementExecuted(lg, txnNumber, stmtId)); } void Session::_beginOrContinueTxn(WithLock wl, TxnNumber txnNumber) { // Check whether the session information needs to be refreshed from disk. _checkValid(wl); // Check if the given transaction number is valid for this session. The transaction number must // be >= the active transaction number. _checkTxnValid(wl, txnNumber); // // Continue an active transaction. // if (txnNumber == _activeTxnNumber) { return; } invariant(txnNumber > _activeTxnNumber); _setActiveTxn(wl, txnNumber); } void Session::_checkTxnValid(WithLock, TxnNumber txnNumber) const { uassert(ErrorCodes::TransactionTooOld, str::stream() << "Cannot start transaction " << txnNumber << " on session " << getSessionId() << " because a newer transaction " << _activeTxnNumber << " has already started.", txnNumber >= _activeTxnNumber); } void Session::_setActiveTxn(WithLock wl, TxnNumber txnNumber) { if (_isTxnNumberLocked) { invariant(_txnNumberLockConflictStatus); uassertStatusOK(*_txnNumberLockConflictStatus); } _activeTxnNumber = txnNumber; _activeTxnCommittedStatements.clear(); _hasIncompleteHistory = false; } void Session::_checkValid(WithLock) const { uassert(ErrorCodes::ConflictingOperationInProgress, str::stream() << "Session " << getSessionId() << " was concurrently modified and the operation must be retried.", _isValid); } void Session::_checkIsActiveTransaction(WithLock, TxnNumber txnNumber) const { uassert(ErrorCodes::ConflictingOperationInProgress, str::stream() << "Cannot perform operations on transaction " << txnNumber << " on session " << getSessionId() << " because a different transaction " << _activeTxnNumber << " is now active.", txnNumber == _activeTxnNumber); } boost::optional Session::_checkStatementExecuted(WithLock wl, TxnNumber txnNumber, StmtId stmtId) const { _checkValid(wl); _checkIsActiveTransaction(wl, txnNumber); const auto it = _activeTxnCommittedStatements.find(stmtId); if (it == _activeTxnCommittedStatements.end()) { uassert(ErrorCodes::IncompleteTransactionHistory, str::stream() << "Incomplete history detected for transaction " << txnNumber << " on session " << _sessionId.toBSON(), !_hasIncompleteHistory); return boost::none; } invariant(_lastWrittenSessionRecord); invariant(_lastWrittenSessionRecord->getTxnNum() == txnNumber); return it->second; } Date_t Session::_getLastWriteDate(WithLock wl, TxnNumber txnNumber) const { _checkValid(wl); _checkIsActiveTransaction(wl, txnNumber); if (!_lastWrittenSessionRecord || _lastWrittenSessionRecord->getTxnNum() != txnNumber) return {}; return _lastWrittenSessionRecord->getLastWriteDate(); } UpdateRequest Session::_makeUpdateRequest(WithLock, TxnNumber newTxnNumber, const repl::OpTime& newLastWriteOpTime, Date_t newLastWriteDate, boost::optional newState) const { UpdateRequest updateRequest(NamespaceString::kSessionTransactionsTableNamespace); const auto updateBSON = [&] { SessionTxnRecord newTxnRecord; newTxnRecord.setSessionId(_sessionId); newTxnRecord.setTxnNum(newTxnNumber); newTxnRecord.setLastWriteOpTime(newLastWriteOpTime); newTxnRecord.setLastWriteDate(newLastWriteDate); newTxnRecord.setState(newState); return newTxnRecord.toBSON(); }(); updateRequest.setUpdates(updateBSON); updateRequest.setQuery(BSON(SessionTxnRecord::kSessionIdFieldName << _sessionId.toBSON())); updateRequest.setUpsert(true); return updateRequest; } void Session::_registerUpdateCacheOnCommit(OperationContext* opCtx, TxnNumber newTxnNumber, std::vector stmtIdsWritten, const repl::OpTime& lastStmtIdWriteOpTime) { opCtx->recoveryUnit()->onCommit( [ this, newTxnNumber, stmtIdsWritten = std::move(stmtIdsWritten), lastStmtIdWriteOpTime ]( boost::optional) { RetryableWritesStats::get(getGlobalServiceContext()) ->incrementTransactionsCollectionWriteCount(); stdx::lock_guard lg(_mutex); if (!_isValid) return; // The cache of the last written record must always be advanced after a write so that // subsequent writes have the correct point to start from. if (!_lastWrittenSessionRecord) { _lastWrittenSessionRecord.emplace(); _lastWrittenSessionRecord->setSessionId(_sessionId); _lastWrittenSessionRecord->setTxnNum(newTxnNumber); _lastWrittenSessionRecord->setLastWriteOpTime(lastStmtIdWriteOpTime); } else { if (newTxnNumber > _lastWrittenSessionRecord->getTxnNum()) _lastWrittenSessionRecord->setTxnNum(newTxnNumber); if (lastStmtIdWriteOpTime > _lastWrittenSessionRecord->getLastWriteOpTime()) _lastWrittenSessionRecord->setLastWriteOpTime(lastStmtIdWriteOpTime); } if (newTxnNumber > _activeTxnNumber) { // This call is necessary in order to advance the txn number and reset the cached // state in the case where just before the storage transaction commits, the cache // entry gets invalidated and immediately refreshed while there were no writes for // newTxnNumber yet. In this case _activeTxnNumber will be less than newTxnNumber // and we will fail to update the cache even though the write was successful. _beginOrContinueTxn(lg, newTxnNumber); } if (newTxnNumber == _activeTxnNumber) { for (const auto stmtId : stmtIdsWritten) { if (stmtId == kIncompleteHistoryStmtId) { _hasIncompleteHistory = true; continue; } const auto insertRes = _activeTxnCommittedStatements.emplace(stmtId, lastStmtIdWriteOpTime); if (!insertRes.second) { const auto& existingOpTime = insertRes.first->second; fassertOnRepeatedExecution(_sessionId, newTxnNumber, stmtId, existingOpTime, lastStmtIdWriteOpTime); } } } }); MONGO_FAIL_POINT_BLOCK(onPrimaryTransactionalWrite, customArgs) { const auto& data = customArgs.getData(); const auto closeConnectionElem = data["closeConnection"]; if (closeConnectionElem.eoo() || closeConnectionElem.Bool()) { opCtx->getClient()->session()->end(); } const auto failBeforeCommitExceptionElem = data["failBeforeCommitExceptionCode"]; if (!failBeforeCommitExceptionElem.eoo()) { const auto failureCode = ErrorCodes::Error(int(failBeforeCommitExceptionElem.Number())); uasserted(failureCode, str::stream() << "Failing write for " << _sessionId << ":" << newTxnNumber << " due to failpoint. The write must not be reflected."); } } } boost::optional Session::createMatchingTransactionTableUpdate( const repl::OplogEntry& entry) { auto sessionInfo = entry.getOperationSessionInfo(); if (!sessionInfo.getTxnNumber()) { return boost::none; } invariant(sessionInfo.getSessionId()); invariant(entry.getWallClockTime()); const auto updateBSON = [&] { SessionTxnRecord newTxnRecord; newTxnRecord.setSessionId(*sessionInfo.getSessionId()); newTxnRecord.setTxnNum(*sessionInfo.getTxnNumber()); newTxnRecord.setLastWriteOpTime(entry.getOpTime()); newTxnRecord.setLastWriteDate(*entry.getWallClockTime()); if (entry.isCommand() && entry.getCommandType() == repl::OplogEntry::CommandType::kApplyOps) { newTxnRecord.setState(entry.shouldPrepare() ? DurableTxnStateEnum::kPrepared : DurableTxnStateEnum::kCommitted); } return newTxnRecord.toBSON(); }(); return repl::OplogEntry( entry.getOpTime(), 0, // hash repl::OpTypeEnum::kUpdate, NamespaceString::kSessionTransactionsTableNamespace, boost::none, // uuid false, // fromMigrate repl::OplogEntry::kOplogVersion, updateBSON, BSON(SessionTxnRecord::kSessionIdFieldName << sessionInfo.getSessionId()->toBSON()), {}, // sessionInfo true, // upsert *entry.getWallClockTime(), boost::none, // statementId boost::none, // prevWriteOpTime boost::none, // preImangeOpTime boost::none // postImageOpTime ); } boost::optional Session::getLastRefreshState() const { stdx::lock_guard lg(_mutex); return _lastRefreshState; } void Session::lockTxnNumber(const TxnNumber lockThisNumber, Status conflictError) { stdx::lock_guard lg(_mutex); uassert(50907, str::stream() << "cannot lock txnNumber to " << lockThisNumber << " because current txnNumber is " << _activeTxnNumber, _activeTxnNumber == lockThisNumber); // TODO: remove this if we need to support recursive locking. invariant(!_isTxnNumberLocked); _isTxnNumberLocked = true; _txnNumberLockConflictStatus = conflictError; } void Session::unlockTxnNumber() { stdx::lock_guard lg(_mutex); _isTxnNumberLocked = false; _txnNumberLockConflictStatus = boost::none; } } // namespace mongo