/** * Copyright (C) 2017 MongoDB, Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kStorage #include "mongo/platform/basic.h" #include "mongo/db/session.h" #include #include "mongo/db/concurrency/write_conflict_exception.h" #include "mongo/db/db_raii.h" #include "mongo/db/dbdirectclient.h" #include "mongo/db/namespace_string.h" #include "mongo/db/operation_context.h" #include "mongo/db/ops/update.h" #include "mongo/db/query/get_executor.h" #include "mongo/stdx/memory.h" #include "mongo/util/log.h" #include "mongo/util/mongoutils/str.h" namespace mongo { namespace { boost::optional loadSessionRecord(OperationContext* opCtx, const LogicalSessionId& sessionId) { DBDirectClient client(opCtx); Query sessionQuery(BSON(SessionTxnRecord::kSessionIdFieldName << sessionId.toBSON())); auto result = client.findOne(NamespaceString::kSessionTransactionsTableNamespace.ns(), sessionQuery); if (result.isEmpty()) { return boost::none; } IDLParserErrorContext ctx("parse latest txn record for session"); return SessionTxnRecord::parse(ctx, result); } /** * Update the txnNum of the session record. Will create a new entry if the record with * corresponding sessionId does not exist. */ void updateSessionRecordTxnNum(OperationContext* opCtx, const LogicalSessionId& sessionId, const TxnNumber& txnNum) { repl::UnreplicatedWritesBlock doNotReplicateWrites(opCtx); Timestamp zeroTs; AutoGetCollection autoColl(opCtx, NamespaceString::kSessionTransactionsTableNamespace, MODE_IX); uassert(40526, str::stream() << "Unable to persist transaction state because the session transaction " "collection is missing. This indicates that the " << NamespaceString::kSessionTransactionsTableNamespace.ns() << " collection has been manually deleted.", autoColl.getCollection() != nullptr); UpdateRequest updateRequest(NamespaceString::kSessionTransactionsTableNamespace); updateRequest.setQuery(BSON(SessionTxnRecord::kSessionIdFieldName << sessionId.toBSON())); updateRequest.setUpdates(BSON("$set" << BSON(SessionTxnRecord::kTxnNumFieldName << txnNum << SessionTxnRecord::kLastWriteOpTimeTsFieldName << zeroTs))); updateRequest.setUpsert(true); auto updateResult = update(opCtx, autoColl.getDb(), updateRequest); uassert(40527, str::stream() << "Failed to update transaction progress for session " << sessionId, updateResult.numDocsModified >= 1 || !updateResult.upserted.isEmpty()); } } // namespace Session::Session(LogicalSessionId sessionId) : _sessionId(std::move(sessionId)) {} void Session::begin(OperationContext* opCtx, const TxnNumber& txnNumber) { invariant(!opCtx->lockState()->isLocked()); invariant(!opCtx->lockState()->inAWriteUnitOfWork()); if (!_txnRecord) { _txnRecord = loadSessionRecord(opCtx, _sessionId); // Previous read failed to retrieve the txn record, which means it does not exist yet, // so create a new entry. if (!_txnRecord) { updateSessionRecordTxnNum(opCtx, _sessionId, txnNumber); _txnRecord.emplace(); _txnRecord->setSessionId(_sessionId); _txnRecord->setTxnNum(txnNumber); _txnRecord->setLastWriteOpTimeTs(Timestamp()); return; } } uassert(40528, str::stream() << "cannot start transaction with id " << txnNumber << " on session " << _sessionId << " because transaction with id " << _txnRecord->getTxnNum() << " already started", _txnRecord->getTxnNum() <= txnNumber); if (txnNumber > _txnRecord->getTxnNum()) { updateSessionRecordTxnNum(opCtx, _sessionId, txnNumber); _txnRecord->setTxnNum(txnNumber); _txnRecord->setLastWriteOpTimeTs(Timestamp()); } } void Session::saveTxnProgress(OperationContext* opCtx, Timestamp opTimeTs) { // Needs to be in the same write unit of work with the write for this result. invariant(opCtx->lockState()->inAWriteUnitOfWork()); repl::UnreplicatedWritesBlock doNotReplicateWrites(opCtx); AutoGetCollection autoColl(opCtx, NamespaceString::kSessionTransactionsTableNamespace, MODE_IX); auto coll = autoColl.getCollection(); uassert(40529, str::stream() << "Unable to persist transaction state because the session transaction " "collection is missing. This indicates that the " << NamespaceString::kSessionTransactionsTableNamespace.ns() << " collection has been manually deleted.", coll); UpdateRequest updateRequest(NamespaceString::kSessionTransactionsTableNamespace); updateRequest.setQuery(BSON(SessionTxnRecord::kSessionIdFieldName << _sessionId.toBSON() << SessionTxnRecord::kTxnNumFieldName << _txnRecord->getTxnNum())); updateRequest.setUpdates( BSON("$set" << BSON(SessionTxnRecord::kLastWriteOpTimeTsFieldName << opTimeTs))); updateRequest.setUpsert(false); auto updateResult = update(opCtx, autoColl.getDb(), updateRequest); uassert(40530, str::stream() << "Failed to update transaction progress for session " << _sessionId, updateResult.numDocsModified >= 1); _txnRecord->setLastWriteOpTimeTs(opTimeTs); } TxnNumber Session::getTxnNum() const { return _txnRecord->getTxnNum(); } const Timestamp& Session::getLastWriteOpTimeTs() const { return _txnRecord->getLastWriteOpTimeTs(); } TransactionHistoryIterator Session::getWriteHistory(OperationContext* opCtx) const { return TransactionHistoryIterator(getLastWriteOpTimeTs()); } } // namespace mongo