summaryrefslogtreecommitdiff
path: root/src/mongo/db/transaction_history_iterator.cpp
diff options
context:
space:
mode:
authorJustin Seyster <justin.seyster@mongodb.com>2019-03-14 19:03:44 -0400
committerJustin Seyster <justin.seyster@mongodb.com>2019-04-03 15:16:15 -0400
commitdd589aa07a0155cdeaa70f0403466aabcfaa5186 (patch)
treefe45722112698332d4e8bde25386e8e3189d260a /src/mongo/db/transaction_history_iterator.cpp
parentf2ab9fa71aabf110b67c28131241de6a27a3f09e (diff)
downloadmongo-dd589aa07a0155cdeaa70f0403466aabcfaa5186.tar.gz
SERVER-39676 Replace DBDirectClient in TransactionHistoryIterator
Diffstat (limited to 'src/mongo/db/transaction_history_iterator.cpp')
-rw-r--r--src/mongo/db/transaction_history_iterator.cpp72
1 files changed, 56 insertions, 16 deletions
diff --git a/src/mongo/db/transaction_history_iterator.cpp b/src/mongo/db/transaction_history_iterator.cpp
index 4877e1b0596..5de036272f0 100644
--- a/src/mongo/db/transaction_history_iterator.cpp
+++ b/src/mongo/db/transaction_history_iterator.cpp
@@ -29,9 +29,12 @@
#include "mongo/platform/basic.h"
-#include "mongo/db/dbdirectclient.h"
+#include "mongo/db/db_raii.h"
+#include "mongo/db/exec/working_set_common.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/operation_context.h"
+#include "mongo/db/query/get_executor.h"
+#include "mongo/db/query/query_request.h"
#include "mongo/db/repl/oplog_entry.h"
#include "mongo/db/transaction_history_iterator.h"
#include "mongo/logger/redaction.h"
@@ -39,34 +42,71 @@
namespace mongo {
-TransactionHistoryIterator::TransactionHistoryIterator(repl::OpTime startingOpTime)
- : _nextOpTime(std::move(startingOpTime)) {}
+namespace {
-bool TransactionHistoryIterator::hasNext() const {
- return !_nextOpTime.isNull();
-}
+/**
+ * Query the oplog for an entry with the given timestamp.
+ */
+BSONObj findOneOplogEntry(OperationContext* opCtx, const repl::OpTime& opTime) {
+ BSONObj oplogBSON;
+ invariant(!opTime.isNull());
-repl::OplogEntry TransactionHistoryIterator::next(OperationContext* opCtx) {
- invariant(hasNext());
+ auto qr = std::make_unique<QueryRequest>(NamespaceString::kRsOplogNamespace);
+ qr->setFilter(opTime.asQuery());
+ qr->setOplogReplay(true); // QueryOption_OplogReplay
+
+ const boost::intrusive_ptr<ExpressionContext> expCtx;
- DBDirectClient client(opCtx);
- auto oplogBSON = client.findOne(NamespaceString::kRsOplogNamespace.ns(),
- _nextOpTime.asQuery(),
- nullptr,
- QueryOption_OplogReplay);
+ auto statusWithCQ = CanonicalQuery::canonicalize(opCtx,
+ std::move(qr),
+ expCtx,
+ ExtensionsCallbackNoop(),
+ MatchExpressionParser::kBanAllSpecialFeatures);
+ invariant(statusWithCQ.isOK(),
+ str::stream() << "Failed to canonicalize oplog lookup"
+ << causedBy(statusWithCQ.getStatus()));
+ std::unique_ptr<CanonicalQuery> cq = std::move(statusWithCQ.getValue());
+ AutoGetCollectionForReadCommand ctx(opCtx,
+ NamespaceString::kRsOplogNamespace,
+ AutoGetCollection::ViewMode::kViewsForbidden,
+ Date_t::max(),
+ AutoStatsTracker::LogMode::kUpdateTop);
+
+ auto exec = uassertStatusOK(getExecutorFind(opCtx, ctx.getCollection(), std::move(cq)));
+
+ auto getNextResult = exec->getNext(&oplogBSON, nullptr);
uassert(ErrorCodes::IncompleteTransactionHistory,
str::stream() << "oplog no longer contains the complete write history of this "
"transaction, log with opTime "
- << _nextOpTime.toBSON()
+ << opTime.toBSON()
<< " cannot be found",
- !oplogBSON.isEmpty());
+ getNextResult != PlanExecutor::IS_EOF);
+ if (getNextResult != PlanExecutor::ADVANCED) {
+ uassertStatusOKWithContext(WorkingSetCommon::getMemberObjectStatus(oplogBSON),
+ "PlanExecutor error in TransactionHistoryIterator");
+ }
+
+ return oplogBSON;
+}
+
+} // namespace
+
+TransactionHistoryIterator::TransactionHistoryIterator(repl::OpTime startingOpTime)
+ : _nextOpTime(std::move(startingOpTime)) {}
+
+bool TransactionHistoryIterator::hasNext() const {
+ return !_nextOpTime.isNull();
+}
+
+repl::OplogEntry TransactionHistoryIterator::next(OperationContext* opCtx) {
+ BSONObj oplogBSON = findOneOplogEntry(opCtx, _nextOpTime);
auto oplogEntry = uassertStatusOK(repl::OplogEntry::parse(oplogBSON));
const auto& oplogPrevTsOption = oplogEntry.getPrevWriteOpTimeInTransaction();
uassert(
ErrorCodes::FailedToParse,
- str::stream() << "Missing prevTs field on oplog entry of previous write in transcation: "
+ str::stream() << "Missing prevTs field on oplog entry of previous write in transaction: "
<< redact(oplogBSON),
oplogPrevTsOption);