diff options
author | Shin Yee Tan <shinyee.tan@mongodb.com> | 2022-08-01 16:05:40 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-08-01 16:59:38 +0000 |
commit | e4dccd260605ca3dedbfd6269d4421940837847e (patch) | |
tree | 27ca603c7477fb74b39b2382e55851fe132c6a22 /src/mongo/db/transaction/transaction_history_iterator.cpp | |
parent | 13eeb512c993348efa814c2c2eead9fe725aa843 (diff) | |
download | mongo-e4dccd260605ca3dedbfd6269d4421940837847e.tar.gz |
SERVER-67505 Move multi-doc transaction classes from mongo/db/ to mongo/db/transaction sub directory
Diffstat (limited to 'src/mongo/db/transaction/transaction_history_iterator.cpp')
-rw-r--r-- | src/mongo/db/transaction/transaction_history_iterator.cpp | 159 |
1 files changed, 159 insertions, 0 deletions
diff --git a/src/mongo/db/transaction/transaction_history_iterator.cpp b/src/mongo/db/transaction/transaction_history_iterator.cpp new file mode 100644 index 00000000000..d9be44df320 --- /dev/null +++ b/src/mongo/db/transaction/transaction_history_iterator.cpp @@ -0,0 +1,159 @@ +/** + * Copyright (C) 2018-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/catalog/database_holder.h" +#include "mongo/db/db_raii.h" +#include "mongo/db/exec/working_set_common.h" +#include "mongo/db/namespace_string.h" +#include "mongo/db/operation_context.h" +#include "mongo/db/query/get_executor.h" +#include "mongo/db/repl/oplog_entry.h" +#include "mongo/db/transaction/transaction_history_iterator.h" +#include "mongo/logv2/redaction.h" +#include "mongo/util/str.h" + +namespace mongo { + +namespace { + +/** + * Query the oplog for an entry with the given timestamp. + */ +BSONObj findOneOplogEntry(OperationContext* opCtx, + const repl::OpTime& opTime, + bool permitYield, + bool prevOpOnly = false) { + BSONObj oplogBSON; + invariant(!opTime.isNull()); + + auto findCommand = std::make_unique<FindCommandRequest>(NamespaceString::kRsOplogNamespace); + findCommand->setFilter(opTime.asQuery()); + + if (prevOpOnly) { + findCommand->setProjection( + BSON("_id" << 0 << repl::OplogEntry::kPrevWriteOpTimeInTransactionFieldName << 1LL)); + } + + const boost::intrusive_ptr<ExpressionContext> expCtx; + + auto statusWithCQ = CanonicalQuery::canonicalize(opCtx, + std::move(findCommand), + false, + expCtx, + ExtensionsCallbackNoop(), + MatchExpressionParser::kBanAllSpecialFeatures); + invariant(statusWithCQ.isOK(), + str::stream() << "Failed to canonicalize oplog lookup" + << causedBy(statusWithCQ.getStatus())); + std::unique_ptr<CanonicalQuery> cq = std::move(statusWithCQ.getValue()); + + AutoGetOplog oplogRead(opCtx, OplogAccessMode::kRead); + const DatabaseName dbName(boost::none, NamespaceString::kLocalDb); + const auto localDb = DatabaseHolder::get(opCtx)->getDb(opCtx, dbName); + invariant(localDb); + AutoStatsTracker statsTracker(opCtx, + NamespaceString::kRsOplogNamespace, + Top::LockType::ReadLocked, + AutoStatsTracker::LogMode::kUpdateTop, + CollectionCatalog::get(opCtx)->getDatabaseProfileLevel(dbName), + Date_t::max()); + + auto exec = uassertStatusOK(getExecutorFind(opCtx, + &oplogRead.getCollection(), + std::move(cq), + nullptr /*extractAndAttachPipelineStages */, + permitYield)); + + PlanExecutor::ExecState getNextResult; + try { + getNextResult = exec->getNext(&oplogBSON, nullptr); + } catch (DBException& exception) { + exception.addContext("PlanExecutor error in TransactionHistoryIterator"); + throw; + } + + uassert(ErrorCodes::IncompleteTransactionHistory, + str::stream() << "oplog no longer contains the complete write history of this " + "transaction, log with opTime " + << opTime.toBSON() << " cannot be found", + getNextResult != PlanExecutor::IS_EOF); + + return oplogBSON.getOwned(); +} + +} // namespace + +TransactionHistoryIterator::TransactionHistoryIterator(repl::OpTime startingOpTime, + bool permitYield) + : _permitYield(permitYield), _nextOpTime(std::move(startingOpTime)) {} + +bool TransactionHistoryIterator::hasNext() const { + return !_nextOpTime.isNull(); +} + +repl::OplogEntry TransactionHistoryIterator::next(OperationContext* opCtx) { + BSONObj oplogBSON = findOneOplogEntry(opCtx, _nextOpTime, _permitYield); + + auto oplogEntry = uassertStatusOK(repl::OplogEntry::parse(oplogBSON)); + const auto& oplogPrevTsOption = oplogEntry.getPrevWriteOpTimeInTransaction(); + uassert(ErrorCodes::FailedToParse, + str::stream() + << "Missing prevOpTime field on oplog entry of previous write in transaction: " + << redact(oplogBSON), + oplogPrevTsOption); + + _nextOpTime = oplogPrevTsOption.value(); + + return oplogEntry; +} + +repl::OplogEntry TransactionHistoryIterator::nextFatalOnErrors(OperationContext* opCtx) try { + return next(opCtx); +} catch (const DBException& ex) { + fassertFailedWithStatus(31145, ex.toStatus()); +} + +repl::OpTime TransactionHistoryIterator::nextOpTime(OperationContext* opCtx) { + BSONObj oplogBSON = findOneOplogEntry(opCtx, _nextOpTime, _permitYield, true /* prevOpOnly */); + + auto prevOpTime = oplogBSON[repl::OplogEntry::kPrevWriteOpTimeInTransactionFieldName]; + uassert(ErrorCodes::FailedToParse, + str::stream() + << "Missing prevOpTime field on oplog entry of previous write in transaction: " + << redact(oplogBSON), + !prevOpTime.eoo() && prevOpTime.isABSONObj()); + + auto returnOpTime = _nextOpTime; + _nextOpTime = repl::OpTime::parse(prevOpTime.Obj()); + return returnOpTime; +} + +} // namespace mongo |