diff options
author | Ian Boros <puppyofkosh@gmail.com> | 2019-05-29 19:31:05 -0400 |
---|---|---|
committer | Ian Boros <puppyofkosh@gmail.com> | 2019-06-07 18:42:19 -0400 |
commit | a3525b8116aee64a88755f4a62b184eb01ba232e (patch) | |
tree | 518fe46f439613801e4a69f701fcb18905cc2f8d | |
parent | fda81c9d4764b933753dfaad4212b217c271272e (diff) | |
download | mongo-a3525b8116aee64a88755f4a62b184eb01ba232e.tar.gz |
SERVER-41294 shard filtering for $sb
(cherry picked from commit ab322d38ef3d5c3836df516f2a40c54c5165cafd)
27 files changed, 670 insertions, 43 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index c8874d3f659..f80bd040b8f 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -1226,6 +1226,7 @@ env.Library( 'exec/requires_collection_stage.cpp', 'exec/requires_index_stage.cpp', 'exec/shard_filter.cpp', + 'exec/shard_filterer_impl.cpp', 'exec/skip.cpp', 'exec/sort.cpp', 'exec/sort_key_generator.cpp', diff --git a/src/mongo/db/exec/shard_filter.cpp b/src/mongo/db/exec/shard_filter.cpp index 51fb723c536..179b4a745f6 100644 --- a/src/mongo/db/exec/shard_filter.cpp +++ b/src/mongo/db/exec/shard_filter.cpp @@ -54,7 +54,7 @@ ShardFilterStage::ShardFilterStage(OperationContext* opCtx, ScopedCollectionMetadata metadata, WorkingSet* ws, PlanStage* child) - : PlanStage(kStageType, opCtx), _ws(ws), _metadata(std::move(metadata)) { + : PlanStage(kStageType, opCtx), _ws(ws), _shardFilterer(std::move(metadata)) { _children.emplace_back(child); } @@ -76,37 +76,29 @@ PlanStage::StageState ShardFilterStage::doWork(WorkingSetID* out) { // If we're sharded make sure that we don't return data that is not owned by us, // including pending documents from in-progress migrations and orphaned documents from // aborted migrations - if (_metadata->isSharded()) { - ShardKeyPattern shardKeyPattern(_metadata->getKeyPattern()); + if (_shardFilterer.isCollectionSharded()) { WorkingSetMember* member = _ws->get(*out); WorkingSetMatchableDocument matchable(member); - BSONObj shardKey = shardKeyPattern.extractShardKeyFromMatchable(matchable); - - if (shardKey.isEmpty()) { - // We can't find a shard key for this document - this should never happen with - // a non-fetched result unless our query planning is screwed up - if (!member->hasObj()) { - Status status(ErrorCodes::InternalError, - "shard key not found after a covered stage, " - "query planning has failed"); - - // Fail loudly and cleanly in production, fatally in debug - error() << redact(status); - dassert(false); - - _ws->free(*out); - *out = WorkingSetCommon::allocateStatusMember(_ws, status); - return PlanStage::FAILURE; - } - // Skip this document with a warning - no shard key should not be possible - // unless manually inserting data into a shard - warning() << "no shard key found in document " << redact(member->obj.value()) << " " - << "for shard key pattern " << _metadata->getKeyPattern() << ", " - << "document may have been inserted manually into shard"; - } + ShardFilterer::DocumentBelongsResult res = + _shardFilterer.documentBelongsToMe(matchable); + + if (res != ShardFilterer::DocumentBelongsResult::kBelongs) { + if (res == ShardFilterer::DocumentBelongsResult::kNoShardKey) { + // We can't find a shard key for this working set member - this should never + // happen with a non-fetched result unless our query planning is screwed up + invariant(member->hasObj()); + + // Skip this working set member with a warning - no shard key should not be + // possible unless manually inserting data into a shard + warning() << "no shard key found in document " << redact(member->obj.value()) + << " for shard key pattern " << _shardFilterer.getKeyPattern() << ", " + << "document may have been inserted manually into shard"; + } else { + invariant(res == ShardFilterer::DocumentBelongsResult::kDoesNotBelong); + } - if (!_metadata->keyBelongsToMe(shardKey)) { + // If the document had no shard key, or doesn't belong to us, skip it. _ws->free(*out); ++_specificStats.chunkSkips; return PlanStage::NEED_TIME; diff --git a/src/mongo/db/exec/shard_filter.h b/src/mongo/db/exec/shard_filter.h index 584ee9ed9c3..d8284ad4e6f 100644 --- a/src/mongo/db/exec/shard_filter.h +++ b/src/mongo/db/exec/shard_filter.h @@ -30,7 +30,7 @@ #pragma once #include "mongo/db/exec/plan_stage.h" -#include "mongo/db/s/scoped_collection_metadata.h" +#include "mongo/db/exec/shard_filterer_impl.h" namespace mongo { @@ -96,9 +96,13 @@ private: // Stats ShardingFilterStats _specificStats; - // Note: it is important that this is the metadata from the time this stage is constructed. - // See class comment for details. - ScopedCollectionMetadata _metadata; + // Note: it is important that this owns the ScopedCollectionMetadata from the time this stage + // is constructed. See ScopedCollectionMetadata class comment and MetadataManager comment for + // details. The existence of the ScopedCollectionMetadata prevents data which may have been + // migrated from being deleted while the query is still active. If we didn't hold one + // ScopedCollectionMetadata for the entire query, it'd be possible for data which the query + // needs to read to be deleted while it's still running. + ShardFiltererImpl _shardFilterer; }; } // namespace mongo diff --git a/src/mongo/db/exec/shard_filterer.h b/src/mongo/db/exec/shard_filterer.h new file mode 100644 index 00000000000..4715e7333ad --- /dev/null +++ b/src/mongo/db/exec/shard_filterer.h @@ -0,0 +1,51 @@ +/** + * Copyright (C) 2018-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/db/matcher/matchable.h" +#include "mongo/db/operation_context.h" +#include "mongo/s/shard_key_pattern.h" + +namespace mongo { + +/** + * Interface for doing shard filtering, to be used by both the find and agg execution trees. + */ +class ShardFilterer { +public: + enum class DocumentBelongsResult { kDoesNotBelong, kBelongs, kNoShardKey }; + + virtual ~ShardFilterer() = default; + + virtual DocumentBelongsResult documentBelongsToMe(const MatchableDocument&) const = 0; + virtual bool isCollectionSharded() const = 0; + virtual const KeyPattern& getKeyPattern() const = 0; +}; +} // namespace mongo diff --git a/src/mongo/db/exec/shard_filterer_impl.cpp b/src/mongo/db/exec/shard_filterer_impl.cpp new file mode 100644 index 00000000000..c9e25dfec10 --- /dev/null +++ b/src/mongo/db/exec/shard_filterer_impl.cpp @@ -0,0 +1,61 @@ +/** + * Copyright (C) 2018-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/exec/shard_filterer_impl.h" + +#include "mongo/db/matcher/matchable.h" +#include "mongo/db/s/scoped_collection_metadata.h" + +namespace mongo { + +ShardFiltererImpl::ShardFiltererImpl(ScopedCollectionMetadata md) : _metadata(std::move(md)) { + if (_metadata->isSharded()) { + _keyPattern = ShardKeyPattern(_metadata->getKeyPattern()); + } +} + +ShardFilterer::DocumentBelongsResult ShardFiltererImpl::documentBelongsToMe( + const MatchableDocument& doc) const { + if (!_metadata->isSharded()) { + return DocumentBelongsResult::kBelongs; + } + + BSONObj shardKey = _keyPattern->extractShardKeyFromMatchable(doc); + + if (shardKey.isEmpty()) { + return DocumentBelongsResult::kNoShardKey; + } + + return _metadata->keyBelongsToMe(shardKey) ? DocumentBelongsResult::kBelongs + : DocumentBelongsResult::kDoesNotBelong; +} + +} // namespace mongo diff --git a/src/mongo/db/exec/shard_filterer_impl.h b/src/mongo/db/exec/shard_filterer_impl.h new file mode 100644 index 00000000000..d729bb96b18 --- /dev/null +++ b/src/mongo/db/exec/shard_filterer_impl.h @@ -0,0 +1,57 @@ +/** + * Copyright (C) 2019-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/db/exec/shard_filterer.h" +#include "mongo/db/matcher/matchable.h" +#include "mongo/db/s/scoped_collection_metadata.h" + +namespace mongo { + +class ShardFiltererImpl : public ShardFilterer { +public: + ShardFiltererImpl(ScopedCollectionMetadata md); + + DocumentBelongsResult documentBelongsToMe(const MatchableDocument& doc) const override; + + bool isCollectionSharded() const override { + return _metadata->isSharded(); + } + + const KeyPattern& getKeyPattern() const override { + invariant(_keyPattern); + return _keyPattern->getKeyPattern(); + } + +private: + ScopedCollectionMetadata _metadata; + boost::optional<ShardKeyPattern> _keyPattern; +}; +} // namespace mongo diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript index 31bf3265ef7..f425c77be64 100644 --- a/src/mongo/db/pipeline/SConscript +++ b/src/mongo/db/pipeline/SConscript @@ -237,6 +237,7 @@ env.CppUnitTest( 'document_source_redact_test.cpp', 'document_source_replace_root_test.cpp', 'document_source_sample_test.cpp', + 'document_source_internal_shard_filter_test.cpp', 'document_source_skip_test.cpp', 'document_source_sort_by_count_test.cpp', 'document_source_sort_test.cpp', @@ -389,6 +390,7 @@ pipelineeEnv.Library( 'document_source_group.cpp', 'document_source_index_stats.cpp', 'document_source_internal_inhibit_optimization.cpp', + 'document_source_internal_shard_filter.cpp', 'document_source_internal_split_pipeline.cpp', 'document_source_limit.cpp', 'document_source_list_cached_and_active_users.cpp', diff --git a/src/mongo/db/pipeline/document_source.cpp b/src/mongo/db/pipeline/document_source.cpp index 25cf5605a3b..158e97b33fa 100644 --- a/src/mongo/db/pipeline/document_source.cpp +++ b/src/mongo/db/pipeline/document_source.cpp @@ -32,6 +32,7 @@ #include "mongo/db/pipeline/document_source.h" #include "mongo/db/matcher/expression_algo.h" +#include "mongo/db/pipeline/document_source_internal_shard_filter.h" #include "mongo/db/pipeline/document_source_match.h" #include "mongo/db/pipeline/document_source_sample.h" #include "mongo/db/pipeline/document_source_sequential_document_cache.h" @@ -255,15 +256,9 @@ Pipeline::SourceContainer::iterator DocumentSource::optimizeAt( Pipeline::SourceContainer::iterator itr, Pipeline::SourceContainer* container) { invariant(*itr == this); - // If we are at the end of the pipeline, only optimize in the special case of a cache stage. - if (std::next(itr) == container->end()) { - return dynamic_cast<DocumentSourceSequentialDocumentCache*>(this) - ? doOptimizeAt(itr, container) - : container->end(); - } - // Attempt to swap 'itr' with a subsequent $match or subsequent $sample. - if (pushMatchBefore(itr, container) || pushSampleBefore(itr, container)) { + if (std::next(itr) != container->end() && + (pushMatchBefore(itr, container) || pushSampleBefore(itr, container))) { // The stage before the pushed before stage may be able to optimize further, if there is // such a stage. return std::prev(itr) == container->begin() ? std::prev(itr) : std::prev(std::prev(itr)); diff --git a/src/mongo/db/pipeline/document_source.h b/src/mongo/db/pipeline/document_source.h index 05397303115..84d724231fc 100644 --- a/src/mongo/db/pipeline/document_source.h +++ b/src/mongo/db/pipeline/document_source.h @@ -482,11 +482,10 @@ protected: /** * Attempt to perform an optimization with the following source in the pipeline. 'container' - * refers to the entire pipeline, and 'itr' points to this stage within the pipeline. The caller - * must guarantee that std::next(itr) != container->end(). + * refers to the entire pipeline, and 'itr' points to this stage within the pipeline. * * The return value is an iterator over the same container which points to the first location - * in the container at which an optimization may be possible. + * in the container at which an optimization may be possible, or the end of the container(). * * For example, if a swap takes place, the returned iterator should just be the position * directly preceding 'itr', if such a position exists, since the stage at that position may be diff --git a/src/mongo/db/pipeline/document_source_cursor.cpp b/src/mongo/db/pipeline/document_source_cursor.cpp index 0015d915acc..1d971d188a8 100644 --- a/src/mongo/db/pipeline/document_source_cursor.cpp +++ b/src/mongo/db/pipeline/document_source_cursor.cpp @@ -175,6 +175,10 @@ Pipeline::SourceContainer::iterator DocumentSourceCursor::doOptimizeAt( Pipeline::SourceContainer::iterator itr, Pipeline::SourceContainer* container) { invariant(*itr == this); + if (std::next(itr) == container->end()) { + return container->end(); + } + auto nextLimit = dynamic_cast<DocumentSourceLimit*>((*std::next(itr)).get()); if (nextLimit) { diff --git a/src/mongo/db/pipeline/document_source_graph_lookup.cpp b/src/mongo/db/pipeline/document_source_graph_lookup.cpp index 007718f0e13..b2f349f1829 100644 --- a/src/mongo/db/pipeline/document_source_graph_lookup.cpp +++ b/src/mongo/db/pipeline/document_source_graph_lookup.cpp @@ -365,6 +365,10 @@ Pipeline::SourceContainer::iterator DocumentSourceGraphLookUp::doOptimizeAt( Pipeline::SourceContainer::iterator itr, Pipeline::SourceContainer* container) { invariant(*itr == this); + if (std::next(itr) == container->end()) { + return container->end(); + } + // If we are not already handling an $unwind stage internally, we can combine with the following // $unwind stage. auto nextUnwind = dynamic_cast<DocumentSourceUnwind*>((*std::next(itr)).get()); diff --git a/src/mongo/db/pipeline/document_source_internal_shard_filter.cpp b/src/mongo/db/pipeline/document_source_internal_shard_filter.cpp new file mode 100644 index 00000000000..a548cb6366c --- /dev/null +++ b/src/mongo/db/pipeline/document_source_internal_shard_filter.cpp @@ -0,0 +1,104 @@ +/** + * Copyright (C) 2019-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kQuery + +#include "mongo/platform/basic.h" + +#include "mongo/db/pipeline/document_source_internal_shard_filter.h" + +#include "mongo/db/pipeline/document.h" +#include "mongo/util/log.h" + +namespace mongo { + +// +// This DocumentSource is not registered and can only be created as part of expansions for other +// DocumentSources. +// + +DocumentSourceInternalShardFilter::DocumentSourceInternalShardFilter( + const boost::intrusive_ptr<ExpressionContext>& pExpCtx, + std::unique_ptr<ShardFilterer> shardFilterer) + : DocumentSource(pExpCtx), _shardFilterer(std::move(shardFilterer)) {} + +DocumentSource::GetNextResult DocumentSourceInternalShardFilter::getNext() { + pExpCtx->checkForInterrupt(); + + auto next = pSource->getNext(); + invariant(_shardFilterer); + for (; next.isAdvanced(); next = pSource->getNext()) { + BSONMatchableDocument matchable(next.getDocument().toBson()); + const auto belongsRes = _shardFilterer->documentBelongsToMe(matchable); + if (belongsRes == ShardFilterer::DocumentBelongsResult::kBelongs) { + return next; + } + + if (belongsRes == ShardFilterer::DocumentBelongsResult::kNoShardKey) { + warning() << "no shard key found in document " << redact(next.getDocument().toBson()) + << " " + << "for shard key pattern " << _shardFilterer->getKeyPattern() << ", " + << "document may have been inserted manually into shard"; + } + + // For performance reasons, a streaming stage must not keep references to documents across + // calls to getNext(). Such stages must retrieve a result from their child and then release + // it (or return it) before asking for another result. Failing to do so can result in extra + // work, since the Document/Value library must copy data on write when that data has a + // refcount above one. + next.releaseDocument(); + } + return next; +} + +Pipeline::SourceContainer::iterator DocumentSourceInternalShardFilter::doOptimizeAt( + Pipeline::SourceContainer::iterator itr, Pipeline::SourceContainer* container) { + invariant(*itr == this); + + if (_shardFilterer->isCollectionSharded()) { + return std::next(itr); + } + + if (itr == container->begin()) { + // Delete this stage from the pipeline if the operation does not require shard versioning. + container->erase(itr); + return container->begin(); + } + + auto ret = std::prev(itr); + container->erase(itr); + return ret; +} + +Value DocumentSourceInternalShardFilter::serialize( + boost::optional<ExplainOptions::Verbosity> explain) const { + return Value(DOC(getSourceName() << Document())); +} + +} // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_internal_shard_filter.h b/src/mongo/db/pipeline/document_source_internal_shard_filter.h new file mode 100644 index 00000000000..8fbfe6b07f1 --- /dev/null +++ b/src/mongo/db/pipeline/document_source_internal_shard_filter.h @@ -0,0 +1,82 @@ +/** + * Copyright (C) 2019-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/db/exec/shard_filterer.h" +#include "mongo/db/pipeline/document_source.h" + +namespace mongo { + +/** + * Filters out documents which are physically present on this shard but not logically owned + * according to this operation's shard version. + */ +class DocumentSourceInternalShardFilter final : public DocumentSource { +public: + static constexpr StringData kStageName = "$_internalShardFilter"_sd; + + static boost::intrusive_ptr<DocumentSource> createFromBson( + BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx); + + DocumentSourceInternalShardFilter(const boost::intrusive_ptr<ExpressionContext>& pExpCtx, + std::unique_ptr<ShardFilterer> shardFilterer); + + const char* getSourceName() const override { + return kStageName.rawData(); + } + + StageConstraints constraints(Pipeline::SplitState pipeState) const override { + return StageConstraints(StreamType::kStreaming, + PositionRequirement::kNone, + HostTypeRequirement::kAnyShard, + DiskUseRequirement::kNoDiskUse, + FacetRequirement::kNotAllowed, + TransactionRequirement::kNotAllowed, + LookupRequirement::kNotAllowed, + ChangeStreamRequirement::kBlacklist); + } + + GetNextResult getNext() override; + + Value serialize( + boost::optional<ExplainOptions::Verbosity> explain = boost::none) const override; + + boost::optional<DistributedPlanLogic> distributedPlanLogic() override { + return boost::none; + } + + Pipeline::SourceContainer::iterator doOptimizeAt(Pipeline::SourceContainer::iterator itr, + Pipeline::SourceContainer* container) override; + +private: + std::unique_ptr<ShardFilterer> _shardFilterer; +}; + +} // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_internal_shard_filter_test.cpp b/src/mongo/db/pipeline/document_source_internal_shard_filter_test.cpp new file mode 100644 index 00000000000..43eda00d9af --- /dev/null +++ b/src/mongo/db/pipeline/document_source_internal_shard_filter_test.cpp @@ -0,0 +1,184 @@ +/** + * Copyright (C) 2019-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/bson/bsonmisc.h" +#include "mongo/bson/bsonobj.h" +#include "mongo/db/exec/shard_filterer.h" +#include "mongo/db/pipeline/aggregation_context_fixture.h" +#include "mongo/db/pipeline/document_source_internal_shard_filter.h" +#include "mongo/db/pipeline/document_source_mock.h" +#include "mongo/db/pipeline/document_value_test_util.h" +#include "mongo/db/pipeline/pipeline.h" +#include "mongo/unittest/unittest.h" + +namespace mongo { +namespace { + +// This provides access to getExpCtx(), but we'll use a different name for this test suite. +using DocumentSourceInternalShardFilterTest = AggregationContextFixture; + +/** + * ShardFilterer with default implementations to be used/extended by other tests. + */ +class ShardFiltererBaseForTest : public ShardFilterer { +public: + DocumentBelongsResult documentBelongsToMe(const MatchableDocument& doc) const override { + MONGO_UNREACHABLE; + } + + bool isCollectionSharded() const override { + MONGO_UNREACHABLE; + } + + const KeyPattern& getKeyPattern() const override { + MONGO_UNREACHABLE; + } +}; + +/** + * ShardFilterer which indicates that the collection isn't sharded. + */ +class UnshardedShardFilterer : public ShardFiltererBaseForTest { +public: + bool isCollectionSharded() const override { + return false; + } +}; + +TEST_F(DocumentSourceInternalShardFilterTest, ShouldOptimizeAwayIfUnshardedCollection) { + Pipeline::SourceContainer container; + auto mock = DocumentSourceMock::createForTest({"{a: 1}", "{a: 2}"}); + + container.push_back(mock); + container.push_back(new DocumentSourceInternalShardFilter( + getExpCtx(), std::make_unique<UnshardedShardFilterer>())); + + // Make 'it' point to the the shard filter source. + auto it = container.begin(); + ++it; + + // The shard filter should remove itself. + container.back()->optimizeAt(it, &container); + ASSERT_EQUALS(1U, container.size()); +} + +TEST_F(DocumentSourceInternalShardFilterTest, + ShouldOptimizeAwayIfUnshardedCollectionAndFirstInPipeline) { + Pipeline::SourceContainer container; + + container.push_back(new DocumentSourceInternalShardFilter( + getExpCtx(), std::make_unique<UnshardedShardFilterer>())); + + // Make 'it' point to the the shard filter source. + auto it = container.begin(); + + // The shard filter should remove itself. + container.back()->optimizeAt(it, &container); + ASSERT_EQUALS(0U, container.size()); +} + +/** + * Indicate that the first 'n' documents passed through documentBelongsToMe() don't belong. + */ +class FirstNShardFilterer : public ShardFiltererBaseForTest { +public: + FirstNShardFilterer(unsigned int n) : _numToFilter(n) {} + + DocumentBelongsResult documentBelongsToMe(const MatchableDocument& doc) const override { + return i++ >= _numToFilter ? DocumentBelongsResult::kBelongs + : DocumentBelongsResult::kDoesNotBelong; + } + + bool isCollectionSharded() const override { + return true; + } + +private: + unsigned int _numToFilter; + + // mutable because documentBelongsToMe() is marked const. + mutable unsigned int i = 0; +}; + +TEST_F(DocumentSourceInternalShardFilterTest, FiltersDocuments) { + Pipeline::SourceContainer container; + auto mock = DocumentSourceMock::createForTest({"{a: 1}", "{a: 2}", "{a: 3}"}); + + const auto nToFilter = 2; + DocumentSourceInternalShardFilter filter(getExpCtx(), + std::make_unique<FirstNShardFilterer>(nToFilter)); + filter.setSource(mock.get()); + + // The first two documents should get filtered out. + auto next = filter.getNext(); + ASSERT_TRUE(next.isAdvanced()); + ASSERT_VALUE_EQ(Value(3), next.getDocument().getField("a")); + + ASSERT_TRUE(filter.getNext().isEOF()); +} + +/** + * Indicate that documents don't have a shard key. + */ +class ShardFiltererNoShardKey : public ShardFiltererBaseForTest { +public: + ShardFiltererNoShardKey() : _kp(BSON("b" << 1)) {} + + DocumentBelongsResult documentBelongsToMe(const MatchableDocument& doc) const override { + return DocumentBelongsResult::kNoShardKey; + } + + bool isCollectionSharded() const override { + return true; + } + + const KeyPattern& getKeyPattern() const override { + return _kp; + } + +private: + KeyPattern _kp; +}; + +TEST_F(DocumentSourceInternalShardFilterTest, SkipDocumentsWithoutShardKey) { + Pipeline::SourceContainer container; + auto mock = DocumentSourceMock::createForTest({"{a: 1}", "{a: 2}", "{a: 3}"}); + + DocumentSourceInternalShardFilter filter(getExpCtx(), + std::make_unique<ShardFiltererNoShardKey>()); + filter.setSource(mock.get()); + + // The call to getNext() return nothing. + ASSERT_TRUE(filter.getNext().isEOF()); +} + +} // namespace +} // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_limit.cpp b/src/mongo/db/pipeline/document_source_limit.cpp index bcc10f4f892..f8b2ca3c0aa 100644 --- a/src/mongo/db/pipeline/document_source_limit.cpp +++ b/src/mongo/db/pipeline/document_source_limit.cpp @@ -56,6 +56,10 @@ Pipeline::SourceContainer::iterator DocumentSourceLimit::doOptimizeAt( Pipeline::SourceContainer::iterator itr, Pipeline::SourceContainer* container) { invariant(*itr == this); + if (std::next(itr) == container->end()) { + return container->end(); + } + auto nextLimit = dynamic_cast<DocumentSourceLimit*>((*std::next(itr)).get()); if (nextLimit) { diff --git a/src/mongo/db/pipeline/document_source_lookup.cpp b/src/mongo/db/pipeline/document_source_lookup.cpp index 52c3b6b40b4..5a1d7767e5f 100644 --- a/src/mongo/db/pipeline/document_source_lookup.cpp +++ b/src/mongo/db/pipeline/document_source_lookup.cpp @@ -338,6 +338,10 @@ Pipeline::SourceContainer::iterator DocumentSourceLookUp::doOptimizeAt( Pipeline::SourceContainer::iterator itr, Pipeline::SourceContainer* container) { invariant(*itr == this); + if (std::next(itr) == container->end()) { + return container->end(); + } + auto nextUnwind = dynamic_cast<DocumentSourceUnwind*>((*std::next(itr)).get()); // If we are not already handling an $unwind stage internally, we can combine with the diff --git a/src/mongo/db/pipeline/document_source_match.cpp b/src/mongo/db/pipeline/document_source_match.cpp index 26b8eaa14db..1900a644627 100644 --- a/src/mongo/db/pipeline/document_source_match.cpp +++ b/src/mongo/db/pipeline/document_source_match.cpp @@ -113,6 +113,10 @@ Pipeline::SourceContainer::iterator DocumentSourceMatch::doOptimizeAt( Pipeline::SourceContainer::iterator itr, Pipeline::SourceContainer* container) { invariant(*itr == this); + if (std::next(itr) == container->end()) { + return container->end(); + } + auto nextMatch = dynamic_cast<DocumentSourceMatch*>((*std::next(itr)).get()); // Since a text search must use an index, it must be the first stage in the pipeline. We cannot diff --git a/src/mongo/db/pipeline/document_source_redact.cpp b/src/mongo/db/pipeline/document_source_redact.cpp index 03bc48e467d..8c48726aa2f 100644 --- a/src/mongo/db/pipeline/document_source_redact.cpp +++ b/src/mongo/db/pipeline/document_source_redact.cpp @@ -80,6 +80,10 @@ Pipeline::SourceContainer::iterator DocumentSourceRedact::doOptimizeAt( Pipeline::SourceContainer::iterator itr, Pipeline::SourceContainer* container) { invariant(*itr == this); + if (std::next(itr) == container->end()) { + return container->end(); + } + auto nextMatch = dynamic_cast<DocumentSourceMatch*>((*std::next(itr)).get()); if (nextMatch) { diff --git a/src/mongo/db/pipeline/document_source_single_document_transformation.cpp b/src/mongo/db/pipeline/document_source_single_document_transformation.cpp index a0dc8fbfcf5..4575e9fa02b 100644 --- a/src/mongo/db/pipeline/document_source_single_document_transformation.cpp +++ b/src/mongo/db/pipeline/document_source_single_document_transformation.cpp @@ -93,6 +93,11 @@ Value DocumentSourceSingleDocumentTransformation::serialize( Pipeline::SourceContainer::iterator DocumentSourceSingleDocumentTransformation::doOptimizeAt( Pipeline::SourceContainer::iterator itr, Pipeline::SourceContainer* container) { invariant(*itr == this); + + if (std::next(itr) == container->end()) { + return container->end(); + } + auto nextSkip = dynamic_cast<DocumentSourceSkip*>((*std::next(itr)).get()); if (nextSkip) { diff --git a/src/mongo/db/pipeline/document_source_skip.cpp b/src/mongo/db/pipeline/document_source_skip.cpp index 364f148162e..2eead90aa3f 100644 --- a/src/mongo/db/pipeline/document_source_skip.cpp +++ b/src/mongo/db/pipeline/document_source_skip.cpp @@ -84,6 +84,10 @@ Pipeline::SourceContainer::iterator DocumentSourceSkip::doOptimizeAt( Pipeline::SourceContainer::iterator itr, Pipeline::SourceContainer* container) { invariant(*itr == this); + if (std::next(itr) == container->end()) { + return container->end(); + } + auto nextSkip = dynamic_cast<DocumentSourceSkip*>((*std::next(itr)).get()); if (nextSkip) { // '_nToSkip' can potentially overflow causing it to become negative and skip nothing. diff --git a/src/mongo/db/pipeline/mongo_process_interface.h b/src/mongo/db/pipeline/mongo_process_interface.h index ac87792aa93..ae7e707e3e9 100644 --- a/src/mongo/db/pipeline/mongo_process_interface.h +++ b/src/mongo/db/pipeline/mongo_process_interface.h @@ -56,6 +56,7 @@ namespace mongo { +class ShardFilterer; class ExpressionContext; class Pipeline; class PipelineDeleter; @@ -260,6 +261,12 @@ public: const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) = 0; /** + * Produces a ShardFilterer. May return null. + */ + virtual std::unique_ptr<ShardFilterer> getShardFilterer( + const boost::intrusive_ptr<ExpressionContext>& expCtx) const = 0; + + /** * Returns a vector of owned BSONObjs, each of which contains details of an in-progress * operation or, optionally, an idle connection. If userMode is kIncludeAllUsers, report * operations for all authenticated users; otherwise, report only the current user's operations. diff --git a/src/mongo/db/pipeline/mongos_process_interface.h b/src/mongo/db/pipeline/mongos_process_interface.h index 57babdf88e7..39c69b2daf7 100644 --- a/src/mongo/db/pipeline/mongos_process_interface.h +++ b/src/mongo/db/pipeline/mongos_process_interface.h @@ -29,6 +29,7 @@ #pragma once +#include "mongo/db/exec/shard_filterer.h" #include "mongo/db/pipeline/mongo_process_common.h" #include "mongo/db/pipeline/pipeline.h" #include "mongo/s/async_requests_sender.h" @@ -164,6 +165,11 @@ public: MONGO_UNREACHABLE; } + std::unique_ptr<ShardFilterer> getShardFilterer( + const boost::intrusive_ptr<ExpressionContext>& expCtx) const override { + return nullptr; + } + std::string getShardName(OperationContext* opCtx) const final { MONGO_UNREACHABLE; } diff --git a/src/mongo/db/pipeline/process_interface_shardsvr.cpp b/src/mongo/db/pipeline/process_interface_shardsvr.cpp index d523bdf5d14..c2672dcf2a0 100644 --- a/src/mongo/db/pipeline/process_interface_shardsvr.cpp +++ b/src/mongo/db/pipeline/process_interface_shardsvr.cpp @@ -41,8 +41,10 @@ #include "mongo/db/concurrency/d_concurrency.h" #include "mongo/db/curop.h" #include "mongo/db/db_raii.h" +#include "mongo/db/exec/shard_filterer_impl.h" #include "mongo/db/ops/write_ops_exec.h" #include "mongo/db/ops/write_ops_gen.h" +#include "mongo/db/pipeline/document_source_internal_shard_filter.h" #include "mongo/db/pipeline/sharded_agg_helpers.h" #include "mongo/db/query/query_knobs_gen.h" #include "mongo/db/s/collection_sharding_state.h" @@ -202,4 +204,10 @@ unique_ptr<Pipeline, PipelineDeleter> MongoInterfaceShardServer::attachCursorSou return attachCursorSourceToPipelineForLocalRead(expCtx, pipeline.release()); } +std::unique_ptr<ShardFilterer> MongoInterfaceShardServer::getShardFilterer( + const boost::intrusive_ptr<ExpressionContext>& expCtx) const { + return std::make_unique<ShardFiltererImpl>( + CollectionShardingState::get(expCtx->opCtx, expCtx->ns)->getOrphansFilter(expCtx->opCtx)); +} + } // namespace mongo diff --git a/src/mongo/db/pipeline/process_interface_shardsvr.h b/src/mongo/db/pipeline/process_interface_shardsvr.h index f54a9a93086..4dbc21b6ca2 100644 --- a/src/mongo/db/pipeline/process_interface_shardsvr.h +++ b/src/mongo/db/pipeline/process_interface_shardsvr.h @@ -84,6 +84,9 @@ public: std::unique_ptr<Pipeline, PipelineDeleter> attachCursorSourceToPipeline( const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) final; + + std::unique_ptr<ShardFilterer> getShardFilterer( + const boost::intrusive_ptr<ExpressionContext>& expCtx) const override final; }; } // namespace mongo diff --git a/src/mongo/db/pipeline/process_interface_standalone.h b/src/mongo/db/pipeline/process_interface_standalone.h index 7e6d53e1bd5..5ba8ea4f21b 100644 --- a/src/mongo/db/pipeline/process_interface_standalone.h +++ b/src/mongo/db/pipeline/process_interface_standalone.h @@ -30,6 +30,7 @@ #pragma once #include "mongo/db/dbdirectclient.h" +#include "mongo/db/exec/shard_filterer.h" #include "mongo/db/ops/write_ops_exec.h" #include "mongo/db/ops/write_ops_gen.h" #include "mongo/db/pipeline/mongo_process_common.h" @@ -97,6 +98,12 @@ public: std::unique_ptr<Pipeline, PipelineDeleter> attachCursorSourceToPipelineForLocalRead( const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) override; std::string getShardName(OperationContext* opCtx) const final; + + std::unique_ptr<ShardFilterer> getShardFilterer( + const boost::intrusive_ptr<ExpressionContext>& expCtx) const override { + // We'll never do shard filtering on a standalone. + return nullptr; + } std::pair<std::vector<FieldPath>, bool> collectDocumentKeyFieldsForHostedCollection( OperationContext* opCtx, const NamespaceString&, UUID) const override; std::vector<FieldPath> collectDocumentKeyFieldsActingAsRouter( diff --git a/src/mongo/db/pipeline/stub_mongo_process_interface.h b/src/mongo/db/pipeline/stub_mongo_process_interface.h index fe408fdf2ab..a8a4a02e1e4 100644 --- a/src/mongo/db/pipeline/stub_mongo_process_interface.h +++ b/src/mongo/db/pipeline/stub_mongo_process_interface.h @@ -136,6 +136,11 @@ public: MONGO_UNREACHABLE; } + std::unique_ptr<ShardFilterer> getShardFilterer( + const boost::intrusive_ptr<ExpressionContext>& expCtx) const override { + MONGO_UNREACHABLE; + } + std::vector<BSONObj> getCurrentOps(const boost::intrusive_ptr<ExpressionContext>& expCtx, CurrentOpConnectionsMode connMode, CurrentOpSessionsMode sessionMode, diff --git a/src/mongo/db/pipeline/stub_mongo_process_interface_lookup_single_document.h b/src/mongo/db/pipeline/stub_mongo_process_interface_lookup_single_document.h index 4b3bcbaafdc..36d9dd81cbc 100644 --- a/src/mongo/db/pipeline/stub_mongo_process_interface_lookup_single_document.h +++ b/src/mongo/db/pipeline/stub_mongo_process_interface_lookup_single_document.h @@ -33,6 +33,7 @@ #include <deque> #include <vector> +#include "mongo/db/exec/shard_filterer.h" #include "mongo/db/pipeline/document.h" #include "mongo/db/pipeline/document_source.h" #include "mongo/db/pipeline/pipeline.h" @@ -40,6 +41,21 @@ namespace mongo { +class StubShardFilterer : public ShardFilterer { +public: + DocumentBelongsResult documentBelongsToMe(const MatchableDocument& doc) const override { + MONGO_UNREACHABLE; + } + + bool isCollectionSharded() const override { + MONGO_UNREACHABLE; + } + + const KeyPattern& getKeyPattern() const override { + MONGO_UNREACHABLE; + } +}; + /** * A mock MongoProcessInterface which allows mocking a foreign pipeline. */ @@ -67,6 +83,16 @@ public: boost::optional<BSONObj> readConcern, bool allowSpeculativeMajorityRead); + std::unique_ptr<ShardFilterer> getShardFilterer( + const boost::intrusive_ptr<ExpressionContext>& expCtx) const override { + // Try to emulate the behavior mongos and mongod would each follow. + if (expCtx->inMongos) { + return nullptr; + } else { + return stdx::make_unique<StubShardFilterer>(); + } + } + private: std::deque<DocumentSource::GetNextResult> _mockResults; }; |