summaryrefslogtreecommitdiff
path: root/src/mongo/db/transaction/transaction_history_iterator.cpp
diff options
context:
space:
mode:
authorShin Yee Tan <shinyee.tan@mongodb.com>2022-08-01 16:05:40 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-08-01 16:59:38 +0000
commite4dccd260605ca3dedbfd6269d4421940837847e (patch)
tree27ca603c7477fb74b39b2382e55851fe132c6a22 /src/mongo/db/transaction/transaction_history_iterator.cpp
parent13eeb512c993348efa814c2c2eead9fe725aa843 (diff)
downloadmongo-e4dccd260605ca3dedbfd6269d4421940837847e.tar.gz
SERVER-67505 Move multi-doc transaction classes from mongo/db/ to mongo/db/transaction sub directory
Diffstat (limited to 'src/mongo/db/transaction/transaction_history_iterator.cpp')
-rw-r--r--src/mongo/db/transaction/transaction_history_iterator.cpp159
1 files changed, 159 insertions, 0 deletions
diff --git a/src/mongo/db/transaction/transaction_history_iterator.cpp b/src/mongo/db/transaction/transaction_history_iterator.cpp
new file mode 100644
index 00000000000..d9be44df320
--- /dev/null
+++ b/src/mongo/db/transaction/transaction_history_iterator.cpp
@@ -0,0 +1,159 @@
+/**
+ * Copyright (C) 2018-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/catalog/database_holder.h"
+#include "mongo/db/db_raii.h"
+#include "mongo/db/exec/working_set_common.h"
+#include "mongo/db/namespace_string.h"
+#include "mongo/db/operation_context.h"
+#include "mongo/db/query/get_executor.h"
+#include "mongo/db/repl/oplog_entry.h"
+#include "mongo/db/transaction/transaction_history_iterator.h"
+#include "mongo/logv2/redaction.h"
+#include "mongo/util/str.h"
+
+namespace mongo {
+
+namespace {
+
+/**
+ * Query the oplog for an entry with the given timestamp.
+ */
+BSONObj findOneOplogEntry(OperationContext* opCtx,
+ const repl::OpTime& opTime,
+ bool permitYield,
+ bool prevOpOnly = false) {
+ BSONObj oplogBSON;
+ invariant(!opTime.isNull());
+
+ auto findCommand = std::make_unique<FindCommandRequest>(NamespaceString::kRsOplogNamespace);
+ findCommand->setFilter(opTime.asQuery());
+
+ if (prevOpOnly) {
+ findCommand->setProjection(
+ BSON("_id" << 0 << repl::OplogEntry::kPrevWriteOpTimeInTransactionFieldName << 1LL));
+ }
+
+ const boost::intrusive_ptr<ExpressionContext> expCtx;
+
+ auto statusWithCQ = CanonicalQuery::canonicalize(opCtx,
+ std::move(findCommand),
+ false,
+ expCtx,
+ ExtensionsCallbackNoop(),
+ MatchExpressionParser::kBanAllSpecialFeatures);
+ invariant(statusWithCQ.isOK(),
+ str::stream() << "Failed to canonicalize oplog lookup"
+ << causedBy(statusWithCQ.getStatus()));
+ std::unique_ptr<CanonicalQuery> cq = std::move(statusWithCQ.getValue());
+
+ AutoGetOplog oplogRead(opCtx, OplogAccessMode::kRead);
+ const DatabaseName dbName(boost::none, NamespaceString::kLocalDb);
+ const auto localDb = DatabaseHolder::get(opCtx)->getDb(opCtx, dbName);
+ invariant(localDb);
+ AutoStatsTracker statsTracker(opCtx,
+ NamespaceString::kRsOplogNamespace,
+ Top::LockType::ReadLocked,
+ AutoStatsTracker::LogMode::kUpdateTop,
+ CollectionCatalog::get(opCtx)->getDatabaseProfileLevel(dbName),
+ Date_t::max());
+
+ auto exec = uassertStatusOK(getExecutorFind(opCtx,
+ &oplogRead.getCollection(),
+ std::move(cq),
+ nullptr /*extractAndAttachPipelineStages */,
+ permitYield));
+
+ PlanExecutor::ExecState getNextResult;
+ try {
+ getNextResult = exec->getNext(&oplogBSON, nullptr);
+ } catch (DBException& exception) {
+ exception.addContext("PlanExecutor error in TransactionHistoryIterator");
+ throw;
+ }
+
+ uassert(ErrorCodes::IncompleteTransactionHistory,
+ str::stream() << "oplog no longer contains the complete write history of this "
+ "transaction, log with opTime "
+ << opTime.toBSON() << " cannot be found",
+ getNextResult != PlanExecutor::IS_EOF);
+
+ return oplogBSON.getOwned();
+}
+
+} // namespace
+
+TransactionHistoryIterator::TransactionHistoryIterator(repl::OpTime startingOpTime,
+ bool permitYield)
+ : _permitYield(permitYield), _nextOpTime(std::move(startingOpTime)) {}
+
+bool TransactionHistoryIterator::hasNext() const {
+ return !_nextOpTime.isNull();
+}
+
+repl::OplogEntry TransactionHistoryIterator::next(OperationContext* opCtx) {
+ BSONObj oplogBSON = findOneOplogEntry(opCtx, _nextOpTime, _permitYield);
+
+ auto oplogEntry = uassertStatusOK(repl::OplogEntry::parse(oplogBSON));
+ const auto& oplogPrevTsOption = oplogEntry.getPrevWriteOpTimeInTransaction();
+ uassert(ErrorCodes::FailedToParse,
+ str::stream()
+ << "Missing prevOpTime field on oplog entry of previous write in transaction: "
+ << redact(oplogBSON),
+ oplogPrevTsOption);
+
+ _nextOpTime = oplogPrevTsOption.value();
+
+ return oplogEntry;
+}
+
+repl::OplogEntry TransactionHistoryIterator::nextFatalOnErrors(OperationContext* opCtx) try {
+ return next(opCtx);
+} catch (const DBException& ex) {
+ fassertFailedWithStatus(31145, ex.toStatus());
+}
+
+repl::OpTime TransactionHistoryIterator::nextOpTime(OperationContext* opCtx) {
+ BSONObj oplogBSON = findOneOplogEntry(opCtx, _nextOpTime, _permitYield, true /* prevOpOnly */);
+
+ auto prevOpTime = oplogBSON[repl::OplogEntry::kPrevWriteOpTimeInTransactionFieldName];
+ uassert(ErrorCodes::FailedToParse,
+ str::stream()
+ << "Missing prevOpTime field on oplog entry of previous write in transaction: "
+ << redact(oplogBSON),
+ !prevOpTime.eoo() && prevOpTime.isABSONObj());
+
+ auto returnOpTime = _nextOpTime;
+ _nextOpTime = repl::OpTime::parse(prevOpTime.Obj());
+ return returnOpTime;
+}
+
+} // namespace mongo