diff options
author | Justin Seyster <justin.seyster@mongodb.com> | 2019-03-14 19:03:44 -0400 |
---|---|---|
committer | Justin Seyster <justin.seyster@mongodb.com> | 2019-04-03 15:16:15 -0400 |
commit | dd589aa07a0155cdeaa70f0403466aabcfaa5186 (patch) | |
tree | fe45722112698332d4e8bde25386e8e3189d260a /src/mongo/db/transaction_history_iterator.cpp | |
parent | f2ab9fa71aabf110b67c28131241de6a27a3f09e (diff) | |
download | mongo-dd589aa07a0155cdeaa70f0403466aabcfaa5186.tar.gz |
SERVER-39676 Replace DBDirectClient in TransactionHistoryIterator
Diffstat (limited to 'src/mongo/db/transaction_history_iterator.cpp')
-rw-r--r-- | src/mongo/db/transaction_history_iterator.cpp | 72 |
1 files changed, 56 insertions, 16 deletions
diff --git a/src/mongo/db/transaction_history_iterator.cpp b/src/mongo/db/transaction_history_iterator.cpp index 4877e1b0596..5de036272f0 100644 --- a/src/mongo/db/transaction_history_iterator.cpp +++ b/src/mongo/db/transaction_history_iterator.cpp @@ -29,9 +29,12 @@ #include "mongo/platform/basic.h" -#include "mongo/db/dbdirectclient.h" +#include "mongo/db/db_raii.h" +#include "mongo/db/exec/working_set_common.h" #include "mongo/db/namespace_string.h" #include "mongo/db/operation_context.h" +#include "mongo/db/query/get_executor.h" +#include "mongo/db/query/query_request.h" #include "mongo/db/repl/oplog_entry.h" #include "mongo/db/transaction_history_iterator.h" #include "mongo/logger/redaction.h" @@ -39,34 +42,71 @@ namespace mongo { -TransactionHistoryIterator::TransactionHistoryIterator(repl::OpTime startingOpTime) - : _nextOpTime(std::move(startingOpTime)) {} +namespace { -bool TransactionHistoryIterator::hasNext() const { - return !_nextOpTime.isNull(); -} +/** + * Query the oplog for an entry with the given timestamp. + */ +BSONObj findOneOplogEntry(OperationContext* opCtx, const repl::OpTime& opTime) { + BSONObj oplogBSON; + invariant(!opTime.isNull()); -repl::OplogEntry TransactionHistoryIterator::next(OperationContext* opCtx) { - invariant(hasNext()); + auto qr = std::make_unique<QueryRequest>(NamespaceString::kRsOplogNamespace); + qr->setFilter(opTime.asQuery()); + qr->setOplogReplay(true); // QueryOption_OplogReplay + + const boost::intrusive_ptr<ExpressionContext> expCtx; - DBDirectClient client(opCtx); - auto oplogBSON = client.findOne(NamespaceString::kRsOplogNamespace.ns(), - _nextOpTime.asQuery(), - nullptr, - QueryOption_OplogReplay); + auto statusWithCQ = CanonicalQuery::canonicalize(opCtx, + std::move(qr), + expCtx, + ExtensionsCallbackNoop(), + MatchExpressionParser::kBanAllSpecialFeatures); + invariant(statusWithCQ.isOK(), + str::stream() << "Failed to canonicalize oplog lookup" + << causedBy(statusWithCQ.getStatus())); + std::unique_ptr<CanonicalQuery> cq = std::move(statusWithCQ.getValue()); + AutoGetCollectionForReadCommand ctx(opCtx, + NamespaceString::kRsOplogNamespace, + AutoGetCollection::ViewMode::kViewsForbidden, + Date_t::max(), + AutoStatsTracker::LogMode::kUpdateTop); + + auto exec = uassertStatusOK(getExecutorFind(opCtx, ctx.getCollection(), std::move(cq))); + + auto getNextResult = exec->getNext(&oplogBSON, nullptr); uassert(ErrorCodes::IncompleteTransactionHistory, str::stream() << "oplog no longer contains the complete write history of this " "transaction, log with opTime " - << _nextOpTime.toBSON() + << opTime.toBSON() << " cannot be found", - !oplogBSON.isEmpty()); + getNextResult != PlanExecutor::IS_EOF); + if (getNextResult != PlanExecutor::ADVANCED) { + uassertStatusOKWithContext(WorkingSetCommon::getMemberObjectStatus(oplogBSON), + "PlanExecutor error in TransactionHistoryIterator"); + } + + return oplogBSON; +} + +} // namespace + +TransactionHistoryIterator::TransactionHistoryIterator(repl::OpTime startingOpTime) + : _nextOpTime(std::move(startingOpTime)) {} + +bool TransactionHistoryIterator::hasNext() const { + return !_nextOpTime.isNull(); +} + +repl::OplogEntry TransactionHistoryIterator::next(OperationContext* opCtx) { + BSONObj oplogBSON = findOneOplogEntry(opCtx, _nextOpTime); auto oplogEntry = uassertStatusOK(repl::OplogEntry::parse(oplogBSON)); const auto& oplogPrevTsOption = oplogEntry.getPrevWriteOpTimeInTransaction(); uassert( ErrorCodes::FailedToParse, - str::stream() << "Missing prevTs field on oplog entry of previous write in transcation: " + str::stream() << "Missing prevTs field on oplog entry of previous write in transaction: " << redact(oplogBSON), oplogPrevTsOption); |