From e83c568e68a37f7a4022caa67b19126fe7f4ab58 Mon Sep 17 00:00:00 2001 From: Anton Korshunov Date: Mon, 13 Jul 2020 11:18:38 +0100 Subject: SERVER-49463 Add support for tailable cursors and change streams in SBE --- src/mongo/db/SConscript | 3 +- src/mongo/db/exec/sbe/expressions/expression.cpp | 88 +++++++++++- src/mongo/db/exec/sbe/expressions/expression.h | 158 +++++++++++++++++++++ src/mongo/db/exec/sbe/sbe_key_string_test.cpp | 2 +- src/mongo/db/exec/sbe/sbe_numeric_convert_test.cpp | 2 +- src/mongo/db/exec/sbe/stages/exchange.cpp | 12 +- src/mongo/db/exec/sbe/stages/exchange.h | 10 +- src/mongo/db/exec/sbe_cmd.cpp | 10 +- src/mongo/db/query/get_executor.cpp | 5 +- src/mongo/db/query/plan_executor_factory.cpp | 13 +- src/mongo/db/query/plan_executor_factory.h | 2 + src/mongo/db/query/plan_executor_impl.cpp | 96 ++----------- src/mongo/db/query/plan_executor_impl.h | 27 ---- src/mongo/db/query/plan_executor_sbe.cpp | 107 +++++++++++--- src/mongo/db/query/plan_executor_sbe.h | 5 + src/mongo/db/query/plan_insert_listener.cpp | 123 ++++++++++++++++ src/mongo/db/query/plan_insert_listener.h | 81 +++++++++++ src/mongo/db/query/sbe_stage_builder.cpp | 119 +++++++++++++++- src/mongo/db/query/sbe_stage_builder.h | 24 +++- src/mongo/db/query/sbe_stage_builder_coll_scan.cpp | 111 +++++++++------ src/mongo/db/query/sbe_stage_builder_coll_scan.h | 3 + 21 files changed, 804 insertions(+), 197 deletions(-) create mode 100644 src/mongo/db/query/plan_insert_listener.cpp create mode 100644 src/mongo/db/query/plan_insert_listener.h diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index 760785efdc6..e4e8a43b8f7 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -1136,8 +1136,9 @@ env.Library( 'query/get_executor.cpp', 'query/internal_plans.cpp', 'query/plan_executor_impl.cpp', - 'query/plan_executor_sbe.cpp', 'query/plan_executor_factory.cpp', + 'query/plan_executor_sbe.cpp', + 'query/plan_insert_listener.cpp', 'query/plan_ranker.cpp', 'query/plan_yield_policy_impl.cpp', 'query/plan_yield_policy_sbe.cpp', diff --git a/src/mongo/db/exec/sbe/expressions/expression.cpp b/src/mongo/db/exec/sbe/expressions/expression.cpp index 68384b44ec5..b25936d2633 100644 --- a/src/mongo/db/exec/sbe/expressions/expression.cpp +++ b/src/mongo/db/exec/sbe/expressions/expression.cpp @@ -692,6 +692,88 @@ std::vector ETypeMatch::debugPrint() const { return ret; } +RuntimeEnvironment::RuntimeEnvironment(const RuntimeEnvironment& other) + : _state{other._state}, _isSmp{other._isSmp} { + for (auto&& [type, slot] : _state->slots) { + emplaceAccessor(slot.first, slot.second); + } +} + +RuntimeEnvironment::~RuntimeEnvironment() { + if (_state.use_count() == 1) { + for (size_t idx = 0; idx < _state->vals.size(); ++idx) { + if (_state->owned[idx]) { + releaseValue(_state->typeTags[idx], _state->vals[idx]); + } + } + } +} + +value::SlotId RuntimeEnvironment::registerSlot(StringData type, + value::TypeTags tag, + value::Value val, + bool owned, + value::SlotIdGenerator* slotIdGenerator) { + if (auto it = _state->slots.find(type); it == _state->slots.end()) { + invariant(slotIdGenerator); + auto slot = slotIdGenerator->generate(); + emplaceAccessor(slot, _state->pushSlot(type, slot)); + _accessors.at(slot).reset(owned, tag, val); + return slot; + } + + uasserted(4946303, str::stream() << "slot already registered:" << type); +} + +value::SlotId RuntimeEnvironment::getSlot(StringData type) { + if (auto it = _state->slots.find(type); it != _state->slots.end()) { + return it->second.first; + } + + uasserted(4946305, str::stream() << "environment slot is not registered for type: " << type); +} + +void RuntimeEnvironment::resetSlot(value::SlotId slot, + value::TypeTags tag, + value::Value val, + bool owned) { + // With intra-query parallelism enabled the global environment can hold only read-only values. + invariant(!_isSmp); + + if (auto it = _accessors.find(slot); it != _accessors.end()) { + it->second.reset(owned, tag, val); + return; + } + + uasserted(4946300, str::stream() << "undefined slot accessor:" << slot); +} + +value::SlotAccessor* RuntimeEnvironment::getAccessor(value::SlotId slot) { + if (auto it = _accessors.find(slot); it != _accessors.end()) { + return &it->second; + } + + uasserted(4946301, str::stream() << "undefined slot accessor:" << slot); +} + +std::unique_ptr RuntimeEnvironment::makeCopy(bool isSmp) { + // Once this environment is used to create a copy for a parallel plan execution, it becomes + // a parallel environment itself. + if (isSmp) { + _isSmp = isSmp; + } + + return std::unique_ptr(new RuntimeEnvironment(*this)); +} + +void RuntimeEnvironment::debugString(StringBuilder* builder) { + *builder << "env: { "; + for (auto&& [type, slot] : _state->slots) { + *builder << type << "=s" << slot.first << " "; + } + *builder << "}"; +} + value::SlotAccessor* CompileCtx::getAccessor(value::SlotId slot) { for (auto it = correlated.rbegin(); it != correlated.rend(); ++it) { if (it->first == slot) { @@ -699,7 +781,7 @@ value::SlotAccessor* CompileCtx::getAccessor(value::SlotId slot) { } } - uasserted(4822848, str::stream() << "undefined slot accessor:" << slot); + return env->getAccessor(slot); } std::shared_ptr CompileCtx::getSpoolBuffer(SpoolId spool) { @@ -716,5 +798,9 @@ void CompileCtx::pushCorrelated(value::SlotId slot, value::SlotAccessor* accesso void CompileCtx::popCorrelated() { correlated.pop_back(); } + +CompileCtx CompileCtx::makeCopy(bool isSmp) { + return {env->makeCopy(isSmp)}; +} } // namespace sbe } // namespace mongo diff --git a/src/mongo/db/exec/sbe/expressions/expression.h b/src/mongo/db/exec/sbe/expressions/expression.h index 66c04b33e3d..2192142e4ef 100644 --- a/src/mongo/db/exec/sbe/expressions/expression.h +++ b/src/mongo/db/exec/sbe/expressions/expression.h @@ -34,27 +34,185 @@ #include #include "mongo/db/exec/sbe/util/debug_print.h" +#include "mongo/db/exec/sbe/values/id_generators.h" #include "mongo/db/exec/sbe/values/value.h" #include "mongo/db/exec/sbe/vm/vm.h" #include "mongo/stdx/unordered_map.h" +#include "mongo/util/string_map.h" namespace mongo { namespace sbe { using SpoolBuffer = std::vector; +/** + * A holder for slots and accessors which are used in a PlanStage tree but: + * - Cannot be made constants due to restrictions on the lifetime of such values (e.g., they're + * singleton instances owned somewhere else). + * - Can be changed in runtime outside of the PlanStage tree (e.g., a resume recordId changed by a + * PlanExecutor). + * + * A RuntimeEnvironment object is created once per an execution thread. That means that each + * producer and consumer in a parallel plan will have their own compilation environment, with their + * own slot accessors. However, slot accessors in each of such environment will access shared data, + * which is the same across all environments. + * + * To avoid data races, the values stored in the runtime environment are considered read-only when + * used with a parallel plan. An attempt to change any slot with 'resetValue' will result in a user + * exception. + * + * If the runtime environment is used in a serial plan, modifications of the slots is allowed. + */ +class RuntimeEnvironment { +public: + RuntimeEnvironment() = default; + RuntimeEnvironment(RuntimeEnvironment&&) = delete; + RuntimeEnvironment& operator=(const RuntimeEnvironment&) = delete; + RuntimeEnvironment& operator=(const RuntimeEnvironment&&) = delete; + ~RuntimeEnvironment(); + + /** + * Registers and returns a SlotId for the given slot 'type'. The 'slotIdGenerartor' is used to + * generated a new SlotId for the given slot 'type', which is then registered with this + * environment by creating a new SlotAccessor. The value 'val' is then stored within the + * SlotAccessor and the newly generated SlotId is returned. + * + * Both owned and unowned values can be stored in the runtime environment. + * + * A user exception is raised if this slot 'type' has been already registered. + */ + value::SlotId registerSlot(StringData type, + value::TypeTags tag, + value::Value val, + bool owned, + value::SlotIdGenerator* slotIdGenerator); + + /** + * Returns a SlotId registered for the given slot 'type'. If the slot hasn't been registered + * yet, a user exception is raised.. + */ + value::SlotId getSlot(StringData type); + + /** + * Store the given value in the specified slot within this runtime environment instance. + * + * A user exception is raised if the SlotId is not registered within this environment, or + * if this environment is used with a parallel plan. + */ + void resetSlot(value::SlotId slot, value::TypeTags tag, value::Value val, bool owned); + + /** + * Returns a SlotAccessor for the given SlotId which must be previously registered within this + * Environment by invoking 'registerSlot' method. + * + * A user exception is raised if the SlotId is not registered within this environment. + */ + value::SlotAccessor* getAccessor(value::SlotId slot); + + /** + * Make a copy of his environment. The new environment will have its own set of SlotAccessors + * pointing to the same shared data holding slot values. + * + * To create a copy of the runtime environment for a parallel execution plan, the 'isSmp' flag + * must be set to 'true'. This will result in this environment being unconverted to a parallel + * environment, as well as the newly created copy. + */ + std::unique_ptr makeCopy(bool isSmp); + + /** + * Dumps all the slots currently defined in this environment into the given string builder. + */ + void debugString(StringBuilder* builder); + +private: + RuntimeEnvironment(const RuntimeEnvironment&); + + struct State { + auto pushSlot(StringData type, value::SlotId slot) { + auto index = vals.size(); + + typeTags.push_back(value::TypeTags::Nothing); + vals.push_back(0); + owned.push_back(false); + + auto [it, inserted] = slots.emplace(type, std::make_pair(slot, index)); + uassert(4946302, str::stream() << "duplicate environment slot: " << slot, inserted); + return index; + } + + StringMap> slots; + std::vector typeTags; + std::vector vals; + std::vector owned; + }; + + class Accessor final : public value::SlotAccessor { + public: + Accessor(RuntimeEnvironment* env, size_t index) : _env{env}, _index{index} {} + + std::pair getViewOfValue() const override { + return {_env->_state->typeTags[_index], _env->_state->vals[_index]}; + } + + std::pair copyOrMoveValue() override { + // Always make a copy. + return copyValue(_env->_state->typeTags[_index], _env->_state->vals[_index]); + } + + void reset(bool owned, value::TypeTags tag, value::Value val) { + release(); + + _env->_state->typeTags[_index] = tag; + _env->_state->vals[_index] = val; + _env->_state->owned[_index] = owned; + } + + private: + void release() { + if (_env->_state->owned[_index]) { + releaseValue(_env->_state->typeTags[_index], _env->_state->vals[_index]); + _env->_state->owned[_index] = false; + } + } + + RuntimeEnvironment* const _env; + const size_t _index; + }; + + void emplaceAccessor(value::SlotId slot, size_t index) { + _accessors.emplace(slot, Accessor{this, index}); + } + + std::shared_ptr _state{std::make_shared()}; + value::SlotMap _accessors; + bool _isSmp{false}; + + friend class Accessor; +}; + class PlanStage; struct CompileCtx { + CompileCtx(std::unique_ptr env) : env{std::move(env)} {} + value::SlotAccessor* getAccessor(value::SlotId slot); std::shared_ptr getSpoolBuffer(SpoolId spool); void pushCorrelated(value::SlotId slot, value::SlotAccessor* accessor); void popCorrelated(); + CompileCtx makeCopy(bool isSmp); + PlanStage* root{nullptr}; value::SlotAccessor* accumulator{nullptr}; std::vector> correlated; stdx::unordered_map> spoolBuffers; bool aggExpression{false}; + +private: + // Any data that a PlanStage needs from the RuntimeEnvironment should not be accessed directly + // but insteady by looking up the corresponding slots. These slots are set up during the process + // of building PlanStages, so the PlanStages themselves should never need to add new slots to + // the RuntimeEnvironment. + std::unique_ptr env; }; /** diff --git a/src/mongo/db/exec/sbe/sbe_key_string_test.cpp b/src/mongo/db/exec/sbe/sbe_key_string_test.cpp index b69ab6ad3e3..303cf6efef1 100644 --- a/src/mongo/db/exec/sbe/sbe_key_string_test.cpp +++ b/src/mongo/db/exec/sbe/sbe_key_string_test.cpp @@ -115,7 +115,7 @@ TEST(SBEKeyStringTest, Basic) { // Set up an SBE expression that will compare one element in the 'testValues' BSON object with // one of the KeyString components. - CompileCtx ctx; + CompileCtx ctx{std::make_unique()}; CoScanStage emptyStage; ctx.root = &emptyStage; diff --git a/src/mongo/db/exec/sbe/sbe_numeric_convert_test.cpp b/src/mongo/db/exec/sbe/sbe_numeric_convert_test.cpp index 372b67fa5b6..e31effefdce 100644 --- a/src/mongo/db/exec/sbe/sbe_numeric_convert_test.cpp +++ b/src/mongo/db/exec/sbe/sbe_numeric_convert_test.cpp @@ -104,7 +104,7 @@ protected: ASSERT_EQUALS(tag, value::TypeTags::Nothing); } - CompileCtx _ctx; + CompileCtx _ctx{std::make_unique()}; CoScanStage _emptyStage; vm::ByteCode _interpreter; diff --git a/src/mongo/db/exec/sbe/stages/exchange.cpp b/src/mongo/db/exec/sbe/stages/exchange.cpp index e644c162e59..0c2535878d9 100644 --- a/src/mongo/db/exec/sbe/stages/exchange.cpp +++ b/src/mongo/db/exec/sbe/stages/exchange.cpp @@ -181,6 +181,10 @@ void ExchangeConsumer::prepare(CompileCtx& ctx) { for (size_t idx = 0; idx < _state->fields().size(); ++idx) { _outgoing.emplace_back(ExchangeBuffer::Accessor{}); } + + for (size_t idx = 0; idx < _state->numOfProducers(); ++idx) { + _state->producerCompileCtxs().push_back(ctx.makeCopy(true)); + } // Compile '<' function once we implement order preserving exchange. } value::SlotAccessor* ExchangeConsumer::getAccessor(CompileCtx& ctx, value::SlotId slot) { @@ -245,6 +249,7 @@ void ExchangeConsumer::open(bool reOpen) { } // Start n producers. + invariant(_state->producerCompileCtxs().size() == _state->numOfProducers()); for (size_t idx = 0; idx < _state->numOfProducers(); ++idx) { auto pf = makePromiseFuture(); s_globalThreadPool->schedule( @@ -255,6 +260,7 @@ void ExchangeConsumer::open(bool reOpen) { promise.setWith([&] { ExchangeProducer::start(opCtx.get(), + _state->producerCompileCtxs()[idx], std::move(_state->producerPlans()[idx])); }); }); @@ -446,13 +452,14 @@ ExchangeProducer::ExchangeProducer(std::unique_ptr input, } } -void ExchangeProducer::start(OperationContext* opCtx, std::unique_ptr producer) { +void ExchangeProducer::start(OperationContext* opCtx, + CompileCtx& ctx, + std::unique_ptr producer) { ExchangeProducer* p = static_cast(producer.get()); p->attachFromOperationContext(opCtx); try { - CompileCtx ctx; p->prepare(ctx); p->open(false); @@ -475,6 +482,7 @@ std::unique_ptr ExchangeProducer::clone() const { void ExchangeProducer::prepare(CompileCtx& ctx) { _children[0]->prepare(ctx); + for (auto& f : _state->fields()) { _incoming.emplace_back(_children[0]->getAccessor(ctx, f)); } diff --git a/src/mongo/db/exec/sbe/stages/exchange.h b/src/mongo/db/exec/sbe/stages/exchange.h index 3fb3a3b91c5..c73dd19ebf5 100644 --- a/src/mongo/db/exec/sbe/stages/exchange.h +++ b/src/mongo/db/exec/sbe/stages/exchange.h @@ -196,6 +196,11 @@ public: auto& producerPlans() { return _producerPlans; } + + auto& producerCompileCtxs() { + return _producerCompileCtxs; + } + auto& producerResults() { return _producerResults; } @@ -218,6 +223,7 @@ private: std::vector _consumers; std::vector _producers; std::vector> _producerPlans; + std::vector _producerCompileCtxs; std::vector> _producerResults; // Variables (fields) that pass through the exchange. @@ -296,7 +302,9 @@ class ExchangeProducer final : public PlanStage { public: ExchangeProducer(std::unique_ptr input, std::shared_ptr state); - static void start(OperationContext* opCtx, std::unique_ptr producer); + static void start(OperationContext* opCtx, + CompileCtx& ctx, + std::unique_ptr producer); std::unique_ptr clone() const final; diff --git a/src/mongo/db/exec/sbe_cmd.cpp b/src/mongo/db/exec/sbe_cmd.cpp index d544e79599f..1e46cec6d4f 100644 --- a/src/mongo/db/exec/sbe_cmd.cpp +++ b/src/mongo/db/exec/sbe_cmd.cpp @@ -78,12 +78,12 @@ public: NamespaceString nss{dbname}; + stage_builder::PlanStageData data{std::make_unique()}; + data.resultSlot = resultSlot; + data.recordIdSlot = recordIdSlot; + exec = uassertStatusOK(plan_executor_factory::make( - opCtx, - nullptr, - {std::move(root), stage_builder::PlanStageData{resultSlot, recordIdSlot}}, - nss, - nullptr)); + opCtx, nullptr, {std::move(root), std::move(data)}, nullptr, nss, nullptr)); for (long long objCount = 0; objCount < batchSize; objCount++) { BSONObj next; diff --git a/src/mongo/db/query/get_executor.cpp b/src/mongo/db/query/get_executor.cpp index a63e841a3e0..3f14abe3645 100644 --- a/src/mongo/db/query/get_executor.cpp +++ b/src/mongo/db/query/get_executor.cpp @@ -898,7 +898,7 @@ protected: auto result = makeResult(); result->emplace( {sbe::makeS(sbe::makeS(), 0, boost::none), - stage_builder::PlanStageData{}}, + stage_builder::PlanStageData{std::make_unique()}}, nullptr); return result; } @@ -1072,6 +1072,7 @@ StatusWith> getSlotBasedExe return plan_executor_factory::make(opCtx, std::move(cq), {std::move(plan.root), std::move(plan.data)}, + collection, {}, std::move(plan.results), std::move(yieldPolicy)); @@ -1079,7 +1080,7 @@ StatusWith> getSlotBasedExe // No need for runtime planning, just use the constructed plan stage tree. invariant(roots.size() == 1); return plan_executor_factory::make( - opCtx, std::move(cq), std::move(roots[0]), {}, std::move(yieldPolicy)); + opCtx, std::move(cq), std::move(roots[0]), collection, {}, std::move(yieldPolicy)); } } // namespace diff --git a/src/mongo/db/query/plan_executor_factory.cpp b/src/mongo/db/query/plan_executor_factory.cpp index fa16042eaf0..3630e975646 100644 --- a/src/mongo/db/query/plan_executor_factory.cpp +++ b/src/mongo/db/query/plan_executor_factory.cpp @@ -113,6 +113,7 @@ StatusWith> make( OperationContext* opCtx, std::unique_ptr cq, std::pair, stage_builder::PlanStageData> root, + const Collection* collection, NamespaceString nss, std::unique_ptr yieldPolicy) { @@ -129,6 +130,7 @@ StatusWith> make( auto exec = new PlanExecutorSBE(opCtx, std::move(cq), std::move(root), + collection, std::move(nss), false, boost::none, @@ -140,6 +142,7 @@ StatusWith> make( OperationContext* opCtx, std::unique_ptr cq, std::pair, stage_builder::PlanStageData> root, + const Collection* collection, NamespaceString nss, std::queue>> stash, std::unique_ptr yieldPolicy) { @@ -152,8 +155,14 @@ StatusWith> make( "slots"_attr = data.debugString(), "stages"_attr = sbe::DebugPrinter{}.print(rootStage.get())); - auto exec = new PlanExecutorSBE( - opCtx, std::move(cq), std::move(root), std::move(nss), true, stash, std::move(yieldPolicy)); + auto exec = new PlanExecutorSBE(opCtx, + std::move(cq), + std::move(root), + collection, + std::move(nss), + true, + stash, + std::move(yieldPolicy)); return {{exec, PlanExecutor::Deleter{opCtx}}}; } diff --git a/src/mongo/db/query/plan_executor_factory.h b/src/mongo/db/query/plan_executor_factory.h index 8faad454285..56f6fe87cf4 100644 --- a/src/mongo/db/query/plan_executor_factory.h +++ b/src/mongo/db/query/plan_executor_factory.h @@ -105,6 +105,7 @@ StatusWith> make( OperationContext* opCtx, std::unique_ptr cq, std::pair, stage_builder::PlanStageData> root, + const Collection* collection, NamespaceString nss, std::unique_ptr yieldPolicy); @@ -117,6 +118,7 @@ StatusWith> make( OperationContext* opCtx, std::unique_ptr cq, std::pair, stage_builder::PlanStageData> root, + const Collection* collection, NamespaceString nss, std::queue>> stash, std::unique_ptr yieldPolicy); diff --git a/src/mongo/db/query/plan_executor_impl.cpp b/src/mongo/db/query/plan_executor_impl.cpp index 8b0126ed50b..bd12126709d 100644 --- a/src/mongo/db/query/plan_executor_impl.cpp +++ b/src/mongo/db/query/plan_executor_impl.cpp @@ -37,8 +37,6 @@ #include "mongo/bson/simple_bsonobj_comparator.h" #include "mongo/db/catalog/collection.h" -#include "mongo/db/catalog/database.h" -#include "mongo/db/catalog/database_holder.h" #include "mongo/db/concurrency/write_conflict_exception.h" #include "mongo/db/curop.h" #include "mongo/db/exec/cached_plan.h" @@ -56,9 +54,8 @@ #include "mongo/db/exec/trial_stage.h" #include "mongo/db/exec/working_set.h" #include "mongo/db/exec/working_set_common.h" -#include "mongo/db/query/explain.h" -#include "mongo/db/query/find_common.h" #include "mongo/db/query/mock_yield_policies.h" +#include "mongo/db/query/plan_insert_listener.h" #include "mongo/db/query/plan_yield_policy_impl.h" #include "mongo/db/repl/replication_coordinator.h" #include "mongo/db/service_context.h" @@ -77,16 +74,10 @@ using std::vector; const OperationContext::Decoration clientsLastKnownCommittedOpTime = OperationContext::declareDecoration(); -struct CappedInsertNotifierData { - shared_ptr notifier; - uint64_t lastEOFVersion = ~0; -}; - namespace { MONGO_FAIL_POINT_DEFINE(planExecutorAlwaysFails); MONGO_FAIL_POINT_DEFINE(planExecutorHangBeforeShouldWaitForInserts); -MONGO_FAIL_POINT_DEFINE(planExecutorHangWhileYieldedInWaitForInserts); /** * Constructs a PlanYieldPolicy based on 'policy'. @@ -326,77 +317,6 @@ PlanExecutor::ExecState PlanExecutorImpl::getNextDocument(Document* objOut, Reco return state; } -bool PlanExecutorImpl::_shouldListenForInserts() { - return _cq && _cq->getQueryRequest().isTailableAndAwaitData() && - awaitDataState(_opCtx).shouldWaitForInserts && _opCtx->checkForInterruptNoAssert().isOK() && - awaitDataState(_opCtx).waitForInsertsDeadline > - _opCtx->getServiceContext()->getPreciseClockSource()->now(); -} - -bool PlanExecutorImpl::_shouldWaitForInserts() { - // If this is an awaitData-respecting operation and we have time left and we're not interrupted, - // we should wait for inserts. - if (_shouldListenForInserts()) { - // We expect awaitData cursors to be yielding. - invariant(_yieldPolicy->canReleaseLocksDuringExecution()); - - // For operations with a last committed opTime, we should not wait if the replication - // coordinator's lastCommittedOpTime has progressed past the client's lastCommittedOpTime. - // In that case, we will return early so that we can inform the client of the new - // lastCommittedOpTime immediately. - if (!clientsLastKnownCommittedOpTime(_opCtx).isNull()) { - auto replCoord = repl::ReplicationCoordinator::get(_opCtx); - return clientsLastKnownCommittedOpTime(_opCtx) >= replCoord->getLastCommittedOpTime(); - } - return true; - } - return false; -} - -std::shared_ptr PlanExecutorImpl::_getCappedInsertNotifier() { - // We don't expect to need a capped insert notifier for non-yielding plans. - invariant(_yieldPolicy->canReleaseLocksDuringExecution()); - - // We can only wait if we have a collection; otherwise we should retry immediately when - // we hit EOF. - dassert(_opCtx->lockState()->isCollectionLockedForMode(_nss, MODE_IS)); - auto databaseHolder = DatabaseHolder::get(_opCtx); - auto db = databaseHolder->getDb(_opCtx, _nss.db()); - invariant(db); - auto collection = CollectionCatalog::get(_opCtx).lookupCollectionByNamespace(_opCtx, _nss); - invariant(collection); - - return collection->getCappedInsertNotifier(); -} - -void PlanExecutorImpl::_waitForInserts(CappedInsertNotifierData* notifierData) { - invariant(notifierData->notifier); - - // The notifier wait() method will not wait unless the version passed to it matches the - // current version of the notifier. Since the version passed to it is the current version - // of the notifier at the time of the previous EOF, we require two EOFs in a row with no - // notifier version change in order to wait. This is sufficient to ensure we never wait - // when data is available. - auto curOp = CurOp::get(_opCtx); - curOp->pauseTimer(); - ON_BLOCK_EXIT([curOp] { curOp->resumeTimer(); }); - auto opCtx = _opCtx; - uint64_t currentNotifierVersion = notifierData->notifier->getVersion(); - auto yieldResult = _yieldPolicy->yieldOrInterrupt(opCtx, [opCtx, notifierData] { - const auto deadline = awaitDataState(opCtx).waitForInsertsDeadline; - notifierData->notifier->waitUntil(notifierData->lastEOFVersion, deadline); - if (MONGO_unlikely(planExecutorHangWhileYieldedInWaitForInserts.shouldFail())) { - LOGV2(4452903, - "PlanExecutor - planExecutorHangWhileYieldedInWaitForInserts fail point enabled. " - "Blocking until fail point is disabled"); - planExecutorHangWhileYieldedInWaitForInserts.pauseWhileSet(); - } - }); - notifierData->lastEOFVersion = currentNotifierVersion; - - uassertStatusOK(yieldResult); -} - PlanExecutor::ExecState PlanExecutorImpl::_getNextImpl(Snapshotted* objOut, RecordId* dlOut) { if (MONGO_unlikely(planExecutorAlwaysFails.shouldFail())) { @@ -422,10 +342,11 @@ PlanExecutor::ExecState PlanExecutorImpl::_getNextImpl(Snapshotted* ob // Capped insert data; declared outside the loop so we hold a shared pointer to the capped // insert notifier the entire time we are in the loop. Holding a shared pointer to the capped // insert notifier is necessary for the notifierVersion to advance. - CappedInsertNotifierData cappedInsertNotifierData; - if (_shouldListenForInserts()) { + insert_listener::CappedInsertNotifierData cappedInsertNotifierData; + if (insert_listener::shouldListenForInserts(_opCtx, _cq.get())) { // We always construct the CappedInsertNotifier for awaitData cursors. - cappedInsertNotifierData.notifier = _getCappedInsertNotifier(); + cappedInsertNotifierData.notifier = + insert_listener::getCappedInsertNotifier(_opCtx, _nss, _yieldPolicy.get()); } for (;;) { // These are the conditions which can cause us to yield: @@ -519,10 +440,13 @@ PlanExecutor::ExecState PlanExecutorImpl::_getNextImpl(Snapshotted* ob "enabled. Blocking until fail point is disabled"); planExecutorHangBeforeShouldWaitForInserts.pauseWhileSet(); } - if (!_shouldWaitForInserts()) { + + if (!insert_listener::shouldWaitForInserts(_opCtx, _cq.get(), _yieldPolicy.get())) { return PlanExecutor::IS_EOF; } - _waitForInserts(&cappedInsertNotifierData); + + insert_listener::waitForInserts(_opCtx, _yieldPolicy.get(), &cappedInsertNotifierData); + // There may be more results, keep going. continue; } diff --git a/src/mongo/db/query/plan_executor_impl.h b/src/mongo/db/query/plan_executor_impl.h index ac586aef59b..f4efb60b8af 100644 --- a/src/mongo/db/query/plan_executor_impl.h +++ b/src/mongo/db/query/plan_executor_impl.h @@ -117,33 +117,6 @@ private: */ Status _pickBestPlan(); - /** - * Returns true if the PlanExecutor should listen for inserts, which is when a getMore is called - * on a tailable and awaitData cursor that still has time left and hasn't been interrupted. - */ - bool _shouldListenForInserts(); - - /** - * Returns true if the PlanExecutor should wait for data to be inserted, which is when a getMore - * is called on a tailable and awaitData cursor on a capped collection. Returns false if an EOF - * should be returned immediately. - */ - bool _shouldWaitForInserts(); - - /** - * Gets the CappedInsertNotifier for a capped collection. Returns nullptr if this plan executor - * is not capable of yielding based on a notifier. - */ - std::shared_ptr _getCappedInsertNotifier(); - - /** - * Called for tailable and awaitData cursors in order to yield locks and waits for inserts to - * the collection being tailed. Returns control to the caller once there has been an insertion - * and there may be new results. If the PlanExecutor was killed during a yield, throws an - * exception. - */ - void _waitForInserts(CappedInsertNotifierData* notifierData); - ExecState _getNextImpl(Snapshotted* objOut, RecordId* dlOut); // The OperationContext that we're executing within. This can be updated if necessary by using diff --git a/src/mongo/db/query/plan_executor_sbe.cpp b/src/mongo/db/query/plan_executor_sbe.cpp index 5d9375f7ed4..9bd146d516f 100644 --- a/src/mongo/db/query/plan_executor_sbe.cpp +++ b/src/mongo/db/query/plan_executor_sbe.cpp @@ -31,8 +31,10 @@ #include "mongo/db/query/plan_executor_sbe.h" +#include "mongo/db/db_raii.h" #include "mongo/db/exec/sbe/expressions/expression.h" #include "mongo/db/exec/sbe/values/bson.h" +#include "mongo/db/query/plan_insert_listener.h" #include "mongo/db/query/sbe_stage_builder.h" namespace mongo { @@ -40,6 +42,7 @@ PlanExecutorSBE::PlanExecutorSBE( OperationContext* opCtx, std::unique_ptr cq, std::pair, stage_builder::PlanStageData> root, + const Collection* collection, NamespaceString nss, bool isOpen, boost::optional>>> stash, @@ -47,6 +50,8 @@ PlanExecutorSBE::PlanExecutorSBE( : _state{isOpen ? State::kOpened : State::kClosed}, _opCtx(opCtx), _nss(std::move(nss)), + _env{root.second.env}, + _ctx(std::move(root.second.ctx)), _root(std::move(root.first)), _cq{std::move(cq)}, _yieldPolicy(std::move(yieldPolicy)) { @@ -69,6 +74,10 @@ PlanExecutorSBE::PlanExecutorSBE( uassert(4822867, "Query does not have oplogTs slot.", _oplogTs); } + if (data.shouldUseTailableScan) { + _resumeRecordIdSlot = _env->getSlot("resumeRecordId"_sd); + } + _shouldTrackLatestOplogTimestamp = data.shouldTrackLatestOplogTimestamp; _shouldTrackResumeToken = data.shouldTrackResumeToken; @@ -84,6 +93,16 @@ PlanExecutorSBE::PlanExecutorSBE( if (_yieldPolicy) { _yieldPolicy->setRootStage(_root.get()); } + + // We may still need to initialize _nss from either collection or _cq. + if (_nss.isEmpty()) { + if (collection) { + _nss = collection->ns(); + } else { + invariant(_cq); + _nss = _cq->getQueryRequest().nss(); + } + } } void PlanExecutorSBE::saveState() { @@ -160,24 +179,64 @@ PlanExecutor::ExecState PlanExecutorSBE::getNext(BSONObj* out, RecordId* dlOut) // fetching the next document. _root->close(); _state = State::kClosed; - return PlanExecutor::ExecState::IS_EOF; + if (!_resumeRecordIdSlot) { + return PlanExecutor::ExecState::IS_EOF; + } } - if (_state == State::kClosed) { - _state = State::kOpened; - _root->open(false); + // Capped insert data; declared outside the loop so we hold a shared pointer to the capped + // insert notifier the entire time we are in the loop. Holding a shared pointer to the capped + // insert notifier is necessary for the notifierVersion to advance. + // + // Note that we need to hold a database intent lock before acquiring a notifier. + boost::optional coll; + insert_listener::CappedInsertNotifierData cappedInsertNotifierData; + if (insert_listener::shouldListenForInserts(_opCtx, _cq.get())) { + if (!_opCtx->lockState()->isCollectionLockedForMode(_nss, MODE_IS)) { + coll.emplace(_opCtx, _nss); + } + + cappedInsertNotifierData.notifier = + insert_listener::getCappedInsertNotifier(_opCtx, _nss, _yieldPolicy.get()); } - invariant(_state == State::kOpened); + for (;;) { + if (_state == State::kClosed) { + if (_resumeRecordIdSlot) { + invariant(_resultRecordId); + + auto [tag, val] = _resultRecordId->getViewOfValue(); + uassert(4946306, + "Collection scan was asked to track resume token, but found a result " + "without a valid RecordId", + tag == sbe::value::TypeTags::NumberInt64 || + tag == sbe::value::TypeTags::Nothing); + _env->resetSlot(*_resumeRecordIdSlot, tag, val, false); + } + + _state = State::kOpened; + _root->open(false); + } - auto result = fetchNext(_root.get(), _result, _resultRecordId, out, dlOut); - if (result == sbe::PlanState::IS_EOF) { - _root->close(); - _state = State::kClosed; - return PlanExecutor::ExecState::IS_EOF; + invariant(_state == State::kOpened); + + auto result = fetchNext(_root.get(), _result, _resultRecordId, out, dlOut); + if (result == sbe::PlanState::IS_EOF) { + _root->close(); + _state = State::kClosed; + + if (!insert_listener::shouldWaitForInserts(_opCtx, _cq.get(), _yieldPolicy.get())) { + return PlanExecutor::ExecState::IS_EOF; + } + + insert_listener::waitForInserts(_opCtx, _yieldPolicy.get(), &cappedInsertNotifierData); + // There may be more results, keep going. + continue; + } + + invariant(result == sbe::PlanState::ADVANCED); + return PlanExecutor::ExecState::ADVANCED; } - invariant(result == sbe::PlanState::ADVANCED); - return PlanExecutor::ExecState::ADVANCED; } Timestamp PlanExecutorSBE::getLatestOplogTimestamp() const { @@ -185,11 +244,13 @@ Timestamp PlanExecutorSBE::getLatestOplogTimestamp() const { invariant(_oplogTs); auto [tag, val] = _oplogTs->getViewOfValue(); - uassert(4822868, - "Collection scan was asked to track latest operation time, " - "but found a result without a valid 'ts' field", - tag == sbe::value::TypeTags::Timestamp); - return Timestamp{sbe::value::bitcastTo(val)}; + if (tag != sbe::value::TypeTags::Nothing) { + uassert(4822868, + str::stream() << "Collection scan was asked to track latest operation time, " + "but found a result without a valid 'ts' field", + tag == sbe::value::TypeTags::Timestamp); + return Timestamp{sbe::value::bitcastTo(val)}; + } } return {}; } @@ -199,11 +260,13 @@ BSONObj PlanExecutorSBE::getPostBatchResumeToken() const { invariant(_resultRecordId); auto [tag, val] = _resultRecordId->getViewOfValue(); - uassert(4822869, - "Collection scan was asked to track resume token, " - "but found a result without a valid RecordId", - tag == sbe::value::TypeTags::NumberInt64); - return BSON("$recordId" << sbe::value::bitcastTo(val)); + if (tag != sbe::value::TypeTags::Nothing) { + uassert(4822869, + "Collection scan was asked to track resume token, " + "but found a result without a valid RecordId", + tag == sbe::value::TypeTags::NumberInt64); + return BSON("$recordId" << sbe::value::bitcastTo(val)); + } } return {}; } diff --git a/src/mongo/db/query/plan_executor_sbe.h b/src/mongo/db/query/plan_executor_sbe.h index 5ff2f0fe05d..de16b910fcb 100644 --- a/src/mongo/db/query/plan_executor_sbe.h +++ b/src/mongo/db/query/plan_executor_sbe.h @@ -43,6 +43,7 @@ public: OperationContext* opCtx, std::unique_ptr cq, std::pair, stage_builder::PlanStageData> root, + const Collection* collection, NamespaceString nss, bool isOpen, boost::optional>>> stash, @@ -129,11 +130,15 @@ private: NamespaceString _nss; + // CompileCtx owns the instance pointed by _env, so we must keep it around. + sbe::RuntimeEnvironment* _env{nullptr}; + sbe::CompileCtx _ctx; std::unique_ptr _root; sbe::value::SlotAccessor* _result{nullptr}; sbe::value::SlotAccessor* _resultRecordId{nullptr}; sbe::value::SlotAccessor* _oplogTs{nullptr}; + boost::optional _resumeRecordIdSlot; bool _shouldTrackLatestOplogTimestamp{false}; bool _shouldTrackResumeToken{false}; diff --git a/src/mongo/db/query/plan_insert_listener.cpp b/src/mongo/db/query/plan_insert_listener.cpp new file mode 100644 index 00000000000..ea98f68e6ee --- /dev/null +++ b/src/mongo/db/query/plan_insert_listener.cpp @@ -0,0 +1,123 @@ +/** + * Copyright (C) 2020-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kQuery + +#include "mongo/platform/basic.h" + +#include "mongo/db/query/plan_insert_listener.h" + +#include "mongo/db/catalog/database.h" +#include "mongo/db/catalog/database_holder.h" +#include "mongo/db/curop.h" +#include "mongo/db/query/find_common.h" +#include "mongo/logv2/log.h" +#include "mongo/util/fail_point.h" + +namespace mongo::insert_listener { +namespace { +MONGO_FAIL_POINT_DEFINE(planExecutorHangWhileYieldedInWaitForInserts); +} + +bool shouldListenForInserts(OperationContext* opCtx, CanonicalQuery* cq) { + return cq && cq->getQueryRequest().isTailableAndAwaitData() && + awaitDataState(opCtx).shouldWaitForInserts && opCtx->checkForInterruptNoAssert().isOK() && + awaitDataState(opCtx).waitForInsertsDeadline > + opCtx->getServiceContext()->getPreciseClockSource()->now(); +} + +bool shouldWaitForInserts(OperationContext* opCtx, + CanonicalQuery* cq, + PlanYieldPolicy* yieldPolicy) { + // If this is an awaitData-respecting operation and we have time left and we're not interrupted, + // we should wait for inserts. + if (shouldListenForInserts(opCtx, cq)) { + // We expect awaitData cursors to be yielding. + invariant(yieldPolicy->canReleaseLocksDuringExecution()); + + // For operations with a last committed opTime, we should not wait if the replication + // coordinator's lastCommittedOpTime has progressed past the client's lastCommittedOpTime. + // In that case, we will return early so that we can inform the client of the new + // lastCommittedOpTime immediately. + if (!clientsLastKnownCommittedOpTime(opCtx).isNull()) { + auto replCoord = repl::ReplicationCoordinator::get(opCtx); + return clientsLastKnownCommittedOpTime(opCtx) >= replCoord->getLastCommittedOpTime(); + } + return true; + } + return false; +} + +std::shared_ptr getCappedInsertNotifier(OperationContext* opCtx, + const NamespaceString& nss, + PlanYieldPolicy* yieldPolicy) { + // We don't expect to need a capped insert notifier for non-yielding plans. + invariant(yieldPolicy->canReleaseLocksDuringExecution()); + + // We can only wait if we have a collection; otherwise we should retry immediately when + // we hit EOF. + dassert(opCtx->lockState()->isCollectionLockedForMode(nss, MODE_IS)); + auto databaseHolder = DatabaseHolder::get(opCtx); + auto db = databaseHolder->getDb(opCtx, nss.db()); + invariant(db); + auto collection = CollectionCatalog::get(opCtx).lookupCollectionByNamespace(opCtx, nss); + invariant(collection); + + return collection->getCappedInsertNotifier(); +} + +void waitForInserts(OperationContext* opCtx, + PlanYieldPolicy* yieldPolicy, + CappedInsertNotifierData* notifierData) { + invariant(notifierData->notifier); + + // The notifier wait() method will not wait unless the version passed to it matches the + // current version of the notifier. Since the version passed to it is the current version + // of the notifier at the time of the previous EOF, we require two EOFs in a row with no + // notifier version change in order to wait. This is sufficient to ensure we never wait + // when data is available. + auto curOp = CurOp::get(opCtx); + curOp->pauseTimer(); + ON_BLOCK_EXIT([curOp] { curOp->resumeTimer(); }); + + uint64_t currentNotifierVersion = notifierData->notifier->getVersion(); + auto yieldResult = yieldPolicy->yieldOrInterrupt(opCtx, [opCtx, notifierData] { + const auto deadline = awaitDataState(opCtx).waitForInsertsDeadline; + notifierData->notifier->waitUntil(notifierData->lastEOFVersion, deadline); + if (MONGO_unlikely(planExecutorHangWhileYieldedInWaitForInserts.shouldFail())) { + LOGV2(4452903, + "PlanExecutor - planExecutorHangWhileYieldedInWaitForInserts fail point enabled. " + "Blocking until fail point is disabled"); + planExecutorHangWhileYieldedInWaitForInserts.pauseWhileSet(); + } + }); + notifierData->lastEOFVersion = currentNotifierVersion; + + uassertStatusOK(yieldResult); +} +} // namespace mongo::insert_listener diff --git a/src/mongo/db/query/plan_insert_listener.h b/src/mongo/db/query/plan_insert_listener.h new file mode 100644 index 00000000000..be4407d3df8 --- /dev/null +++ b/src/mongo/db/query/plan_insert_listener.h @@ -0,0 +1,81 @@ +/** + * Copyright (C) 2020-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/db/catalog/collection.h" +#include "mongo/db/operation_context.h" +#include "mongo/db/query/canonical_query.h" +#include "mongo/db/query/plan_yield_policy.h" + +namespace mongo::insert_listener { +/** + * A helper wrapper struct around CappedInsertNotifier which also holds the last version returned + * by the 'notifier'. + */ +struct CappedInsertNotifierData { + std::shared_ptr notifier; + uint64_t lastEOFVersion = ~0; +}; + +/** + * Returns true if the PlanExecutor should listen for inserts, which is when a getMore is called + * on a tailable and awaitData cursor that still has time left and hasn't been interrupted. + */ + +bool shouldListenForInserts(OperationContext* opCtx, CanonicalQuery* cq); + +/** + * Returns true if the PlanExecutor should wait for data to be inserted, which is when a getMore + * is called on a tailable and awaitData cursor on a capped collection. Returns false if an EOF + * should be returned immediately. + */ +bool shouldWaitForInserts(OperationContext* opCtx, + CanonicalQuery* cq, + PlanYieldPolicy* yieldPolicy); + +/** + * Gets the CappedInsertNotifier for a capped collection. Returns nullptr if this plan executor + * is not capable of yielding based on a notifier. + */ +std::shared_ptr getCappedInsertNotifier(OperationContext* opCtx, + const NamespaceString& nss, + PlanYieldPolicy* yieldPolicy); + +/** + * Called for tailable and awaitData cursors in order to yield locks and waits for inserts to + * the collection being tailed. Returns control to the caller once there has been an insertion + * and there may be new results. If the PlanExecutor was killed during a yield, throws an + * exception. + */ + +void waitForInserts(OperationContext* opCtx, + PlanYieldPolicy* yieldPolicy, + CappedInsertNotifierData* notifierData); +} // namespace mongo::insert_listener diff --git a/src/mongo/db/query/sbe_stage_builder.cpp b/src/mongo/db/query/sbe_stage_builder.cpp index 076b304aa5d..63a6c7952aa 100644 --- a/src/mongo/db/query/sbe_stage_builder.cpp +++ b/src/mongo/db/query/sbe_stage_builder.cpp @@ -52,8 +52,15 @@ #include "mongo/db/query/sbe_stage_builder_filter.h" #include "mongo/db/query/sbe_stage_builder_index_scan.h" #include "mongo/db/query/sbe_stage_builder_projection.h" +#include "mongo/db/query/util/make_data_structure.h" namespace mongo::stage_builder { +std::unique_ptr makeRuntimeEnvironment( + OperationContext* opCtx, sbe::value::SlotIdGenerator* slotIdGenerator) { + auto env = std::make_unique(); + return env; +} + std::unique_ptr SlotBasedStageBuilder::buildCollScan( const QuerySolutionNode* root) { auto csn = static_cast(root); @@ -63,12 +70,15 @@ std::unique_ptr SlotBasedStageBuilder::buildCollScan( csn, &_slotIdGenerator, _yieldPolicy, + _data.env, + _isTailableCollScanResumeBranch, _data.trialRunProgressTracker.get()); _data.resultSlot = resultSlot; _data.recordIdSlot = recordIdSlot; _data.oplogTsSlot = oplogTsSlot; _data.shouldTrackLatestOplogTimestamp = csn->shouldTrackLatestOplogTimestamp; _data.shouldTrackResumeToken = csn->requestResumeToken; + _data.shouldUseTailableScan = csn->tailable; if (_returnKeySlot) { // Assign the '_returnKeySlot' to be the empty object. @@ -144,14 +154,15 @@ std::unique_ptr SlotBasedStageBuilder::buildFetch(const QuerySol std::unique_ptr SlotBasedStageBuilder::buildLimit(const QuerySolutionNode* root) { const auto ln = static_cast(root); - // If we have both limit and skip stages and the skip stage is beneath the limit, then we can - // combine these two stages into one. So, save the _limit value and let the skip stage builder - // handle it. + // If we have both limit and skip stages and the skip stage is beneath the limit, then we + // can combine these two stages into one. So, save the _limit value and let the skip stage + // builder handle it. if (ln->children[0]->getType() == StageType::STAGE_SKIP) { _limit = ln->limit; } + auto inputStage = build(ln->children[0]); - return _limit + return _limit || _isTailableCollScanResumeBranch ? std::move(inputStage) : std::make_unique(std::move(inputStage), ln->limit, boost::none); } @@ -159,7 +170,9 @@ std::unique_ptr SlotBasedStageBuilder::buildLimit(const QuerySol std::unique_ptr SlotBasedStageBuilder::buildSkip(const QuerySolutionNode* root) { const auto sn = static_cast(root); auto inputStage = build(sn->children[0]); - return std::make_unique(std::move(inputStage), _limit, sn->skip); + return _isTailableCollScanResumeBranch + ? std::move(inputStage) + : std::make_unique(std::move(inputStage), _limit, sn->skip); } std::unique_ptr SlotBasedStageBuilder::buildSort(const QuerySolutionNode* root) { @@ -464,6 +477,86 @@ std::unique_ptr SlotBasedStageBuilder::buildReturnKey( return stage; } +std::unique_ptr SlotBasedStageBuilder::makeUnionForTailableCollScan( + const QuerySolutionNode* root) { + using namespace std::literals; + + // Register a SlotId in the global environment which would contain a recordId to resume a + // tailable collection scan from. A PlanStage executor will track the last seen recordId and + // will reset a SlotAccessor for the resumeRecordIdSlot with this recordId. + auto resumeRecordIdSlot = _data.env->registerSlot( + "resumeRecordId"_sd, sbe::value::TypeTags::Nothing, 0, false, &_slotIdGenerator); + + // For tailable collection scan we need to build a special union sub-tree consisting of two + // branches: + // 1) An anchor branch implementing an initial collection scan before the first EOF is hit. + // 2) A resume branch implementing all consecutive collection scans from a recordId which was + // seen last. + // + // The 'makeStage' parameter is used to build a PlanStage tree which is served as a root stage + // for each of the union branches. The same machanism is used to build each union branch, and + // the special logic which needs to be triggered depending on which branch we build is + // controlled by setting the _isTailableCollScanResumeBranch flag. + + _isBuildingUnionForTailableCollScan = true; + + auto makeUnionBranch = [&](bool isTailableCollScanResumeBranch) + -> std::pair> { + _isTailableCollScanResumeBranch = isTailableCollScanResumeBranch; + auto branch = build(root); + auto branchSlots = sbe::makeSV(*_data.resultSlot, *_data.recordIdSlot); + if (_data.oplogTsSlot) { + branchSlots.push_back(*_data.oplogTsSlot); + } + if (_returnKeySlot) { + branchSlots.push_back(*_returnKeySlot); + } + return {std::move(branchSlots), std::move(branch)}; + }; + + // Build an anchor branch of the union and add a constant filter on top of it, so that it would + // only execute on an initial collection scan, that is, when resumeRecordId is not available + // yet. + auto&& [anchorBranchSlots, anchorBranch] = makeUnionBranch(false); + anchorBranch = sbe::makeS>( + std::move(anchorBranch), + sbe::makeE( + sbe::EPrimUnary::logicNot, + sbe::makeE( + "exists"sv, sbe::makeEs(sbe::makeE(resumeRecordIdSlot))))); + + // Build a resume branch of the union and add a constant filter on op of it, so that it would + // only execute when we resume a collection scan from the resumeRecordId. + auto&& [resumeBranchSlots, resumeBranch] = makeUnionBranch(true); + resumeBranch = sbe::makeS>( + sbe::makeS(std::move(resumeBranch), boost::none, 1), + sbe::makeE("exists"sv, + sbe::makeEs(sbe::makeE(resumeRecordIdSlot)))); + + invariant(anchorBranchSlots.size() == resumeBranchSlots.size()); + + // A vector of the output slots for each union branch. + auto branchSlots = make_vector(std::move(anchorBranchSlots), + std::move(resumeBranchSlots)); + + _data.resultSlot = _slotIdGenerator.generate(); + _data.recordIdSlot = _slotIdGenerator.generate(); + auto unionOutputSlots = sbe::makeSV(*_data.resultSlot, *_data.recordIdSlot); + if (_data.oplogTsSlot) { + _data.oplogTsSlot = _slotIdGenerator.generate(); + unionOutputSlots.push_back(*_data.oplogTsSlot); + } + + // Branch output slots become the input slots to the union. + auto unionStage = + sbe::makeS(make_vector>( + std::move(anchorBranch), std::move(resumeBranch)), + branchSlots, + unionOutputSlots); + _isBuildingUnionForTailableCollScan = false; + return unionStage; +} + // Returns a non-null pointer to the root of a plan tree, or a non-OK status if the PlanStage tree // could not be constructed. std::unique_ptr SlotBasedStageBuilder::build(const QuerySolutionNode* root) { @@ -489,6 +582,22 @@ std::unique_ptr SlotBasedStageBuilder::build(const QuerySolution str::stream() << "Can't build exec tree for node: " << root->toString(), kStageBuilders.find(root->getType()) != kStageBuilders.end()); + // If this plan is for a tailable cursor scan, and we're not already in the process of building + // a special union sub-tree implementing such scans, then start building a union sub-tree. Note + // that LIMIT or SKIP stage is used as a splitting point of the two union branches, if present, + // because we need to apply limit (or skip) only in the initial scan (in the anchor branch), and + // the resume branch should not have it. + switch (root->getType()) { + case STAGE_COLLSCAN: + case STAGE_LIMIT: + case STAGE_SKIP: + if (_cq.getQueryRequest().isTailable() && !_isBuildingUnionForTailableCollScan) { + return makeUnionForTailableCollScan(root); + } + default: + break; + } + return std::invoke(kStageBuilders.at(root->getType()), *this, root); } } // namespace mongo::stage_builder diff --git a/src/mongo/db/query/sbe_stage_builder.h b/src/mongo/db/query/sbe_stage_builder.h index 6d1044e998d..3137d12156d 100644 --- a/src/mongo/db/query/sbe_stage_builder.h +++ b/src/mongo/db/query/sbe_stage_builder.h @@ -38,11 +38,21 @@ #include "mongo/db/query/stage_builder.h" namespace mongo::stage_builder { +/** + * Creates a new compilation environment and registers global values within the + * new environment. + */ +std::unique_ptr makeRuntimeEnvironment( + OperationContext* opCtx, sbe::value::SlotIdGenerator* slotIdGenerator); + /** * Some auxiliary data returned by a 'SlotBasedStageBuilder' along with a PlanStage tree root, which * is needed to execute the PlanStage tree. */ struct PlanStageData { + PlanStageData(std::unique_ptr env) + : env{env.get()}, ctx{std::move(env)} {} + std::string debugString() const { StringBuilder builder; @@ -56,15 +66,19 @@ struct PlanStageData { builder << "$$OPLOGTS=s" << *oplogTsSlot << " "; } + env->debugString(&builder); + return builder.str(); } boost::optional resultSlot; boost::optional recordIdSlot; boost::optional oplogTsSlot; + sbe::RuntimeEnvironment* env{nullptr}; sbe::CompileCtx ctx; bool shouldTrackLatestOplogTimestamp{false}; bool shouldTrackResumeToken{false}; + bool shouldUseTailableScan{false}; // Used during the trial run of the runtime planner to track progress of the work done so far. std::unique_ptr trialRunProgressTracker; }; @@ -114,6 +128,8 @@ private: sbe::value::SlotId recordIdKeySlot, const sbe::value::SlotVector& slotsToForward = {}); + std::unique_ptr makeUnionForTailableCollScan(const QuerySolutionNode* root); + sbe::value::SlotIdGenerator _slotIdGenerator; sbe::value::FrameIdGenerator _frameIdGenerator; sbe::value::SpoolIdGenerator _spoolIdGenerator; @@ -127,10 +143,16 @@ private: // should inflate each index entry into an object and bind it to this slot. boost::optional _returnKeySlot; + // These two flags control whether we're in the middle of the process of building a special + // union sub-tree implementing a tailable cursor collection scan, and if so, whether we're + // building an anchor or resume branch. + bool _isBuildingUnionForTailableCollScan{false}; + bool _isTailableCollScanResumeBranch{false}; + PlanYieldPolicySBE* const _yieldPolicy; // Apart from generating just an execution tree, this builder will also produce some auxiliary // data which is needed to execute the tree, such as a result slot, or a recordId slot. - PlanStageData _data; + PlanStageData _data{makeRuntimeEnvironment(_opCtx, &_slotIdGenerator)}; }; } // namespace mongo::stage_builder diff --git a/src/mongo/db/query/sbe_stage_builder_coll_scan.cpp b/src/mongo/db/query/sbe_stage_builder_coll_scan.cpp index fbfcef2cca6..85fe75f5508 100644 --- a/src/mongo/db/query/sbe_stage_builder_coll_scan.cpp +++ b/src/mongo/db/query/sbe_stage_builder_coll_scan.cpp @@ -80,6 +80,24 @@ sbe::ScanOpenCallback makeOpenCallbackIfNeeded(const Collection* collection, return {}; } +/** + * If 'shouldTrackLatestOplogTimestamp' returns a vector holding the name of the oplog 'ts' field + * along with another vector holding a SlotId to map this field to, as well as the standalone value + * of the same SlotId (the latter is returned purely for convenience purposes). + */ +std::tuple, sbe::value::SlotVector, boost::optional> +makeOplogTimestampSlotsIfNeeded(const Collection* collection, + sbe::value::SlotIdGenerator* slotIdGenerator, + bool shouldTrackLatestOplogTimestamp) { + if (shouldTrackLatestOplogTimestamp) { + invariant(collection->ns().isOplog()); + + auto tsSlot = slotIdGenerator->generate(); + return {{repl::OpTime::kTimestampFieldName}, sbe::makeSV(tsSlot), tsSlot}; + } + return {}; +}; + /** * Creates a collection scan sub-tree optimized for oplog scans. We can built an optimized scan * when there is a predicted on the 'ts' field of the oplog collection. @@ -103,6 +121,8 @@ generateOptimizedOplogScan(OperationContext* opCtx, const CollectionScanNode* csn, sbe::value::SlotIdGenerator* slotIdGenerator, PlanYieldPolicy* yieldPolicy, + sbe::RuntimeEnvironment* env, + bool isTailableResumeBranch, TrialRunProgressTracker* tracker) { invariant(collection->ns().isOplog()); // The minTs and maxTs optimizations are not compatible with resumeAfterRecordId and can only @@ -115,9 +135,15 @@ generateOptimizedOplogScan(OperationContext* opCtx, // See if the RecordStore supports the oplogStartHack. If so, the scan will start from the // RecordId stored in seekRecordId. + // Otherwise, if we're building a collection scan for a resume branch of a special union + // sub-tree implementing a tailable cursor scan, we can use the seekRecordIdSlot directly + // to access the recordId to resume the scan from. auto [seekRecordId, seekRecordIdSlot] = [&]() -> std::pair, boost::optional> { - if (csn->minTs) { + if (isTailableResumeBranch) { + auto resumeRecordIdSlot = env->getSlot("resumeRecordId"_sd); + return {{}, resumeRecordIdSlot}; + } else if (csn->minTs) { auto goal = oploghack::keyForOptime(*csn->minTs); if (goal.isOK()) { auto startLoc = @@ -134,18 +160,10 @@ generateOptimizedOplogScan(OperationContext* opCtx, // Check if we need to project out an oplog 'ts' field as part of the collection scan. We will // need it either when 'maxTs' bound has been provided, so that we can apply an EOF filter, of // if we need to track the latest oplog timestamp. - auto [fields, slots, tsSlot] = [&]() -> std::tuple, - sbe::value::SlotVector, - boost::optional> { - // Don't project the 'ts' if stopApplyingFilterAfterFirstMatch is 'true'. We will have - // another scan stage where it will be done. - if (!csn->stopApplyingFilterAfterFirstMatch && - (csn->maxTs || csn->shouldTrackLatestOplogTimestamp)) { - auto tsSlot = slotIdGenerator->generate(); - return {{repl::OpTime::kTimestampFieldName}, sbe::makeSV(tsSlot), tsSlot}; - } - return {}; - }(); + const auto shouldTrackLatestOplogTimestamp = !csn->stopApplyingFilterAfterFirstMatch && + (csn->maxTs || csn->shouldTrackLatestOplogTimestamp); + auto&& [fields, slots, tsSlot] = makeOplogTimestampSlotsIfNeeded( + collection, slotIdGenerator, shouldTrackLatestOplogTimestamp); NamespaceStringOrUUID nss{collection->ns().db().toString(), collection->uuid()}; auto stage = sbe::makeS(nss, @@ -217,16 +235,11 @@ generateOptimizedOplogScan(OperationContext* opCtx, // inner branch, and the execution will continue from this point further on, without // applying the filter. if (csn->stopApplyingFilterAfterFirstMatch) { - std::tie(fields, slots, tsSlot) = - [&]() -> std::tuple, - sbe::value::SlotVector, - boost::optional> { - if (csn->shouldTrackLatestOplogTimestamp) { - auto tsSlot = slotIdGenerator->generate(); - return {{repl::OpTime::kTimestampFieldName}, sbe::makeSV(tsSlot), tsSlot}; - } - return {}; - }(); + invariant(csn->minTs || csn->maxTs); + invariant(csn->direction == CollectionScanParams::FORWARD); + + std::tie(fields, slots, tsSlot) = makeOplogTimestampSlotsIfNeeded( + collection, slotIdGenerator, csn->shouldTrackLatestOplogTimestamp); seekRecordIdSlot = recordIdSlot; resultSlot = slotIdGenerator->generate(); @@ -268,27 +281,32 @@ generateGenericCollScan(const Collection* collection, const CollectionScanNode* csn, sbe::value::SlotIdGenerator* slotIdGenerator, PlanYieldPolicy* yieldPolicy, + sbe::RuntimeEnvironment* env, + bool isTailableResumeBranch, TrialRunProgressTracker* tracker) { const auto forward = csn->direction == CollectionScanParams::FORWARD; + invariant(!csn->shouldTrackLatestOplogTimestamp || collection->ns().isOplog()); + invariant(!csn->resumeAfterRecordId || forward); + invariant(!csn->resumeAfterRecordId || !csn->tailable); + auto resultSlot = slotIdGenerator->generate(); auto recordIdSlot = slotIdGenerator->generate(); - auto seekRecordIdSlot = boost::make_optional(static_cast(csn->resumeAfterRecordId), - slotIdGenerator->generate()); - - // See if we need to project out an oplog latest timestamp. - auto [fields, slots, tsSlot] = [&]() -> std::tuple, - sbe::value::SlotVector, - boost::optional> { - if (csn->shouldTrackLatestOplogTimestamp) { - invariant(collection->ns().isOplog()); - - auto tsSlot = slotIdGenerator->generate(); - return {{repl::OpTime::kTimestampFieldName}, sbe::makeSV(tsSlot), tsSlot}; + auto seekRecordIdSlot = [&]() -> boost::optional { + if (csn->resumeAfterRecordId) { + return slotIdGenerator->generate(); + } else if (isTailableResumeBranch) { + auto resumeRecordIdSlot = env->getSlot("resumeRecordId"_sd); + invariant(resumeRecordIdSlot); + return resumeRecordIdSlot; } return {}; }(); + // See if we need to project out an oplog latest timestamp. + auto&& [fields, slots, tsSlot] = makeOplogTimestampSlotsIfNeeded( + collection, slotIdGenerator, csn->shouldTrackLatestOplogTimestamp); + NamespaceStringOrUUID nss{collection->ns().db().toString(), collection->uuid()}; auto stage = sbe::makeS(nss, resultSlot, @@ -310,7 +328,7 @@ generateGenericCollScan(const Collection* collection, // // TODO SERVER-48472: raise KeyNotFound error if we cannot position the cursor on // seekRecordIdSlot. - if (seekRecordIdSlot) { + if (seekRecordIdSlot && !isTailableResumeBranch) { stage = sbe::makeS( sbe::makeProjectStage( sbe::makeS(sbe::makeS(), 1, boost::none), @@ -345,15 +363,28 @@ generateCollScan(OperationContext* opCtx, const CollectionScanNode* csn, sbe::value::SlotIdGenerator* slotIdGenerator, PlanYieldPolicy* yieldPolicy, + sbe::RuntimeEnvironment* env, + bool isTailableResumeBranch, TrialRunProgressTracker* tracker) { - uassert(4822889, "Tailable collection scans are not supported in SBE", !csn->tailable); auto [resultSlot, recordIdSlot, oplogTsSlot, stage] = [&]() { if (csn->minTs || csn->maxTs) { - return generateOptimizedOplogScan( - opCtx, collection, csn, slotIdGenerator, yieldPolicy, tracker); + return generateOptimizedOplogScan(opCtx, + collection, + csn, + slotIdGenerator, + yieldPolicy, + env, + isTailableResumeBranch, + tracker); } else { - return generateGenericCollScan(collection, csn, slotIdGenerator, yieldPolicy, tracker); + return generateGenericCollScan(collection, + csn, + slotIdGenerator, + yieldPolicy, + env, + isTailableResumeBranch, + tracker); } }(); diff --git a/src/mongo/db/query/sbe_stage_builder_coll_scan.h b/src/mongo/db/query/sbe_stage_builder_coll_scan.h index b5dd49a9504..f6ed8671739 100644 --- a/src/mongo/db/query/sbe_stage_builder_coll_scan.h +++ b/src/mongo/db/query/sbe_stage_builder_coll_scan.h @@ -29,6 +29,7 @@ #pragma once +#include "mongo/db/exec/sbe/expressions/expression.h" #include "mongo/db/exec/sbe/stages/stages.h" #include "mongo/db/exec/sbe/values/id_generators.h" #include "mongo/db/exec/trial_run_progress_tracker.h" @@ -56,5 +57,7 @@ generateCollScan(OperationContext* opCtx, const CollectionScanNode* csn, sbe::value::SlotIdGenerator* slotIdGenerator, PlanYieldPolicy* yieldPolicy, + sbe::RuntimeEnvironment* env, + bool isTailableResumeBranch, TrialRunProgressTracker* tracker); } // namespace mongo::stage_builder -- cgit v1.2.1