summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorIan Boros <puppyofkosh@gmail.com>2019-05-29 19:31:05 -0400
committerIan Boros <puppyofkosh@gmail.com>2019-06-07 13:15:21 -0400
commitab322d38ef3d5c3836df516f2a40c54c5165cafd (patch)
tree1a1d77ad40275d3160b1701a2984745965f9da45
parentd5a46532dd80c4255982308cb933cf78efb21a7f (diff)
downloadmongo-ab322d38ef3d5c3836df516f2a40c54c5165cafd.tar.gz
SERVER-41294 shard filtering for $sb
-rw-r--r--src/mongo/db/SConscript1
-rw-r--r--src/mongo/db/exec/shard_filter.cpp48
-rw-r--r--src/mongo/db/exec/shard_filter.h12
-rw-r--r--src/mongo/db/exec/shard_filterer.h51
-rw-r--r--src/mongo/db/exec/shard_filterer_impl.cpp61
-rw-r--r--src/mongo/db/exec/shard_filterer_impl.h57
-rw-r--r--src/mongo/db/pipeline/SConscript2
-rw-r--r--src/mongo/db/pipeline/document_source.cpp11
-rw-r--r--src/mongo/db/pipeline/document_source.h5
-rw-r--r--src/mongo/db/pipeline/document_source_cursor.cpp4
-rw-r--r--src/mongo/db/pipeline/document_source_graph_lookup.cpp4
-rw-r--r--src/mongo/db/pipeline/document_source_internal_shard_filter.cpp104
-rw-r--r--src/mongo/db/pipeline/document_source_internal_shard_filter.h82
-rw-r--r--src/mongo/db/pipeline/document_source_internal_shard_filter_test.cpp184
-rw-r--r--src/mongo/db/pipeline/document_source_limit.cpp4
-rw-r--r--src/mongo/db/pipeline/document_source_lookup.cpp4
-rw-r--r--src/mongo/db/pipeline/document_source_match.cpp4
-rw-r--r--src/mongo/db/pipeline/document_source_redact.cpp4
-rw-r--r--src/mongo/db/pipeline/document_source_single_document_transformation.cpp5
-rw-r--r--src/mongo/db/pipeline/document_source_skip.cpp4
-rw-r--r--src/mongo/db/pipeline/mongo_process_interface.h7
-rw-r--r--src/mongo/db/pipeline/mongos_process_interface.h6
-rw-r--r--src/mongo/db/pipeline/process_interface_shardsvr.cpp8
-rw-r--r--src/mongo/db/pipeline/process_interface_shardsvr.h3
-rw-r--r--src/mongo/db/pipeline/process_interface_standalone.h7
-rw-r--r--src/mongo/db/pipeline/stub_mongo_process_interface.h5
-rw-r--r--src/mongo/db/pipeline/stub_mongo_process_interface_lookup_single_document.h26
27 files changed, 670 insertions, 43 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript
index c8874d3f659..f80bd040b8f 100644
--- a/src/mongo/db/SConscript
+++ b/src/mongo/db/SConscript
@@ -1226,6 +1226,7 @@ env.Library(
'exec/requires_collection_stage.cpp',
'exec/requires_index_stage.cpp',
'exec/shard_filter.cpp',
+ 'exec/shard_filterer_impl.cpp',
'exec/skip.cpp',
'exec/sort.cpp',
'exec/sort_key_generator.cpp',
diff --git a/src/mongo/db/exec/shard_filter.cpp b/src/mongo/db/exec/shard_filter.cpp
index 51fb723c536..179b4a745f6 100644
--- a/src/mongo/db/exec/shard_filter.cpp
+++ b/src/mongo/db/exec/shard_filter.cpp
@@ -54,7 +54,7 @@ ShardFilterStage::ShardFilterStage(OperationContext* opCtx,
ScopedCollectionMetadata metadata,
WorkingSet* ws,
PlanStage* child)
- : PlanStage(kStageType, opCtx), _ws(ws), _metadata(std::move(metadata)) {
+ : PlanStage(kStageType, opCtx), _ws(ws), _shardFilterer(std::move(metadata)) {
_children.emplace_back(child);
}
@@ -76,37 +76,29 @@ PlanStage::StageState ShardFilterStage::doWork(WorkingSetID* out) {
// If we're sharded make sure that we don't return data that is not owned by us,
// including pending documents from in-progress migrations and orphaned documents from
// aborted migrations
- if (_metadata->isSharded()) {
- ShardKeyPattern shardKeyPattern(_metadata->getKeyPattern());
+ if (_shardFilterer.isCollectionSharded()) {
WorkingSetMember* member = _ws->get(*out);
WorkingSetMatchableDocument matchable(member);
- BSONObj shardKey = shardKeyPattern.extractShardKeyFromMatchable(matchable);
-
- if (shardKey.isEmpty()) {
- // We can't find a shard key for this document - this should never happen with
- // a non-fetched result unless our query planning is screwed up
- if (!member->hasObj()) {
- Status status(ErrorCodes::InternalError,
- "shard key not found after a covered stage, "
- "query planning has failed");
-
- // Fail loudly and cleanly in production, fatally in debug
- error() << redact(status);
- dassert(false);
-
- _ws->free(*out);
- *out = WorkingSetCommon::allocateStatusMember(_ws, status);
- return PlanStage::FAILURE;
- }
- // Skip this document with a warning - no shard key should not be possible
- // unless manually inserting data into a shard
- warning() << "no shard key found in document " << redact(member->obj.value()) << " "
- << "for shard key pattern " << _metadata->getKeyPattern() << ", "
- << "document may have been inserted manually into shard";
- }
+ ShardFilterer::DocumentBelongsResult res =
+ _shardFilterer.documentBelongsToMe(matchable);
+
+ if (res != ShardFilterer::DocumentBelongsResult::kBelongs) {
+ if (res == ShardFilterer::DocumentBelongsResult::kNoShardKey) {
+ // We can't find a shard key for this working set member - this should never
+ // happen with a non-fetched result unless our query planning is screwed up
+ invariant(member->hasObj());
+
+ // Skip this working set member with a warning - no shard key should not be
+ // possible unless manually inserting data into a shard
+ warning() << "no shard key found in document " << redact(member->obj.value())
+ << " for shard key pattern " << _shardFilterer.getKeyPattern() << ", "
+ << "document may have been inserted manually into shard";
+ } else {
+ invariant(res == ShardFilterer::DocumentBelongsResult::kDoesNotBelong);
+ }
- if (!_metadata->keyBelongsToMe(shardKey)) {
+ // If the document had no shard key, or doesn't belong to us, skip it.
_ws->free(*out);
++_specificStats.chunkSkips;
return PlanStage::NEED_TIME;
diff --git a/src/mongo/db/exec/shard_filter.h b/src/mongo/db/exec/shard_filter.h
index 584ee9ed9c3..d8284ad4e6f 100644
--- a/src/mongo/db/exec/shard_filter.h
+++ b/src/mongo/db/exec/shard_filter.h
@@ -30,7 +30,7 @@
#pragma once
#include "mongo/db/exec/plan_stage.h"
-#include "mongo/db/s/scoped_collection_metadata.h"
+#include "mongo/db/exec/shard_filterer_impl.h"
namespace mongo {
@@ -96,9 +96,13 @@ private:
// Stats
ShardingFilterStats _specificStats;
- // Note: it is important that this is the metadata from the time this stage is constructed.
- // See class comment for details.
- ScopedCollectionMetadata _metadata;
+ // Note: it is important that this owns the ScopedCollectionMetadata from the time this stage
+ // is constructed. See ScopedCollectionMetadata class comment and MetadataManager comment for
+ // details. The existence of the ScopedCollectionMetadata prevents data which may have been
+ // migrated from being deleted while the query is still active. If we didn't hold one
+ // ScopedCollectionMetadata for the entire query, it'd be possible for data which the query
+ // needs to read to be deleted while it's still running.
+ ShardFiltererImpl _shardFilterer;
};
} // namespace mongo
diff --git a/src/mongo/db/exec/shard_filterer.h b/src/mongo/db/exec/shard_filterer.h
new file mode 100644
index 00000000000..4715e7333ad
--- /dev/null
+++ b/src/mongo/db/exec/shard_filterer.h
@@ -0,0 +1,51 @@
+/**
+ * Copyright (C) 2018-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/matcher/matchable.h"
+#include "mongo/db/operation_context.h"
+#include "mongo/s/shard_key_pattern.h"
+
+namespace mongo {
+
+/**
+ * Interface for doing shard filtering, to be used by both the find and agg execution trees.
+ */
+class ShardFilterer {
+public:
+ enum class DocumentBelongsResult { kDoesNotBelong, kBelongs, kNoShardKey };
+
+ virtual ~ShardFilterer() = default;
+
+ virtual DocumentBelongsResult documentBelongsToMe(const MatchableDocument&) const = 0;
+ virtual bool isCollectionSharded() const = 0;
+ virtual const KeyPattern& getKeyPattern() const = 0;
+};
+} // namespace mongo
diff --git a/src/mongo/db/exec/shard_filterer_impl.cpp b/src/mongo/db/exec/shard_filterer_impl.cpp
new file mode 100644
index 00000000000..c9e25dfec10
--- /dev/null
+++ b/src/mongo/db/exec/shard_filterer_impl.cpp
@@ -0,0 +1,61 @@
+/**
+ * Copyright (C) 2018-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/exec/shard_filterer_impl.h"
+
+#include "mongo/db/matcher/matchable.h"
+#include "mongo/db/s/scoped_collection_metadata.h"
+
+namespace mongo {
+
+ShardFiltererImpl::ShardFiltererImpl(ScopedCollectionMetadata md) : _metadata(std::move(md)) {
+ if (_metadata->isSharded()) {
+ _keyPattern = ShardKeyPattern(_metadata->getKeyPattern());
+ }
+}
+
+ShardFilterer::DocumentBelongsResult ShardFiltererImpl::documentBelongsToMe(
+ const MatchableDocument& doc) const {
+ if (!_metadata->isSharded()) {
+ return DocumentBelongsResult::kBelongs;
+ }
+
+ BSONObj shardKey = _keyPattern->extractShardKeyFromMatchable(doc);
+
+ if (shardKey.isEmpty()) {
+ return DocumentBelongsResult::kNoShardKey;
+ }
+
+ return _metadata->keyBelongsToMe(shardKey) ? DocumentBelongsResult::kBelongs
+ : DocumentBelongsResult::kDoesNotBelong;
+}
+
+} // namespace mongo
diff --git a/src/mongo/db/exec/shard_filterer_impl.h b/src/mongo/db/exec/shard_filterer_impl.h
new file mode 100644
index 00000000000..d729bb96b18
--- /dev/null
+++ b/src/mongo/db/exec/shard_filterer_impl.h
@@ -0,0 +1,57 @@
+/**
+ * Copyright (C) 2019-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/exec/shard_filterer.h"
+#include "mongo/db/matcher/matchable.h"
+#include "mongo/db/s/scoped_collection_metadata.h"
+
+namespace mongo {
+
+class ShardFiltererImpl : public ShardFilterer {
+public:
+ ShardFiltererImpl(ScopedCollectionMetadata md);
+
+ DocumentBelongsResult documentBelongsToMe(const MatchableDocument& doc) const override;
+
+ bool isCollectionSharded() const override {
+ return _metadata->isSharded();
+ }
+
+ const KeyPattern& getKeyPattern() const override {
+ invariant(_keyPattern);
+ return _keyPattern->getKeyPattern();
+ }
+
+private:
+ ScopedCollectionMetadata _metadata;
+ boost::optional<ShardKeyPattern> _keyPattern;
+};
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript
index 31bf3265ef7..f425c77be64 100644
--- a/src/mongo/db/pipeline/SConscript
+++ b/src/mongo/db/pipeline/SConscript
@@ -237,6 +237,7 @@ env.CppUnitTest(
'document_source_redact_test.cpp',
'document_source_replace_root_test.cpp',
'document_source_sample_test.cpp',
+ 'document_source_internal_shard_filter_test.cpp',
'document_source_skip_test.cpp',
'document_source_sort_by_count_test.cpp',
'document_source_sort_test.cpp',
@@ -389,6 +390,7 @@ pipelineeEnv.Library(
'document_source_group.cpp',
'document_source_index_stats.cpp',
'document_source_internal_inhibit_optimization.cpp',
+ 'document_source_internal_shard_filter.cpp',
'document_source_internal_split_pipeline.cpp',
'document_source_limit.cpp',
'document_source_list_cached_and_active_users.cpp',
diff --git a/src/mongo/db/pipeline/document_source.cpp b/src/mongo/db/pipeline/document_source.cpp
index 25cf5605a3b..158e97b33fa 100644
--- a/src/mongo/db/pipeline/document_source.cpp
+++ b/src/mongo/db/pipeline/document_source.cpp
@@ -32,6 +32,7 @@
#include "mongo/db/pipeline/document_source.h"
#include "mongo/db/matcher/expression_algo.h"
+#include "mongo/db/pipeline/document_source_internal_shard_filter.h"
#include "mongo/db/pipeline/document_source_match.h"
#include "mongo/db/pipeline/document_source_sample.h"
#include "mongo/db/pipeline/document_source_sequential_document_cache.h"
@@ -255,15 +256,9 @@ Pipeline::SourceContainer::iterator DocumentSource::optimizeAt(
Pipeline::SourceContainer::iterator itr, Pipeline::SourceContainer* container) {
invariant(*itr == this);
- // If we are at the end of the pipeline, only optimize in the special case of a cache stage.
- if (std::next(itr) == container->end()) {
- return dynamic_cast<DocumentSourceSequentialDocumentCache*>(this)
- ? doOptimizeAt(itr, container)
- : container->end();
- }
-
// Attempt to swap 'itr' with a subsequent $match or subsequent $sample.
- if (pushMatchBefore(itr, container) || pushSampleBefore(itr, container)) {
+ if (std::next(itr) != container->end() &&
+ (pushMatchBefore(itr, container) || pushSampleBefore(itr, container))) {
// The stage before the pushed before stage may be able to optimize further, if there is
// such a stage.
return std::prev(itr) == container->begin() ? std::prev(itr) : std::prev(std::prev(itr));
diff --git a/src/mongo/db/pipeline/document_source.h b/src/mongo/db/pipeline/document_source.h
index 05397303115..84d724231fc 100644
--- a/src/mongo/db/pipeline/document_source.h
+++ b/src/mongo/db/pipeline/document_source.h
@@ -482,11 +482,10 @@ protected:
/**
* Attempt to perform an optimization with the following source in the pipeline. 'container'
- * refers to the entire pipeline, and 'itr' points to this stage within the pipeline. The caller
- * must guarantee that std::next(itr) != container->end().
+ * refers to the entire pipeline, and 'itr' points to this stage within the pipeline.
*
* The return value is an iterator over the same container which points to the first location
- * in the container at which an optimization may be possible.
+ * in the container at which an optimization may be possible, or the end of the container().
*
* For example, if a swap takes place, the returned iterator should just be the position
* directly preceding 'itr', if such a position exists, since the stage at that position may be
diff --git a/src/mongo/db/pipeline/document_source_cursor.cpp b/src/mongo/db/pipeline/document_source_cursor.cpp
index 0015d915acc..1d971d188a8 100644
--- a/src/mongo/db/pipeline/document_source_cursor.cpp
+++ b/src/mongo/db/pipeline/document_source_cursor.cpp
@@ -175,6 +175,10 @@ Pipeline::SourceContainer::iterator DocumentSourceCursor::doOptimizeAt(
Pipeline::SourceContainer::iterator itr, Pipeline::SourceContainer* container) {
invariant(*itr == this);
+ if (std::next(itr) == container->end()) {
+ return container->end();
+ }
+
auto nextLimit = dynamic_cast<DocumentSourceLimit*>((*std::next(itr)).get());
if (nextLimit) {
diff --git a/src/mongo/db/pipeline/document_source_graph_lookup.cpp b/src/mongo/db/pipeline/document_source_graph_lookup.cpp
index 007718f0e13..b2f349f1829 100644
--- a/src/mongo/db/pipeline/document_source_graph_lookup.cpp
+++ b/src/mongo/db/pipeline/document_source_graph_lookup.cpp
@@ -365,6 +365,10 @@ Pipeline::SourceContainer::iterator DocumentSourceGraphLookUp::doOptimizeAt(
Pipeline::SourceContainer::iterator itr, Pipeline::SourceContainer* container) {
invariant(*itr == this);
+ if (std::next(itr) == container->end()) {
+ return container->end();
+ }
+
// If we are not already handling an $unwind stage internally, we can combine with the following
// $unwind stage.
auto nextUnwind = dynamic_cast<DocumentSourceUnwind*>((*std::next(itr)).get());
diff --git a/src/mongo/db/pipeline/document_source_internal_shard_filter.cpp b/src/mongo/db/pipeline/document_source_internal_shard_filter.cpp
new file mode 100644
index 00000000000..a548cb6366c
--- /dev/null
+++ b/src/mongo/db/pipeline/document_source_internal_shard_filter.cpp
@@ -0,0 +1,104 @@
+/**
+ * Copyright (C) 2019-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kQuery
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/pipeline/document_source_internal_shard_filter.h"
+
+#include "mongo/db/pipeline/document.h"
+#include "mongo/util/log.h"
+
+namespace mongo {
+
+//
+// This DocumentSource is not registered and can only be created as part of expansions for other
+// DocumentSources.
+//
+
+DocumentSourceInternalShardFilter::DocumentSourceInternalShardFilter(
+ const boost::intrusive_ptr<ExpressionContext>& pExpCtx,
+ std::unique_ptr<ShardFilterer> shardFilterer)
+ : DocumentSource(pExpCtx), _shardFilterer(std::move(shardFilterer)) {}
+
+DocumentSource::GetNextResult DocumentSourceInternalShardFilter::getNext() {
+ pExpCtx->checkForInterrupt();
+
+ auto next = pSource->getNext();
+ invariant(_shardFilterer);
+ for (; next.isAdvanced(); next = pSource->getNext()) {
+ BSONMatchableDocument matchable(next.getDocument().toBson());
+ const auto belongsRes = _shardFilterer->documentBelongsToMe(matchable);
+ if (belongsRes == ShardFilterer::DocumentBelongsResult::kBelongs) {
+ return next;
+ }
+
+ if (belongsRes == ShardFilterer::DocumentBelongsResult::kNoShardKey) {
+ warning() << "no shard key found in document " << redact(next.getDocument().toBson())
+ << " "
+ << "for shard key pattern " << _shardFilterer->getKeyPattern() << ", "
+ << "document may have been inserted manually into shard";
+ }
+
+ // For performance reasons, a streaming stage must not keep references to documents across
+ // calls to getNext(). Such stages must retrieve a result from their child and then release
+ // it (or return it) before asking for another result. Failing to do so can result in extra
+ // work, since the Document/Value library must copy data on write when that data has a
+ // refcount above one.
+ next.releaseDocument();
+ }
+ return next;
+}
+
+Pipeline::SourceContainer::iterator DocumentSourceInternalShardFilter::doOptimizeAt(
+ Pipeline::SourceContainer::iterator itr, Pipeline::SourceContainer* container) {
+ invariant(*itr == this);
+
+ if (_shardFilterer->isCollectionSharded()) {
+ return std::next(itr);
+ }
+
+ if (itr == container->begin()) {
+ // Delete this stage from the pipeline if the operation does not require shard versioning.
+ container->erase(itr);
+ return container->begin();
+ }
+
+ auto ret = std::prev(itr);
+ container->erase(itr);
+ return ret;
+}
+
+Value DocumentSourceInternalShardFilter::serialize(
+ boost::optional<ExplainOptions::Verbosity> explain) const {
+ return Value(DOC(getSourceName() << Document()));
+}
+
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_internal_shard_filter.h b/src/mongo/db/pipeline/document_source_internal_shard_filter.h
new file mode 100644
index 00000000000..8fbfe6b07f1
--- /dev/null
+++ b/src/mongo/db/pipeline/document_source_internal_shard_filter.h
@@ -0,0 +1,82 @@
+/**
+ * Copyright (C) 2019-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/exec/shard_filterer.h"
+#include "mongo/db/pipeline/document_source.h"
+
+namespace mongo {
+
+/**
+ * Filters out documents which are physically present on this shard but not logically owned
+ * according to this operation's shard version.
+ */
+class DocumentSourceInternalShardFilter final : public DocumentSource {
+public:
+ static constexpr StringData kStageName = "$_internalShardFilter"_sd;
+
+ static boost::intrusive_ptr<DocumentSource> createFromBson(
+ BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
+
+ DocumentSourceInternalShardFilter(const boost::intrusive_ptr<ExpressionContext>& pExpCtx,
+ std::unique_ptr<ShardFilterer> shardFilterer);
+
+ const char* getSourceName() const override {
+ return kStageName.rawData();
+ }
+
+ StageConstraints constraints(Pipeline::SplitState pipeState) const override {
+ return StageConstraints(StreamType::kStreaming,
+ PositionRequirement::kNone,
+ HostTypeRequirement::kAnyShard,
+ DiskUseRequirement::kNoDiskUse,
+ FacetRequirement::kNotAllowed,
+ TransactionRequirement::kNotAllowed,
+ LookupRequirement::kNotAllowed,
+ ChangeStreamRequirement::kBlacklist);
+ }
+
+ GetNextResult getNext() override;
+
+ Value serialize(
+ boost::optional<ExplainOptions::Verbosity> explain = boost::none) const override;
+
+ boost::optional<DistributedPlanLogic> distributedPlanLogic() override {
+ return boost::none;
+ }
+
+ Pipeline::SourceContainer::iterator doOptimizeAt(Pipeline::SourceContainer::iterator itr,
+ Pipeline::SourceContainer* container) override;
+
+private:
+ std::unique_ptr<ShardFilterer> _shardFilterer;
+};
+
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_internal_shard_filter_test.cpp b/src/mongo/db/pipeline/document_source_internal_shard_filter_test.cpp
new file mode 100644
index 00000000000..43eda00d9af
--- /dev/null
+++ b/src/mongo/db/pipeline/document_source_internal_shard_filter_test.cpp
@@ -0,0 +1,184 @@
+/**
+ * Copyright (C) 2019-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/bson/bsonmisc.h"
+#include "mongo/bson/bsonobj.h"
+#include "mongo/db/exec/shard_filterer.h"
+#include "mongo/db/pipeline/aggregation_context_fixture.h"
+#include "mongo/db/pipeline/document_source_internal_shard_filter.h"
+#include "mongo/db/pipeline/document_source_mock.h"
+#include "mongo/db/pipeline/document_value_test_util.h"
+#include "mongo/db/pipeline/pipeline.h"
+#include "mongo/unittest/unittest.h"
+
+namespace mongo {
+namespace {
+
+// This provides access to getExpCtx(), but we'll use a different name for this test suite.
+using DocumentSourceInternalShardFilterTest = AggregationContextFixture;
+
+/**
+ * ShardFilterer with default implementations to be used/extended by other tests.
+ */
+class ShardFiltererBaseForTest : public ShardFilterer {
+public:
+ DocumentBelongsResult documentBelongsToMe(const MatchableDocument& doc) const override {
+ MONGO_UNREACHABLE;
+ }
+
+ bool isCollectionSharded() const override {
+ MONGO_UNREACHABLE;
+ }
+
+ const KeyPattern& getKeyPattern() const override {
+ MONGO_UNREACHABLE;
+ }
+};
+
+/**
+ * ShardFilterer which indicates that the collection isn't sharded.
+ */
+class UnshardedShardFilterer : public ShardFiltererBaseForTest {
+public:
+ bool isCollectionSharded() const override {
+ return false;
+ }
+};
+
+TEST_F(DocumentSourceInternalShardFilterTest, ShouldOptimizeAwayIfUnshardedCollection) {
+ Pipeline::SourceContainer container;
+ auto mock = DocumentSourceMock::createForTest({"{a: 1}", "{a: 2}"});
+
+ container.push_back(mock);
+ container.push_back(new DocumentSourceInternalShardFilter(
+ getExpCtx(), std::make_unique<UnshardedShardFilterer>()));
+
+ // Make 'it' point to the the shard filter source.
+ auto it = container.begin();
+ ++it;
+
+ // The shard filter should remove itself.
+ container.back()->optimizeAt(it, &container);
+ ASSERT_EQUALS(1U, container.size());
+}
+
+TEST_F(DocumentSourceInternalShardFilterTest,
+ ShouldOptimizeAwayIfUnshardedCollectionAndFirstInPipeline) {
+ Pipeline::SourceContainer container;
+
+ container.push_back(new DocumentSourceInternalShardFilter(
+ getExpCtx(), std::make_unique<UnshardedShardFilterer>()));
+
+ // Make 'it' point to the the shard filter source.
+ auto it = container.begin();
+
+ // The shard filter should remove itself.
+ container.back()->optimizeAt(it, &container);
+ ASSERT_EQUALS(0U, container.size());
+}
+
+/**
+ * Indicate that the first 'n' documents passed through documentBelongsToMe() don't belong.
+ */
+class FirstNShardFilterer : public ShardFiltererBaseForTest {
+public:
+ FirstNShardFilterer(unsigned int n) : _numToFilter(n) {}
+
+ DocumentBelongsResult documentBelongsToMe(const MatchableDocument& doc) const override {
+ return i++ >= _numToFilter ? DocumentBelongsResult::kBelongs
+ : DocumentBelongsResult::kDoesNotBelong;
+ }
+
+ bool isCollectionSharded() const override {
+ return true;
+ }
+
+private:
+ unsigned int _numToFilter;
+
+ // mutable because documentBelongsToMe() is marked const.
+ mutable unsigned int i = 0;
+};
+
+TEST_F(DocumentSourceInternalShardFilterTest, FiltersDocuments) {
+ Pipeline::SourceContainer container;
+ auto mock = DocumentSourceMock::createForTest({"{a: 1}", "{a: 2}", "{a: 3}"});
+
+ const auto nToFilter = 2;
+ DocumentSourceInternalShardFilter filter(getExpCtx(),
+ std::make_unique<FirstNShardFilterer>(nToFilter));
+ filter.setSource(mock.get());
+
+ // The first two documents should get filtered out.
+ auto next = filter.getNext();
+ ASSERT_TRUE(next.isAdvanced());
+ ASSERT_VALUE_EQ(Value(3), next.getDocument().getField("a"));
+
+ ASSERT_TRUE(filter.getNext().isEOF());
+}
+
+/**
+ * Indicate that documents don't have a shard key.
+ */
+class ShardFiltererNoShardKey : public ShardFiltererBaseForTest {
+public:
+ ShardFiltererNoShardKey() : _kp(BSON("b" << 1)) {}
+
+ DocumentBelongsResult documentBelongsToMe(const MatchableDocument& doc) const override {
+ return DocumentBelongsResult::kNoShardKey;
+ }
+
+ bool isCollectionSharded() const override {
+ return true;
+ }
+
+ const KeyPattern& getKeyPattern() const override {
+ return _kp;
+ }
+
+private:
+ KeyPattern _kp;
+};
+
+TEST_F(DocumentSourceInternalShardFilterTest, SkipDocumentsWithoutShardKey) {
+ Pipeline::SourceContainer container;
+ auto mock = DocumentSourceMock::createForTest({"{a: 1}", "{a: 2}", "{a: 3}"});
+
+ DocumentSourceInternalShardFilter filter(getExpCtx(),
+ std::make_unique<ShardFiltererNoShardKey>());
+ filter.setSource(mock.get());
+
+ // The call to getNext() return nothing.
+ ASSERT_TRUE(filter.getNext().isEOF());
+}
+
+} // namespace
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_limit.cpp b/src/mongo/db/pipeline/document_source_limit.cpp
index bcc10f4f892..f8b2ca3c0aa 100644
--- a/src/mongo/db/pipeline/document_source_limit.cpp
+++ b/src/mongo/db/pipeline/document_source_limit.cpp
@@ -56,6 +56,10 @@ Pipeline::SourceContainer::iterator DocumentSourceLimit::doOptimizeAt(
Pipeline::SourceContainer::iterator itr, Pipeline::SourceContainer* container) {
invariant(*itr == this);
+ if (std::next(itr) == container->end()) {
+ return container->end();
+ }
+
auto nextLimit = dynamic_cast<DocumentSourceLimit*>((*std::next(itr)).get());
if (nextLimit) {
diff --git a/src/mongo/db/pipeline/document_source_lookup.cpp b/src/mongo/db/pipeline/document_source_lookup.cpp
index 52c3b6b40b4..5a1d7767e5f 100644
--- a/src/mongo/db/pipeline/document_source_lookup.cpp
+++ b/src/mongo/db/pipeline/document_source_lookup.cpp
@@ -338,6 +338,10 @@ Pipeline::SourceContainer::iterator DocumentSourceLookUp::doOptimizeAt(
Pipeline::SourceContainer::iterator itr, Pipeline::SourceContainer* container) {
invariant(*itr == this);
+ if (std::next(itr) == container->end()) {
+ return container->end();
+ }
+
auto nextUnwind = dynamic_cast<DocumentSourceUnwind*>((*std::next(itr)).get());
// If we are not already handling an $unwind stage internally, we can combine with the
diff --git a/src/mongo/db/pipeline/document_source_match.cpp b/src/mongo/db/pipeline/document_source_match.cpp
index 26b8eaa14db..1900a644627 100644
--- a/src/mongo/db/pipeline/document_source_match.cpp
+++ b/src/mongo/db/pipeline/document_source_match.cpp
@@ -113,6 +113,10 @@ Pipeline::SourceContainer::iterator DocumentSourceMatch::doOptimizeAt(
Pipeline::SourceContainer::iterator itr, Pipeline::SourceContainer* container) {
invariant(*itr == this);
+ if (std::next(itr) == container->end()) {
+ return container->end();
+ }
+
auto nextMatch = dynamic_cast<DocumentSourceMatch*>((*std::next(itr)).get());
// Since a text search must use an index, it must be the first stage in the pipeline. We cannot
diff --git a/src/mongo/db/pipeline/document_source_redact.cpp b/src/mongo/db/pipeline/document_source_redact.cpp
index 03bc48e467d..8c48726aa2f 100644
--- a/src/mongo/db/pipeline/document_source_redact.cpp
+++ b/src/mongo/db/pipeline/document_source_redact.cpp
@@ -80,6 +80,10 @@ Pipeline::SourceContainer::iterator DocumentSourceRedact::doOptimizeAt(
Pipeline::SourceContainer::iterator itr, Pipeline::SourceContainer* container) {
invariant(*itr == this);
+ if (std::next(itr) == container->end()) {
+ return container->end();
+ }
+
auto nextMatch = dynamic_cast<DocumentSourceMatch*>((*std::next(itr)).get());
if (nextMatch) {
diff --git a/src/mongo/db/pipeline/document_source_single_document_transformation.cpp b/src/mongo/db/pipeline/document_source_single_document_transformation.cpp
index a0dc8fbfcf5..4575e9fa02b 100644
--- a/src/mongo/db/pipeline/document_source_single_document_transformation.cpp
+++ b/src/mongo/db/pipeline/document_source_single_document_transformation.cpp
@@ -93,6 +93,11 @@ Value DocumentSourceSingleDocumentTransformation::serialize(
Pipeline::SourceContainer::iterator DocumentSourceSingleDocumentTransformation::doOptimizeAt(
Pipeline::SourceContainer::iterator itr, Pipeline::SourceContainer* container) {
invariant(*itr == this);
+
+ if (std::next(itr) == container->end()) {
+ return container->end();
+ }
+
auto nextSkip = dynamic_cast<DocumentSourceSkip*>((*std::next(itr)).get());
if (nextSkip) {
diff --git a/src/mongo/db/pipeline/document_source_skip.cpp b/src/mongo/db/pipeline/document_source_skip.cpp
index 364f148162e..2eead90aa3f 100644
--- a/src/mongo/db/pipeline/document_source_skip.cpp
+++ b/src/mongo/db/pipeline/document_source_skip.cpp
@@ -84,6 +84,10 @@ Pipeline::SourceContainer::iterator DocumentSourceSkip::doOptimizeAt(
Pipeline::SourceContainer::iterator itr, Pipeline::SourceContainer* container) {
invariant(*itr == this);
+ if (std::next(itr) == container->end()) {
+ return container->end();
+ }
+
auto nextSkip = dynamic_cast<DocumentSourceSkip*>((*std::next(itr)).get());
if (nextSkip) {
// '_nToSkip' can potentially overflow causing it to become negative and skip nothing.
diff --git a/src/mongo/db/pipeline/mongo_process_interface.h b/src/mongo/db/pipeline/mongo_process_interface.h
index ac87792aa93..ae7e707e3e9 100644
--- a/src/mongo/db/pipeline/mongo_process_interface.h
+++ b/src/mongo/db/pipeline/mongo_process_interface.h
@@ -56,6 +56,7 @@
namespace mongo {
+class ShardFilterer;
class ExpressionContext;
class Pipeline;
class PipelineDeleter;
@@ -260,6 +261,12 @@ public:
const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) = 0;
/**
+ * Produces a ShardFilterer. May return null.
+ */
+ virtual std::unique_ptr<ShardFilterer> getShardFilterer(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx) const = 0;
+
+ /**
* Returns a vector of owned BSONObjs, each of which contains details of an in-progress
* operation or, optionally, an idle connection. If userMode is kIncludeAllUsers, report
* operations for all authenticated users; otherwise, report only the current user's operations.
diff --git a/src/mongo/db/pipeline/mongos_process_interface.h b/src/mongo/db/pipeline/mongos_process_interface.h
index 57babdf88e7..39c69b2daf7 100644
--- a/src/mongo/db/pipeline/mongos_process_interface.h
+++ b/src/mongo/db/pipeline/mongos_process_interface.h
@@ -29,6 +29,7 @@
#pragma once
+#include "mongo/db/exec/shard_filterer.h"
#include "mongo/db/pipeline/mongo_process_common.h"
#include "mongo/db/pipeline/pipeline.h"
#include "mongo/s/async_requests_sender.h"
@@ -164,6 +165,11 @@ public:
MONGO_UNREACHABLE;
}
+ std::unique_ptr<ShardFilterer> getShardFilterer(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx) const override {
+ return nullptr;
+ }
+
std::string getShardName(OperationContext* opCtx) const final {
MONGO_UNREACHABLE;
}
diff --git a/src/mongo/db/pipeline/process_interface_shardsvr.cpp b/src/mongo/db/pipeline/process_interface_shardsvr.cpp
index d523bdf5d14..c2672dcf2a0 100644
--- a/src/mongo/db/pipeline/process_interface_shardsvr.cpp
+++ b/src/mongo/db/pipeline/process_interface_shardsvr.cpp
@@ -41,8 +41,10 @@
#include "mongo/db/concurrency/d_concurrency.h"
#include "mongo/db/curop.h"
#include "mongo/db/db_raii.h"
+#include "mongo/db/exec/shard_filterer_impl.h"
#include "mongo/db/ops/write_ops_exec.h"
#include "mongo/db/ops/write_ops_gen.h"
+#include "mongo/db/pipeline/document_source_internal_shard_filter.h"
#include "mongo/db/pipeline/sharded_agg_helpers.h"
#include "mongo/db/query/query_knobs_gen.h"
#include "mongo/db/s/collection_sharding_state.h"
@@ -202,4 +204,10 @@ unique_ptr<Pipeline, PipelineDeleter> MongoInterfaceShardServer::attachCursorSou
return attachCursorSourceToPipelineForLocalRead(expCtx, pipeline.release());
}
+std::unique_ptr<ShardFilterer> MongoInterfaceShardServer::getShardFilterer(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx) const {
+ return std::make_unique<ShardFiltererImpl>(
+ CollectionShardingState::get(expCtx->opCtx, expCtx->ns)->getOrphansFilter(expCtx->opCtx));
+}
+
} // namespace mongo
diff --git a/src/mongo/db/pipeline/process_interface_shardsvr.h b/src/mongo/db/pipeline/process_interface_shardsvr.h
index f54a9a93086..4dbc21b6ca2 100644
--- a/src/mongo/db/pipeline/process_interface_shardsvr.h
+++ b/src/mongo/db/pipeline/process_interface_shardsvr.h
@@ -84,6 +84,9 @@ public:
std::unique_ptr<Pipeline, PipelineDeleter> attachCursorSourceToPipeline(
const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) final;
+
+ std::unique_ptr<ShardFilterer> getShardFilterer(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx) const override final;
};
} // namespace mongo
diff --git a/src/mongo/db/pipeline/process_interface_standalone.h b/src/mongo/db/pipeline/process_interface_standalone.h
index 7e6d53e1bd5..5ba8ea4f21b 100644
--- a/src/mongo/db/pipeline/process_interface_standalone.h
+++ b/src/mongo/db/pipeline/process_interface_standalone.h
@@ -30,6 +30,7 @@
#pragma once
#include "mongo/db/dbdirectclient.h"
+#include "mongo/db/exec/shard_filterer.h"
#include "mongo/db/ops/write_ops_exec.h"
#include "mongo/db/ops/write_ops_gen.h"
#include "mongo/db/pipeline/mongo_process_common.h"
@@ -97,6 +98,12 @@ public:
std::unique_ptr<Pipeline, PipelineDeleter> attachCursorSourceToPipelineForLocalRead(
const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) override;
std::string getShardName(OperationContext* opCtx) const final;
+
+ std::unique_ptr<ShardFilterer> getShardFilterer(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx) const override {
+ // We'll never do shard filtering on a standalone.
+ return nullptr;
+ }
std::pair<std::vector<FieldPath>, bool> collectDocumentKeyFieldsForHostedCollection(
OperationContext* opCtx, const NamespaceString&, UUID) const override;
std::vector<FieldPath> collectDocumentKeyFieldsActingAsRouter(
diff --git a/src/mongo/db/pipeline/stub_mongo_process_interface.h b/src/mongo/db/pipeline/stub_mongo_process_interface.h
index fe408fdf2ab..a8a4a02e1e4 100644
--- a/src/mongo/db/pipeline/stub_mongo_process_interface.h
+++ b/src/mongo/db/pipeline/stub_mongo_process_interface.h
@@ -136,6 +136,11 @@ public:
MONGO_UNREACHABLE;
}
+ std::unique_ptr<ShardFilterer> getShardFilterer(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx) const override {
+ MONGO_UNREACHABLE;
+ }
+
std::vector<BSONObj> getCurrentOps(const boost::intrusive_ptr<ExpressionContext>& expCtx,
CurrentOpConnectionsMode connMode,
CurrentOpSessionsMode sessionMode,
diff --git a/src/mongo/db/pipeline/stub_mongo_process_interface_lookup_single_document.h b/src/mongo/db/pipeline/stub_mongo_process_interface_lookup_single_document.h
index 4b3bcbaafdc..36d9dd81cbc 100644
--- a/src/mongo/db/pipeline/stub_mongo_process_interface_lookup_single_document.h
+++ b/src/mongo/db/pipeline/stub_mongo_process_interface_lookup_single_document.h
@@ -33,6 +33,7 @@
#include <deque>
#include <vector>
+#include "mongo/db/exec/shard_filterer.h"
#include "mongo/db/pipeline/document.h"
#include "mongo/db/pipeline/document_source.h"
#include "mongo/db/pipeline/pipeline.h"
@@ -40,6 +41,21 @@
namespace mongo {
+class StubShardFilterer : public ShardFilterer {
+public:
+ DocumentBelongsResult documentBelongsToMe(const MatchableDocument& doc) const override {
+ MONGO_UNREACHABLE;
+ }
+
+ bool isCollectionSharded() const override {
+ MONGO_UNREACHABLE;
+ }
+
+ const KeyPattern& getKeyPattern() const override {
+ MONGO_UNREACHABLE;
+ }
+};
+
/**
* A mock MongoProcessInterface which allows mocking a foreign pipeline.
*/
@@ -67,6 +83,16 @@ public:
boost::optional<BSONObj> readConcern,
bool allowSpeculativeMajorityRead);
+ std::unique_ptr<ShardFilterer> getShardFilterer(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx) const override {
+ // Try to emulate the behavior mongos and mongod would each follow.
+ if (expCtx->inMongos) {
+ return nullptr;
+ } else {
+ return stdx::make_unique<StubShardFilterer>();
+ }
+ }
+
private:
std::deque<DocumentSource::GetNextResult> _mockResults;
};