summaryrefslogtreecommitdiff
path: root/src/mongo/db
diff options
context:
space:
mode:
authorAnton Korshunov <anton.korshunov@mongodb.com>2020-07-13 11:18:38 +0100
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-08-06 08:33:01 +0000
commite83c568e68a37f7a4022caa67b19126fe7f4ab58 (patch)
treef6fd0bec39fbad32754e62a37da072b040e92cfc /src/mongo/db
parentb8b24f4b52188efc07daf2dbc81f38e7194aac9e (diff)
downloadmongo-e83c568e68a37f7a4022caa67b19126fe7f4ab58.tar.gz
SERVER-49463 Add support for tailable cursors and change streams in SBE
Diffstat (limited to 'src/mongo/db')
-rw-r--r--src/mongo/db/SConscript3
-rw-r--r--src/mongo/db/exec/sbe/expressions/expression.cpp88
-rw-r--r--src/mongo/db/exec/sbe/expressions/expression.h158
-rw-r--r--src/mongo/db/exec/sbe/sbe_key_string_test.cpp2
-rw-r--r--src/mongo/db/exec/sbe/sbe_numeric_convert_test.cpp2
-rw-r--r--src/mongo/db/exec/sbe/stages/exchange.cpp12
-rw-r--r--src/mongo/db/exec/sbe/stages/exchange.h10
-rw-r--r--src/mongo/db/exec/sbe_cmd.cpp10
-rw-r--r--src/mongo/db/query/get_executor.cpp5
-rw-r--r--src/mongo/db/query/plan_executor_factory.cpp13
-rw-r--r--src/mongo/db/query/plan_executor_factory.h2
-rw-r--r--src/mongo/db/query/plan_executor_impl.cpp96
-rw-r--r--src/mongo/db/query/plan_executor_impl.h27
-rw-r--r--src/mongo/db/query/plan_executor_sbe.cpp107
-rw-r--r--src/mongo/db/query/plan_executor_sbe.h5
-rw-r--r--src/mongo/db/query/plan_insert_listener.cpp123
-rw-r--r--src/mongo/db/query/plan_insert_listener.h81
-rw-r--r--src/mongo/db/query/sbe_stage_builder.cpp119
-rw-r--r--src/mongo/db/query/sbe_stage_builder.h24
-rw-r--r--src/mongo/db/query/sbe_stage_builder_coll_scan.cpp111
-rw-r--r--src/mongo/db/query/sbe_stage_builder_coll_scan.h3
21 files changed, 804 insertions, 197 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript
index 760785efdc6..e4e8a43b8f7 100644
--- a/src/mongo/db/SConscript
+++ b/src/mongo/db/SConscript
@@ -1136,8 +1136,9 @@ env.Library(
'query/get_executor.cpp',
'query/internal_plans.cpp',
'query/plan_executor_impl.cpp',
- 'query/plan_executor_sbe.cpp',
'query/plan_executor_factory.cpp',
+ 'query/plan_executor_sbe.cpp',
+ 'query/plan_insert_listener.cpp',
'query/plan_ranker.cpp',
'query/plan_yield_policy_impl.cpp',
'query/plan_yield_policy_sbe.cpp',
diff --git a/src/mongo/db/exec/sbe/expressions/expression.cpp b/src/mongo/db/exec/sbe/expressions/expression.cpp
index 68384b44ec5..b25936d2633 100644
--- a/src/mongo/db/exec/sbe/expressions/expression.cpp
+++ b/src/mongo/db/exec/sbe/expressions/expression.cpp
@@ -692,6 +692,88 @@ std::vector<DebugPrinter::Block> ETypeMatch::debugPrint() const {
return ret;
}
+RuntimeEnvironment::RuntimeEnvironment(const RuntimeEnvironment& other)
+ : _state{other._state}, _isSmp{other._isSmp} {
+ for (auto&& [type, slot] : _state->slots) {
+ emplaceAccessor(slot.first, slot.second);
+ }
+}
+
+RuntimeEnvironment::~RuntimeEnvironment() {
+ if (_state.use_count() == 1) {
+ for (size_t idx = 0; idx < _state->vals.size(); ++idx) {
+ if (_state->owned[idx]) {
+ releaseValue(_state->typeTags[idx], _state->vals[idx]);
+ }
+ }
+ }
+}
+
+value::SlotId RuntimeEnvironment::registerSlot(StringData type,
+ value::TypeTags tag,
+ value::Value val,
+ bool owned,
+ value::SlotIdGenerator* slotIdGenerator) {
+ if (auto it = _state->slots.find(type); it == _state->slots.end()) {
+ invariant(slotIdGenerator);
+ auto slot = slotIdGenerator->generate();
+ emplaceAccessor(slot, _state->pushSlot(type, slot));
+ _accessors.at(slot).reset(owned, tag, val);
+ return slot;
+ }
+
+ uasserted(4946303, str::stream() << "slot already registered:" << type);
+}
+
+value::SlotId RuntimeEnvironment::getSlot(StringData type) {
+ if (auto it = _state->slots.find(type); it != _state->slots.end()) {
+ return it->second.first;
+ }
+
+ uasserted(4946305, str::stream() << "environment slot is not registered for type: " << type);
+}
+
+void RuntimeEnvironment::resetSlot(value::SlotId slot,
+ value::TypeTags tag,
+ value::Value val,
+ bool owned) {
+ // With intra-query parallelism enabled the global environment can hold only read-only values.
+ invariant(!_isSmp);
+
+ if (auto it = _accessors.find(slot); it != _accessors.end()) {
+ it->second.reset(owned, tag, val);
+ return;
+ }
+
+ uasserted(4946300, str::stream() << "undefined slot accessor:" << slot);
+}
+
+value::SlotAccessor* RuntimeEnvironment::getAccessor(value::SlotId slot) {
+ if (auto it = _accessors.find(slot); it != _accessors.end()) {
+ return &it->second;
+ }
+
+ uasserted(4946301, str::stream() << "undefined slot accessor:" << slot);
+}
+
+std::unique_ptr<RuntimeEnvironment> RuntimeEnvironment::makeCopy(bool isSmp) {
+ // Once this environment is used to create a copy for a parallel plan execution, it becomes
+ // a parallel environment itself.
+ if (isSmp) {
+ _isSmp = isSmp;
+ }
+
+ return std::unique_ptr<RuntimeEnvironment>(new RuntimeEnvironment(*this));
+}
+
+void RuntimeEnvironment::debugString(StringBuilder* builder) {
+ *builder << "env: { ";
+ for (auto&& [type, slot] : _state->slots) {
+ *builder << type << "=s" << slot.first << " ";
+ }
+ *builder << "}";
+}
+
value::SlotAccessor* CompileCtx::getAccessor(value::SlotId slot) {
for (auto it = correlated.rbegin(); it != correlated.rend(); ++it) {
if (it->first == slot) {
@@ -699,7 +781,7 @@ value::SlotAccessor* CompileCtx::getAccessor(value::SlotId slot) {
}
}
- uasserted(4822848, str::stream() << "undefined slot accessor:" << slot);
+ return env->getAccessor(slot);
}
std::shared_ptr<SpoolBuffer> CompileCtx::getSpoolBuffer(SpoolId spool) {
@@ -716,5 +798,9 @@ void CompileCtx::pushCorrelated(value::SlotId slot, value::SlotAccessor* accesso
void CompileCtx::popCorrelated() {
correlated.pop_back();
}
+
+CompileCtx CompileCtx::makeCopy(bool isSmp) {
+ return {env->makeCopy(isSmp)};
+}
} // namespace sbe
} // namespace mongo
diff --git a/src/mongo/db/exec/sbe/expressions/expression.h b/src/mongo/db/exec/sbe/expressions/expression.h
index 66c04b33e3d..2192142e4ef 100644
--- a/src/mongo/db/exec/sbe/expressions/expression.h
+++ b/src/mongo/db/exec/sbe/expressions/expression.h
@@ -34,27 +34,185 @@
#include <vector>
#include "mongo/db/exec/sbe/util/debug_print.h"
+#include "mongo/db/exec/sbe/values/id_generators.h"
#include "mongo/db/exec/sbe/values/value.h"
#include "mongo/db/exec/sbe/vm/vm.h"
#include "mongo/stdx/unordered_map.h"
+#include "mongo/util/string_map.h"
namespace mongo {
namespace sbe {
using SpoolBuffer = std::vector<value::MaterializedRow>;
+/**
+ * A holder for slots and accessors which are used in a PlanStage tree but:
+ * - Cannot be made constants due to restrictions on the lifetime of such values (e.g., they're
+ * singleton instances owned somewhere else).
+ * - Can be changed in runtime outside of the PlanStage tree (e.g., a resume recordId changed by a
+ * PlanExecutor).
+ *
+ * A RuntimeEnvironment object is created once per an execution thread. That means that each
+ * producer and consumer in a parallel plan will have their own compilation environment, with their
+ * own slot accessors. However, slot accessors in each of such environment will access shared data,
+ * which is the same across all environments.
+ *
+ * To avoid data races, the values stored in the runtime environment are considered read-only when
+ * used with a parallel plan. An attempt to change any slot with 'resetValue' will result in a user
+ * exception.
+ *
+ * If the runtime environment is used in a serial plan, modifications of the slots is allowed.
+ */
+class RuntimeEnvironment {
+public:
+ RuntimeEnvironment() = default;
+ RuntimeEnvironment(RuntimeEnvironment&&) = delete;
+ RuntimeEnvironment& operator=(const RuntimeEnvironment&) = delete;
+ RuntimeEnvironment& operator=(const RuntimeEnvironment&&) = delete;
+ ~RuntimeEnvironment();
+
+ /**
+ * Registers and returns a SlotId for the given slot 'type'. The 'slotIdGenerartor' is used to
+ * generated a new SlotId for the given slot 'type', which is then registered with this
+ * environment by creating a new SlotAccessor. The value 'val' is then stored within the
+ * SlotAccessor and the newly generated SlotId is returned.
+ *
+ * Both owned and unowned values can be stored in the runtime environment.
+ *
+ * A user exception is raised if this slot 'type' has been already registered.
+ */
+ value::SlotId registerSlot(StringData type,
+ value::TypeTags tag,
+ value::Value val,
+ bool owned,
+ value::SlotIdGenerator* slotIdGenerator);
+
+ /**
+ * Returns a SlotId registered for the given slot 'type'. If the slot hasn't been registered
+ * yet, a user exception is raised..
+ */
+ value::SlotId getSlot(StringData type);
+
+ /**
+ * Store the given value in the specified slot within this runtime environment instance.
+ *
+ * A user exception is raised if the SlotId is not registered within this environment, or
+ * if this environment is used with a parallel plan.
+ */
+ void resetSlot(value::SlotId slot, value::TypeTags tag, value::Value val, bool owned);
+
+ /**
+ * Returns a SlotAccessor for the given SlotId which must be previously registered within this
+ * Environment by invoking 'registerSlot' method.
+ *
+ * A user exception is raised if the SlotId is not registered within this environment.
+ */
+ value::SlotAccessor* getAccessor(value::SlotId slot);
+
+ /**
+ * Make a copy of his environment. The new environment will have its own set of SlotAccessors
+ * pointing to the same shared data holding slot values.
+ *
+ * To create a copy of the runtime environment for a parallel execution plan, the 'isSmp' flag
+ * must be set to 'true'. This will result in this environment being unconverted to a parallel
+ * environment, as well as the newly created copy.
+ */
+ std::unique_ptr<RuntimeEnvironment> makeCopy(bool isSmp);
+
+ /**
+ * Dumps all the slots currently defined in this environment into the given string builder.
+ */
+ void debugString(StringBuilder* builder);
+
+private:
+ RuntimeEnvironment(const RuntimeEnvironment&);
+
+ struct State {
+ auto pushSlot(StringData type, value::SlotId slot) {
+ auto index = vals.size();
+
+ typeTags.push_back(value::TypeTags::Nothing);
+ vals.push_back(0);
+ owned.push_back(false);
+
+ auto [it, inserted] = slots.emplace(type, std::make_pair(slot, index));
+ uassert(4946302, str::stream() << "duplicate environment slot: " << slot, inserted);
+ return index;
+ }
+
+ StringMap<std::pair<value::SlotId, size_t>> slots;
+ std::vector<value::TypeTags> typeTags;
+ std::vector<value::Value> vals;
+ std::vector<bool> owned;
+ };
+
+ class Accessor final : public value::SlotAccessor {
+ public:
+ Accessor(RuntimeEnvironment* env, size_t index) : _env{env}, _index{index} {}
+
+ std::pair<value::TypeTags, value::Value> getViewOfValue() const override {
+ return {_env->_state->typeTags[_index], _env->_state->vals[_index]};
+ }
+
+ std::pair<value::TypeTags, value::Value> copyOrMoveValue() override {
+ // Always make a copy.
+ return copyValue(_env->_state->typeTags[_index], _env->_state->vals[_index]);
+ }
+
+ void reset(bool owned, value::TypeTags tag, value::Value val) {
+ release();
+
+ _env->_state->typeTags[_index] = tag;
+ _env->_state->vals[_index] = val;
+ _env->_state->owned[_index] = owned;
+ }
+
+ private:
+ void release() {
+ if (_env->_state->owned[_index]) {
+ releaseValue(_env->_state->typeTags[_index], _env->_state->vals[_index]);
+ _env->_state->owned[_index] = false;
+ }
+ }
+
+ RuntimeEnvironment* const _env;
+ const size_t _index;
+ };
+
+ void emplaceAccessor(value::SlotId slot, size_t index) {
+ _accessors.emplace(slot, Accessor{this, index});
+ }
+
+ std::shared_ptr<State> _state{std::make_shared<State>()};
+ value::SlotMap<Accessor> _accessors;
+ bool _isSmp{false};
+
+ friend class Accessor;
+};
+
class PlanStage;
struct CompileCtx {
+ CompileCtx(std::unique_ptr<RuntimeEnvironment> env) : env{std::move(env)} {}
+
value::SlotAccessor* getAccessor(value::SlotId slot);
std::shared_ptr<SpoolBuffer> getSpoolBuffer(SpoolId spool);
void pushCorrelated(value::SlotId slot, value::SlotAccessor* accessor);
void popCorrelated();
+ CompileCtx makeCopy(bool isSmp);
+
PlanStage* root{nullptr};
value::SlotAccessor* accumulator{nullptr};
std::vector<std::pair<value::SlotId, value::SlotAccessor*>> correlated;
stdx::unordered_map<SpoolId, std::shared_ptr<SpoolBuffer>> spoolBuffers;
bool aggExpression{false};
+
+private:
+ // Any data that a PlanStage needs from the RuntimeEnvironment should not be accessed directly
+ // but insteady by looking up the corresponding slots. These slots are set up during the process
+ // of building PlanStages, so the PlanStages themselves should never need to add new slots to
+ // the RuntimeEnvironment.
+ std::unique_ptr<RuntimeEnvironment> env;
};
/**
diff --git a/src/mongo/db/exec/sbe/sbe_key_string_test.cpp b/src/mongo/db/exec/sbe/sbe_key_string_test.cpp
index b69ab6ad3e3..303cf6efef1 100644
--- a/src/mongo/db/exec/sbe/sbe_key_string_test.cpp
+++ b/src/mongo/db/exec/sbe/sbe_key_string_test.cpp
@@ -115,7 +115,7 @@ TEST(SBEKeyStringTest, Basic) {
// Set up an SBE expression that will compare one element in the 'testValues' BSON object with
// one of the KeyString components.
- CompileCtx ctx;
+ CompileCtx ctx{std::make_unique<RuntimeEnvironment>()};
CoScanStage emptyStage;
ctx.root = &emptyStage;
diff --git a/src/mongo/db/exec/sbe/sbe_numeric_convert_test.cpp b/src/mongo/db/exec/sbe/sbe_numeric_convert_test.cpp
index 372b67fa5b6..e31effefdce 100644
--- a/src/mongo/db/exec/sbe/sbe_numeric_convert_test.cpp
+++ b/src/mongo/db/exec/sbe/sbe_numeric_convert_test.cpp
@@ -104,7 +104,7 @@ protected:
ASSERT_EQUALS(tag, value::TypeTags::Nothing);
}
- CompileCtx _ctx;
+ CompileCtx _ctx{std::make_unique<RuntimeEnvironment>()};
CoScanStage _emptyStage;
vm::ByteCode _interpreter;
diff --git a/src/mongo/db/exec/sbe/stages/exchange.cpp b/src/mongo/db/exec/sbe/stages/exchange.cpp
index e644c162e59..0c2535878d9 100644
--- a/src/mongo/db/exec/sbe/stages/exchange.cpp
+++ b/src/mongo/db/exec/sbe/stages/exchange.cpp
@@ -181,6 +181,10 @@ void ExchangeConsumer::prepare(CompileCtx& ctx) {
for (size_t idx = 0; idx < _state->fields().size(); ++idx) {
_outgoing.emplace_back(ExchangeBuffer::Accessor{});
}
+
+ for (size_t idx = 0; idx < _state->numOfProducers(); ++idx) {
+ _state->producerCompileCtxs().push_back(ctx.makeCopy(true));
+ }
// Compile '<' function once we implement order preserving exchange.
}
value::SlotAccessor* ExchangeConsumer::getAccessor(CompileCtx& ctx, value::SlotId slot) {
@@ -245,6 +249,7 @@ void ExchangeConsumer::open(bool reOpen) {
}
// Start n producers.
+ invariant(_state->producerCompileCtxs().size() == _state->numOfProducers());
for (size_t idx = 0; idx < _state->numOfProducers(); ++idx) {
auto pf = makePromiseFuture<void>();
s_globalThreadPool->schedule(
@@ -255,6 +260,7 @@ void ExchangeConsumer::open(bool reOpen) {
promise.setWith([&] {
ExchangeProducer::start(opCtx.get(),
+ _state->producerCompileCtxs()[idx],
std::move(_state->producerPlans()[idx]));
});
});
@@ -446,13 +452,14 @@ ExchangeProducer::ExchangeProducer(std::unique_ptr<PlanStage> input,
}
}
-void ExchangeProducer::start(OperationContext* opCtx, std::unique_ptr<PlanStage> producer) {
+void ExchangeProducer::start(OperationContext* opCtx,
+ CompileCtx& ctx,
+ std::unique_ptr<PlanStage> producer) {
ExchangeProducer* p = static_cast<ExchangeProducer*>(producer.get());
p->attachFromOperationContext(opCtx);
try {
- CompileCtx ctx;
p->prepare(ctx);
p->open(false);
@@ -475,6 +482,7 @@ std::unique_ptr<PlanStage> ExchangeProducer::clone() const {
void ExchangeProducer::prepare(CompileCtx& ctx) {
_children[0]->prepare(ctx);
+
for (auto& f : _state->fields()) {
_incoming.emplace_back(_children[0]->getAccessor(ctx, f));
}
diff --git a/src/mongo/db/exec/sbe/stages/exchange.h b/src/mongo/db/exec/sbe/stages/exchange.h
index 3fb3a3b91c5..c73dd19ebf5 100644
--- a/src/mongo/db/exec/sbe/stages/exchange.h
+++ b/src/mongo/db/exec/sbe/stages/exchange.h
@@ -196,6 +196,11 @@ public:
auto& producerPlans() {
return _producerPlans;
}
+
+ auto& producerCompileCtxs() {
+ return _producerCompileCtxs;
+ }
+
auto& producerResults() {
return _producerResults;
}
@@ -218,6 +223,7 @@ private:
std::vector<ExchangeConsumer*> _consumers;
std::vector<ExchangeProducer*> _producers;
std::vector<std::unique_ptr<PlanStage>> _producerPlans;
+ std::vector<CompileCtx> _producerCompileCtxs;
std::vector<Future<void>> _producerResults;
// Variables (fields) that pass through the exchange.
@@ -296,7 +302,9 @@ class ExchangeProducer final : public PlanStage {
public:
ExchangeProducer(std::unique_ptr<PlanStage> input, std::shared_ptr<ExchangeState> state);
- static void start(OperationContext* opCtx, std::unique_ptr<PlanStage> producer);
+ static void start(OperationContext* opCtx,
+ CompileCtx& ctx,
+ std::unique_ptr<PlanStage> producer);
std::unique_ptr<PlanStage> clone() const final;
diff --git a/src/mongo/db/exec/sbe_cmd.cpp b/src/mongo/db/exec/sbe_cmd.cpp
index d544e79599f..1e46cec6d4f 100644
--- a/src/mongo/db/exec/sbe_cmd.cpp
+++ b/src/mongo/db/exec/sbe_cmd.cpp
@@ -78,12 +78,12 @@ public:
NamespaceString nss{dbname};
+ stage_builder::PlanStageData data{std::make_unique<sbe::RuntimeEnvironment>()};
+ data.resultSlot = resultSlot;
+ data.recordIdSlot = recordIdSlot;
+
exec = uassertStatusOK(plan_executor_factory::make(
- opCtx,
- nullptr,
- {std::move(root), stage_builder::PlanStageData{resultSlot, recordIdSlot}},
- nss,
- nullptr));
+ opCtx, nullptr, {std::move(root), std::move(data)}, nullptr, nss, nullptr));
for (long long objCount = 0; objCount < batchSize; objCount++) {
BSONObj next;
diff --git a/src/mongo/db/query/get_executor.cpp b/src/mongo/db/query/get_executor.cpp
index a63e841a3e0..3f14abe3645 100644
--- a/src/mongo/db/query/get_executor.cpp
+++ b/src/mongo/db/query/get_executor.cpp
@@ -898,7 +898,7 @@ protected:
auto result = makeResult();
result->emplace(
{sbe::makeS<sbe::LimitSkipStage>(sbe::makeS<sbe::CoScanStage>(), 0, boost::none),
- stage_builder::PlanStageData{}},
+ stage_builder::PlanStageData{std::make_unique<sbe::RuntimeEnvironment>()}},
nullptr);
return result;
}
@@ -1072,6 +1072,7 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getSlotBasedExe
return plan_executor_factory::make(opCtx,
std::move(cq),
{std::move(plan.root), std::move(plan.data)},
+ collection,
{},
std::move(plan.results),
std::move(yieldPolicy));
@@ -1079,7 +1080,7 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getSlotBasedExe
// No need for runtime planning, just use the constructed plan stage tree.
invariant(roots.size() == 1);
return plan_executor_factory::make(
- opCtx, std::move(cq), std::move(roots[0]), {}, std::move(yieldPolicy));
+ opCtx, std::move(cq), std::move(roots[0]), collection, {}, std::move(yieldPolicy));
}
} // namespace
diff --git a/src/mongo/db/query/plan_executor_factory.cpp b/src/mongo/db/query/plan_executor_factory.cpp
index fa16042eaf0..3630e975646 100644
--- a/src/mongo/db/query/plan_executor_factory.cpp
+++ b/src/mongo/db/query/plan_executor_factory.cpp
@@ -113,6 +113,7 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> make(
OperationContext* opCtx,
std::unique_ptr<CanonicalQuery> cq,
std::pair<std::unique_ptr<sbe::PlanStage>, stage_builder::PlanStageData> root,
+ const Collection* collection,
NamespaceString nss,
std::unique_ptr<PlanYieldPolicySBE> yieldPolicy) {
@@ -129,6 +130,7 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> make(
auto exec = new PlanExecutorSBE(opCtx,
std::move(cq),
std::move(root),
+ collection,
std::move(nss),
false,
boost::none,
@@ -140,6 +142,7 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> make(
OperationContext* opCtx,
std::unique_ptr<CanonicalQuery> cq,
std::pair<std::unique_ptr<sbe::PlanStage>, stage_builder::PlanStageData> root,
+ const Collection* collection,
NamespaceString nss,
std::queue<std::pair<BSONObj, boost::optional<RecordId>>> stash,
std::unique_ptr<PlanYieldPolicySBE> yieldPolicy) {
@@ -152,8 +155,14 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> make(
"slots"_attr = data.debugString(),
"stages"_attr = sbe::DebugPrinter{}.print(rootStage.get()));
- auto exec = new PlanExecutorSBE(
- opCtx, std::move(cq), std::move(root), std::move(nss), true, stash, std::move(yieldPolicy));
+ auto exec = new PlanExecutorSBE(opCtx,
+ std::move(cq),
+ std::move(root),
+ collection,
+ std::move(nss),
+ true,
+ stash,
+ std::move(yieldPolicy));
return {{exec, PlanExecutor::Deleter{opCtx}}};
}
diff --git a/src/mongo/db/query/plan_executor_factory.h b/src/mongo/db/query/plan_executor_factory.h
index 8faad454285..56f6fe87cf4 100644
--- a/src/mongo/db/query/plan_executor_factory.h
+++ b/src/mongo/db/query/plan_executor_factory.h
@@ -105,6 +105,7 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> make(
OperationContext* opCtx,
std::unique_ptr<CanonicalQuery> cq,
std::pair<std::unique_ptr<sbe::PlanStage>, stage_builder::PlanStageData> root,
+ const Collection* collection,
NamespaceString nss,
std::unique_ptr<PlanYieldPolicySBE> yieldPolicy);
@@ -117,6 +118,7 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> make(
OperationContext* opCtx,
std::unique_ptr<CanonicalQuery> cq,
std::pair<std::unique_ptr<sbe::PlanStage>, stage_builder::PlanStageData> root,
+ const Collection* collection,
NamespaceString nss,
std::queue<std::pair<BSONObj, boost::optional<RecordId>>> stash,
std::unique_ptr<PlanYieldPolicySBE> yieldPolicy);
diff --git a/src/mongo/db/query/plan_executor_impl.cpp b/src/mongo/db/query/plan_executor_impl.cpp
index 8b0126ed50b..bd12126709d 100644
--- a/src/mongo/db/query/plan_executor_impl.cpp
+++ b/src/mongo/db/query/plan_executor_impl.cpp
@@ -37,8 +37,6 @@
#include "mongo/bson/simple_bsonobj_comparator.h"
#include "mongo/db/catalog/collection.h"
-#include "mongo/db/catalog/database.h"
-#include "mongo/db/catalog/database_holder.h"
#include "mongo/db/concurrency/write_conflict_exception.h"
#include "mongo/db/curop.h"
#include "mongo/db/exec/cached_plan.h"
@@ -56,9 +54,8 @@
#include "mongo/db/exec/trial_stage.h"
#include "mongo/db/exec/working_set.h"
#include "mongo/db/exec/working_set_common.h"
-#include "mongo/db/query/explain.h"
-#include "mongo/db/query/find_common.h"
#include "mongo/db/query/mock_yield_policies.h"
+#include "mongo/db/query/plan_insert_listener.h"
#include "mongo/db/query/plan_yield_policy_impl.h"
#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/db/service_context.h"
@@ -77,16 +74,10 @@ using std::vector;
const OperationContext::Decoration<repl::OpTime> clientsLastKnownCommittedOpTime =
OperationContext::declareDecoration<repl::OpTime>();
-struct CappedInsertNotifierData {
- shared_ptr<CappedInsertNotifier> notifier;
- uint64_t lastEOFVersion = ~0;
-};
-
namespace {
MONGO_FAIL_POINT_DEFINE(planExecutorAlwaysFails);
MONGO_FAIL_POINT_DEFINE(planExecutorHangBeforeShouldWaitForInserts);
-MONGO_FAIL_POINT_DEFINE(planExecutorHangWhileYieldedInWaitForInserts);
/**
* Constructs a PlanYieldPolicy based on 'policy'.
@@ -326,77 +317,6 @@ PlanExecutor::ExecState PlanExecutorImpl::getNextDocument(Document* objOut, Reco
return state;
}
-bool PlanExecutorImpl::_shouldListenForInserts() {
- return _cq && _cq->getQueryRequest().isTailableAndAwaitData() &&
- awaitDataState(_opCtx).shouldWaitForInserts && _opCtx->checkForInterruptNoAssert().isOK() &&
- awaitDataState(_opCtx).waitForInsertsDeadline >
- _opCtx->getServiceContext()->getPreciseClockSource()->now();
-}
-
-bool PlanExecutorImpl::_shouldWaitForInserts() {
- // If this is an awaitData-respecting operation and we have time left and we're not interrupted,
- // we should wait for inserts.
- if (_shouldListenForInserts()) {
- // We expect awaitData cursors to be yielding.
- invariant(_yieldPolicy->canReleaseLocksDuringExecution());
-
- // For operations with a last committed opTime, we should not wait if the replication
- // coordinator's lastCommittedOpTime has progressed past the client's lastCommittedOpTime.
- // In that case, we will return early so that we can inform the client of the new
- // lastCommittedOpTime immediately.
- if (!clientsLastKnownCommittedOpTime(_opCtx).isNull()) {
- auto replCoord = repl::ReplicationCoordinator::get(_opCtx);
- return clientsLastKnownCommittedOpTime(_opCtx) >= replCoord->getLastCommittedOpTime();
- }
- return true;
- }
- return false;
-}
-
-std::shared_ptr<CappedInsertNotifier> PlanExecutorImpl::_getCappedInsertNotifier() {
- // We don't expect to need a capped insert notifier for non-yielding plans.
- invariant(_yieldPolicy->canReleaseLocksDuringExecution());
-
- // We can only wait if we have a collection; otherwise we should retry immediately when
- // we hit EOF.
- dassert(_opCtx->lockState()->isCollectionLockedForMode(_nss, MODE_IS));
- auto databaseHolder = DatabaseHolder::get(_opCtx);
- auto db = databaseHolder->getDb(_opCtx, _nss.db());
- invariant(db);
- auto collection = CollectionCatalog::get(_opCtx).lookupCollectionByNamespace(_opCtx, _nss);
- invariant(collection);
-
- return collection->getCappedInsertNotifier();
-}
-
-void PlanExecutorImpl::_waitForInserts(CappedInsertNotifierData* notifierData) {
- invariant(notifierData->notifier);
-
- // The notifier wait() method will not wait unless the version passed to it matches the
- // current version of the notifier. Since the version passed to it is the current version
- // of the notifier at the time of the previous EOF, we require two EOFs in a row with no
- // notifier version change in order to wait. This is sufficient to ensure we never wait
- // when data is available.
- auto curOp = CurOp::get(_opCtx);
- curOp->pauseTimer();
- ON_BLOCK_EXIT([curOp] { curOp->resumeTimer(); });
- auto opCtx = _opCtx;
- uint64_t currentNotifierVersion = notifierData->notifier->getVersion();
- auto yieldResult = _yieldPolicy->yieldOrInterrupt(opCtx, [opCtx, notifierData] {
- const auto deadline = awaitDataState(opCtx).waitForInsertsDeadline;
- notifierData->notifier->waitUntil(notifierData->lastEOFVersion, deadline);
- if (MONGO_unlikely(planExecutorHangWhileYieldedInWaitForInserts.shouldFail())) {
- LOGV2(4452903,
- "PlanExecutor - planExecutorHangWhileYieldedInWaitForInserts fail point enabled. "
- "Blocking until fail point is disabled");
- planExecutorHangWhileYieldedInWaitForInserts.pauseWhileSet();
- }
- });
- notifierData->lastEOFVersion = currentNotifierVersion;
-
- uassertStatusOK(yieldResult);
-}
-
PlanExecutor::ExecState PlanExecutorImpl::_getNextImpl(Snapshotted<Document>* objOut,
RecordId* dlOut) {
if (MONGO_unlikely(planExecutorAlwaysFails.shouldFail())) {
@@ -422,10 +342,11 @@ PlanExecutor::ExecState PlanExecutorImpl::_getNextImpl(Snapshotted<Document>* ob
// Capped insert data; declared outside the loop so we hold a shared pointer to the capped
// insert notifier the entire time we are in the loop. Holding a shared pointer to the capped
// insert notifier is necessary for the notifierVersion to advance.
- CappedInsertNotifierData cappedInsertNotifierData;
- if (_shouldListenForInserts()) {
+ insert_listener::CappedInsertNotifierData cappedInsertNotifierData;
+ if (insert_listener::shouldListenForInserts(_opCtx, _cq.get())) {
// We always construct the CappedInsertNotifier for awaitData cursors.
- cappedInsertNotifierData.notifier = _getCappedInsertNotifier();
+ cappedInsertNotifierData.notifier =
+ insert_listener::getCappedInsertNotifier(_opCtx, _nss, _yieldPolicy.get());
}
for (;;) {
// These are the conditions which can cause us to yield:
@@ -519,10 +440,13 @@ PlanExecutor::ExecState PlanExecutorImpl::_getNextImpl(Snapshotted<Document>* ob
"enabled. Blocking until fail point is disabled");
planExecutorHangBeforeShouldWaitForInserts.pauseWhileSet();
}
- if (!_shouldWaitForInserts()) {
+
+ if (!insert_listener::shouldWaitForInserts(_opCtx, _cq.get(), _yieldPolicy.get())) {
return PlanExecutor::IS_EOF;
}
- _waitForInserts(&cappedInsertNotifierData);
+
+ insert_listener::waitForInserts(_opCtx, _yieldPolicy.get(), &cappedInsertNotifierData);
+
// There may be more results, keep going.
continue;
}
diff --git a/src/mongo/db/query/plan_executor_impl.h b/src/mongo/db/query/plan_executor_impl.h
index ac586aef59b..f4efb60b8af 100644
--- a/src/mongo/db/query/plan_executor_impl.h
+++ b/src/mongo/db/query/plan_executor_impl.h
@@ -117,33 +117,6 @@ private:
*/
Status _pickBestPlan();
- /**
- * Returns true if the PlanExecutor should listen for inserts, which is when a getMore is called
- * on a tailable and awaitData cursor that still has time left and hasn't been interrupted.
- */
- bool _shouldListenForInserts();
-
- /**
- * Returns true if the PlanExecutor should wait for data to be inserted, which is when a getMore
- * is called on a tailable and awaitData cursor on a capped collection. Returns false if an EOF
- * should be returned immediately.
- */
- bool _shouldWaitForInserts();
-
- /**
- * Gets the CappedInsertNotifier for a capped collection. Returns nullptr if this plan executor
- * is not capable of yielding based on a notifier.
- */
- std::shared_ptr<CappedInsertNotifier> _getCappedInsertNotifier();
-
- /**
- * Called for tailable and awaitData cursors in order to yield locks and waits for inserts to
- * the collection being tailed. Returns control to the caller once there has been an insertion
- * and there may be new results. If the PlanExecutor was killed during a yield, throws an
- * exception.
- */
- void _waitForInserts(CappedInsertNotifierData* notifierData);
-
ExecState _getNextImpl(Snapshotted<Document>* objOut, RecordId* dlOut);
// The OperationContext that we're executing within. This can be updated if necessary by using
diff --git a/src/mongo/db/query/plan_executor_sbe.cpp b/src/mongo/db/query/plan_executor_sbe.cpp
index 5d9375f7ed4..9bd146d516f 100644
--- a/src/mongo/db/query/plan_executor_sbe.cpp
+++ b/src/mongo/db/query/plan_executor_sbe.cpp
@@ -31,8 +31,10 @@
#include "mongo/db/query/plan_executor_sbe.h"
+#include "mongo/db/db_raii.h"
#include "mongo/db/exec/sbe/expressions/expression.h"
#include "mongo/db/exec/sbe/values/bson.h"
+#include "mongo/db/query/plan_insert_listener.h"
#include "mongo/db/query/sbe_stage_builder.h"
namespace mongo {
@@ -40,6 +42,7 @@ PlanExecutorSBE::PlanExecutorSBE(
OperationContext* opCtx,
std::unique_ptr<CanonicalQuery> cq,
std::pair<std::unique_ptr<sbe::PlanStage>, stage_builder::PlanStageData> root,
+ const Collection* collection,
NamespaceString nss,
bool isOpen,
boost::optional<std::queue<std::pair<BSONObj, boost::optional<RecordId>>>> stash,
@@ -47,6 +50,8 @@ PlanExecutorSBE::PlanExecutorSBE(
: _state{isOpen ? State::kOpened : State::kClosed},
_opCtx(opCtx),
_nss(std::move(nss)),
+ _env{root.second.env},
+ _ctx(std::move(root.second.ctx)),
_root(std::move(root.first)),
_cq{std::move(cq)},
_yieldPolicy(std::move(yieldPolicy)) {
@@ -69,6 +74,10 @@ PlanExecutorSBE::PlanExecutorSBE(
uassert(4822867, "Query does not have oplogTs slot.", _oplogTs);
}
+ if (data.shouldUseTailableScan) {
+ _resumeRecordIdSlot = _env->getSlot("resumeRecordId"_sd);
+ }
+
_shouldTrackLatestOplogTimestamp = data.shouldTrackLatestOplogTimestamp;
_shouldTrackResumeToken = data.shouldTrackResumeToken;
@@ -84,6 +93,16 @@ PlanExecutorSBE::PlanExecutorSBE(
if (_yieldPolicy) {
_yieldPolicy->setRootStage(_root.get());
}
+
+ // We may still need to initialize _nss from either collection or _cq.
+ if (_nss.isEmpty()) {
+ if (collection) {
+ _nss = collection->ns();
+ } else {
+ invariant(_cq);
+ _nss = _cq->getQueryRequest().nss();
+ }
+ }
}
void PlanExecutorSBE::saveState() {
@@ -160,24 +179,64 @@ PlanExecutor::ExecState PlanExecutorSBE::getNext(BSONObj* out, RecordId* dlOut)
// fetching the next document.
_root->close();
_state = State::kClosed;
- return PlanExecutor::ExecState::IS_EOF;
+ if (!_resumeRecordIdSlot) {
+ return PlanExecutor::ExecState::IS_EOF;
+ }
}
- if (_state == State::kClosed) {
- _state = State::kOpened;
- _root->open(false);
+ // Capped insert data; declared outside the loop so we hold a shared pointer to the capped
+ // insert notifier the entire time we are in the loop. Holding a shared pointer to the capped
+ // insert notifier is necessary for the notifierVersion to advance.
+ //
+ // Note that we need to hold a database intent lock before acquiring a notifier.
+ boost::optional<AutoGetCollectionForRead> coll;
+ insert_listener::CappedInsertNotifierData cappedInsertNotifierData;
+ if (insert_listener::shouldListenForInserts(_opCtx, _cq.get())) {
+ if (!_opCtx->lockState()->isCollectionLockedForMode(_nss, MODE_IS)) {
+ coll.emplace(_opCtx, _nss);
+ }
+
+ cappedInsertNotifierData.notifier =
+ insert_listener::getCappedInsertNotifier(_opCtx, _nss, _yieldPolicy.get());
}
- invariant(_state == State::kOpened);
+ for (;;) {
+ if (_state == State::kClosed) {
+ if (_resumeRecordIdSlot) {
+ invariant(_resultRecordId);
+
+ auto [tag, val] = _resultRecordId->getViewOfValue();
+ uassert(4946306,
+ "Collection scan was asked to track resume token, but found a result "
+ "without a valid RecordId",
+ tag == sbe::value::TypeTags::NumberInt64 ||
+ tag == sbe::value::TypeTags::Nothing);
+ _env->resetSlot(*_resumeRecordIdSlot, tag, val, false);
+ }
+
+ _state = State::kOpened;
+ _root->open(false);
+ }
- auto result = fetchNext(_root.get(), _result, _resultRecordId, out, dlOut);
- if (result == sbe::PlanState::IS_EOF) {
- _root->close();
- _state = State::kClosed;
- return PlanExecutor::ExecState::IS_EOF;
+ invariant(_state == State::kOpened);
+
+ auto result = fetchNext(_root.get(), _result, _resultRecordId, out, dlOut);
+ if (result == sbe::PlanState::IS_EOF) {
+ _root->close();
+ _state = State::kClosed;
+
+ if (!insert_listener::shouldWaitForInserts(_opCtx, _cq.get(), _yieldPolicy.get())) {
+ return PlanExecutor::ExecState::IS_EOF;
+ }
+
+ insert_listener::waitForInserts(_opCtx, _yieldPolicy.get(), &cappedInsertNotifierData);
+ // There may be more results, keep going.
+ continue;
+ }
+
+ invariant(result == sbe::PlanState::ADVANCED);
+ return PlanExecutor::ExecState::ADVANCED;
}
- invariant(result == sbe::PlanState::ADVANCED);
- return PlanExecutor::ExecState::ADVANCED;
}
Timestamp PlanExecutorSBE::getLatestOplogTimestamp() const {
@@ -185,11 +244,13 @@ Timestamp PlanExecutorSBE::getLatestOplogTimestamp() const {
invariant(_oplogTs);
auto [tag, val] = _oplogTs->getViewOfValue();
- uassert(4822868,
- "Collection scan was asked to track latest operation time, "
- "but found a result without a valid 'ts' field",
- tag == sbe::value::TypeTags::Timestamp);
- return Timestamp{sbe::value::bitcastTo<uint64_t>(val)};
+ if (tag != sbe::value::TypeTags::Nothing) {
+ uassert(4822868,
+ str::stream() << "Collection scan was asked to track latest operation time, "
+ "but found a result without a valid 'ts' field",
+ tag == sbe::value::TypeTags::Timestamp);
+ return Timestamp{sbe::value::bitcastTo<uint64_t>(val)};
+ }
}
return {};
}
@@ -199,11 +260,13 @@ BSONObj PlanExecutorSBE::getPostBatchResumeToken() const {
invariant(_resultRecordId);
auto [tag, val] = _resultRecordId->getViewOfValue();
- uassert(4822869,
- "Collection scan was asked to track resume token, "
- "but found a result without a valid RecordId",
- tag == sbe::value::TypeTags::NumberInt64);
- return BSON("$recordId" << sbe::value::bitcastTo<int64_t>(val));
+ if (tag != sbe::value::TypeTags::Nothing) {
+ uassert(4822869,
+ "Collection scan was asked to track resume token, "
+ "but found a result without a valid RecordId",
+ tag == sbe::value::TypeTags::NumberInt64);
+ return BSON("$recordId" << sbe::value::bitcastTo<int64_t>(val));
+ }
}
return {};
}
diff --git a/src/mongo/db/query/plan_executor_sbe.h b/src/mongo/db/query/plan_executor_sbe.h
index 5ff2f0fe05d..de16b910fcb 100644
--- a/src/mongo/db/query/plan_executor_sbe.h
+++ b/src/mongo/db/query/plan_executor_sbe.h
@@ -43,6 +43,7 @@ public:
OperationContext* opCtx,
std::unique_ptr<CanonicalQuery> cq,
std::pair<std::unique_ptr<sbe::PlanStage>, stage_builder::PlanStageData> root,
+ const Collection* collection,
NamespaceString nss,
bool isOpen,
boost::optional<std::queue<std::pair<BSONObj, boost::optional<RecordId>>>> stash,
@@ -129,11 +130,15 @@ private:
NamespaceString _nss;
+ // CompileCtx owns the instance pointed by _env, so we must keep it around.
+ sbe::RuntimeEnvironment* _env{nullptr};
+ sbe::CompileCtx _ctx;
std::unique_ptr<sbe::PlanStage> _root;
sbe::value::SlotAccessor* _result{nullptr};
sbe::value::SlotAccessor* _resultRecordId{nullptr};
sbe::value::SlotAccessor* _oplogTs{nullptr};
+ boost::optional<sbe::value::SlotId> _resumeRecordIdSlot;
bool _shouldTrackLatestOplogTimestamp{false};
bool _shouldTrackResumeToken{false};
diff --git a/src/mongo/db/query/plan_insert_listener.cpp b/src/mongo/db/query/plan_insert_listener.cpp
new file mode 100644
index 00000000000..ea98f68e6ee
--- /dev/null
+++ b/src/mongo/db/query/plan_insert_listener.cpp
@@ -0,0 +1,123 @@
+/**
+ * Copyright (C) 2020-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kQuery
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/query/plan_insert_listener.h"
+
+#include "mongo/db/catalog/database.h"
+#include "mongo/db/catalog/database_holder.h"
+#include "mongo/db/curop.h"
+#include "mongo/db/query/find_common.h"
+#include "mongo/logv2/log.h"
+#include "mongo/util/fail_point.h"
+
+namespace mongo::insert_listener {
+namespace {
+MONGO_FAIL_POINT_DEFINE(planExecutorHangWhileYieldedInWaitForInserts);
+}
+
+bool shouldListenForInserts(OperationContext* opCtx, CanonicalQuery* cq) {
+ return cq && cq->getQueryRequest().isTailableAndAwaitData() &&
+ awaitDataState(opCtx).shouldWaitForInserts && opCtx->checkForInterruptNoAssert().isOK() &&
+ awaitDataState(opCtx).waitForInsertsDeadline >
+ opCtx->getServiceContext()->getPreciseClockSource()->now();
+}
+
+bool shouldWaitForInserts(OperationContext* opCtx,
+ CanonicalQuery* cq,
+ PlanYieldPolicy* yieldPolicy) {
+ // If this is an awaitData-respecting operation and we have time left and we're not interrupted,
+ // we should wait for inserts.
+ if (shouldListenForInserts(opCtx, cq)) {
+ // We expect awaitData cursors to be yielding.
+ invariant(yieldPolicy->canReleaseLocksDuringExecution());
+
+ // For operations with a last committed opTime, we should not wait if the replication
+ // coordinator's lastCommittedOpTime has progressed past the client's lastCommittedOpTime.
+ // In that case, we will return early so that we can inform the client of the new
+ // lastCommittedOpTime immediately.
+ if (!clientsLastKnownCommittedOpTime(opCtx).isNull()) {
+ auto replCoord = repl::ReplicationCoordinator::get(opCtx);
+ return clientsLastKnownCommittedOpTime(opCtx) >= replCoord->getLastCommittedOpTime();
+ }
+ return true;
+ }
+ return false;
+}
+
+std::shared_ptr<CappedInsertNotifier> getCappedInsertNotifier(OperationContext* opCtx,
+ const NamespaceString& nss,
+ PlanYieldPolicy* yieldPolicy) {
+ // We don't expect to need a capped insert notifier for non-yielding plans.
+ invariant(yieldPolicy->canReleaseLocksDuringExecution());
+
+ // We can only wait if we have a collection; otherwise we should retry immediately when
+ // we hit EOF.
+ dassert(opCtx->lockState()->isCollectionLockedForMode(nss, MODE_IS));
+ auto databaseHolder = DatabaseHolder::get(opCtx);
+ auto db = databaseHolder->getDb(opCtx, nss.db());
+ invariant(db);
+ auto collection = CollectionCatalog::get(opCtx).lookupCollectionByNamespace(opCtx, nss);
+ invariant(collection);
+
+ return collection->getCappedInsertNotifier();
+}
+
+void waitForInserts(OperationContext* opCtx,
+ PlanYieldPolicy* yieldPolicy,
+ CappedInsertNotifierData* notifierData) {
+ invariant(notifierData->notifier);
+
+ // The notifier wait() method will not wait unless the version passed to it matches the
+ // current version of the notifier. Since the version passed to it is the current version
+ // of the notifier at the time of the previous EOF, we require two EOFs in a row with no
+ // notifier version change in order to wait. This is sufficient to ensure we never wait
+ // when data is available.
+ auto curOp = CurOp::get(opCtx);
+ curOp->pauseTimer();
+ ON_BLOCK_EXIT([curOp] { curOp->resumeTimer(); });
+
+ uint64_t currentNotifierVersion = notifierData->notifier->getVersion();
+ auto yieldResult = yieldPolicy->yieldOrInterrupt(opCtx, [opCtx, notifierData] {
+ const auto deadline = awaitDataState(opCtx).waitForInsertsDeadline;
+ notifierData->notifier->waitUntil(notifierData->lastEOFVersion, deadline);
+ if (MONGO_unlikely(planExecutorHangWhileYieldedInWaitForInserts.shouldFail())) {
+ LOGV2(4452903,
+ "PlanExecutor - planExecutorHangWhileYieldedInWaitForInserts fail point enabled. "
+ "Blocking until fail point is disabled");
+ planExecutorHangWhileYieldedInWaitForInserts.pauseWhileSet();
+ }
+ });
+ notifierData->lastEOFVersion = currentNotifierVersion;
+
+ uassertStatusOK(yieldResult);
+}
+} // namespace mongo::insert_listener
diff --git a/src/mongo/db/query/plan_insert_listener.h b/src/mongo/db/query/plan_insert_listener.h
new file mode 100644
index 00000000000..be4407d3df8
--- /dev/null
+++ b/src/mongo/db/query/plan_insert_listener.h
@@ -0,0 +1,81 @@
+/**
+ * Copyright (C) 2020-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/catalog/collection.h"
+#include "mongo/db/operation_context.h"
+#include "mongo/db/query/canonical_query.h"
+#include "mongo/db/query/plan_yield_policy.h"
+
+namespace mongo::insert_listener {
+/**
+ * A helper wrapper struct around CappedInsertNotifier which also holds the last version returned
+ * by the 'notifier'.
+ */
+struct CappedInsertNotifierData {
+ std::shared_ptr<CappedInsertNotifier> notifier;
+ uint64_t lastEOFVersion = ~0;
+};
+
+/**
+ * Returns true if the PlanExecutor should listen for inserts, which is when a getMore is called
+ * on a tailable and awaitData cursor that still has time left and hasn't been interrupted.
+ */
+
+bool shouldListenForInserts(OperationContext* opCtx, CanonicalQuery* cq);
+
+/**
+ * Returns true if the PlanExecutor should wait for data to be inserted, which is when a getMore
+ * is called on a tailable and awaitData cursor on a capped collection. Returns false if an EOF
+ * should be returned immediately.
+ */
+bool shouldWaitForInserts(OperationContext* opCtx,
+ CanonicalQuery* cq,
+ PlanYieldPolicy* yieldPolicy);
+
+/**
+ * Gets the CappedInsertNotifier for a capped collection. Returns nullptr if this plan executor
+ * is not capable of yielding based on a notifier.
+ */
+std::shared_ptr<CappedInsertNotifier> getCappedInsertNotifier(OperationContext* opCtx,
+ const NamespaceString& nss,
+ PlanYieldPolicy* yieldPolicy);
+
+/**
+ * Called for tailable and awaitData cursors in order to yield locks and waits for inserts to
+ * the collection being tailed. Returns control to the caller once there has been an insertion
+ * and there may be new results. If the PlanExecutor was killed during a yield, throws an
+ * exception.
+ */
+
+void waitForInserts(OperationContext* opCtx,
+ PlanYieldPolicy* yieldPolicy,
+ CappedInsertNotifierData* notifierData);
+} // namespace mongo::insert_listener
diff --git a/src/mongo/db/query/sbe_stage_builder.cpp b/src/mongo/db/query/sbe_stage_builder.cpp
index 076b304aa5d..63a6c7952aa 100644
--- a/src/mongo/db/query/sbe_stage_builder.cpp
+++ b/src/mongo/db/query/sbe_stage_builder.cpp
@@ -52,8 +52,15 @@
#include "mongo/db/query/sbe_stage_builder_filter.h"
#include "mongo/db/query/sbe_stage_builder_index_scan.h"
#include "mongo/db/query/sbe_stage_builder_projection.h"
+#include "mongo/db/query/util/make_data_structure.h"
namespace mongo::stage_builder {
+std::unique_ptr<sbe::RuntimeEnvironment> makeRuntimeEnvironment(
+ OperationContext* opCtx, sbe::value::SlotIdGenerator* slotIdGenerator) {
+ auto env = std::make_unique<sbe::RuntimeEnvironment>();
+ return env;
+}
+
std::unique_ptr<sbe::PlanStage> SlotBasedStageBuilder::buildCollScan(
const QuerySolutionNode* root) {
auto csn = static_cast<const CollectionScanNode*>(root);
@@ -63,12 +70,15 @@ std::unique_ptr<sbe::PlanStage> SlotBasedStageBuilder::buildCollScan(
csn,
&_slotIdGenerator,
_yieldPolicy,
+ _data.env,
+ _isTailableCollScanResumeBranch,
_data.trialRunProgressTracker.get());
_data.resultSlot = resultSlot;
_data.recordIdSlot = recordIdSlot;
_data.oplogTsSlot = oplogTsSlot;
_data.shouldTrackLatestOplogTimestamp = csn->shouldTrackLatestOplogTimestamp;
_data.shouldTrackResumeToken = csn->requestResumeToken;
+ _data.shouldUseTailableScan = csn->tailable;
if (_returnKeySlot) {
// Assign the '_returnKeySlot' to be the empty object.
@@ -144,14 +154,15 @@ std::unique_ptr<sbe::PlanStage> SlotBasedStageBuilder::buildFetch(const QuerySol
std::unique_ptr<sbe::PlanStage> SlotBasedStageBuilder::buildLimit(const QuerySolutionNode* root) {
const auto ln = static_cast<const LimitNode*>(root);
- // If we have both limit and skip stages and the skip stage is beneath the limit, then we can
- // combine these two stages into one. So, save the _limit value and let the skip stage builder
- // handle it.
+ // If we have both limit and skip stages and the skip stage is beneath the limit, then we
+ // can combine these two stages into one. So, save the _limit value and let the skip stage
+ // builder handle it.
if (ln->children[0]->getType() == StageType::STAGE_SKIP) {
_limit = ln->limit;
}
+
auto inputStage = build(ln->children[0]);
- return _limit
+ return _limit || _isTailableCollScanResumeBranch
? std::move(inputStage)
: std::make_unique<sbe::LimitSkipStage>(std::move(inputStage), ln->limit, boost::none);
}
@@ -159,7 +170,9 @@ std::unique_ptr<sbe::PlanStage> SlotBasedStageBuilder::buildLimit(const QuerySol
std::unique_ptr<sbe::PlanStage> SlotBasedStageBuilder::buildSkip(const QuerySolutionNode* root) {
const auto sn = static_cast<const SkipNode*>(root);
auto inputStage = build(sn->children[0]);
- return std::make_unique<sbe::LimitSkipStage>(std::move(inputStage), _limit, sn->skip);
+ return _isTailableCollScanResumeBranch
+ ? std::move(inputStage)
+ : std::make_unique<sbe::LimitSkipStage>(std::move(inputStage), _limit, sn->skip);
}
std::unique_ptr<sbe::PlanStage> SlotBasedStageBuilder::buildSort(const QuerySolutionNode* root) {
@@ -464,6 +477,86 @@ std::unique_ptr<sbe::PlanStage> SlotBasedStageBuilder::buildReturnKey(
return stage;
}
+std::unique_ptr<sbe::PlanStage> SlotBasedStageBuilder::makeUnionForTailableCollScan(
+ const QuerySolutionNode* root) {
+ using namespace std::literals;
+
+ // Register a SlotId in the global environment which would contain a recordId to resume a
+ // tailable collection scan from. A PlanStage executor will track the last seen recordId and
+ // will reset a SlotAccessor for the resumeRecordIdSlot with this recordId.
+ auto resumeRecordIdSlot = _data.env->registerSlot(
+ "resumeRecordId"_sd, sbe::value::TypeTags::Nothing, 0, false, &_slotIdGenerator);
+
+ // For tailable collection scan we need to build a special union sub-tree consisting of two
+ // branches:
+ // 1) An anchor branch implementing an initial collection scan before the first EOF is hit.
+ // 2) A resume branch implementing all consecutive collection scans from a recordId which was
+ // seen last.
+ //
+ // The 'makeStage' parameter is used to build a PlanStage tree which is served as a root stage
+ // for each of the union branches. The same machanism is used to build each union branch, and
+ // the special logic which needs to be triggered depending on which branch we build is
+ // controlled by setting the _isTailableCollScanResumeBranch flag.
+
+ _isBuildingUnionForTailableCollScan = true;
+
+ auto makeUnionBranch = [&](bool isTailableCollScanResumeBranch)
+ -> std::pair<sbe::value::SlotVector, std::unique_ptr<sbe::PlanStage>> {
+ _isTailableCollScanResumeBranch = isTailableCollScanResumeBranch;
+ auto branch = build(root);
+ auto branchSlots = sbe::makeSV(*_data.resultSlot, *_data.recordIdSlot);
+ if (_data.oplogTsSlot) {
+ branchSlots.push_back(*_data.oplogTsSlot);
+ }
+ if (_returnKeySlot) {
+ branchSlots.push_back(*_returnKeySlot);
+ }
+ return {std::move(branchSlots), std::move(branch)};
+ };
+
+ // Build an anchor branch of the union and add a constant filter on top of it, so that it would
+ // only execute on an initial collection scan, that is, when resumeRecordId is not available
+ // yet.
+ auto&& [anchorBranchSlots, anchorBranch] = makeUnionBranch(false);
+ anchorBranch = sbe::makeS<sbe::FilterStage<true>>(
+ std::move(anchorBranch),
+ sbe::makeE<sbe::EPrimUnary>(
+ sbe::EPrimUnary::logicNot,
+ sbe::makeE<sbe::EFunction>(
+ "exists"sv, sbe::makeEs(sbe::makeE<sbe::EVariable>(resumeRecordIdSlot)))));
+
+ // Build a resume branch of the union and add a constant filter on op of it, so that it would
+ // only execute when we resume a collection scan from the resumeRecordId.
+ auto&& [resumeBranchSlots, resumeBranch] = makeUnionBranch(true);
+ resumeBranch = sbe::makeS<sbe::FilterStage<true>>(
+ sbe::makeS<sbe::LimitSkipStage>(std::move(resumeBranch), boost::none, 1),
+ sbe::makeE<sbe::EFunction>("exists"sv,
+ sbe::makeEs(sbe::makeE<sbe::EVariable>(resumeRecordIdSlot))));
+
+ invariant(anchorBranchSlots.size() == resumeBranchSlots.size());
+
+ // A vector of the output slots for each union branch.
+ auto branchSlots = make_vector<sbe::value::SlotVector>(std::move(anchorBranchSlots),
+ std::move(resumeBranchSlots));
+
+ _data.resultSlot = _slotIdGenerator.generate();
+ _data.recordIdSlot = _slotIdGenerator.generate();
+ auto unionOutputSlots = sbe::makeSV(*_data.resultSlot, *_data.recordIdSlot);
+ if (_data.oplogTsSlot) {
+ _data.oplogTsSlot = _slotIdGenerator.generate();
+ unionOutputSlots.push_back(*_data.oplogTsSlot);
+ }
+
+ // Branch output slots become the input slots to the union.
+ auto unionStage =
+ sbe::makeS<sbe::UnionStage>(make_vector<std::unique_ptr<sbe::PlanStage>>(
+ std::move(anchorBranch), std::move(resumeBranch)),
+ branchSlots,
+ unionOutputSlots);
+ _isBuildingUnionForTailableCollScan = false;
+ return unionStage;
+}
+
// Returns a non-null pointer to the root of a plan tree, or a non-OK status if the PlanStage tree
// could not be constructed.
std::unique_ptr<sbe::PlanStage> SlotBasedStageBuilder::build(const QuerySolutionNode* root) {
@@ -489,6 +582,22 @@ std::unique_ptr<sbe::PlanStage> SlotBasedStageBuilder::build(const QuerySolution
str::stream() << "Can't build exec tree for node: " << root->toString(),
kStageBuilders.find(root->getType()) != kStageBuilders.end());
+ // If this plan is for a tailable cursor scan, and we're not already in the process of building
+ // a special union sub-tree implementing such scans, then start building a union sub-tree. Note
+ // that LIMIT or SKIP stage is used as a splitting point of the two union branches, if present,
+ // because we need to apply limit (or skip) only in the initial scan (in the anchor branch), and
+ // the resume branch should not have it.
+ switch (root->getType()) {
+ case STAGE_COLLSCAN:
+ case STAGE_LIMIT:
+ case STAGE_SKIP:
+ if (_cq.getQueryRequest().isTailable() && !_isBuildingUnionForTailableCollScan) {
+ return makeUnionForTailableCollScan(root);
+ }
+ default:
+ break;
+ }
+
return std::invoke(kStageBuilders.at(root->getType()), *this, root);
}
} // namespace mongo::stage_builder
diff --git a/src/mongo/db/query/sbe_stage_builder.h b/src/mongo/db/query/sbe_stage_builder.h
index 6d1044e998d..3137d12156d 100644
--- a/src/mongo/db/query/sbe_stage_builder.h
+++ b/src/mongo/db/query/sbe_stage_builder.h
@@ -39,10 +39,20 @@
namespace mongo::stage_builder {
/**
+ * Creates a new compilation environment and registers global values within the
+ * new environment.
+ */
+std::unique_ptr<sbe::RuntimeEnvironment> makeRuntimeEnvironment(
+ OperationContext* opCtx, sbe::value::SlotIdGenerator* slotIdGenerator);
+
+/**
* Some auxiliary data returned by a 'SlotBasedStageBuilder' along with a PlanStage tree root, which
* is needed to execute the PlanStage tree.
*/
struct PlanStageData {
+ PlanStageData(std::unique_ptr<sbe::RuntimeEnvironment> env)
+ : env{env.get()}, ctx{std::move(env)} {}
+
std::string debugString() const {
StringBuilder builder;
@@ -56,15 +66,19 @@ struct PlanStageData {
builder << "$$OPLOGTS=s" << *oplogTsSlot << " ";
}
+ env->debugString(&builder);
+
return builder.str();
}
boost::optional<sbe::value::SlotId> resultSlot;
boost::optional<sbe::value::SlotId> recordIdSlot;
boost::optional<sbe::value::SlotId> oplogTsSlot;
+ sbe::RuntimeEnvironment* env{nullptr};
sbe::CompileCtx ctx;
bool shouldTrackLatestOplogTimestamp{false};
bool shouldTrackResumeToken{false};
+ bool shouldUseTailableScan{false};
// Used during the trial run of the runtime planner to track progress of the work done so far.
std::unique_ptr<TrialRunProgressTracker> trialRunProgressTracker;
};
@@ -114,6 +128,8 @@ private:
sbe::value::SlotId recordIdKeySlot,
const sbe::value::SlotVector& slotsToForward = {});
+ std::unique_ptr<sbe::PlanStage> makeUnionForTailableCollScan(const QuerySolutionNode* root);
+
sbe::value::SlotIdGenerator _slotIdGenerator;
sbe::value::FrameIdGenerator _frameIdGenerator;
sbe::value::SpoolIdGenerator _spoolIdGenerator;
@@ -127,10 +143,16 @@ private:
// should inflate each index entry into an object and bind it to this slot.
boost::optional<sbe::value::SlotId> _returnKeySlot;
+ // These two flags control whether we're in the middle of the process of building a special
+ // union sub-tree implementing a tailable cursor collection scan, and if so, whether we're
+ // building an anchor or resume branch.
+ bool _isBuildingUnionForTailableCollScan{false};
+ bool _isTailableCollScanResumeBranch{false};
+
PlanYieldPolicySBE* const _yieldPolicy;
// Apart from generating just an execution tree, this builder will also produce some auxiliary
// data which is needed to execute the tree, such as a result slot, or a recordId slot.
- PlanStageData _data;
+ PlanStageData _data{makeRuntimeEnvironment(_opCtx, &_slotIdGenerator)};
};
} // namespace mongo::stage_builder
diff --git a/src/mongo/db/query/sbe_stage_builder_coll_scan.cpp b/src/mongo/db/query/sbe_stage_builder_coll_scan.cpp
index fbfcef2cca6..85fe75f5508 100644
--- a/src/mongo/db/query/sbe_stage_builder_coll_scan.cpp
+++ b/src/mongo/db/query/sbe_stage_builder_coll_scan.cpp
@@ -81,6 +81,24 @@ sbe::ScanOpenCallback makeOpenCallbackIfNeeded(const Collection* collection,
}
/**
+ * If 'shouldTrackLatestOplogTimestamp' returns a vector holding the name of the oplog 'ts' field
+ * along with another vector holding a SlotId to map this field to, as well as the standalone value
+ * of the same SlotId (the latter is returned purely for convenience purposes).
+ */
+std::tuple<std::vector<std::string>, sbe::value::SlotVector, boost::optional<sbe::value::SlotId>>
+makeOplogTimestampSlotsIfNeeded(const Collection* collection,
+ sbe::value::SlotIdGenerator* slotIdGenerator,
+ bool shouldTrackLatestOplogTimestamp) {
+ if (shouldTrackLatestOplogTimestamp) {
+ invariant(collection->ns().isOplog());
+
+ auto tsSlot = slotIdGenerator->generate();
+ return {{repl::OpTime::kTimestampFieldName}, sbe::makeSV(tsSlot), tsSlot};
+ }
+ return {};
+};
+
+/**
* Creates a collection scan sub-tree optimized for oplog scans. We can built an optimized scan
* when there is a predicted on the 'ts' field of the oplog collection.
*
@@ -103,6 +121,8 @@ generateOptimizedOplogScan(OperationContext* opCtx,
const CollectionScanNode* csn,
sbe::value::SlotIdGenerator* slotIdGenerator,
PlanYieldPolicy* yieldPolicy,
+ sbe::RuntimeEnvironment* env,
+ bool isTailableResumeBranch,
TrialRunProgressTracker* tracker) {
invariant(collection->ns().isOplog());
// The minTs and maxTs optimizations are not compatible with resumeAfterRecordId and can only
@@ -115,9 +135,15 @@ generateOptimizedOplogScan(OperationContext* opCtx,
// See if the RecordStore supports the oplogStartHack. If so, the scan will start from the
// RecordId stored in seekRecordId.
+ // Otherwise, if we're building a collection scan for a resume branch of a special union
+ // sub-tree implementing a tailable cursor scan, we can use the seekRecordIdSlot directly
+ // to access the recordId to resume the scan from.
auto [seekRecordId, seekRecordIdSlot] =
[&]() -> std::pair<boost::optional<RecordId>, boost::optional<sbe::value::SlotId>> {
- if (csn->minTs) {
+ if (isTailableResumeBranch) {
+ auto resumeRecordIdSlot = env->getSlot("resumeRecordId"_sd);
+ return {{}, resumeRecordIdSlot};
+ } else if (csn->minTs) {
auto goal = oploghack::keyForOptime(*csn->minTs);
if (goal.isOK()) {
auto startLoc =
@@ -134,18 +160,10 @@ generateOptimizedOplogScan(OperationContext* opCtx,
// Check if we need to project out an oplog 'ts' field as part of the collection scan. We will
// need it either when 'maxTs' bound has been provided, so that we can apply an EOF filter, of
// if we need to track the latest oplog timestamp.
- auto [fields, slots, tsSlot] = [&]() -> std::tuple<std::vector<std::string>,
- sbe::value::SlotVector,
- boost::optional<sbe::value::SlotId>> {
- // Don't project the 'ts' if stopApplyingFilterAfterFirstMatch is 'true'. We will have
- // another scan stage where it will be done.
- if (!csn->stopApplyingFilterAfterFirstMatch &&
- (csn->maxTs || csn->shouldTrackLatestOplogTimestamp)) {
- auto tsSlot = slotIdGenerator->generate();
- return {{repl::OpTime::kTimestampFieldName}, sbe::makeSV(tsSlot), tsSlot};
- }
- return {};
- }();
+ const auto shouldTrackLatestOplogTimestamp = !csn->stopApplyingFilterAfterFirstMatch &&
+ (csn->maxTs || csn->shouldTrackLatestOplogTimestamp);
+ auto&& [fields, slots, tsSlot] = makeOplogTimestampSlotsIfNeeded(
+ collection, slotIdGenerator, shouldTrackLatestOplogTimestamp);
NamespaceStringOrUUID nss{collection->ns().db().toString(), collection->uuid()};
auto stage = sbe::makeS<sbe::ScanStage>(nss,
@@ -217,16 +235,11 @@ generateOptimizedOplogScan(OperationContext* opCtx,
// inner branch, and the execution will continue from this point further on, without
// applying the filter.
if (csn->stopApplyingFilterAfterFirstMatch) {
- std::tie(fields, slots, tsSlot) =
- [&]() -> std::tuple<std::vector<std::string>,
- sbe::value::SlotVector,
- boost::optional<sbe::value::SlotId>> {
- if (csn->shouldTrackLatestOplogTimestamp) {
- auto tsSlot = slotIdGenerator->generate();
- return {{repl::OpTime::kTimestampFieldName}, sbe::makeSV(tsSlot), tsSlot};
- }
- return {};
- }();
+ invariant(csn->minTs || csn->maxTs);
+ invariant(csn->direction == CollectionScanParams::FORWARD);
+
+ std::tie(fields, slots, tsSlot) = makeOplogTimestampSlotsIfNeeded(
+ collection, slotIdGenerator, csn->shouldTrackLatestOplogTimestamp);
seekRecordIdSlot = recordIdSlot;
resultSlot = slotIdGenerator->generate();
@@ -268,27 +281,32 @@ generateGenericCollScan(const Collection* collection,
const CollectionScanNode* csn,
sbe::value::SlotIdGenerator* slotIdGenerator,
PlanYieldPolicy* yieldPolicy,
+ sbe::RuntimeEnvironment* env,
+ bool isTailableResumeBranch,
TrialRunProgressTracker* tracker) {
const auto forward = csn->direction == CollectionScanParams::FORWARD;
+ invariant(!csn->shouldTrackLatestOplogTimestamp || collection->ns().isOplog());
+ invariant(!csn->resumeAfterRecordId || forward);
+ invariant(!csn->resumeAfterRecordId || !csn->tailable);
+
auto resultSlot = slotIdGenerator->generate();
auto recordIdSlot = slotIdGenerator->generate();
- auto seekRecordIdSlot = boost::make_optional(static_cast<bool>(csn->resumeAfterRecordId),
- slotIdGenerator->generate());
-
- // See if we need to project out an oplog latest timestamp.
- auto [fields, slots, tsSlot] = [&]() -> std::tuple<std::vector<std::string>,
- sbe::value::SlotVector,
- boost::optional<sbe::value::SlotId>> {
- if (csn->shouldTrackLatestOplogTimestamp) {
- invariant(collection->ns().isOplog());
-
- auto tsSlot = slotIdGenerator->generate();
- return {{repl::OpTime::kTimestampFieldName}, sbe::makeSV(tsSlot), tsSlot};
+ auto seekRecordIdSlot = [&]() -> boost::optional<sbe::value::SlotId> {
+ if (csn->resumeAfterRecordId) {
+ return slotIdGenerator->generate();
+ } else if (isTailableResumeBranch) {
+ auto resumeRecordIdSlot = env->getSlot("resumeRecordId"_sd);
+ invariant(resumeRecordIdSlot);
+ return resumeRecordIdSlot;
}
return {};
}();
+ // See if we need to project out an oplog latest timestamp.
+ auto&& [fields, slots, tsSlot] = makeOplogTimestampSlotsIfNeeded(
+ collection, slotIdGenerator, csn->shouldTrackLatestOplogTimestamp);
+
NamespaceStringOrUUID nss{collection->ns().db().toString(), collection->uuid()};
auto stage = sbe::makeS<sbe::ScanStage>(nss,
resultSlot,
@@ -310,7 +328,7 @@ generateGenericCollScan(const Collection* collection,
//
// TODO SERVER-48472: raise KeyNotFound error if we cannot position the cursor on
// seekRecordIdSlot.
- if (seekRecordIdSlot) {
+ if (seekRecordIdSlot && !isTailableResumeBranch) {
stage = sbe::makeS<sbe::LoopJoinStage>(
sbe::makeProjectStage(
sbe::makeS<sbe::LimitSkipStage>(sbe::makeS<sbe::CoScanStage>(), 1, boost::none),
@@ -345,15 +363,28 @@ generateCollScan(OperationContext* opCtx,
const CollectionScanNode* csn,
sbe::value::SlotIdGenerator* slotIdGenerator,
PlanYieldPolicy* yieldPolicy,
+ sbe::RuntimeEnvironment* env,
+ bool isTailableResumeBranch,
TrialRunProgressTracker* tracker) {
- uassert(4822889, "Tailable collection scans are not supported in SBE", !csn->tailable);
auto [resultSlot, recordIdSlot, oplogTsSlot, stage] = [&]() {
if (csn->minTs || csn->maxTs) {
- return generateOptimizedOplogScan(
- opCtx, collection, csn, slotIdGenerator, yieldPolicy, tracker);
+ return generateOptimizedOplogScan(opCtx,
+ collection,
+ csn,
+ slotIdGenerator,
+ yieldPolicy,
+ env,
+ isTailableResumeBranch,
+ tracker);
} else {
- return generateGenericCollScan(collection, csn, slotIdGenerator, yieldPolicy, tracker);
+ return generateGenericCollScan(collection,
+ csn,
+ slotIdGenerator,
+ yieldPolicy,
+ env,
+ isTailableResumeBranch,
+ tracker);
}
}();
diff --git a/src/mongo/db/query/sbe_stage_builder_coll_scan.h b/src/mongo/db/query/sbe_stage_builder_coll_scan.h
index b5dd49a9504..f6ed8671739 100644
--- a/src/mongo/db/query/sbe_stage_builder_coll_scan.h
+++ b/src/mongo/db/query/sbe_stage_builder_coll_scan.h
@@ -29,6 +29,7 @@
#pragma once
+#include "mongo/db/exec/sbe/expressions/expression.h"
#include "mongo/db/exec/sbe/stages/stages.h"
#include "mongo/db/exec/sbe/values/id_generators.h"
#include "mongo/db/exec/trial_run_progress_tracker.h"
@@ -56,5 +57,7 @@ generateCollScan(OperationContext* opCtx,
const CollectionScanNode* csn,
sbe::value::SlotIdGenerator* slotIdGenerator,
PlanYieldPolicy* yieldPolicy,
+ sbe::RuntimeEnvironment* env,
+ bool isTailableResumeBranch,
TrialRunProgressTracker* tracker);
} // namespace mongo::stage_builder