summaryrefslogtreecommitdiff
path: root/src/mongo/db/query/plan_executor_sbe.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/query/plan_executor_sbe.cpp')
-rw-r--r--src/mongo/db/query/plan_executor_sbe.cpp107
1 files changed, 85 insertions, 22 deletions
diff --git a/src/mongo/db/query/plan_executor_sbe.cpp b/src/mongo/db/query/plan_executor_sbe.cpp
index 5d9375f7ed4..9bd146d516f 100644
--- a/src/mongo/db/query/plan_executor_sbe.cpp
+++ b/src/mongo/db/query/plan_executor_sbe.cpp
@@ -31,8 +31,10 @@
#include "mongo/db/query/plan_executor_sbe.h"
+#include "mongo/db/db_raii.h"
#include "mongo/db/exec/sbe/expressions/expression.h"
#include "mongo/db/exec/sbe/values/bson.h"
+#include "mongo/db/query/plan_insert_listener.h"
#include "mongo/db/query/sbe_stage_builder.h"
namespace mongo {
@@ -40,6 +42,7 @@ PlanExecutorSBE::PlanExecutorSBE(
OperationContext* opCtx,
std::unique_ptr<CanonicalQuery> cq,
std::pair<std::unique_ptr<sbe::PlanStage>, stage_builder::PlanStageData> root,
+ const Collection* collection,
NamespaceString nss,
bool isOpen,
boost::optional<std::queue<std::pair<BSONObj, boost::optional<RecordId>>>> stash,
@@ -47,6 +50,8 @@ PlanExecutorSBE::PlanExecutorSBE(
: _state{isOpen ? State::kOpened : State::kClosed},
_opCtx(opCtx),
_nss(std::move(nss)),
+ _env{root.second.env},
+ _ctx(std::move(root.second.ctx)),
_root(std::move(root.first)),
_cq{std::move(cq)},
_yieldPolicy(std::move(yieldPolicy)) {
@@ -69,6 +74,10 @@ PlanExecutorSBE::PlanExecutorSBE(
uassert(4822867, "Query does not have oplogTs slot.", _oplogTs);
}
+ if (data.shouldUseTailableScan) {
+ _resumeRecordIdSlot = _env->getSlot("resumeRecordId"_sd);
+ }
+
_shouldTrackLatestOplogTimestamp = data.shouldTrackLatestOplogTimestamp;
_shouldTrackResumeToken = data.shouldTrackResumeToken;
@@ -84,6 +93,16 @@ PlanExecutorSBE::PlanExecutorSBE(
if (_yieldPolicy) {
_yieldPolicy->setRootStage(_root.get());
}
+
+ // We may still need to initialize _nss from either collection or _cq.
+ if (_nss.isEmpty()) {
+ if (collection) {
+ _nss = collection->ns();
+ } else {
+ invariant(_cq);
+ _nss = _cq->getQueryRequest().nss();
+ }
+ }
}
void PlanExecutorSBE::saveState() {
@@ -160,24 +179,64 @@ PlanExecutor::ExecState PlanExecutorSBE::getNext(BSONObj* out, RecordId* dlOut)
// fetching the next document.
_root->close();
_state = State::kClosed;
- return PlanExecutor::ExecState::IS_EOF;
+ if (!_resumeRecordIdSlot) {
+ return PlanExecutor::ExecState::IS_EOF;
+ }
}
- if (_state == State::kClosed) {
- _state = State::kOpened;
- _root->open(false);
+ // Capped insert data; declared outside the loop so we hold a shared pointer to the capped
+ // insert notifier the entire time we are in the loop. Holding a shared pointer to the capped
+ // insert notifier is necessary for the notifierVersion to advance.
+ //
+ // Note that we need to hold a database intent lock before acquiring a notifier.
+ boost::optional<AutoGetCollectionForRead> coll;
+ insert_listener::CappedInsertNotifierData cappedInsertNotifierData;
+ if (insert_listener::shouldListenForInserts(_opCtx, _cq.get())) {
+ if (!_opCtx->lockState()->isCollectionLockedForMode(_nss, MODE_IS)) {
+ coll.emplace(_opCtx, _nss);
+ }
+
+ cappedInsertNotifierData.notifier =
+ insert_listener::getCappedInsertNotifier(_opCtx, _nss, _yieldPolicy.get());
}
- invariant(_state == State::kOpened);
+ for (;;) {
+ if (_state == State::kClosed) {
+ if (_resumeRecordIdSlot) {
+ invariant(_resultRecordId);
+
+ auto [tag, val] = _resultRecordId->getViewOfValue();
+ uassert(4946306,
+ "Collection scan was asked to track resume token, but found a result "
+ "without a valid RecordId",
+ tag == sbe::value::TypeTags::NumberInt64 ||
+ tag == sbe::value::TypeTags::Nothing);
+ _env->resetSlot(*_resumeRecordIdSlot, tag, val, false);
+ }
+
+ _state = State::kOpened;
+ _root->open(false);
+ }
- auto result = fetchNext(_root.get(), _result, _resultRecordId, out, dlOut);
- if (result == sbe::PlanState::IS_EOF) {
- _root->close();
- _state = State::kClosed;
- return PlanExecutor::ExecState::IS_EOF;
+ invariant(_state == State::kOpened);
+
+ auto result = fetchNext(_root.get(), _result, _resultRecordId, out, dlOut);
+ if (result == sbe::PlanState::IS_EOF) {
+ _root->close();
+ _state = State::kClosed;
+
+ if (!insert_listener::shouldWaitForInserts(_opCtx, _cq.get(), _yieldPolicy.get())) {
+ return PlanExecutor::ExecState::IS_EOF;
+ }
+
+ insert_listener::waitForInserts(_opCtx, _yieldPolicy.get(), &cappedInsertNotifierData);
+ // There may be more results, keep going.
+ continue;
+ }
+
+ invariant(result == sbe::PlanState::ADVANCED);
+ return PlanExecutor::ExecState::ADVANCED;
}
- invariant(result == sbe::PlanState::ADVANCED);
- return PlanExecutor::ExecState::ADVANCED;
}
Timestamp PlanExecutorSBE::getLatestOplogTimestamp() const {
@@ -185,11 +244,13 @@ Timestamp PlanExecutorSBE::getLatestOplogTimestamp() const {
invariant(_oplogTs);
auto [tag, val] = _oplogTs->getViewOfValue();
- uassert(4822868,
- "Collection scan was asked to track latest operation time, "
- "but found a result without a valid 'ts' field",
- tag == sbe::value::TypeTags::Timestamp);
- return Timestamp{sbe::value::bitcastTo<uint64_t>(val)};
+ if (tag != sbe::value::TypeTags::Nothing) {
+ uassert(4822868,
+ str::stream() << "Collection scan was asked to track latest operation time, "
+ "but found a result without a valid 'ts' field",
+ tag == sbe::value::TypeTags::Timestamp);
+ return Timestamp{sbe::value::bitcastTo<uint64_t>(val)};
+ }
}
return {};
}
@@ -199,11 +260,13 @@ BSONObj PlanExecutorSBE::getPostBatchResumeToken() const {
invariant(_resultRecordId);
auto [tag, val] = _resultRecordId->getViewOfValue();
- uassert(4822869,
- "Collection scan was asked to track resume token, "
- "but found a result without a valid RecordId",
- tag == sbe::value::TypeTags::NumberInt64);
- return BSON("$recordId" << sbe::value::bitcastTo<int64_t>(val));
+ if (tag != sbe::value::TypeTags::Nothing) {
+ uassert(4822869,
+ "Collection scan was asked to track resume token, "
+ "but found a result without a valid RecordId",
+ tag == sbe::value::TypeTags::NumberInt64);
+ return BSON("$recordId" << sbe::value::bitcastTo<int64_t>(val));
+ }
}
return {};
}