summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/mongo/db/pipeline/document_source.cpp147
-rw-r--r--src/mongo/db/pipeline/document_source.h18
-rw-r--r--src/mongo/db/pipeline/document_source_group.cpp109
-rw-r--r--src/mongo/db/pipeline/document_source_group.h27
-rw-r--r--src/mongo/db/pipeline/document_source_group_test.cpp38
-rw-r--r--src/mongo/db/pipeline/document_source_mock.h7
-rw-r--r--src/mongo/db/pipeline/document_source_out.cpp78
-rw-r--r--src/mongo/db/pipeline/document_source_out.h42
-rw-r--r--src/mongo/db/pipeline/document_source_sort.cpp11
-rw-r--r--src/mongo/db/pipeline/document_source_sort.h2
-rw-r--r--src/mongo/db/pipeline/document_source_test.cpp240
-rw-r--r--src/mongo/db/pipeline/document_source_test_optimizations.h67
-rw-r--r--src/mongo/db/pipeline/expression.cpp10
-rw-r--r--src/mongo/db/pipeline/expression.h6
-rw-r--r--src/mongo/db/pipeline/pipeline.cpp42
-rw-r--r--src/mongo/db/pipeline/pipeline.h22
-rw-r--r--src/mongo/db/pipeline/pipeline_test.cpp210
-rw-r--r--src/mongo/s/SConscript2
-rw-r--r--src/mongo/s/catalog_cache_test_fixture.cpp4
-rw-r--r--src/mongo/s/catalog_cache_test_fixture.h2
-rw-r--r--src/mongo/s/commands/SConscript1
-rw-r--r--src/mongo/s/grid.h4
-rw-r--r--src/mongo/s/query/SConscript4
-rw-r--r--src/mongo/s/query/cluster_aggregation_planner.cpp165
-rw-r--r--src/mongo/s/query/cluster_aggregation_planner.h28
-rw-r--r--src/mongo/s/query/cluster_aggregation_planner_test.cpp545
26 files changed, 1725 insertions, 106 deletions
diff --git a/src/mongo/db/pipeline/document_source.cpp b/src/mongo/db/pipeline/document_source.cpp
index d51ad84ab67..32edc1704d5 100644
--- a/src/mongo/db/pipeline/document_source.cpp
+++ b/src/mongo/db/pipeline/document_source.cpp
@@ -1,30 +1,30 @@
/**
-* Copyright (C) 2011 10gen Inc.
-*
-* This program is free software: you can redistribute it and/or modify
-* it under the terms of the GNU Affero General Public License, version 3,
-* as published by the Free Software Foundation.
-*
-* This program is distributed in the hope that it will be useful,
-* but WITHOUT ANY WARRANTY; without even the implied warranty of
-* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-* GNU Affero General Public License for more details.
-*
-* You should have received a copy of the GNU Affero General Public License
-* along with this program. If not, see <http://www.gnu.org/licenses/>.
-*
-* As a special exception, the copyright holders give permission to link the
-* code of portions of this program with the OpenSSL library under certain
-* conditions as described in each individual source file and distribute
-* linked combinations including the program with the OpenSSL library. You
-* must comply with the GNU Affero General Public License in all respects for
-* all of the code used other than as permitted herein. If you modify file(s)
-* with this exception, you may extend this exception to your version of the
-* file(s), but you are not obligated to do so. If you do not wish to do so,
-* delete this exception statement from your version. If you delete this
-* exception statement from all source files in the program, then also delete
-* it in the license file.
-*/
+ * Copyright (C) 2011 10gen Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
#include "mongo/platform/basic.h"
@@ -160,6 +160,52 @@ splitMatchByModifiedFields(const boost::intrusive_ptr<DocumentSourceMatch>& matc
return match->splitSourceBy(modifiedPaths, modifiedPathsRet.renames);
}
+/**
+ * If 'pathOfInterest' or some path prefix of 'pathOfInterest' is renamed, returns the new name for
+ * 'pathOfInterest', otherwise returns boost::none.
+ * For example, if 'renamedPaths' is {"c.d", "c"}, and 'pathOfInterest' is "c.d.f", returns "c.f".
+ */
+boost::optional<std::string> findNewName(const StringMap<std::string>& renamedPaths,
+ std::string pathOfInterest) {
+ FieldPath fullPathOfInterest(pathOfInterest);
+ StringBuilder toLookup;
+ std::size_t pathIndex = 0;
+ while (pathIndex < fullPathOfInterest.getPathLength()) {
+ if (pathIndex != 0) {
+ toLookup << ".";
+ }
+ toLookup << fullPathOfInterest.getFieldName(pathIndex++);
+
+ auto it = renamedPaths.find(toLookup.stringData());
+ if (it != renamedPaths.end()) {
+ const auto& newPathOfPrefix = it->second;
+ // We found a rename! Note this might be a rename of the prefix of the path, so we have
+ // to add back on the suffix that was unchanged.
+ StringBuilder renamedPath;
+ renamedPath << newPathOfPrefix;
+ while (pathIndex < fullPathOfInterest.getPathLength()) {
+ renamedPath << "." << fullPathOfInterest.getFieldName(pathIndex++);
+ }
+ return {renamedPath.str()};
+ }
+ }
+ return boost::none;
+}
+
+StringMap<std::string> computeNewNamesAssumingAnyPathsNotRenamedAreUnmodified(
+ const StringMap<std::string>& renamedPaths, const std::set<std::string>& pathsOfInterest) {
+ StringMap<std::string> renameOut;
+ for (auto&& ofInterest : pathsOfInterest) {
+ if (auto newName = findNewName(renamedPaths, ofInterest)) {
+ renameOut[ofInterest] = *newName;
+ } else {
+ // This path was not renamed, assume it was unchanged and map it to itself.
+ renameOut[ofInterest] = ofInterest;
+ }
+ }
+ return renameOut;
+}
+
} // namespace
Pipeline::SourceContainer::iterator DocumentSource::optimizeAt(
@@ -202,6 +248,53 @@ Pipeline::SourceContainer::iterator DocumentSource::optimizeAt(
return doOptimizeAt(itr, container);
}
+boost::optional<StringMap<std::string>> DocumentSource::renamedPaths(
+ const std::set<std::string>& pathsOfInterest) const {
+ auto modifiedPathsRet = this->getModifiedPaths();
+ switch (modifiedPathsRet.type) {
+ case DocumentSource::GetModPathsReturn::Type::kNotSupported:
+ case DocumentSource::GetModPathsReturn::Type::kAllPaths:
+ return boost::none;
+ case DocumentSource::GetModPathsReturn::Type::kFiniteSet: {
+ for (auto&& modified : modifiedPathsRet.paths) {
+ for (auto&& ofInterest : pathsOfInterest) {
+ // Any overlap of the path means the path of interest is not preserved. For
+ // example, if the path of interest is "a.b", then a modified path of "a",
+ // "a.b", or "a.b.c" would all signal that "a.b" is not preserved.
+ if (ofInterest == modified ||
+ expression::isPathPrefixOf(ofInterest, modified) ||
+ expression::isPathPrefixOf(modified, ofInterest)) {
+ // This stage modifies at least one of the fields which the caller is
+ // interested in, bail out.
+ return boost::none;
+ }
+ }
+ }
+
+ // None of the paths of interest were modified, construct the result map, mapping
+ // the names after this stage to the names before this stage.
+ return computeNewNamesAssumingAnyPathsNotRenamedAreUnmodified(modifiedPathsRet.renames,
+ pathsOfInterest);
+ }
+ case DocumentSource::GetModPathsReturn::Type::kAllExcept: {
+ auto preservedPaths = modifiedPathsRet.paths;
+ for (auto&& rename : modifiedPathsRet.renames) {
+ // For the purposes of checking which paths are modified, consider renames to
+ // preserve the path. We'll circle back later to figure out the new name if
+ // appropriate.
+ preservedPaths.insert(rename.first);
+ }
+ auto modifiedPaths = extractModifiedDependencies(pathsOfInterest, preservedPaths);
+ if (modifiedPaths.empty()) {
+ return computeNewNamesAssumingAnyPathsNotRenamedAreUnmodified(
+ modifiedPathsRet.renames, pathsOfInterest);
+ }
+ return boost::none;
+ }
+ }
+ MONGO_UNREACHABLE;
+}
+
void DocumentSource::serializeToArray(vector<Value>& array,
boost::optional<ExplainOptions::Verbosity> explain) const {
Value entry = serialize(explain);
@@ -258,4 +351,4 @@ BSONObjSet DocumentSource::truncateSortSet(const BSONObjSet& sorts,
return out;
}
-}
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source.h b/src/mongo/db/pipeline/document_source.h
index 682b03b76e1..e9bf60dba55 100644
--- a/src/mongo/db/pipeline/document_source.h
+++ b/src/mongo/db/pipeline/document_source.h
@@ -628,6 +628,14 @@ public:
}
/**
+ * Given 'currentNames' which describes a set of paths which the caller is interested in,
+ * returns boost::none if any of those paths are modified by this stage, or a mapping from
+ * their old name to their new name if they are preserved but possibly renamed by this stage.
+ */
+ boost::optional<StringMap<std::string>> renamedPaths(
+ const std::set<std::string>& currentNames) const;
+
+ /**
* Get the dependencies this operation needs to do its job. If overridden, subclasses must add
* all paths needed to apply their transformation to 'deps->fields', and call
* 'deps->setNeedsMetadata()' to indicate what metadata (e.g. text score), if any, is required.
@@ -731,6 +739,16 @@ public:
*/
virtual MergingLogic mergingLogic() = 0;
+ /**
+ * Returns true if it would be correct to execute this stage in parallel across the shards in
+ * cases where the final stage is an $out. For example, a $group stage which is just merging the
+ * groups from the shards can be run in parallel since it will preserve the shard key.
+ */
+ virtual bool canRunInParallelBeforeOut(
+ const std::set<std::string>& nameOfShardKeyFieldsUponEntryToStage) const {
+ return false;
+ }
+
protected:
// It is invalid to delete through a NeedsMergerDocumentSource-typed pointer.
virtual ~NeedsMergerDocumentSource() {}
diff --git a/src/mongo/db/pipeline/document_source_group.cpp b/src/mongo/db/pipeline/document_source_group.cpp
index 1563c6ba14b..de3dcd084ec 100644
--- a/src/mongo/db/pipeline/document_source_group.cpp
+++ b/src/mongo/db/pipeline/document_source_group.cpp
@@ -1,30 +1,30 @@
/**
-* Copyright (C) 2011 10gen Inc.
-*
-* This program is free software: you can redistribute it and/or modify
-* it under the terms of the GNU Affero General Public License, version 3,
-* as published by the Free Software Foundation.
-*
-* This program is distributed in the hope that it will be useful,
-* but WITHOUT ANY WARRANTY; without even the implied warranty of
-* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-* GNU Affero General Public License for more details.
-*
-* You should have received a copy of the GNU Affero General Public License
-* along with this program. If not, see <http://www.gnu.org/licenses/>.
-*
-* As a special exception, the copyright holders give permission to link the
-* code of portions of this program with the OpenSSL library under certain
-* conditions as described in each individual source file and distribute
-* linked combinations including the program with the OpenSSL library. You
-* must comply with the GNU Affero General Public License in all respects for
-* all of the code used other than as permitted herein. If you modify file(s)
-* with this exception, you may extend this exception to your version of the
-* file(s), but you are not obligated to do so. If you do not wish to do so,
-* delete this exception statement from your version. If you delete this
-* exception statement from all source files in the program, then also delete
-* it in the license file.
-*/
+ * Copyright (C) 2011 10gen Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
#include "mongo/platform/basic.h"
@@ -43,8 +43,8 @@
namespace mongo {
using boost::intrusive_ptr;
-using std::shared_ptr;
using std::pair;
+using std::shared_ptr;
using std::vector;
REGISTER_DOCUMENT_SOURCE(group,
@@ -241,6 +241,25 @@ DepsTracker::State DocumentSourceGroup::getDependencies(DepsTracker* deps) const
return DepsTracker::State::EXHAUSTIVE_ALL;
}
+DocumentSource::GetModPathsReturn DocumentSourceGroup::getModifiedPaths() const {
+ // We preserve none of the fields, but any fields referenced as part of the group key are
+ // logically just renamed.
+ StringMap<std::string> renames;
+ for (std::size_t i = 0; i < _idExpressions.size(); ++i) {
+ auto idExp = _idExpressions[i];
+ auto pathToPutResultOfExpression =
+ _idFieldNames.empty() ? "_id" : "_id." + _idFieldNames[i];
+ auto computedPaths = idExp->getComputedPaths(pathToPutResultOfExpression);
+ for (auto&& rename : computedPaths.renames) {
+ renames[rename.first] = rename.second;
+ }
+ }
+
+ return {DocumentSource::GetModPathsReturn::Type::kAllExcept,
+ std::set<std::string>{}, // No fields are preserved.
+ std::move(renames)};
+}
+
intrusive_ptr<DocumentSourceGroup> DocumentSourceGroup::create(
const intrusive_ptr<ExpressionContext>& pExpCtx,
const boost::intrusive_ptr<Expression>& groupByExpression,
@@ -860,6 +879,42 @@ NeedsMergerDocumentSource::MergingLogic DocumentSourceGroup::mergingLogic() {
return {mergingGroup};
}
+
+bool DocumentSourceGroup::pathIncludedInGroupKeys(const std::string& dottedPath) const {
+ return std::any_of(
+ _idExpressions.begin(), _idExpressions.end(), [&dottedPath](const auto& exp) {
+ if (auto fieldExp = dynamic_cast<ExpressionFieldPath*>(exp.get())) {
+ if (fieldExp->representsPath(dottedPath)) {
+ return true;
+ }
+ }
+ return false;
+ });
+}
+
+bool DocumentSourceGroup::canRunInParallelBeforeOut(
+ const std::set<std::string>& nameOfShardKeyFieldsUponEntryToStage) const {
+ if (_doingMerge) {
+ return true; // This is fine.
+ }
+
+ // Certain $group stages are allowed to execute on each exchange consumer. In order to
+ // guarantee each consumer will only group together data from its own shard, the $group must
+ // group on a superset of the shard key.
+ for (auto&& currentPathOfShardKey : nameOfShardKeyFieldsUponEntryToStage) {
+ if (!pathIncludedInGroupKeys(currentPathOfShardKey)) {
+ // This requires an exact path match, but as a future optimization certain path
+ // prefixes should be okay. For example, if the shard key path is "a.b", and we're
+ // grouping by "a", then each group of "a" is strictly more specific than "a.b", so
+ // we can deduce that grouping by "a" will not need to group together documents
+ // across different values of the shard key field "a.b", and thus as long as any
+ // other shard key fields are similarly preserved will not need to consume a merged
+ // stream to perform the group.
+ return false;
+ }
+ }
+ return true;
+}
} // namespace mongo
#include "mongo/db/sorter/sorter.cpp"
diff --git a/src/mongo/db/pipeline/document_source_group.h b/src/mongo/db/pipeline/document_source_group.h
index 421c244641e..7a79db09a56 100644
--- a/src/mongo/db/pipeline/document_source_group.h
+++ b/src/mongo/db/pipeline/document_source_group.h
@@ -43,13 +43,13 @@ public:
using Accumulators = std::vector<boost::intrusive_ptr<Accumulator>>;
using GroupsMap = ValueUnorderedMap<Accumulators>;
- // Virtuals from DocumentSource.
boost::intrusive_ptr<DocumentSource> optimize() final;
DepsTracker::State getDependencies(DepsTracker* deps) const final;
Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final;
GetNextResult getNext() final;
const char* getSourceName() const final;
BSONObjSet getOutputSorts() final;
+ GetModPathsReturn getModifiedPaths() const final;
/**
* Convenience method for creating a new $group stage. If maxMemoryUsageBytes is boost::none,
@@ -88,6 +88,14 @@ public:
void setIdExpression(const boost::intrusive_ptr<Expression> idExpression);
/**
+ * Returns true if this $group stage represents a 'global' $group which is merging together
+ * results from earlier partial groups.
+ */
+ bool doingMerge() const {
+ return _doingMerge;
+ }
+
+ /**
* Tell this source if it is doing a merge from shards. Defaults to false.
*/
void setDoingMerge(bool doingMerge) {
@@ -98,15 +106,17 @@ public:
return _streaming;
}
- // Virtuals for NeedsMergerDocumentSource.
- boost::intrusive_ptr<DocumentSource> getShardSource() final;
- MergingLogic mergingLogic() final;
-
/**
* Returns true if this $group stage used disk during execution and false otherwise.
*/
bool usedDisk() final;
+ // Virtuals for NeedsMergerDocumentSource.
+ boost::intrusive_ptr<DocumentSource> getShardSource() final;
+ MergingLogic mergingLogic() final;
+ bool canRunInParallelBeforeOut(
+ const std::set<std::string>& nameOfShardKeyFieldsUponEntryToStage) const final;
+
protected:
void doDispose() final;
@@ -161,9 +171,14 @@ private:
*/
Value expandId(const Value& val);
- bool _usedDisk; // Keeps track of whether this $group spilled to disk.
+ /**
+ * Returns true if 'dottedPath' is one of the group keys present in '_idExpressions'.
+ */
+ bool pathIncludedInGroupKeys(const std::string& dottedPath) const;
+
std::vector<AccumulationStatement> _accumulatedFields;
+ bool _usedDisk; // Keeps track of whether this $group spilled to disk.
bool _doingMerge;
size_t _memoryUsageBytes = 0;
size_t _maxMemoryUsageBytes;
diff --git a/src/mongo/db/pipeline/document_source_group_test.cpp b/src/mongo/db/pipeline/document_source_group_test.cpp
index 3a8cecedf54..b612b9c7512 100644
--- a/src/mongo/db/pipeline/document_source_group_test.cpp
+++ b/src/mongo/db/pipeline/document_source_group_test.cpp
@@ -189,6 +189,44 @@ TEST_F(DocumentSourceGroupTest, ShouldCorrectlyTrackMemoryUsageBetweenPauses) {
ASSERT_THROWS_CODE(group->getNext(), AssertionException, 16945);
}
+TEST_F(DocumentSourceGroupTest, ShouldReportSingleFieldGroupKeyAsARename) {
+ auto expCtx = getExpCtx();
+ VariablesParseState vps = expCtx->variablesParseState;
+ auto groupByExpression = ExpressionFieldPath::parse(expCtx, "$x", vps);
+ auto group = DocumentSourceGroup::create(expCtx, groupByExpression, {});
+ auto modifiedPathsRet = group->getModifiedPaths();
+ ASSERT(modifiedPathsRet.type == DocumentSource::GetModPathsReturn::Type::kAllExcept);
+ ASSERT_EQ(modifiedPathsRet.paths.size(), 0UL);
+ ASSERT_EQ(modifiedPathsRet.renames.size(), 1UL);
+ ASSERT_EQ(modifiedPathsRet.renames["_id"], "x");
+}
+
+TEST_F(DocumentSourceGroupTest, ShouldReportMultipleFieldGroupKeysAsARename) {
+ auto expCtx = getExpCtx();
+ VariablesParseState vps = expCtx->variablesParseState;
+ auto x = ExpressionFieldPath::parse(expCtx, "$x", vps);
+ auto y = ExpressionFieldPath::parse(expCtx, "$y", vps);
+ auto groupByExpression = ExpressionObject::create(expCtx, {{"x", x}, {"y", y}});
+ auto group = DocumentSourceGroup::create(expCtx, groupByExpression, {});
+ auto modifiedPathsRet = group->getModifiedPaths();
+ ASSERT(modifiedPathsRet.type == DocumentSource::GetModPathsReturn::Type::kAllExcept);
+ ASSERT_EQ(modifiedPathsRet.paths.size(), 0UL);
+ ASSERT_EQ(modifiedPathsRet.renames.size(), 2UL);
+ ASSERT_EQ(modifiedPathsRet.renames["_id.x"], "x");
+ ASSERT_EQ(modifiedPathsRet.renames["_id.y"], "y");
+}
+
+TEST_F(DocumentSourceGroupTest, ShouldNotReportDottedGroupKeyAsARename) {
+ auto expCtx = getExpCtx();
+ VariablesParseState vps = expCtx->variablesParseState;
+ auto xDotY = ExpressionFieldPath::parse(expCtx, "$x.y", vps);
+ auto group = DocumentSourceGroup::create(expCtx, xDotY, {});
+ auto modifiedPathsRet = group->getModifiedPaths();
+ ASSERT(modifiedPathsRet.type == DocumentSource::GetModPathsReturn::Type::kAllExcept);
+ ASSERT_EQ(modifiedPathsRet.paths.size(), 0UL);
+ ASSERT_EQ(modifiedPathsRet.renames.size(), 0UL);
+}
+
BSONObj toBson(const intrusive_ptr<DocumentSource>& source) {
vector<Value> arr;
source->serializeToArray(arr);
diff --git a/src/mongo/db/pipeline/document_source_mock.h b/src/mongo/db/pipeline/document_source_mock.h
index cc81d533fb2..0a0ad30ca41 100644
--- a/src/mongo/db/pipeline/document_source_mock.h
+++ b/src/mongo/db/pipeline/document_source_mock.h
@@ -89,6 +89,13 @@ public:
return this;
}
+ /**
+ * This stage does not modify anything.
+ */
+ GetModPathsReturn getModifiedPaths() const override {
+ return {GetModPathsReturn::Type::kFiniteSet, std::set<std::string>{}, {}};
+ }
+
// Return documents from front of queue.
std::deque<GetNextResult> queue;
diff --git a/src/mongo/db/pipeline/document_source_out.cpp b/src/mongo/db/pipeline/document_source_out.cpp
index e5ccdc038bc..9f59cc099b0 100644
--- a/src/mongo/db/pipeline/document_source_out.cpp
+++ b/src/mongo/db/pipeline/document_source_out.cpp
@@ -165,18 +165,11 @@ DocumentSource::GetNextResult DocumentSourceOut::getNext() {
MONGO_UNREACHABLE;
}
-DocumentSourceOut::DocumentSourceOut(const NamespaceString& outputNs,
- const boost::intrusive_ptr<ExpressionContext>& expCtx,
- WriteModeEnum mode,
- std::set<FieldPath> uniqueKey)
- : DocumentSource(expCtx),
- _done(false),
- _outputNs(outputNs),
- _mode(mode),
- _uniqueKeyFields(std::move(uniqueKey)) {}
-
-intrusive_ptr<DocumentSource> DocumentSourceOut::createFromBson(
- BSONElement elem, const intrusive_ptr<ExpressionContext>& expCtx) {
+intrusive_ptr<DocumentSourceOut> DocumentSourceOut::create(
+ NamespaceString outputNs,
+ const intrusive_ptr<ExpressionContext>& expCtx,
+ WriteModeEnum mode,
+ std::set<FieldPath> uniqueKey) {
uassert(ErrorCodes::OperationNotSupportedInTransaction,
"$out cannot be used in a transaction",
@@ -187,6 +180,45 @@ intrusive_ptr<DocumentSource> DocumentSourceOut::createFromBson(
"$out cannot be used with a 'majority' read concern level",
readConcernLevel != repl::ReadConcernLevel::kMajorityReadConcern);
+ // Although we perform a check for "replaceCollection" mode with a sharded output collection
+ // during lite parsing, we need to do it here as well in case mongos is stale or the command is
+ // sent directly to the shard.
+ uassert(17017,
+ str::stream() << "$out with mode " << WriteMode_serializer(mode)
+ << " is not supported to an existing *sharded* output collection.",
+ !(mode == WriteModeEnum::kModeReplaceCollection &&
+ expCtx->mongoProcessInterface->isSharded(expCtx->opCtx, outputNs)));
+
+ uassert(17385, "Can't $out to special collection: " + outputNs.coll(), !outputNs.isSpecial());
+
+ switch (mode) {
+ case WriteModeEnum::kModeReplaceCollection:
+ return new DocumentSourceOutReplaceColl(
+ std::move(outputNs), expCtx, mode, std::move(uniqueKey));
+ case WriteModeEnum::kModeInsertDocuments:
+ return new DocumentSourceOutInPlace(
+ std::move(outputNs), expCtx, mode, std::move(uniqueKey));
+ case WriteModeEnum::kModeReplaceDocuments:
+ return new DocumentSourceOutInPlaceReplace(
+ std::move(outputNs), expCtx, mode, std::move(uniqueKey));
+ default:
+ MONGO_UNREACHABLE;
+ }
+}
+
+DocumentSourceOut::DocumentSourceOut(NamespaceString outputNs,
+ const intrusive_ptr<ExpressionContext>& expCtx,
+ WriteModeEnum mode,
+ std::set<FieldPath> uniqueKey)
+ : DocumentSource(expCtx),
+ _done(false),
+ _outputNs(std::move(outputNs)),
+ _mode(mode),
+ _uniqueKeyFields(std::move(uniqueKey)) {}
+
+intrusive_ptr<DocumentSource> DocumentSourceOut::createFromBson(
+ BSONElement elem, const intrusive_ptr<ExpressionContext>& expCtx) {
+
auto mode = WriteModeEnum::kModeReplaceCollection;
std::set<FieldPath> uniqueKey;
NamespaceString outputNs;
@@ -221,27 +253,7 @@ intrusive_ptr<DocumentSource> DocumentSourceOut::createFromBson(
<< typeName(elem.type()));
}
- // Although we perform a check for "replaceCollection" mode with a sharded output collection
- // during lite parsing, we need to do it here as well in case mongos is stale or the command is
- // sent directly to the shard.
- uassert(17017,
- str::stream() << "$out with mode " << WriteMode_serializer(mode)
- << " is not supported to an existing *sharded* output collection.",
- !(mode == WriteModeEnum::kModeReplaceCollection &&
- expCtx->mongoProcessInterface->isSharded(expCtx->opCtx, outputNs)));
-
- uassert(17385, "Can't $out to special collection: " + outputNs.coll(), !outputNs.isSpecial());
-
- switch (mode) {
- case WriteModeEnum::kModeReplaceCollection:
- return new DocumentSourceOutReplaceColl(outputNs, expCtx, mode, uniqueKey);
- case WriteModeEnum::kModeInsertDocuments:
- return new DocumentSourceOutInPlace(outputNs, expCtx, mode, uniqueKey);
- case WriteModeEnum::kModeReplaceDocuments:
- return new DocumentSourceOutInPlaceReplace(outputNs, expCtx, mode, uniqueKey);
- default:
- MONGO_UNREACHABLE;
- }
+ return create(std::move(outputNs), expCtx, mode, std::move(uniqueKey));
}
Value DocumentSourceOut::serialize(boost::optional<ExplainOptions::Verbosity> explain) const {
diff --git a/src/mongo/db/pipeline/document_source_out.h b/src/mongo/db/pipeline/document_source_out.h
index 4ae2c9af5e8..45a631cabd4 100644
--- a/src/mongo/db/pipeline/document_source_out.h
+++ b/src/mongo/db/pipeline/document_source_out.h
@@ -59,7 +59,7 @@ public:
bool _allowShardedOutNss;
};
- DocumentSourceOut(const NamespaceString& outputNs,
+ DocumentSourceOut(NamespaceString outputNs,
const boost::intrusive_ptr<ExpressionContext>& expCtx,
WriteModeEnum mode,
std::set<FieldPath> uniqueKey);
@@ -70,28 +70,46 @@ public:
const char* getSourceName() const final;
Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final;
DepsTracker::State getDependencies(DepsTracker* deps) const final;
+ /**
+ * For purposes of tracking which fields come from where, this stage does not modify any fields.
+ */
+ GetModPathsReturn getModifiedPaths() const final {
+ return {GetModPathsReturn::Type::kFiniteSet, std::set<std::string>{}, {}};
+ }
StageConstraints constraints(Pipeline::SplitState pipeState) const final {
return {StreamType::kStreaming,
PositionRequirement::kLast,
+ // A $out to an unsharded collection should merge on the primary shard to perform
+ // local writes. A $out to a sharded collection has no requirement, since each shard
+ // can perform its own portion of the write.
HostTypeRequirement::kPrimaryShard,
DiskUseRequirement::kWritesPersistentData,
FacetRequirement::kNotAllowed,
TransactionRequirement::kNotAllowed};
}
- // Virtuals for NeedsMergerDocumentSource
+ const NamespaceString& getOutputNs() const {
+ return _outputNs;
+ }
+
+ WriteModeEnum getMode() const {
+ return _mode;
+ }
+
boost::intrusive_ptr<DocumentSource> getShardSource() final {
- return NULL;
+ return nullptr;
}
MergingLogic mergingLogic() final {
return {this};
}
-
- const NamespaceString& getOutputNs() const {
- return _outputNs;
+ virtual bool canRunInParallelBeforeOut(
+ const std::set<std::string>& nameOfShardKeyFieldsUponEntryToStage) const final {
+ // If someone is asking the question, this must be the $out stage in question, so yes!
+ return true;
}
+
/**
* Retrieves the namespace to direct each batch to, which may be a temporary namespace or the
* final output namespace.
@@ -146,6 +164,18 @@ public:
*/
virtual void finalize() = 0;
+ /**
+ * Creates a new $out stage from the given arguments.
+ */
+ static boost::intrusive_ptr<DocumentSourceOut> create(
+ NamespaceString outputNs,
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ WriteModeEnum,
+ std::set<FieldPath> uniqueKey = std::set<FieldPath>{"_id"});
+
+ /**
+ * Parses a $out stage from the user-supplied BSON.
+ */
static boost::intrusive_ptr<DocumentSource> createFromBson(
BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
diff --git a/src/mongo/db/pipeline/document_source_sort.cpp b/src/mongo/db/pipeline/document_source_sort.cpp
index cb753b400f9..ca092927f56 100644
--- a/src/mongo/db/pipeline/document_source_sort.cpp
+++ b/src/mongo/db/pipeline/document_source_sort.cpp
@@ -513,6 +513,17 @@ NeedsMergerDocumentSource::MergingLogic DocumentSourceSort::mergingLogic() {
return {_limitSrc ? DocumentSourceLimit::create(pExpCtx, _limitSrc->getLimit()) : nullptr,
sortKeyPattern(SortKeySerialization::kForSortKeyMerging).toBson()};
}
+
+bool DocumentSourceSort::canRunInParallelBeforeOut(
+ const std::set<std::string>& nameOfShardKeyFieldsUponEntryToStage) const {
+ // This is an interesting special case. If there are no further stages which require merging the
+ // streams into one, a $sort should not require it. This is only the case because the sort order
+ // doesn't matter for a pipeline ending with a $out stage. We may encounter it here as an
+ // intermediate stage before a final $group with a $sort, which would make sense. Should we
+ // extend our analysis to detect if an exchange is appropriate in a general pipeline, a $sort
+ // would generally require merging the streams before producing output.
+ return false;
+}
} // namespace mongo
#include "mongo/db/sorter/sorter.cpp"
diff --git a/src/mongo/db/pipeline/document_source_sort.h b/src/mongo/db/pipeline/document_source_sort.h
index ac860c5b7a5..a104c3f5713 100644
--- a/src/mongo/db/pipeline/document_source_sort.h
+++ b/src/mongo/db/pipeline/document_source_sort.h
@@ -83,6 +83,8 @@ public:
boost::intrusive_ptr<DocumentSource> getShardSource() final;
MergingLogic mergingLogic() final;
+ bool canRunInParallelBeforeOut(
+ const std::set<std::string>& nameOfShardKeyFieldsUponEntryToStage) const final;
/**
* Write out a Document whose contents are the sort key pattern.
diff --git a/src/mongo/db/pipeline/document_source_test.cpp b/src/mongo/db/pipeline/document_source_test.cpp
index 2979179c255..9372e683071 100644
--- a/src/mongo/db/pipeline/document_source_test.cpp
+++ b/src/mongo/db/pipeline/document_source_test.cpp
@@ -32,6 +32,7 @@
#include "mongo/bson/bsonobj.h"
#include "mongo/bson/bsonobjbuilder.h"
#include "mongo/db/pipeline/document_source.h"
+#include "mongo/db/pipeline/document_source_test_optimizations.h"
#include "mongo/db/service_context_test_fixture.h"
#include "mongo/unittest/unittest.h"
@@ -78,5 +79,244 @@ TEST_F(DocumentSourceTruncateSort, TruncateSortDedupsSortCorrectly) {
ASSERT_EQUALS(truncated.count(BSON("a" << 1)), 1U);
}
+class RenamesAToB : public DocumentSourceTestOptimizations {
+public:
+ RenamesAToB() : DocumentSourceTestOptimizations() {}
+ GetModPathsReturn getModifiedPaths() const final {
+ // Pretend this stage simply renames the "a" field to be "b", leaving the value of "a" the
+ // same. This would be the equivalent of an {$addFields: {b: "$a"}}.
+ return {GetModPathsReturn::Type::kFiniteSet, std::set<std::string>{}, {{"b", "a"}}};
+ }
+};
+
+TEST(DocumentSourceRenamedPaths, DoesReturnSimpleRenameFromFiniteSetRename) {
+ RenamesAToB renamesAToB;
+ auto renames = renamesAToB.renamedPaths({"b"});
+ ASSERT(static_cast<bool>(renames));
+ auto map = *renames;
+ ASSERT_EQ(map.size(), 1UL);
+ ASSERT_EQ(map["b"], "a");
+}
+
+TEST(DocumentSourceRenamedPaths, ReturnsSimpleMapForUnaffectedFieldsFromFiniteSetRename) {
+ RenamesAToB renamesAToB;
+ {
+ auto renames = renamesAToB.renamedPaths({"c"});
+ ASSERT(static_cast<bool>(renames));
+ auto map = *renames;
+ ASSERT_EQ(map.size(), 1UL);
+ ASSERT_EQ(map["c"], "c");
+ }
+
+ {
+ auto renames = renamesAToB.renamedPaths({"a"});
+ ASSERT(static_cast<bool>(renames));
+ auto map = *renames;
+ ASSERT_EQ(map.size(), 1UL);
+ ASSERT_EQ(map["a"], "a");
+ }
+
+ {
+ auto renames = renamesAToB.renamedPaths({"e", "f", "g"});
+ ASSERT(static_cast<bool>(renames));
+ auto map = *renames;
+ ASSERT_EQ(map.size(), 3UL);
+ ASSERT_EQ(map["e"], "e");
+ ASSERT_EQ(map["f"], "f");
+ ASSERT_EQ(map["g"], "g");
+ }
+}
+
+class RenameCToDPreserveEFG : public DocumentSourceTestOptimizations {
+public:
+ RenameCToDPreserveEFG() : DocumentSourceTestOptimizations() {}
+
+ GetModPathsReturn getModifiedPaths() const final {
+ return {GetModPathsReturn::Type::kAllExcept,
+ std::set<std::string>{"e", "f", "g"},
+ {{"d", "c"}}};
+ }
+};
+
+TEST(DocumentSourceRenamedPaths, DoesReturnSimpleRenameFromAllExceptRename) {
+ RenameCToDPreserveEFG renameCToDPreserveEFG;
+ auto renames = renameCToDPreserveEFG.renamedPaths({"d"});
+ ASSERT(static_cast<bool>(renames));
+ auto map = *renames;
+ ASSERT_EQ(map.size(), 1UL);
+ ASSERT_EQ(map["d"], "c");
+}
+
+TEST(DocumentSourceRenamedPaths, ReturnsSimpleMapForUnaffectedFieldsFromAllExceptRename) {
+ RenameCToDPreserveEFG renameCToDPreserveEFG;
+ {
+ auto renames = renameCToDPreserveEFG.renamedPaths({"e"});
+ ASSERT(static_cast<bool>(renames));
+ auto map = *renames;
+ ASSERT_EQ(map.size(), 1UL);
+ ASSERT_EQ(map["e"], "e");
+ }
+
+ {
+ auto renames = renameCToDPreserveEFG.renamedPaths({"f", "g"});
+ ASSERT(static_cast<bool>(renames));
+ auto map = *renames;
+ ASSERT_EQ(map.size(), 2UL);
+ ASSERT_EQ(map["f"], "f");
+ ASSERT_EQ(map["g"], "g");
+ }
+}
+
+class RenameCDotDToEPreserveFDotG : public DocumentSourceTestOptimizations {
+public:
+ RenameCDotDToEPreserveFDotG() : DocumentSourceTestOptimizations() {}
+
+ GetModPathsReturn getModifiedPaths() const final {
+ return {GetModPathsReturn::Type::kAllExcept, std::set<std::string>{"f.g"}, {{"e", "c.d"}}};
+ }
+};
+
+TEST(DocumentSourceRenamedPaths, DoesReturnRenameToDottedFieldFromAllExceptRename) {
+ RenameCDotDToEPreserveFDotG renameCDotDToEPreserveFDotG;
+ {
+ auto renames = renameCDotDToEPreserveFDotG.renamedPaths({"e"});
+ ASSERT(static_cast<bool>(renames));
+ auto map = *renames;
+ ASSERT_EQ(map.size(), 1UL);
+ ASSERT_EQ(map["e"], "c.d");
+ }
+ {
+ auto renames = renameCDotDToEPreserveFDotG.renamedPaths({"e.x", "e.y"});
+ ASSERT(static_cast<bool>(renames));
+ auto map = *renames;
+ ASSERT_EQ(map.size(), 2UL);
+ ASSERT_EQ(map["e.x"], "c.d.x");
+ ASSERT_EQ(map["e.y"], "c.d.y");
+ }
+}
+
+TEST(DocumentSourceRenamedPaths, DoesNotTreatPrefixAsUnmodifiedWhenSuffixIsModifiedFromAllExcept) {
+ RenameCDotDToEPreserveFDotG renameCDotDToEPreserveFDotG;
+ {
+ auto renames = renameCDotDToEPreserveFDotG.renamedPaths({"f"});
+ ASSERT_FALSE(static_cast<bool>(renames));
+ }
+ {
+ // This is the exception, the only path that is not modified.
+ auto renames = renameCDotDToEPreserveFDotG.renamedPaths({"f.g"});
+ ASSERT(static_cast<bool>(renames));
+ auto map = *renames;
+ ASSERT_EQ(map.size(), 1UL);
+ ASSERT_EQ(map["f.g"], "f.g");
+ }
+ {
+ // We know "f.g" is preserved, so it follows that a subpath of that path is also preserved.
+ auto renames = renameCDotDToEPreserveFDotG.renamedPaths({"f.g.x", "f.g.xyz.foobarbaz"});
+ ASSERT(static_cast<bool>(renames));
+ auto map = *renames;
+ ASSERT_EQ(map.size(), 2UL);
+ ASSERT_EQ(map["f.g.x"], "f.g.x");
+ ASSERT_EQ(map["f.g.xyz.foobarbaz"], "f.g.xyz.foobarbaz");
+ }
+
+ {
+ // This shares a prefix with the unmodified path, but should not be reported as unmodified.
+ auto renames = renameCDotDToEPreserveFDotG.renamedPaths({"f.x"});
+ ASSERT_FALSE(static_cast<bool>(renames));
+ }
+}
+
+class RenameAToXDotYModifyCDotD : public DocumentSourceTestOptimizations {
+public:
+ RenameAToXDotYModifyCDotD() : DocumentSourceTestOptimizations() {}
+
+ GetModPathsReturn getModifiedPaths() const final {
+ return {GetModPathsReturn::Type::kFiniteSet, std::set<std::string>{"c.d"}, {{"x.y", "a"}}};
+ }
+};
+
+TEST(DocumentSourceRenamedPaths, DoesReturnRenameToDottedFieldFromFiniteSetRename) {
+ RenameAToXDotYModifyCDotD renameAToXDotYModifyCDotD;
+ {
+ auto renames = renameAToXDotYModifyCDotD.renamedPaths({"x.y"});
+ ASSERT(static_cast<bool>(renames));
+ auto map = *renames;
+ ASSERT_EQ(map.size(), 1UL);
+ ASSERT_EQ(map["x.y"], "a");
+ }
+ {
+ auto renames = renameAToXDotYModifyCDotD.renamedPaths({"x.y.z", "x.y.a.b.c"});
+ ASSERT(static_cast<bool>(renames));
+ auto map = *renames;
+ ASSERT_EQ(map.size(), 2UL);
+ ASSERT_EQ(map["x.y.z"], "a.z");
+ ASSERT_EQ(map["x.y.a.b.c"], "a.a.b.c");
+ }
+}
+
+TEST(DocumentSourceRenamedPaths, DoesNotTreatPrefixAsUnmodifiedWhenSuffixIsPartOfModifiedSet) {
+ RenameAToXDotYModifyCDotD renameAToXDotYModifyCDotD;
+ {
+ auto renames = renameAToXDotYModifyCDotD.renamedPaths({"c"});
+ ASSERT_FALSE(static_cast<bool>(renames));
+ }
+ {
+ auto renames = renameAToXDotYModifyCDotD.renamedPaths({"c.d"});
+ ASSERT_FALSE(static_cast<bool>(renames));
+ }
+ {
+ auto renames = renameAToXDotYModifyCDotD.renamedPaths({"c.d.e"});
+ ASSERT_FALSE(static_cast<bool>(renames));
+ }
+ {
+ auto renames = renameAToXDotYModifyCDotD.renamedPaths({"c.not_d", "c.decoy"});
+ ASSERT(static_cast<bool>(renames));
+ auto map = *renames;
+ ASSERT_EQ(map.size(), 2UL);
+ ASSERT_EQ(map["c.not_d"], "c.not_d");
+ ASSERT_EQ(map["c.decoy"], "c.decoy");
+ }
+}
+
+class ModifiesAllPaths : public DocumentSourceTestOptimizations {
+public:
+ ModifiesAllPaths() : DocumentSourceTestOptimizations() {}
+ GetModPathsReturn getModifiedPaths() const final {
+ return {GetModPathsReturn::Type::kAllPaths, std::set<std::string>{}, {}};
+ }
+};
+
+TEST(DocumentSourceRenamedPaths, ReturnsNoneWhenAllPathsAreModified) {
+ ModifiesAllPaths modifiesAllPaths;
+ {
+ auto renames = modifiesAllPaths.renamedPaths({"a"});
+ ASSERT_FALSE(static_cast<bool>(renames));
+ }
+ {
+ auto renames = modifiesAllPaths.renamedPaths({"a", "b", "c.d"});
+ ASSERT_FALSE(static_cast<bool>(renames));
+ }
+}
+
+class ModificationsUnknown : public DocumentSourceTestOptimizations {
+public:
+ ModificationsUnknown() : DocumentSourceTestOptimizations() {}
+ GetModPathsReturn getModifiedPaths() const final {
+ return {GetModPathsReturn::Type::kNotSupported, std::set<std::string>{}, {}};
+ }
+};
+
+TEST(DocumentSourceRenamedPaths, ReturnsNoneWhenModificationsAreNotKnown) {
+ ModificationsUnknown modificationsUnknown;
+ {
+ auto renames = modificationsUnknown.renamedPaths({"a"});
+ ASSERT_FALSE(static_cast<bool>(renames));
+ }
+ {
+ auto renames = modificationsUnknown.renamedPaths({"a", "b", "c.d"});
+ ASSERT_FALSE(static_cast<bool>(renames));
+ }
+}
+
} // namespace
} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_test_optimizations.h b/src/mongo/db/pipeline/document_source_test_optimizations.h
new file mode 100644
index 00000000000..e7141835457
--- /dev/null
+++ b/src/mongo/db/pipeline/document_source_test_optimizations.h
@@ -0,0 +1,67 @@
+/**
+ * Copyright (C) 2018 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/pipeline/document_source.h"
+#include "mongo/db/pipeline/expression_context_for_test.h"
+
+namespace mongo {
+/**
+ * A dummy class for other tests to inherit from to customize the behavior of any of the virtual
+ * methods from DocumentSource without having to implement all of them.
+ */
+class DocumentSourceTestOptimizations : public DocumentSource {
+public:
+ DocumentSourceTestOptimizations() : DocumentSource(new ExpressionContextForTest()) {}
+ virtual ~DocumentSourceTestOptimizations() = default;
+ virtual GetNextResult getNext() override {
+ MONGO_UNREACHABLE;
+ }
+ virtual StageConstraints constraints(Pipeline::SplitState) const override {
+ // Return the default constraints so that this can be used in test pipelines. Constructing a
+ // pipeline needs to do some validation that depends on this.
+ return StageConstraints{StreamType::kStreaming,
+ PositionRequirement::kNone,
+ HostTypeRequirement::kNone,
+ DiskUseRequirement::kNoDiskUse,
+ FacetRequirement::kAllowed,
+ TransactionRequirement::kNotAllowed};
+ }
+
+ virtual GetModPathsReturn getModifiedPaths() const override {
+ MONGO_UNREACHABLE;
+ }
+
+private:
+ virtual Value serialize(boost::optional<ExplainOptions::Verbosity>) const override {
+ MONGO_UNREACHABLE;
+ }
+};
+
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/expression.cpp b/src/mongo/db/pipeline/expression.cpp
index 200792ac734..4e8f911087c 100644
--- a/src/mongo/db/pipeline/expression.cpp
+++ b/src/mongo/db/pipeline/expression.cpp
@@ -1970,6 +1970,16 @@ intrusive_ptr<Expression> ExpressionFieldPath::optimize() {
return intrusive_ptr<Expression>(this);
}
+bool ExpressionFieldPath::representsPath(const std::string& dottedPath) const {
+ if (_variable != Variables::kRootId || _fieldPath.getPathLength() == 1) {
+ // This variable refers to the entire document, or refers to a sub-field of something
+ // besides the root document. Either way we can't prove that it represents the path given by
+ // 'dottedPath'.
+ return false;
+ }
+ return _fieldPath.tail().fullPath() == dottedPath;
+}
+
void ExpressionFieldPath::_doAddDependencies(DepsTracker* deps) const {
if (_variable == Variables::kRootId) { // includes CURRENT when it is equivalent to ROOT.
if (_fieldPath.getPathLength() == 1) {
diff --git a/src/mongo/db/pipeline/expression.h b/src/mongo/db/pipeline/expression.h
index 6cfd4603bac..bc39e271575 100644
--- a/src/mongo/db/pipeline/expression.h
+++ b/src/mongo/db/pipeline/expression.h
@@ -1108,6 +1108,12 @@ public:
const std::string& raw,
const VariablesParseState& vps);
+ /**
+ * Returns true if this expression logically represents the path 'dottedPath'. For example, if
+ * 'dottedPath' is 'a.b' and this FieldPath is '$$CURRENT.a.b', returns true.
+ */
+ bool representsPath(const std::string& dottedPath) const;
+
const FieldPath& getFieldPath() const {
return _fieldPath;
}
diff --git a/src/mongo/db/pipeline/pipeline.cpp b/src/mongo/db/pipeline/pipeline.cpp
index 1a82bf1569a..3c84881323f 100644
--- a/src/mongo/db/pipeline/pipeline.cpp
+++ b/src/mongo/db/pipeline/pipeline.cpp
@@ -478,6 +478,48 @@ void Pipeline::addFinalSource(intrusive_ptr<DocumentSource> source) {
_sources.push_back(source);
}
+boost::optional<StringMap<std::string>> Pipeline::renamedPaths(
+ SourceContainer::const_reverse_iterator rstart,
+ SourceContainer::const_reverse_iterator rend,
+ std::set<std::string> pathsOfInterest) {
+ // Use a vector to give a path id to each input path. A path's id is its index in the vector.
+ const std::vector<string> inputPaths(pathsOfInterest.begin(), pathsOfInterest.end());
+ std::vector<string> currentPaths(pathsOfInterest.begin(), pathsOfInterest.end());
+
+ // Loop backwards over the stages. We will re-use 'pathsOfInterest', modifying that set each
+ // time to be the current set of field's we're interested in. At the same time, we will maintain
+ // 'currentPaths'. 'pathsOfInterest' is used to compute the renames, while 'currentPaths' is
+ // used to tie a path back to its id.
+ //
+ // Interestingly, 'currentPaths' may contain duplicates. For example, if a stage like
+ // {$addFields: {a: "$b"}} duplicates the value of "a" and both paths are of interest, then
+ // 'currentPaths' may begin as ["a", "b"] representing the paths after the $addFields stage, but
+ // becomes ["a", "a"] via the rename.
+ for (auto it = rstart; it != rend; ++it) {
+ boost::optional<StringMap<string>> renamed = (*it)->renamedPaths(pathsOfInterest);
+ if (!renamed) {
+ return boost::none;
+ }
+ pathsOfInterest.clear();
+ for (std::size_t pathId = 0; pathId < inputPaths.size(); ++pathId) {
+ currentPaths[pathId] = (*renamed)[currentPaths[pathId]];
+ pathsOfInterest.insert(currentPaths[pathId]);
+ }
+ }
+
+ // We got all the way through the pipeline via renames! Construct the mapping from path at the
+ // end of the pipeline to path at the beginning.
+ StringMap<string> renameMap;
+ for (std::size_t pathId = 0; pathId < currentPaths.size(); ++pathId) {
+ renameMap[inputPaths[pathId]] = currentPaths[pathId];
+ }
+ return renameMap;
+}
+
+boost::optional<StringMap<string>> Pipeline::renamedPaths(std::set<string> pathsOfInterest) const {
+ return renamedPaths(_sources.rbegin(), _sources.rend(), std::move(pathsOfInterest));
+}
+
DepsTracker Pipeline::getDependencies(DepsTracker::MetadataAvailable metadataAvailable) const {
DepsTracker deps(metadataAvailable);
const bool scopeHasVariables = pCtx->variablesParseState.hasDefinedVariables();
diff --git a/src/mongo/db/pipeline/pipeline.h b/src/mongo/db/pipeline/pipeline.h
index d3e99daae31..4874126b398 100644
--- a/src/mongo/db/pipeline/pipeline.h
+++ b/src/mongo/db/pipeline/pipeline.h
@@ -132,6 +132,19 @@ public:
*/
static bool aggSupportsWriteConcern(const BSONObj& cmd);
+ /**
+ * Given 'pathsOfInterest' which describes a set of paths which the caller is interested in,
+ * returns boost::none if any of those paths are modified by the section of a pipeline
+ * described by 'rstart' and 'rend', or a mapping from their name at the end of the pipeline to
+ * their name at the beginning of the pipeline if they are preserved but possibly renamed by
+ * this pipeline. Note that the analysis proceeds backwards, so the iterators must be reverse
+ * iterators.
+ */
+ static boost::optional<StringMap<std::string>> renamedPaths(
+ SourceContainer::const_reverse_iterator rstart,
+ SourceContainer::const_reverse_iterator rend,
+ std::set<std::string> pathsOfInterest);
+
const boost::intrusive_ptr<ExpressionContext>& getContext() const {
return pCtx;
}
@@ -250,6 +263,15 @@ public:
*/
DepsTracker getDependencies(DepsTracker::MetadataAvailable metadataAvailable) const;
+ /**
+ * Given 'pathsOfInterest' which describes a set of paths which the caller is interested in,
+ * returns boost::none if any of those paths are modified by this pipeline, or a mapping from
+ * their name at the end of the pipeline to their name at the beginning of the pipeline if they
+ * are preserved but possibly renamed by this pipeline.
+ */
+ boost::optional<StringMap<std::string>> renamedPaths(
+ std::set<std::string> pathsOfInterest) const;
+
const SourceContainer& getSources() const {
return _sources;
}
diff --git a/src/mongo/db/pipeline/pipeline_test.cpp b/src/mongo/db/pipeline/pipeline_test.cpp
index 65d3c4fecc0..280473848cc 100644
--- a/src/mongo/db/pipeline/pipeline_test.cpp
+++ b/src/mongo/db/pipeline/pipeline_test.cpp
@@ -45,10 +45,12 @@
#include "mongo/db/pipeline/document_source_out.h"
#include "mongo/db/pipeline/document_source_project.h"
#include "mongo/db/pipeline/document_source_sort.h"
+#include "mongo/db/pipeline/document_source_test_optimizations.h"
#include "mongo/db/pipeline/document_value_test_util.h"
#include "mongo/db/pipeline/expression_context_for_test.h"
#include "mongo/db/pipeline/field_path.h"
#include "mongo/db/pipeline/pipeline.h"
+#include "mongo/db/pipeline/stub_mongo_process_interface.h"
#include "mongo/db/query/collation/collator_interface_mock.h"
#include "mongo/db/query/query_test_service_context.h"
#include "mongo/db/repl/replication_coordinator_mock.h"
@@ -2246,11 +2248,24 @@ DEATH_TEST_F(PipelineMustRunOnMongoSTest,
splitPipeline.shardsPipeline->requiredToRunOnMongos();
}
+/**
+ * For the purpsoses of this test, assume every collection is unsharded. Stages may ask this during
+ * setup. For example, to compute its constraints, the $out stage needs to know if the output
+ * collection is sharded.
+ */
+class FakeMongoProcessInterface : public StubMongoProcessInterface {
+public:
+ bool isSharded(OperationContext* opCtx, const NamespaceString& ns) override {
+ return false;
+ }
+};
+
TEST_F(PipelineMustRunOnMongoSTest, SplitMongoSMergePipelineAssertsIfShardStagePresent) {
auto expCtx = getExpCtx();
expCtx->allowDiskUse = true;
expCtx->inMongos = true;
+ expCtx->mongoProcessInterface = std::make_shared<FakeMongoProcessInterface>();
auto match = DocumentSourceMatch::create(fromjson("{x: 5}"), expCtx);
auto split = DocumentSourceInternalSplitPipeline::create(expCtx, HostTypeRequirement::kNone);
@@ -2678,6 +2693,201 @@ TEST_F(PipelineDependenciesTest, ShouldNotRequireTextScoreIfAvailableButDefinite
} // namespace Dependencies
+namespace {
+TEST(PipelineRenameTracking, ReportsIdentityMapWhenEmpty) {
+ boost::intrusive_ptr<ExpressionContext> expCtx(new ExpressionContextForTest());
+ auto pipeline = unittest::assertGet(Pipeline::create({DocumentSourceMock::create()}, expCtx));
+ auto renames = pipeline->renamedPaths({"a", "b", "c.d"});
+ ASSERT(static_cast<bool>(renames));
+ auto nameMap = *renames;
+ ASSERT_EQ(nameMap.size(), 3UL);
+ ASSERT_EQ(nameMap["a"], "a");
+ ASSERT_EQ(nameMap["b"], "b");
+ ASSERT_EQ(nameMap["c.d"], "c.d");
+}
+
+class NoModifications : public DocumentSourceTestOptimizations {
+public:
+ NoModifications() : DocumentSourceTestOptimizations() {}
+ static boost::intrusive_ptr<NoModifications> create() {
+ return new NoModifications();
+ }
+
+ /**
+ * Returns a description which communicate that this stage modifies nothing.
+ */
+ GetModPathsReturn getModifiedPaths() const final {
+ return {GetModPathsReturn::Type::kFiniteSet, std::set<std::string>(), {}};
+ }
+};
+
+TEST(PipelineRenameTracking, ReportsIdentityWhenNoStageModifiesAnything) {
+ boost::intrusive_ptr<ExpressionContext> expCtx(new ExpressionContextForTest());
+ {
+ auto pipeline = unittest::assertGet(
+ Pipeline::create({DocumentSourceMock::create(), NoModifications::create()}, expCtx));
+ auto renames = pipeline->renamedPaths({"a", "b", "c.d"});
+ ASSERT(static_cast<bool>(renames));
+ auto nameMap = *renames;
+ ASSERT_EQ(nameMap.size(), 3UL);
+ ASSERT_EQ(nameMap["a"], "a");
+ ASSERT_EQ(nameMap["b"], "b");
+ ASSERT_EQ(nameMap["c.d"], "c.d");
+ }
+ {
+ auto pipeline = unittest::assertGet(Pipeline::create({DocumentSourceMock::create(),
+ NoModifications::create(),
+ NoModifications::create(),
+ NoModifications::create()},
+ expCtx));
+ auto renames = pipeline->renamedPaths({"a", "b", "c.d"});
+ ASSERT(static_cast<bool>(renames));
+ auto nameMap = *renames;
+ ASSERT_EQ(nameMap.size(), 3UL);
+ ASSERT_EQ(nameMap["a"], "a");
+ ASSERT_EQ(nameMap["b"], "b");
+ ASSERT_EQ(nameMap["c.d"], "c.d");
+ }
+}
+
+class NotSupported : public DocumentSourceTestOptimizations {
+public:
+ NotSupported() : DocumentSourceTestOptimizations() {}
+ static boost::intrusive_ptr<NotSupported> create() {
+ return new NotSupported();
+ }
+
+ /**
+ * Returns a description which communicate that this stage modifies nothing.
+ */
+ GetModPathsReturn getModifiedPaths() const final {
+ return {GetModPathsReturn::Type::kNotSupported, std::set<std::string>(), {}};
+ }
+};
+
+TEST(PipelineRenameTracking, DoesNotReportRenamesIfAStageDoesNotSupportTrackingThem) {
+ boost::intrusive_ptr<ExpressionContext> expCtx(new ExpressionContextForTest());
+ auto pipeline = unittest::assertGet(Pipeline::create({DocumentSourceMock::create(),
+ NoModifications::create(),
+ NotSupported::create(),
+ NoModifications::create()},
+ expCtx));
+ ASSERT_FALSE(static_cast<bool>(pipeline->renamedPaths({"a"})));
+ ASSERT_FALSE(static_cast<bool>(pipeline->renamedPaths({"a", "b"})));
+ ASSERT_FALSE(static_cast<bool>(pipeline->renamedPaths({"x", "yahoo", "c.d"})));
+}
+
+class RenamesAToB : public DocumentSourceTestOptimizations {
+public:
+ RenamesAToB() : DocumentSourceTestOptimizations() {}
+ static boost::intrusive_ptr<RenamesAToB> create() {
+ return new RenamesAToB();
+ }
+ GetModPathsReturn getModifiedPaths() const final {
+ return {GetModPathsReturn::Type::kFiniteSet, std::set<std::string>{}, {{"b", "a"}}};
+ }
+};
+
+TEST(PipelineRenameTracking, ReportsNewNamesWhenSingleStageRenames) {
+ boost::intrusive_ptr<ExpressionContext> expCtx(new ExpressionContextForTest());
+ auto pipeline = unittest::assertGet(
+ Pipeline::create({DocumentSourceMock::create(), RenamesAToB::create()}, expCtx));
+ {
+ auto renames = pipeline->renamedPaths({"b"});
+ ASSERT(static_cast<bool>(renames));
+ auto nameMap = *renames;
+ ASSERT_EQ(nameMap.size(), 1UL);
+ ASSERT_EQ(nameMap["b"], "a");
+ }
+ {
+ auto renames = pipeline->renamedPaths({"b", "c.d"});
+ ASSERT(static_cast<bool>(renames));
+ auto nameMap = *renames;
+ ASSERT_EQ(nameMap.size(), 2UL);
+ ASSERT_EQ(nameMap["b"], "a");
+ ASSERT_EQ(nameMap["c.d"], "c.d");
+ }
+ {
+ // This is strange; the mock stage reports to essentially duplicate the "a" field into "b".
+ // Because of this, both "b" and "a" should map to "a".
+ auto renames = pipeline->renamedPaths({"b", "a"});
+ ASSERT(static_cast<bool>(renames));
+ auto nameMap = *renames;
+ ASSERT_EQ(nameMap.size(), 2UL);
+ ASSERT_EQ(nameMap["b"], "a");
+ ASSERT_EQ(nameMap["a"], "a");
+ }
+}
+
+TEST(PipelineRenameTracking, ReportsIdentityMapWhenGivenEmptyIteratorRange) {
+ boost::intrusive_ptr<ExpressionContext> expCtx(new ExpressionContextForTest());
+ auto pipeline = unittest::assertGet(
+ Pipeline::create({DocumentSourceMock::create(), RenamesAToB::create()}, expCtx));
+ {
+ auto renames = Pipeline::renamedPaths(
+ pipeline->getSources().rbegin(), pipeline->getSources().rbegin(), {"b"});
+ ASSERT(static_cast<bool>(renames));
+ auto nameMap = *renames;
+ ASSERT_EQ(nameMap.size(), 1UL);
+ ASSERT_EQ(nameMap["b"], "b");
+ }
+ {
+ auto renames = Pipeline::renamedPaths(
+ pipeline->getSources().rbegin(), pipeline->getSources().rbegin(), {"b", "c.d"});
+ ASSERT(static_cast<bool>(renames));
+ auto nameMap = *renames;
+ ASSERT_EQ(nameMap.size(), 2UL);
+ ASSERT_EQ(nameMap["b"], "b");
+ ASSERT_EQ(nameMap["c.d"], "c.d");
+ }
+}
+
+class RenamesBToC : public DocumentSourceTestOptimizations {
+public:
+ RenamesBToC() : DocumentSourceTestOptimizations() {}
+ static boost::intrusive_ptr<RenamesBToC> create() {
+ return new RenamesBToC();
+ }
+ GetModPathsReturn getModifiedPaths() const final {
+ return {GetModPathsReturn::Type::kFiniteSet, std::set<std::string>{}, {{"c", "b"}}};
+ }
+};
+
+TEST(PipelineRenameTracking, ReportsNewNameAcrossMultipleRenames) {
+ boost::intrusive_ptr<ExpressionContext> expCtx(new ExpressionContextForTest());
+ auto pipeline = unittest::assertGet(Pipeline::create(
+ {DocumentSourceMock::create(), RenamesAToB::create(), RenamesBToC::create()}, expCtx));
+ auto renames = pipeline->renamedPaths({"c"});
+ ASSERT(static_cast<bool>(renames));
+ auto nameMap = *renames;
+ ASSERT_EQ(nameMap.size(), 1UL);
+ ASSERT_EQ(nameMap["c"], "a");
+}
+
+class RenamesBToA : public DocumentSourceTestOptimizations {
+public:
+ RenamesBToA() : DocumentSourceTestOptimizations() {}
+ static boost::intrusive_ptr<RenamesBToA> create() {
+ return new RenamesBToA();
+ }
+ GetModPathsReturn getModifiedPaths() const final {
+ return {GetModPathsReturn::Type::kFiniteSet, std::set<std::string>{}, {{"b", "a"}}};
+ }
+};
+
+TEST(PipelineRenameTracking, CanHandleBackAndForthRename) {
+ boost::intrusive_ptr<ExpressionContext> expCtx(new ExpressionContextForTest());
+ auto pipeline = unittest::assertGet(Pipeline::create(
+ {DocumentSourceMock::create(), RenamesAToB::create(), RenamesBToA::create()}, expCtx));
+ auto renames = pipeline->renamedPaths({"a"});
+ ASSERT(static_cast<bool>(renames));
+ auto nameMap = *renames;
+ ASSERT_EQ(nameMap.size(), 1UL);
+ ASSERT_EQ(nameMap["a"], "a");
+}
+
+} // namespace
+
class All : public Suite {
public:
All() : Suite("PipelineOptimizations") {}
diff --git a/src/mongo/s/SConscript b/src/mongo/s/SConscript
index 2796119cc9e..522419420c8 100644
--- a/src/mongo/s/SConscript
+++ b/src/mongo/s/SConscript
@@ -360,7 +360,6 @@ env.CppUnitTest(
'shard_key_pattern_test.cpp',
],
LIBDEPS=[
- "$BUILD_DIR/mongo/db/auth/authmocks",
'$BUILD_DIR/mongo/db/serveronly',
'catalog_cache_test_fixture',
]
@@ -392,6 +391,7 @@ env.Library(
'catalog_cache_test_fixture.cpp',
],
LIBDEPS=[
+ "$BUILD_DIR/mongo/db/auth/authmocks",
'$BUILD_DIR/mongo/db/query/query_test_service_context',
'coreshard',
'sharding_router_test_fixture',
diff --git a/src/mongo/s/catalog_cache_test_fixture.cpp b/src/mongo/s/catalog_cache_test_fixture.cpp
index 835aedebf2f..c7df14e96b1 100644
--- a/src/mongo/s/catalog_cache_test_fixture.cpp
+++ b/src/mongo/s/catalog_cache_test_fixture.cpp
@@ -160,9 +160,9 @@ std::shared_ptr<ChunkManager> CatalogCacheTestFixture::makeChunkManager(
return routingInfo->cm();
}
-void CatalogCacheTestFixture::expectGetDatabase(NamespaceString nss) {
+void CatalogCacheTestFixture::expectGetDatabase(NamespaceString nss, std::string shardId) {
expectFindSendBSONObjVector(kConfigHostAndPort, [&]() {
- DatabaseType db(nss.db().toString(), {"0"}, true, databaseVersion::makeNew());
+ DatabaseType db(nss.db().toString(), {shardId}, true, databaseVersion::makeNew());
return std::vector<BSONObj>{db.toBSON()};
}());
}
diff --git a/src/mongo/s/catalog_cache_test_fixture.h b/src/mongo/s/catalog_cache_test_fixture.h
index ef155fa1c8a..c7cacad7eec 100644
--- a/src/mongo/s/catalog_cache_test_fixture.h
+++ b/src/mongo/s/catalog_cache_test_fixture.h
@@ -84,7 +84,7 @@ protected:
/**
* Mocks network responses for loading a sharded database and collection from the config server.
*/
- void expectGetDatabase(NamespaceString nss);
+ void expectGetDatabase(NamespaceString nss, std::string primaryShard = "0");
void expectGetCollection(NamespaceString nss,
OID epoch,
const ShardKeyPattern& shardKeyPattern);
diff --git a/src/mongo/s/commands/SConscript b/src/mongo/s/commands/SConscript
index 10d8ad623fb..3c6fe218bfd 100644
--- a/src/mongo/s/commands/SConscript
+++ b/src/mongo/s/commands/SConscript
@@ -153,7 +153,6 @@ env.CppUnitTest(
LIBDEPS=[
'cluster_commands',
'cluster_command_test_fixture',
- '$BUILD_DIR/mongo/db/auth/authmocks',
'$BUILD_DIR/mongo/db/auth/saslauth',
],
)
diff --git a/src/mongo/s/grid.h b/src/mongo/s/grid.h
index 02da388532b..ab61f840282 100644
--- a/src/mongo/s/grid.h
+++ b/src/mongo/s/grid.h
@@ -119,6 +119,10 @@ public:
return _catalogClient.get();
}
+ /**
+ * Can return nullptr. For example, if this is a mongod that is not a shard server. This is
+ * always present on mongos after startup.
+ */
CatalogCache* catalogCache() const {
return _catalogCache.get();
}
diff --git a/src/mongo/s/query/SConscript b/src/mongo/s/query/SConscript
index 9882356dd90..6dd7d6605c6 100644
--- a/src/mongo/s/query/SConscript
+++ b/src/mongo/s/query/SConscript
@@ -41,6 +41,7 @@ env.CppUnitTest(
target="cluster_aggregate_test",
source=[
"cluster_aggregate_test.cpp",
+ "cluster_aggregation_planner_test.cpp",
],
LIBDEPS=[
'$BUILD_DIR/mongo/db/auth/authmocks',
@@ -57,10 +58,9 @@ env.CppUnitTest(
],
LIBDEPS=[
'cluster_query',
- '$BUILD_DIR/mongo/db/auth/authmocks',
'$BUILD_DIR/mongo/db/keys_collection_client_sharded',
- '$BUILD_DIR/mongo/s/commands/cluster_commands',
'$BUILD_DIR/mongo/s/catalog_cache_test_fixture',
+ '$BUILD_DIR/mongo/s/commands/cluster_commands',
],
)
diff --git a/src/mongo/s/query/cluster_aggregation_planner.cpp b/src/mongo/s/query/cluster_aggregation_planner.cpp
index cd110ca31ff..288dcd48448 100644
--- a/src/mongo/s/query/cluster_aggregation_planner.cpp
+++ b/src/mongo/s/query/cluster_aggregation_planner.cpp
@@ -30,21 +30,25 @@
#include "mongo/s/query/cluster_aggregation_planner.h"
+#include "mongo/db/pipeline/document_source_group.h"
#include "mongo/db/pipeline/document_source_limit.h"
#include "mongo/db/pipeline/document_source_match.h"
#include "mongo/db/pipeline/document_source_merge_cursors.h"
+#include "mongo/db/pipeline/document_source_out.h"
#include "mongo/db/pipeline/document_source_project.h"
#include "mongo/db/pipeline/document_source_skip.h"
#include "mongo/db/pipeline/document_source_sort.h"
#include "mongo/db/pipeline/document_source_unwind.h"
#include "mongo/db/pipeline/document_source_update_on_add_shard.h"
#include "mongo/executor/task_executor_pool.h"
+#include "mongo/s/catalog_cache.h"
#include "mongo/s/grid.h"
#include "mongo/s/query/router_stage_limit.h"
#include "mongo/s/query/router_stage_pipeline.h"
#include "mongo/s/query/router_stage_remove_metadata_fields.h"
#include "mongo/s/query/router_stage_skip.h"
#include "mongo/s/shard_id.h"
+#include "mongo/s/shard_key_pattern.h"
namespace mongo {
namespace cluster_aggregation_planner {
@@ -186,6 +190,136 @@ ClusterClientCursorGuard convertPipelineToRouterStages(
opCtx, std::move(root), Document::allMetadataFieldNames),
std::move(cursorParams));
}
+
+bool stageCanRunInParallel(const boost::intrusive_ptr<DocumentSource>& stage,
+ const std::set<std::string>& nameOfShardKeyFieldsUponEntryToStage) {
+ if (auto needsMerger = dynamic_cast<NeedsMergerDocumentSource*>(stage.get())) {
+ return needsMerger->canRunInParallelBeforeOut(nameOfShardKeyFieldsUponEntryToStage);
+ } else {
+ // This stage is fine to execute in parallel on each stream. For example, a $match can be
+ // applied to each stream in parallel.
+ return true;
+ }
+}
+
+std::string mapToString(const StringMap<std::string>& map) {
+ StringBuilder sb;
+ sb << "{";
+ for (auto&& entry : map) {
+ if (sb.len() != 1) {
+ sb << ", ";
+ }
+ sb << entry.first << ": " << entry.second;
+ }
+ sb << "}";
+ return sb.str();
+}
+
+BSONObj buildNewKeyPattern(const ShardKeyPattern& shardKey, StringMap<std::string> renames) {
+ BSONObjBuilder newPattern;
+ for (auto&& elem : shardKey.getKeyPattern().toBSON()) {
+ auto it = renames.find(elem.fieldNameStringData());
+ invariant(it != renames.end(),
+ str::stream() << "Could not find new name of shard key field \""
+ << elem.fieldName()
+ << "\": rename map was "
+ << mapToString(renames));
+ newPattern.appendAs(elem, it->second);
+ }
+ return newPattern.obj();
+}
+
+StringMap<std::string> computeShardKeyRenameMap(const Pipeline* mergePipeline,
+ std::set<std::string>&& pathsOfShardKey) {
+ auto traversalStart = mergePipeline->getSources().crbegin();
+ auto traversalEnd = mergePipeline->getSources().crend();
+ const auto leadingGroup =
+ dynamic_cast<DocumentSourceGroup*>(mergePipeline->getSources().front().get());
+ if (leadingGroup && leadingGroup->doingMerge()) {
+ // A leading $group stage will not report to preserve any fields, since it blows away the
+ // _id and replaces it with something new. It possibly renames some fields, but when
+ // computing the new shard key we are interested in the name of the shard key *in the middle
+ // of the $group*. The $exchange will be inserted between the shard-local groups and the
+ // global groups. Thus we want to exclude this stage from our rename tracking.
+ traversalEnd = std::prev(traversalEnd);
+ }
+ auto renameMap = Pipeline::renamedPaths(traversalStart, traversalEnd, pathsOfShardKey);
+ invariant(renameMap,
+ str::stream()
+ << "Analyzed pipeline was thought to preserve the shard key fields, but did not: "
+ << Value(mergePipeline->serialize()).toString());
+ return *renameMap;
+}
+
+/**
+ * Returns true if any stage in the pipeline would modify any of the fields in 'shardKeyPaths', or
+ * if there is any stage in the pipeline requires a unified stream to do its computation like a
+ * $limit would.
+ *
+ * Purposefully takes 'shardKeyPaths' by value so that it can be modified throughout.
+ */
+bool anyStageModifiesShardKeyOrNeedsMerge(std::set<std::string> shardKeyPaths,
+ const Pipeline* mergePipeline) {
+ const auto& stages = mergePipeline->getSources();
+ for (auto it = stages.crbegin(); it != stages.crend(); ++it) {
+ const auto& stage = *it;
+ auto renames = stage->renamedPaths(std::move(shardKeyPaths));
+ if (!renames) {
+ return true;
+ }
+ shardKeyPaths.clear();
+ for (auto&& rename : *renames) {
+ shardKeyPaths.insert(rename.second);
+ }
+ if (!stageCanRunInParallel(stage, shardKeyPaths)) {
+ // In order for this stage to work it needs a single input stream which it wouldn't get
+ // if we inserted an exchange before it.
+ return true;
+ }
+ }
+ return false;
+}
+
+boost::optional<ShardedExchangePolicy> walkPipelineBackwardsTrackingShardKey(
+ OperationContext* opCtx,
+ const boost::intrusive_ptr<const DocumentSourceOut>& outStage,
+ const Pipeline* mergePipeline,
+ const ChunkManager& chunkManager) {
+
+ const ShardKeyPattern& shardKey = chunkManager.getShardKeyPattern();
+ std::set<std::string> shardKeyPaths;
+ for (auto&& path : shardKey.getKeyPatternFields()) {
+ shardKeyPaths.emplace(path->dottedField().toString());
+ }
+ if (anyStageModifiesShardKeyOrNeedsMerge(shardKeyPaths, mergePipeline)) {
+ return boost::none;
+ }
+
+ // All the fields of the shard key are preserved by the pipeline, but they might be renamed. To
+ // set up the $exchange, we need to build a fake shard key pattern which uses the names of the
+ // shard key fields as they are at the split point of the pipeline.
+ auto renames = computeShardKeyRenameMap(mergePipeline, std::move(shardKeyPaths));
+ ShardKeyPattern newShardKey(buildNewKeyPattern(shardKey, renames));
+
+ // Given the new shard key fields, build the distribution map.
+ StringMap<std::vector<ChunkRange>> distribution;
+ for (auto&& chunk : chunkManager.chunks()) {
+ // Append the boundaries with the new names from the new shard key.
+ auto translateBoundary = [&renames](const BSONObj& oldBoundary) {
+ BSONObjBuilder bob;
+ for (auto&& elem : oldBoundary) {
+ bob.appendAs(elem, renames[elem.fieldNameStringData()]);
+ }
+ return bob.obj();
+ };
+ distribution[chunk.getShardId().toString()].emplace_back(translateBoundary(chunk.getMin()),
+ translateBoundary(chunk.getMax()));
+ }
+ return ShardedExchangePolicy{
+ ExchangePolicyEnum::kRange,
+ ShardDistributionInfo{ShardKeyPattern{std::move(newShardKey)}, std::move(distribution)}};
+}
+
} // namespace
SplitPipeline splitPipeline(std::unique_ptr<Pipeline, PipelineDeleter> pipeline) {
@@ -254,5 +388,36 @@ ClusterClientCursorGuard buildClusterCursor(OperationContext* opCtx,
opCtx, std::make_unique<RouterStagePipeline>(std::move(pipeline)), std::move(cursorParams));
}
+boost::optional<ShardedExchangePolicy> checkIfEligibleForExchange(OperationContext* opCtx,
+ const Pipeline* mergePipeline) {
+ const auto grid = Grid::get(opCtx);
+ invariant(grid);
+
+ const auto outStage =
+ dynamic_cast<DocumentSourceOut*>(mergePipeline->getSources().back().get());
+ if (!outStage || outStage->getMode() == WriteModeEnum::kModeReplaceCollection) {
+ // If there's no $out stage we won't try to do an $exchange. If the $out stage is using mode
+ // "replaceCollection", then there's no point doing an $exchange because all the writes will
+ // go to a single node, so we should just perform the merge on that host.
+ return boost::none;
+ }
+
+ const auto routingInfo = uassertStatusOK(
+ grid->catalogCache()->getCollectionRoutingInfo(opCtx, outStage->getOutputNs()));
+ if (!routingInfo.cm()) {
+ return boost::none;
+ }
+
+ // The collection is sharded and we have an $out stage! Here we assume the $out stage has
+ // already verified that the shard key pattern is compatible with the unique key being used.
+ // Assuming this, we just have to make sure the shard key is preserved (though possibly renamed)
+ // all the way to the front of the merge pipeline. If this is the case then for any document
+ // entering the merging pipeline we can predict which shard it will need to end up being
+ // inserted on. With this ability we can insert an exchange on the shards to partition the
+ // documents based on which shard will end up owning them. Then each shard can perform a merge
+ // of only those documents which belong to it (optimistically, barring chunk migrations).
+ return walkPipelineBackwardsTrackingShardKey(opCtx, outStage, mergePipeline, *routingInfo.cm());
+}
+
} // namespace cluster_aggregation_planner
} // namespace mongo
diff --git a/src/mongo/s/query/cluster_aggregation_planner.h b/src/mongo/s/query/cluster_aggregation_planner.h
index 3b3aaa63df4..8a5f494cff2 100644
--- a/src/mongo/s/query/cluster_aggregation_planner.h
+++ b/src/mongo/s/query/cluster_aggregation_planner.h
@@ -28,9 +28,13 @@
#pragma once
+#include "mongo/db/pipeline/document_source_exchange_gen.h"
#include "mongo/db/pipeline/lite_parsed_pipeline.h"
#include "mongo/db/pipeline/pipeline.h"
+#include "mongo/db/pipeline/pipeline.h"
+#include "mongo/s/catalog/type_chunk.h"
#include "mongo/s/query/cluster_client_cursor_impl.h"
+#include "mongo/s/shard_id.h"
namespace mongo {
namespace cluster_aggregation_planner {
@@ -89,5 +93,29 @@ ClusterClientCursorGuard buildClusterCursor(OperationContext* opCtx,
std::unique_ptr<Pipeline, PipelineDeleter> pipeline,
ClusterClientCursorParams&&);
+struct ShardDistributionInfo {
+ // If we want to send data to the shards which would own the data, 'logicalShardKeyAtSplitPoint'
+ // describes which of the fields to use to determine what the final shard key will be. For
+ // example, if the merging pipeline renames "x" to "out_shard_key" and then uses $out to output
+ // to a collection sharded by {out_shard_key: 1}, 'logicalShardKeyAtSplitPoint' will be {x: 1}.
+ ShardKeyPattern logicalShardKeyAtSplitPoint;
+
+ // This map describes which shard is going to receive which range. The keys are the shard ids.
+ StringMap<std::vector<ChunkRange>> partitions;
+};
+
+struct ShardedExchangePolicy {
+ ExchangePolicyEnum policy;
+
+ // Only set if the policy is ranged.
+ boost::optional<ShardDistributionInfo> shardDistributionInfo;
+};
+
+/**
+ * If the merging pipeline is eligible for an $exchange merge optimization, returns the information
+ * required to set that up.
+ */
+boost::optional<ShardedExchangePolicy> checkIfEligibleForExchange(OperationContext* opCtx,
+ const Pipeline* mergePipeline);
} // namespace cluster_aggregation_planner
} // namespace mongo
diff --git a/src/mongo/s/query/cluster_aggregation_planner_test.cpp b/src/mongo/s/query/cluster_aggregation_planner_test.cpp
new file mode 100644
index 00000000000..10782443e35
--- /dev/null
+++ b/src/mongo/s/query/cluster_aggregation_planner_test.cpp
@@ -0,0 +1,545 @@
+/**
+ * Copyright (C) 2018 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+
+#include "mongo/client/remote_command_targeter_factory_mock.h"
+#include "mongo/client/remote_command_targeter_mock.h"
+#include "mongo/db/pipeline/document_source_group.h"
+#include "mongo/db/pipeline/document_source_match.h"
+#include "mongo/db/pipeline/document_source_out.h"
+#include "mongo/db/pipeline/document_source_out_gen.h"
+#include "mongo/db/pipeline/document_source_project.h"
+#include "mongo/db/pipeline/document_source_sort.h"
+#include "mongo/db/pipeline/expression_context_for_test.h"
+#include "mongo/db/pipeline/stub_mongo_process_interface.h"
+#include "mongo/s/catalog/type_shard.h"
+#include "mongo/s/catalog_cache_test_fixture.h"
+#include "mongo/s/query/cluster_aggregation_planner.h"
+#include "mongo/unittest/unittest.h"
+#include "mongo/util/scopeguard.h"
+
+namespace mongo {
+
+namespace {
+
+const NamespaceString kTestAggregateNss = NamespaceString{"unittests", "cluster_exchange"};
+const NamespaceString kTestOutNss = NamespaceString{"unittests", "out_ns"};
+
+/**
+ * For the purposes of this test, assume every collection is sharded. Stages may ask this during
+ * setup. For example, to compute its constraints, the $out stage needs to know if the output
+ * collection is sharded.
+ */
+class FakeMongoProcessInterface : public StubMongoProcessInterface {
+public:
+ bool isSharded(OperationContext* opCtx, const NamespaceString& ns) override {
+ return true;
+ }
+};
+
+class ClusterExchangeTest : public CatalogCacheTestFixture {
+public:
+ void setUp() {
+ CatalogCacheTestFixture::setUp();
+ _expCtx = new ExpressionContextForTest(operationContext(),
+ AggregationRequest{kTestAggregateNss, {}});
+ _expCtx->mongoProcessInterface = std::make_shared<FakeMongoProcessInterface>();
+ }
+
+ boost::intrusive_ptr<ExpressionContext> expCtx() {
+ return _expCtx;
+ }
+
+ boost::intrusive_ptr<DocumentSource> parse(const std::string& json) {
+ auto stages = DocumentSource::parse(_expCtx, fromjson(json));
+ ASSERT_EQ(stages.size(), 1UL);
+ return stages.front();
+ }
+
+ std::vector<ChunkType> makeChunks(const NamespaceString& nss,
+ const OID epoch,
+ std::vector<std::pair<ChunkRange, ShardId>> chunkInfos) {
+ ChunkVersion version(1, 0, epoch);
+ std::vector<ChunkType> chunks;
+ for (auto&& pair : chunkInfos) {
+ chunks.emplace_back(nss, pair.first, version, pair.second);
+ version.incMinor();
+ }
+ return chunks;
+ }
+
+ void loadRoutingTable(NamespaceString nss,
+ const OID epoch,
+ const ShardKeyPattern& shardKey,
+ const std::vector<ChunkType>& chunkDistribution) {
+ auto future = scheduleRoutingInfoRefresh(nss);
+
+ // Mock the expected config server queries.
+ expectGetDatabase(nss);
+ expectGetCollection(nss, epoch, shardKey);
+ expectGetCollection(nss, epoch, shardKey);
+ expectFindSendBSONObjVector(kConfigHostAndPort, [&]() {
+ std::vector<BSONObj> response;
+ for (auto&& chunk : chunkDistribution) {
+ response.push_back(chunk.toConfigBSON());
+ }
+ return response;
+ }());
+
+ future.timed_get(kFutureTimeout).get();
+ }
+
+private:
+ boost::intrusive_ptr<ExpressionContext> _expCtx;
+};
+
+TEST_F(ClusterExchangeTest, ShouldNotExchangeIfPipelineDoesNotEndWithOut) {
+ setupNShards(2);
+ auto mergePipe =
+ unittest::assertGet(Pipeline::create({DocumentSourceLimit::create(expCtx(), 1)}, expCtx()));
+ ASSERT_FALSE(cluster_aggregation_planner::checkIfEligibleForExchange(operationContext(),
+ mergePipe.get()));
+ mergePipe = unittest::assertGet(
+ Pipeline::create({DocumentSourceMatch::create(BSONObj(), expCtx())}, expCtx()));
+ ASSERT_FALSE(cluster_aggregation_planner::checkIfEligibleForExchange(operationContext(),
+ mergePipe.get()));
+}
+
+TEST_F(ClusterExchangeTest, ShouldNotExchangeIfPipelineEndsWithReplaceCollectionOut) {
+ setupNShards(2);
+
+ // For this test pretend 'kTestOutNss' is not sharded so that we can use a "replaceCollection"
+ // $out.
+ const auto originalMongoProcessInterface = expCtx()->mongoProcessInterface;
+ expCtx()->mongoProcessInterface = std::make_shared<StubMongoProcessInterface>();
+ ON_BLOCK_EXIT([&]() { expCtx()->mongoProcessInterface = originalMongoProcessInterface; });
+
+ auto mergePipe = unittest::assertGet(Pipeline::create(
+ {DocumentSourceOut::create(kTestOutNss, expCtx(), WriteModeEnum::kModeReplaceCollection)},
+ expCtx()));
+ ASSERT_FALSE(cluster_aggregation_planner::checkIfEligibleForExchange(operationContext(),
+ mergePipe.get()));
+}
+
+TEST_F(ClusterExchangeTest, SingleOutStageNotEligibleForExchangeIfOutputDatabaseDoesNotExist) {
+ setupNShards(2);
+ auto mergePipe = unittest::assertGet(Pipeline::create(
+ {DocumentSourceOut::create(kTestOutNss, expCtx(), WriteModeEnum::kModeInsertDocuments)},
+ expCtx()));
+
+ auto future = launchAsync([&] {
+ ASSERT_THROWS_CODE(cluster_aggregation_planner::checkIfEligibleForExchange(
+ operationContext(), mergePipe.get()),
+ AssertionException,
+ ErrorCodes::NamespaceNotFound);
+ });
+
+ // Mock out a response as if the database doesn't exist.
+ expectFindSendBSONObjVector(kConfigHostAndPort, []() { return std::vector<BSONObj>{}; }());
+ expectFindSendBSONObjVector(kConfigHostAndPort, []() { return std::vector<BSONObj>{}; }());
+
+ future.timed_get(kFutureTimeout);
+}
+
+// If the output collection doesn't exist, we don't know how to distribute the output documents so
+// cannot insert an $exchange. The $out stage should later create a new, unsharded collection.
+TEST_F(ClusterExchangeTest, SingleOutStageNotEligibleForExchangeIfOutputCollectionDoesNotExist) {
+ setupNShards(2);
+ auto mergePipe = unittest::assertGet(Pipeline::create(
+ {DocumentSourceOut::create(kTestOutNss, expCtx(), WriteModeEnum::kModeInsertDocuments)},
+ expCtx()));
+
+ auto future = launchAsync([&] {
+ ASSERT_FALSE(cluster_aggregation_planner::checkIfEligibleForExchange(operationContext(),
+ mergePipe.get()));
+ });
+
+ expectGetDatabase(kTestOutNss);
+ // Pretend there are no collections in this database.
+ expectFindSendBSONObjVector(kConfigHostAndPort, std::vector<BSONObj>());
+
+ future.timed_get(kFutureTimeout);
+}
+
+// A $limit stage requires a single merger.
+TEST_F(ClusterExchangeTest, LimitFollowedByOutStageIsNotEligibleForExchange) {
+ // Sharded by {_id: 1}, [MinKey, 0) on shard "0", [0, MaxKey) on shard "1".
+ setupNShards(2);
+ loadRoutingTableWithTwoChunksAndTwoShards(kTestOutNss);
+
+ auto mergePipe = unittest::assertGet(Pipeline::create(
+ {DocumentSourceLimit::create(expCtx(), 6),
+ DocumentSourceOut::create(kTestOutNss, expCtx(), WriteModeEnum::kModeInsertDocuments)},
+ expCtx()));
+
+ auto future = launchAsync([&] {
+ ASSERT_FALSE(cluster_aggregation_planner::checkIfEligibleForExchange(operationContext(),
+ mergePipe.get()));
+ });
+
+ future.timed_get(kFutureTimeout);
+}
+
+TEST_F(ClusterExchangeTest, GroupFollowedByOutIsEligbleForExchange) {
+ // Sharded by {_id: 1}, [MinKey, 0) on shard "0", [0, MaxKey) on shard "1".
+ setupNShards(2);
+ loadRoutingTableWithTwoChunksAndTwoShards(kTestOutNss);
+
+ auto mergePipe = unittest::assertGet(Pipeline::create(
+ {parse("{$group: {_id: '$x', $doingMerge: true}}"),
+ DocumentSourceOut::create(kTestOutNss, expCtx(), WriteModeEnum::kModeInsertDocuments)},
+ expCtx()));
+
+ auto future = launchAsync([&] {
+ auto exchangeSpec = cluster_aggregation_planner::checkIfEligibleForExchange(
+ operationContext(), mergePipe.get());
+ ASSERT_TRUE(exchangeSpec);
+ ASSERT(exchangeSpec->policy == ExchangePolicyEnum::kRange);
+ ASSERT_TRUE(exchangeSpec->shardDistributionInfo);
+ const auto& partitions = exchangeSpec->shardDistributionInfo->partitions;
+ ASSERT_EQ(partitions.size(), 2UL); // One for each shard.
+
+ auto shard0Ranges = partitions.find("0");
+ ASSERT(shard0Ranges != partitions.end());
+ ASSERT_EQ(shard0Ranges->second.size(), 1UL);
+ auto shard0Range = shard0Ranges->second[0];
+ ASSERT(shard0Range == ChunkRange(BSON("_id" << MINKEY), BSON("_id" << 0)));
+ });
+
+ future.timed_get(kFutureTimeout);
+}
+
+TEST_F(ClusterExchangeTest, RenamesAreEligibleForExchange) {
+ // Sharded by {_id: 1}, [MinKey, 0) on shard "0", [0, MaxKey) on shard "1".
+ setupNShards(2);
+ loadRoutingTableWithTwoChunksAndTwoShards(kTestOutNss);
+
+ auto mergePipe = unittest::assertGet(Pipeline::create(
+ {parse("{$group: {_id: '$x', $doingMerge: true}}"),
+ parse("{$project: {temporarily_renamed: '$_id'}}"),
+ parse("{$project: {_id: '$temporarily_renamed'}}"),
+ DocumentSourceOut::create(kTestOutNss, expCtx(), WriteModeEnum::kModeInsertDocuments)},
+ expCtx()));
+
+ auto future = launchAsync([&] {
+ auto exchangeSpec = cluster_aggregation_planner::checkIfEligibleForExchange(
+ operationContext(), mergePipe.get());
+ ASSERT_TRUE(exchangeSpec);
+ ASSERT(exchangeSpec->policy == ExchangePolicyEnum::kRange);
+ ASSERT_TRUE(exchangeSpec->shardDistributionInfo);
+ const auto& partitions = exchangeSpec->shardDistributionInfo->partitions;
+ ASSERT_EQ(partitions.size(), 2UL); // One for each shard.
+
+ auto shard0Ranges = partitions.find("0");
+ ASSERT(shard0Ranges != partitions.end());
+ ASSERT_EQ(shard0Ranges->second.size(), 1UL);
+ auto shard0Range = shard0Ranges->second[0];
+ ASSERT(shard0Range == ChunkRange(BSON("_id" << MINKEY), BSON("_id" << 0)));
+
+ auto shard1Ranges = partitions.find("1");
+ ASSERT(shard1Ranges != partitions.end());
+ ASSERT_EQ(shard1Ranges->second.size(), 1UL);
+ auto shard1Range = shard1Ranges->second[0];
+ ASSERT(shard1Range == ChunkRange(BSON("_id" << 0), BSON("_id" << MAXKEY)));
+ });
+
+ future.timed_get(kFutureTimeout);
+}
+
+TEST_F(ClusterExchangeTest, SortThenGroupIsEligibleForExchange) {
+ // Sharded by {_id: 1}, [MinKey, 0) on shard "0", [0, MaxKey) on shard "1".
+ setupNShards(2);
+ loadRoutingTableWithTwoChunksAndTwoShards(kTestOutNss);
+
+ // This would be the merging half of the pipeline if the original pipeline was
+ // [{$sort: {x: 1}},
+ // {$group: {_id: "$x"}},
+ // {$out: {to: "sharded_by_id", mode: "replaceDocuments"}}].
+ // No $sort stage appears in the merging half since we'd expect that to be absorbed by the
+ // $mergeCursors and AsyncResultsMerger.
+ auto mergePipe = unittest::assertGet(Pipeline::create(
+ {parse("{$group: {_id: '$x'}}"),
+ DocumentSourceOut::create(kTestOutNss, expCtx(), WriteModeEnum::kModeInsertDocuments)},
+ expCtx()));
+
+ auto future = launchAsync([&] {
+ auto exchangeSpec = cluster_aggregation_planner::checkIfEligibleForExchange(
+ operationContext(), mergePipe.get());
+ ASSERT_TRUE(exchangeSpec);
+ ASSERT(exchangeSpec->policy == ExchangePolicyEnum::kRange);
+ ASSERT_TRUE(exchangeSpec->shardDistributionInfo);
+ ASSERT_BSONOBJ_EQ(exchangeSpec->shardDistributionInfo->logicalShardKeyAtSplitPoint.toBSON(),
+ BSON("x" << 1));
+ const auto& partitions = exchangeSpec->shardDistributionInfo->partitions;
+ ASSERT_EQ(partitions.size(), 2UL); // One for each shard.
+
+ auto shard0Ranges = partitions.find("0");
+ ASSERT(shard0Ranges != partitions.end());
+ ASSERT_EQ(shard0Ranges->second.size(), 1UL);
+ auto shard0Range = shard0Ranges->second[0];
+ ASSERT(shard0Range == ChunkRange(BSON("x" << MINKEY), BSON("x" << 0)));
+
+ auto shard1Ranges = partitions.find("1");
+ ASSERT(shard1Ranges != partitions.end());
+ ASSERT_EQ(shard1Ranges->second.size(), 1UL);
+ auto shard1Range = shard1Ranges->second[0];
+ ASSERT(shard1Range == ChunkRange(BSON("x" << 0), BSON("x" << MAXKEY)));
+ });
+
+ future.timed_get(kFutureTimeout);
+}
+
+TEST_F(ClusterExchangeTest, ProjectThroughDottedFieldDoesNotPreserveShardKey) {
+ // Sharded by {_id: 1}, [MinKey, 0) on shard "0", [0, MaxKey) on shard "1".
+ setupNShards(2);
+ loadRoutingTableWithTwoChunksAndTwoShards(kTestOutNss);
+
+ auto mergePipe = unittest::assertGet(Pipeline::create(
+ {parse("{$group: {"
+ " _id: {region: '$region', country: '$country'},"
+ " population: {$sum: '$population'},"
+ " cities: {$push: {name: '$city', population: '$population'}}"
+ "}}"),
+ parse(
+ "{$project: {_id: '$_id.country', region: '$_id.region', population: 1, cities: 1}}"),
+ DocumentSourceOut::create(kTestOutNss, expCtx(), WriteModeEnum::kModeInsertDocuments)},
+ expCtx()));
+
+ auto future = launchAsync([&] {
+ auto exchangeSpec = cluster_aggregation_planner::checkIfEligibleForExchange(
+ operationContext(), mergePipe.get());
+ // Because '_id' is populated from '$_id.country', we cannot prove that '_id' is a simple
+ // rename. We cannot prove that '_id' is not an array, and thus the $project could do more
+ // than a rename.
+ ASSERT_FALSE(exchangeSpec);
+ });
+
+ future.timed_get(kFutureTimeout);
+}
+
+TEST_F(ClusterExchangeTest, WordCountUseCaseExample) {
+ // Sharded by {_id: 1}, [MinKey, 0) on shard "0", [0, MaxKey) on shard "1".
+ setupNShards(2);
+ loadRoutingTableWithTwoChunksAndTwoShards(kTestOutNss);
+
+ // As an example of a pipeline that might replace a map reduce, imagine that we are performing a
+ // word count, and the shards part of the pipeline tokenized some text field of each document
+ // into {word: <token>, count: 1}. Then this is the merging half of the pipeline:
+ auto mergePipe = unittest::assertGet(Pipeline::create(
+ {parse("{$group: {"
+ " _id: '$word',"
+ " count: {$sum: 1},"
+ " $doingMerge: true"
+ "}}"),
+ DocumentSourceOut::create(kTestOutNss, expCtx(), WriteModeEnum::kModeInsertDocuments)},
+ expCtx()));
+
+ auto future = launchAsync([&] {
+ auto exchangeSpec = cluster_aggregation_planner::checkIfEligibleForExchange(
+ operationContext(), mergePipe.get());
+ ASSERT_TRUE(exchangeSpec);
+ ASSERT(exchangeSpec->policy == ExchangePolicyEnum::kRange);
+ ASSERT_TRUE(exchangeSpec->shardDistributionInfo);
+ const auto& partitions = exchangeSpec->shardDistributionInfo->partitions;
+ ASSERT_EQ(partitions.size(), 2UL); // One for each shard.
+
+ auto shard0Ranges = partitions.find("0");
+ ASSERT(shard0Ranges != partitions.end());
+ ASSERT_EQ(shard0Ranges->second.size(), 1UL);
+ auto shard0Range = shard0Ranges->second[0];
+ ASSERT(shard0Range == ChunkRange(BSON("_id" << MINKEY), BSON("_id" << 0)));
+
+ auto shard1Ranges = partitions.find("1");
+ ASSERT(shard1Ranges != partitions.end());
+ ASSERT_EQ(shard1Ranges->second.size(), 1UL);
+ auto shard1Range = shard1Ranges->second[0];
+ ASSERT(shard1Range == ChunkRange(BSON("_id" << 0), BSON("_id" << MAXKEY)));
+ });
+
+ future.timed_get(kFutureTimeout);
+}
+
+TEST_F(ClusterExchangeTest, WordCountUseCaseExampleShardedByWord) {
+ setupNShards(2);
+ const OID epoch = OID::gen();
+ ShardKeyPattern shardKey(BSON("word" << 1));
+ loadRoutingTable(kTestOutNss,
+ epoch,
+ shardKey,
+ makeChunks(kTestOutNss,
+ epoch,
+ {{ChunkRange{BSON("word" << MINKEY),
+ BSON("word"
+ << "hello")},
+ ShardId("0")},
+ {ChunkRange{BSON("word"
+ << "hello"),
+ BSON("word"
+ << "world")},
+ ShardId("1")},
+ {ChunkRange{BSON("word"
+ << "world"),
+ BSON("word" << MAXKEY)},
+ ShardId("1")}}));
+
+ // As an example of a pipeline that might replace a map reduce, imagine that we are performing a
+ // word count, and the shards part of the pipeline tokenized some text field of each document
+ // into {word: <token>, count: 1}. Then this is the merging half of the pipeline:
+ auto mergePipe = unittest::assertGet(Pipeline::create(
+ {parse("{$group: {"
+ " _id: '$word',"
+ " count: {$sum: 1},"
+ " $doingMerge: true"
+ "}}"),
+ parse("{$project: {word: '$_id', count: 1}}"),
+ DocumentSourceOut::create(kTestOutNss, expCtx(), WriteModeEnum::kModeInsertDocuments)},
+ expCtx()));
+
+ auto future = launchAsync([&] {
+ auto exchangeSpec = cluster_aggregation_planner::checkIfEligibleForExchange(
+ operationContext(), mergePipe.get());
+ ASSERT_TRUE(exchangeSpec);
+ ASSERT(exchangeSpec->policy == ExchangePolicyEnum::kRange);
+ ASSERT_TRUE(exchangeSpec->shardDistributionInfo);
+ ASSERT_BSONOBJ_EQ(exchangeSpec->shardDistributionInfo->logicalShardKeyAtSplitPoint.toBSON(),
+ BSON("_id" << 1));
+ const auto& partitions = exchangeSpec->shardDistributionInfo->partitions;
+ ASSERT_EQ(partitions.size(), 2UL); // One for each shard.
+
+ auto shard0Ranges = partitions.find("0");
+ ASSERT(shard0Ranges != partitions.end());
+ ASSERT_EQ(shard0Ranges->second.size(), 1UL);
+ auto firstRangeOnShard0 = shard0Ranges->second[0];
+ ASSERT(firstRangeOnShard0 == ChunkRange(BSON("_id" << MINKEY),
+ BSON("_id"
+ << "hello")));
+
+ auto shard1Ranges = partitions.find("1");
+ ASSERT(shard1Ranges != partitions.end());
+ ASSERT_EQ(shard1Ranges->second.size(), 2UL);
+ auto firstRangeOnShard1 = shard1Ranges->second[0];
+ ASSERT(firstRangeOnShard1 == ChunkRange(BSON("_id"
+ << "hello"),
+ BSON("_id"
+ << "world")));
+ auto secondRangeOnShard1 = shard1Ranges->second[1];
+ ASSERT(secondRangeOnShard1 == ChunkRange(BSON("_id"
+ << "world"),
+ BSON("_id" << MAXKEY)));
+ });
+
+ future.timed_get(kFutureTimeout);
+}
+
+// We'd like to test that a compound shard key pattern can be used. Strangely, the only case we can
+// actually perform an exchange today on a compound shard key is when the shard key contains fields
+// which are all duplicates. This is due to the limitations of tracking renames through dots, see
+// SERVER-36787 for an example.
+TEST_F(ClusterExchangeTest, CompoundShardKeyThreeShards) {
+ const OID epoch = OID::gen();
+ ShardKeyPattern shardKey(BSON("x" << 1 << "y" << 1));
+
+ setupNShards(3);
+ const std::vector<std::string> xBoundaries = {"a", "g", "m", "r", "u"};
+ auto chunks = [&]() {
+ std::vector<ChunkType> chunks;
+ ChunkVersion version(1, 0, epoch);
+ chunks.emplace_back(kTestOutNss,
+ ChunkRange{BSON("x" << MINKEY << "y" << MINKEY),
+ BSON("x" << xBoundaries[0] << "y" << MINKEY)},
+ version,
+ ShardId("0"));
+ for (std::size_t i = 0; i < xBoundaries.size() - 1; ++i) {
+ chunks.emplace_back(kTestOutNss,
+ ChunkRange{BSON("x" << xBoundaries[i] << "y" << MINKEY),
+ BSON("x" << xBoundaries[i + 1] << "y" << MINKEY)},
+ version,
+ ShardId(str::stream() << i % 3));
+ }
+ chunks.emplace_back(kTestOutNss,
+ ChunkRange{BSON("x" << xBoundaries.back() << "y" << MINKEY),
+ BSON("x" << MAXKEY << "y" << MAXKEY)},
+ version,
+ ShardId(str::stream() << "1"));
+ return chunks;
+ }();
+
+ loadRoutingTable(kTestOutNss, epoch, shardKey, chunks);
+
+ auto mergePipe = unittest::assertGet(Pipeline::create(
+ {parse("{$group: {"
+ " _id: '$x',"
+ " $doingMerge: true"
+ "}}"),
+ parse("{$project: {x: '$_id', y: '$_id'}}"),
+ DocumentSourceOut::create(kTestOutNss, expCtx(), WriteModeEnum::kModeInsertDocuments)},
+ expCtx()));
+
+ auto future = launchAsync([&] {
+ auto exchangeSpec = cluster_aggregation_planner::checkIfEligibleForExchange(
+ operationContext(), mergePipe.get());
+ ASSERT_TRUE(exchangeSpec);
+ ASSERT(exchangeSpec->policy == ExchangePolicyEnum::kRange);
+ ASSERT_TRUE(exchangeSpec->shardDistributionInfo);
+ ASSERT_BSONOBJ_EQ(exchangeSpec->shardDistributionInfo->logicalShardKeyAtSplitPoint.toBSON(),
+ BSON("_id" << 1 << "_id" << 1));
+ const auto& partitions = exchangeSpec->shardDistributionInfo->partitions;
+ ASSERT_EQ(partitions.size(), 3UL); // One for each shard.
+
+ // Make sure each shard has the same chunks that it started with, just with the names of the
+ // boundary fields translated. For each chunk that we created to begin with, make sure its
+ // corresponding/translated chunk is present on the same shard in the same order.
+ StringMap<std::size_t> numChunksExaminedOnShard = {{"0", 0}, {"1", 0}, {"2", 0}};
+ for (auto&& chunk : chunks) {
+ auto shardId = chunk.getShard().toString();
+ auto shardRanges = partitions.find(shardId);
+ ASSERT(shardRanges != partitions.end());
+ auto nextChunkOnShard = numChunksExaminedOnShard[shardId]++;
+ ASSERT_LTE(nextChunkOnShard, shardRanges->second.size());
+ auto outputChunk = shardRanges->second[nextChunkOnShard];
+
+ auto expectedChunkMin = [&]() {
+ ASSERT_EQ(chunk.getMin().nFields(), 2);
+ return BSON("_id" << chunk.getMin()["x"] << "_id" << chunk.getMin()["y"]);
+ }();
+ ASSERT_BSONOBJ_EQ(outputChunk.getMin(), expectedChunkMin);
+
+ auto expectedChunkMax = [&]() {
+ ASSERT_EQ(chunk.getMax().nFields(), 2);
+ return BSON("_id" << chunk.getMax()["x"] << "_id" << chunk.getMax()["y"]);
+ }();
+ ASSERT_BSONOBJ_EQ(outputChunk.getMax(), expectedChunkMax);
+ }
+ });
+
+ future.timed_get(kFutureTimeout);
+}
+} // namespace
+} // namespace mongo