diff options
Diffstat (limited to 'src/mongo/db/query/plan_executor_sbe.cpp')
-rw-r--r-- | src/mongo/db/query/plan_executor_sbe.cpp | 107 |
1 files changed, 85 insertions, 22 deletions
diff --git a/src/mongo/db/query/plan_executor_sbe.cpp b/src/mongo/db/query/plan_executor_sbe.cpp index 5d9375f7ed4..9bd146d516f 100644 --- a/src/mongo/db/query/plan_executor_sbe.cpp +++ b/src/mongo/db/query/plan_executor_sbe.cpp @@ -31,8 +31,10 @@ #include "mongo/db/query/plan_executor_sbe.h" +#include "mongo/db/db_raii.h" #include "mongo/db/exec/sbe/expressions/expression.h" #include "mongo/db/exec/sbe/values/bson.h" +#include "mongo/db/query/plan_insert_listener.h" #include "mongo/db/query/sbe_stage_builder.h" namespace mongo { @@ -40,6 +42,7 @@ PlanExecutorSBE::PlanExecutorSBE( OperationContext* opCtx, std::unique_ptr<CanonicalQuery> cq, std::pair<std::unique_ptr<sbe::PlanStage>, stage_builder::PlanStageData> root, + const Collection* collection, NamespaceString nss, bool isOpen, boost::optional<std::queue<std::pair<BSONObj, boost::optional<RecordId>>>> stash, @@ -47,6 +50,8 @@ PlanExecutorSBE::PlanExecutorSBE( : _state{isOpen ? State::kOpened : State::kClosed}, _opCtx(opCtx), _nss(std::move(nss)), + _env{root.second.env}, + _ctx(std::move(root.second.ctx)), _root(std::move(root.first)), _cq{std::move(cq)}, _yieldPolicy(std::move(yieldPolicy)) { @@ -69,6 +74,10 @@ PlanExecutorSBE::PlanExecutorSBE( uassert(4822867, "Query does not have oplogTs slot.", _oplogTs); } + if (data.shouldUseTailableScan) { + _resumeRecordIdSlot = _env->getSlot("resumeRecordId"_sd); + } + _shouldTrackLatestOplogTimestamp = data.shouldTrackLatestOplogTimestamp; _shouldTrackResumeToken = data.shouldTrackResumeToken; @@ -84,6 +93,16 @@ PlanExecutorSBE::PlanExecutorSBE( if (_yieldPolicy) { _yieldPolicy->setRootStage(_root.get()); } + + // We may still need to initialize _nss from either collection or _cq. + if (_nss.isEmpty()) { + if (collection) { + _nss = collection->ns(); + } else { + invariant(_cq); + _nss = _cq->getQueryRequest().nss(); + } + } } void PlanExecutorSBE::saveState() { @@ -160,24 +179,64 @@ PlanExecutor::ExecState PlanExecutorSBE::getNext(BSONObj* out, RecordId* dlOut) // fetching the next document. _root->close(); _state = State::kClosed; - return PlanExecutor::ExecState::IS_EOF; + if (!_resumeRecordIdSlot) { + return PlanExecutor::ExecState::IS_EOF; + } } - if (_state == State::kClosed) { - _state = State::kOpened; - _root->open(false); + // Capped insert data; declared outside the loop so we hold a shared pointer to the capped + // insert notifier the entire time we are in the loop. Holding a shared pointer to the capped + // insert notifier is necessary for the notifierVersion to advance. + // + // Note that we need to hold a database intent lock before acquiring a notifier. + boost::optional<AutoGetCollectionForRead> coll; + insert_listener::CappedInsertNotifierData cappedInsertNotifierData; + if (insert_listener::shouldListenForInserts(_opCtx, _cq.get())) { + if (!_opCtx->lockState()->isCollectionLockedForMode(_nss, MODE_IS)) { + coll.emplace(_opCtx, _nss); + } + + cappedInsertNotifierData.notifier = + insert_listener::getCappedInsertNotifier(_opCtx, _nss, _yieldPolicy.get()); } - invariant(_state == State::kOpened); + for (;;) { + if (_state == State::kClosed) { + if (_resumeRecordIdSlot) { + invariant(_resultRecordId); + + auto [tag, val] = _resultRecordId->getViewOfValue(); + uassert(4946306, + "Collection scan was asked to track resume token, but found a result " + "without a valid RecordId", + tag == sbe::value::TypeTags::NumberInt64 || + tag == sbe::value::TypeTags::Nothing); + _env->resetSlot(*_resumeRecordIdSlot, tag, val, false); + } + + _state = State::kOpened; + _root->open(false); + } - auto result = fetchNext(_root.get(), _result, _resultRecordId, out, dlOut); - if (result == sbe::PlanState::IS_EOF) { - _root->close(); - _state = State::kClosed; - return PlanExecutor::ExecState::IS_EOF; + invariant(_state == State::kOpened); + + auto result = fetchNext(_root.get(), _result, _resultRecordId, out, dlOut); + if (result == sbe::PlanState::IS_EOF) { + _root->close(); + _state = State::kClosed; + + if (!insert_listener::shouldWaitForInserts(_opCtx, _cq.get(), _yieldPolicy.get())) { + return PlanExecutor::ExecState::IS_EOF; + } + + insert_listener::waitForInserts(_opCtx, _yieldPolicy.get(), &cappedInsertNotifierData); + // There may be more results, keep going. + continue; + } + + invariant(result == sbe::PlanState::ADVANCED); + return PlanExecutor::ExecState::ADVANCED; } - invariant(result == sbe::PlanState::ADVANCED); - return PlanExecutor::ExecState::ADVANCED; } Timestamp PlanExecutorSBE::getLatestOplogTimestamp() const { @@ -185,11 +244,13 @@ Timestamp PlanExecutorSBE::getLatestOplogTimestamp() const { invariant(_oplogTs); auto [tag, val] = _oplogTs->getViewOfValue(); - uassert(4822868, - "Collection scan was asked to track latest operation time, " - "but found a result without a valid 'ts' field", - tag == sbe::value::TypeTags::Timestamp); - return Timestamp{sbe::value::bitcastTo<uint64_t>(val)}; + if (tag != sbe::value::TypeTags::Nothing) { + uassert(4822868, + str::stream() << "Collection scan was asked to track latest operation time, " + "but found a result without a valid 'ts' field", + tag == sbe::value::TypeTags::Timestamp); + return Timestamp{sbe::value::bitcastTo<uint64_t>(val)}; + } } return {}; } @@ -199,11 +260,13 @@ BSONObj PlanExecutorSBE::getPostBatchResumeToken() const { invariant(_resultRecordId); auto [tag, val] = _resultRecordId->getViewOfValue(); - uassert(4822869, - "Collection scan was asked to track resume token, " - "but found a result without a valid RecordId", - tag == sbe::value::TypeTags::NumberInt64); - return BSON("$recordId" << sbe::value::bitcastTo<int64_t>(val)); + if (tag != sbe::value::TypeTags::Nothing) { + uassert(4822869, + "Collection scan was asked to track resume token, " + "but found a result without a valid RecordId", + tag == sbe::value::TypeTags::NumberInt64); + return BSON("$recordId" << sbe::value::bitcastTo<int64_t>(val)); + } } return {}; } |