diff options
26 files changed, 1725 insertions, 106 deletions
diff --git a/src/mongo/db/pipeline/document_source.cpp b/src/mongo/db/pipeline/document_source.cpp index d51ad84ab67..32edc1704d5 100644 --- a/src/mongo/db/pipeline/document_source.cpp +++ b/src/mongo/db/pipeline/document_source.cpp @@ -1,30 +1,30 @@ /** -* Copyright (C) 2011 10gen Inc. -* -* This program is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License, version 3, -* as published by the Free Software Foundation. -* -* This program is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see <http://www.gnu.org/licenses/>. -* -* As a special exception, the copyright holders give permission to link the -* code of portions of this program with the OpenSSL library under certain -* conditions as described in each individual source file and distribute -* linked combinations including the program with the OpenSSL library. You -* must comply with the GNU Affero General Public License in all respects for -* all of the code used other than as permitted herein. If you modify file(s) -* with this exception, you may extend this exception to your version of the -* file(s), but you are not obligated to do so. If you do not wish to do so, -* delete this exception statement from your version. If you delete this -* exception statement from all source files in the program, then also delete -* it in the license file. -*/ + * Copyright (C) 2011 10gen Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ #include "mongo/platform/basic.h" @@ -160,6 +160,52 @@ splitMatchByModifiedFields(const boost::intrusive_ptr<DocumentSourceMatch>& matc return match->splitSourceBy(modifiedPaths, modifiedPathsRet.renames); } +/** + * If 'pathOfInterest' or some path prefix of 'pathOfInterest' is renamed, returns the new name for + * 'pathOfInterest', otherwise returns boost::none. + * For example, if 'renamedPaths' is {"c.d", "c"}, and 'pathOfInterest' is "c.d.f", returns "c.f". + */ +boost::optional<std::string> findNewName(const StringMap<std::string>& renamedPaths, + std::string pathOfInterest) { + FieldPath fullPathOfInterest(pathOfInterest); + StringBuilder toLookup; + std::size_t pathIndex = 0; + while (pathIndex < fullPathOfInterest.getPathLength()) { + if (pathIndex != 0) { + toLookup << "."; + } + toLookup << fullPathOfInterest.getFieldName(pathIndex++); + + auto it = renamedPaths.find(toLookup.stringData()); + if (it != renamedPaths.end()) { + const auto& newPathOfPrefix = it->second; + // We found a rename! Note this might be a rename of the prefix of the path, so we have + // to add back on the suffix that was unchanged. + StringBuilder renamedPath; + renamedPath << newPathOfPrefix; + while (pathIndex < fullPathOfInterest.getPathLength()) { + renamedPath << "." << fullPathOfInterest.getFieldName(pathIndex++); + } + return {renamedPath.str()}; + } + } + return boost::none; +} + +StringMap<std::string> computeNewNamesAssumingAnyPathsNotRenamedAreUnmodified( + const StringMap<std::string>& renamedPaths, const std::set<std::string>& pathsOfInterest) { + StringMap<std::string> renameOut; + for (auto&& ofInterest : pathsOfInterest) { + if (auto newName = findNewName(renamedPaths, ofInterest)) { + renameOut[ofInterest] = *newName; + } else { + // This path was not renamed, assume it was unchanged and map it to itself. + renameOut[ofInterest] = ofInterest; + } + } + return renameOut; +} + } // namespace Pipeline::SourceContainer::iterator DocumentSource::optimizeAt( @@ -202,6 +248,53 @@ Pipeline::SourceContainer::iterator DocumentSource::optimizeAt( return doOptimizeAt(itr, container); } +boost::optional<StringMap<std::string>> DocumentSource::renamedPaths( + const std::set<std::string>& pathsOfInterest) const { + auto modifiedPathsRet = this->getModifiedPaths(); + switch (modifiedPathsRet.type) { + case DocumentSource::GetModPathsReturn::Type::kNotSupported: + case DocumentSource::GetModPathsReturn::Type::kAllPaths: + return boost::none; + case DocumentSource::GetModPathsReturn::Type::kFiniteSet: { + for (auto&& modified : modifiedPathsRet.paths) { + for (auto&& ofInterest : pathsOfInterest) { + // Any overlap of the path means the path of interest is not preserved. For + // example, if the path of interest is "a.b", then a modified path of "a", + // "a.b", or "a.b.c" would all signal that "a.b" is not preserved. + if (ofInterest == modified || + expression::isPathPrefixOf(ofInterest, modified) || + expression::isPathPrefixOf(modified, ofInterest)) { + // This stage modifies at least one of the fields which the caller is + // interested in, bail out. + return boost::none; + } + } + } + + // None of the paths of interest were modified, construct the result map, mapping + // the names after this stage to the names before this stage. + return computeNewNamesAssumingAnyPathsNotRenamedAreUnmodified(modifiedPathsRet.renames, + pathsOfInterest); + } + case DocumentSource::GetModPathsReturn::Type::kAllExcept: { + auto preservedPaths = modifiedPathsRet.paths; + for (auto&& rename : modifiedPathsRet.renames) { + // For the purposes of checking which paths are modified, consider renames to + // preserve the path. We'll circle back later to figure out the new name if + // appropriate. + preservedPaths.insert(rename.first); + } + auto modifiedPaths = extractModifiedDependencies(pathsOfInterest, preservedPaths); + if (modifiedPaths.empty()) { + return computeNewNamesAssumingAnyPathsNotRenamedAreUnmodified( + modifiedPathsRet.renames, pathsOfInterest); + } + return boost::none; + } + } + MONGO_UNREACHABLE; +} + void DocumentSource::serializeToArray(vector<Value>& array, boost::optional<ExplainOptions::Verbosity> explain) const { Value entry = serialize(explain); @@ -258,4 +351,4 @@ BSONObjSet DocumentSource::truncateSortSet(const BSONObjSet& sorts, return out; } -} +} // namespace mongo diff --git a/src/mongo/db/pipeline/document_source.h b/src/mongo/db/pipeline/document_source.h index 682b03b76e1..e9bf60dba55 100644 --- a/src/mongo/db/pipeline/document_source.h +++ b/src/mongo/db/pipeline/document_source.h @@ -628,6 +628,14 @@ public: } /** + * Given 'currentNames' which describes a set of paths which the caller is interested in, + * returns boost::none if any of those paths are modified by this stage, or a mapping from + * their old name to their new name if they are preserved but possibly renamed by this stage. + */ + boost::optional<StringMap<std::string>> renamedPaths( + const std::set<std::string>& currentNames) const; + + /** * Get the dependencies this operation needs to do its job. If overridden, subclasses must add * all paths needed to apply their transformation to 'deps->fields', and call * 'deps->setNeedsMetadata()' to indicate what metadata (e.g. text score), if any, is required. @@ -731,6 +739,16 @@ public: */ virtual MergingLogic mergingLogic() = 0; + /** + * Returns true if it would be correct to execute this stage in parallel across the shards in + * cases where the final stage is an $out. For example, a $group stage which is just merging the + * groups from the shards can be run in parallel since it will preserve the shard key. + */ + virtual bool canRunInParallelBeforeOut( + const std::set<std::string>& nameOfShardKeyFieldsUponEntryToStage) const { + return false; + } + protected: // It is invalid to delete through a NeedsMergerDocumentSource-typed pointer. virtual ~NeedsMergerDocumentSource() {} diff --git a/src/mongo/db/pipeline/document_source_group.cpp b/src/mongo/db/pipeline/document_source_group.cpp index 1563c6ba14b..de3dcd084ec 100644 --- a/src/mongo/db/pipeline/document_source_group.cpp +++ b/src/mongo/db/pipeline/document_source_group.cpp @@ -1,30 +1,30 @@ /** -* Copyright (C) 2011 10gen Inc. -* -* This program is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License, version 3, -* as published by the Free Software Foundation. -* -* This program is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see <http://www.gnu.org/licenses/>. -* -* As a special exception, the copyright holders give permission to link the -* code of portions of this program with the OpenSSL library under certain -* conditions as described in each individual source file and distribute -* linked combinations including the program with the OpenSSL library. You -* must comply with the GNU Affero General Public License in all respects for -* all of the code used other than as permitted herein. If you modify file(s) -* with this exception, you may extend this exception to your version of the -* file(s), but you are not obligated to do so. If you do not wish to do so, -* delete this exception statement from your version. If you delete this -* exception statement from all source files in the program, then also delete -* it in the license file. -*/ + * Copyright (C) 2011 10gen Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ #include "mongo/platform/basic.h" @@ -43,8 +43,8 @@ namespace mongo { using boost::intrusive_ptr; -using std::shared_ptr; using std::pair; +using std::shared_ptr; using std::vector; REGISTER_DOCUMENT_SOURCE(group, @@ -241,6 +241,25 @@ DepsTracker::State DocumentSourceGroup::getDependencies(DepsTracker* deps) const return DepsTracker::State::EXHAUSTIVE_ALL; } +DocumentSource::GetModPathsReturn DocumentSourceGroup::getModifiedPaths() const { + // We preserve none of the fields, but any fields referenced as part of the group key are + // logically just renamed. + StringMap<std::string> renames; + for (std::size_t i = 0; i < _idExpressions.size(); ++i) { + auto idExp = _idExpressions[i]; + auto pathToPutResultOfExpression = + _idFieldNames.empty() ? "_id" : "_id." + _idFieldNames[i]; + auto computedPaths = idExp->getComputedPaths(pathToPutResultOfExpression); + for (auto&& rename : computedPaths.renames) { + renames[rename.first] = rename.second; + } + } + + return {DocumentSource::GetModPathsReturn::Type::kAllExcept, + std::set<std::string>{}, // No fields are preserved. + std::move(renames)}; +} + intrusive_ptr<DocumentSourceGroup> DocumentSourceGroup::create( const intrusive_ptr<ExpressionContext>& pExpCtx, const boost::intrusive_ptr<Expression>& groupByExpression, @@ -860,6 +879,42 @@ NeedsMergerDocumentSource::MergingLogic DocumentSourceGroup::mergingLogic() { return {mergingGroup}; } + +bool DocumentSourceGroup::pathIncludedInGroupKeys(const std::string& dottedPath) const { + return std::any_of( + _idExpressions.begin(), _idExpressions.end(), [&dottedPath](const auto& exp) { + if (auto fieldExp = dynamic_cast<ExpressionFieldPath*>(exp.get())) { + if (fieldExp->representsPath(dottedPath)) { + return true; + } + } + return false; + }); +} + +bool DocumentSourceGroup::canRunInParallelBeforeOut( + const std::set<std::string>& nameOfShardKeyFieldsUponEntryToStage) const { + if (_doingMerge) { + return true; // This is fine. + } + + // Certain $group stages are allowed to execute on each exchange consumer. In order to + // guarantee each consumer will only group together data from its own shard, the $group must + // group on a superset of the shard key. + for (auto&& currentPathOfShardKey : nameOfShardKeyFieldsUponEntryToStage) { + if (!pathIncludedInGroupKeys(currentPathOfShardKey)) { + // This requires an exact path match, but as a future optimization certain path + // prefixes should be okay. For example, if the shard key path is "a.b", and we're + // grouping by "a", then each group of "a" is strictly more specific than "a.b", so + // we can deduce that grouping by "a" will not need to group together documents + // across different values of the shard key field "a.b", and thus as long as any + // other shard key fields are similarly preserved will not need to consume a merged + // stream to perform the group. + return false; + } + } + return true; +} } // namespace mongo #include "mongo/db/sorter/sorter.cpp" diff --git a/src/mongo/db/pipeline/document_source_group.h b/src/mongo/db/pipeline/document_source_group.h index 421c244641e..7a79db09a56 100644 --- a/src/mongo/db/pipeline/document_source_group.h +++ b/src/mongo/db/pipeline/document_source_group.h @@ -43,13 +43,13 @@ public: using Accumulators = std::vector<boost::intrusive_ptr<Accumulator>>; using GroupsMap = ValueUnorderedMap<Accumulators>; - // Virtuals from DocumentSource. boost::intrusive_ptr<DocumentSource> optimize() final; DepsTracker::State getDependencies(DepsTracker* deps) const final; Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final; GetNextResult getNext() final; const char* getSourceName() const final; BSONObjSet getOutputSorts() final; + GetModPathsReturn getModifiedPaths() const final; /** * Convenience method for creating a new $group stage. If maxMemoryUsageBytes is boost::none, @@ -88,6 +88,14 @@ public: void setIdExpression(const boost::intrusive_ptr<Expression> idExpression); /** + * Returns true if this $group stage represents a 'global' $group which is merging together + * results from earlier partial groups. + */ + bool doingMerge() const { + return _doingMerge; + } + + /** * Tell this source if it is doing a merge from shards. Defaults to false. */ void setDoingMerge(bool doingMerge) { @@ -98,15 +106,17 @@ public: return _streaming; } - // Virtuals for NeedsMergerDocumentSource. - boost::intrusive_ptr<DocumentSource> getShardSource() final; - MergingLogic mergingLogic() final; - /** * Returns true if this $group stage used disk during execution and false otherwise. */ bool usedDisk() final; + // Virtuals for NeedsMergerDocumentSource. + boost::intrusive_ptr<DocumentSource> getShardSource() final; + MergingLogic mergingLogic() final; + bool canRunInParallelBeforeOut( + const std::set<std::string>& nameOfShardKeyFieldsUponEntryToStage) const final; + protected: void doDispose() final; @@ -161,9 +171,14 @@ private: */ Value expandId(const Value& val); - bool _usedDisk; // Keeps track of whether this $group spilled to disk. + /** + * Returns true if 'dottedPath' is one of the group keys present in '_idExpressions'. + */ + bool pathIncludedInGroupKeys(const std::string& dottedPath) const; + std::vector<AccumulationStatement> _accumulatedFields; + bool _usedDisk; // Keeps track of whether this $group spilled to disk. bool _doingMerge; size_t _memoryUsageBytes = 0; size_t _maxMemoryUsageBytes; diff --git a/src/mongo/db/pipeline/document_source_group_test.cpp b/src/mongo/db/pipeline/document_source_group_test.cpp index 3a8cecedf54..b612b9c7512 100644 --- a/src/mongo/db/pipeline/document_source_group_test.cpp +++ b/src/mongo/db/pipeline/document_source_group_test.cpp @@ -189,6 +189,44 @@ TEST_F(DocumentSourceGroupTest, ShouldCorrectlyTrackMemoryUsageBetweenPauses) { ASSERT_THROWS_CODE(group->getNext(), AssertionException, 16945); } +TEST_F(DocumentSourceGroupTest, ShouldReportSingleFieldGroupKeyAsARename) { + auto expCtx = getExpCtx(); + VariablesParseState vps = expCtx->variablesParseState; + auto groupByExpression = ExpressionFieldPath::parse(expCtx, "$x", vps); + auto group = DocumentSourceGroup::create(expCtx, groupByExpression, {}); + auto modifiedPathsRet = group->getModifiedPaths(); + ASSERT(modifiedPathsRet.type == DocumentSource::GetModPathsReturn::Type::kAllExcept); + ASSERT_EQ(modifiedPathsRet.paths.size(), 0UL); + ASSERT_EQ(modifiedPathsRet.renames.size(), 1UL); + ASSERT_EQ(modifiedPathsRet.renames["_id"], "x"); +} + +TEST_F(DocumentSourceGroupTest, ShouldReportMultipleFieldGroupKeysAsARename) { + auto expCtx = getExpCtx(); + VariablesParseState vps = expCtx->variablesParseState; + auto x = ExpressionFieldPath::parse(expCtx, "$x", vps); + auto y = ExpressionFieldPath::parse(expCtx, "$y", vps); + auto groupByExpression = ExpressionObject::create(expCtx, {{"x", x}, {"y", y}}); + auto group = DocumentSourceGroup::create(expCtx, groupByExpression, {}); + auto modifiedPathsRet = group->getModifiedPaths(); + ASSERT(modifiedPathsRet.type == DocumentSource::GetModPathsReturn::Type::kAllExcept); + ASSERT_EQ(modifiedPathsRet.paths.size(), 0UL); + ASSERT_EQ(modifiedPathsRet.renames.size(), 2UL); + ASSERT_EQ(modifiedPathsRet.renames["_id.x"], "x"); + ASSERT_EQ(modifiedPathsRet.renames["_id.y"], "y"); +} + +TEST_F(DocumentSourceGroupTest, ShouldNotReportDottedGroupKeyAsARename) { + auto expCtx = getExpCtx(); + VariablesParseState vps = expCtx->variablesParseState; + auto xDotY = ExpressionFieldPath::parse(expCtx, "$x.y", vps); + auto group = DocumentSourceGroup::create(expCtx, xDotY, {}); + auto modifiedPathsRet = group->getModifiedPaths(); + ASSERT(modifiedPathsRet.type == DocumentSource::GetModPathsReturn::Type::kAllExcept); + ASSERT_EQ(modifiedPathsRet.paths.size(), 0UL); + ASSERT_EQ(modifiedPathsRet.renames.size(), 0UL); +} + BSONObj toBson(const intrusive_ptr<DocumentSource>& source) { vector<Value> arr; source->serializeToArray(arr); diff --git a/src/mongo/db/pipeline/document_source_mock.h b/src/mongo/db/pipeline/document_source_mock.h index cc81d533fb2..0a0ad30ca41 100644 --- a/src/mongo/db/pipeline/document_source_mock.h +++ b/src/mongo/db/pipeline/document_source_mock.h @@ -89,6 +89,13 @@ public: return this; } + /** + * This stage does not modify anything. + */ + GetModPathsReturn getModifiedPaths() const override { + return {GetModPathsReturn::Type::kFiniteSet, std::set<std::string>{}, {}}; + } + // Return documents from front of queue. std::deque<GetNextResult> queue; diff --git a/src/mongo/db/pipeline/document_source_out.cpp b/src/mongo/db/pipeline/document_source_out.cpp index e5ccdc038bc..9f59cc099b0 100644 --- a/src/mongo/db/pipeline/document_source_out.cpp +++ b/src/mongo/db/pipeline/document_source_out.cpp @@ -165,18 +165,11 @@ DocumentSource::GetNextResult DocumentSourceOut::getNext() { MONGO_UNREACHABLE; } -DocumentSourceOut::DocumentSourceOut(const NamespaceString& outputNs, - const boost::intrusive_ptr<ExpressionContext>& expCtx, - WriteModeEnum mode, - std::set<FieldPath> uniqueKey) - : DocumentSource(expCtx), - _done(false), - _outputNs(outputNs), - _mode(mode), - _uniqueKeyFields(std::move(uniqueKey)) {} - -intrusive_ptr<DocumentSource> DocumentSourceOut::createFromBson( - BSONElement elem, const intrusive_ptr<ExpressionContext>& expCtx) { +intrusive_ptr<DocumentSourceOut> DocumentSourceOut::create( + NamespaceString outputNs, + const intrusive_ptr<ExpressionContext>& expCtx, + WriteModeEnum mode, + std::set<FieldPath> uniqueKey) { uassert(ErrorCodes::OperationNotSupportedInTransaction, "$out cannot be used in a transaction", @@ -187,6 +180,45 @@ intrusive_ptr<DocumentSource> DocumentSourceOut::createFromBson( "$out cannot be used with a 'majority' read concern level", readConcernLevel != repl::ReadConcernLevel::kMajorityReadConcern); + // Although we perform a check for "replaceCollection" mode with a sharded output collection + // during lite parsing, we need to do it here as well in case mongos is stale or the command is + // sent directly to the shard. + uassert(17017, + str::stream() << "$out with mode " << WriteMode_serializer(mode) + << " is not supported to an existing *sharded* output collection.", + !(mode == WriteModeEnum::kModeReplaceCollection && + expCtx->mongoProcessInterface->isSharded(expCtx->opCtx, outputNs))); + + uassert(17385, "Can't $out to special collection: " + outputNs.coll(), !outputNs.isSpecial()); + + switch (mode) { + case WriteModeEnum::kModeReplaceCollection: + return new DocumentSourceOutReplaceColl( + std::move(outputNs), expCtx, mode, std::move(uniqueKey)); + case WriteModeEnum::kModeInsertDocuments: + return new DocumentSourceOutInPlace( + std::move(outputNs), expCtx, mode, std::move(uniqueKey)); + case WriteModeEnum::kModeReplaceDocuments: + return new DocumentSourceOutInPlaceReplace( + std::move(outputNs), expCtx, mode, std::move(uniqueKey)); + default: + MONGO_UNREACHABLE; + } +} + +DocumentSourceOut::DocumentSourceOut(NamespaceString outputNs, + const intrusive_ptr<ExpressionContext>& expCtx, + WriteModeEnum mode, + std::set<FieldPath> uniqueKey) + : DocumentSource(expCtx), + _done(false), + _outputNs(std::move(outputNs)), + _mode(mode), + _uniqueKeyFields(std::move(uniqueKey)) {} + +intrusive_ptr<DocumentSource> DocumentSourceOut::createFromBson( + BSONElement elem, const intrusive_ptr<ExpressionContext>& expCtx) { + auto mode = WriteModeEnum::kModeReplaceCollection; std::set<FieldPath> uniqueKey; NamespaceString outputNs; @@ -221,27 +253,7 @@ intrusive_ptr<DocumentSource> DocumentSourceOut::createFromBson( << typeName(elem.type())); } - // Although we perform a check for "replaceCollection" mode with a sharded output collection - // during lite parsing, we need to do it here as well in case mongos is stale or the command is - // sent directly to the shard. - uassert(17017, - str::stream() << "$out with mode " << WriteMode_serializer(mode) - << " is not supported to an existing *sharded* output collection.", - !(mode == WriteModeEnum::kModeReplaceCollection && - expCtx->mongoProcessInterface->isSharded(expCtx->opCtx, outputNs))); - - uassert(17385, "Can't $out to special collection: " + outputNs.coll(), !outputNs.isSpecial()); - - switch (mode) { - case WriteModeEnum::kModeReplaceCollection: - return new DocumentSourceOutReplaceColl(outputNs, expCtx, mode, uniqueKey); - case WriteModeEnum::kModeInsertDocuments: - return new DocumentSourceOutInPlace(outputNs, expCtx, mode, uniqueKey); - case WriteModeEnum::kModeReplaceDocuments: - return new DocumentSourceOutInPlaceReplace(outputNs, expCtx, mode, uniqueKey); - default: - MONGO_UNREACHABLE; - } + return create(std::move(outputNs), expCtx, mode, std::move(uniqueKey)); } Value DocumentSourceOut::serialize(boost::optional<ExplainOptions::Verbosity> explain) const { diff --git a/src/mongo/db/pipeline/document_source_out.h b/src/mongo/db/pipeline/document_source_out.h index 4ae2c9af5e8..45a631cabd4 100644 --- a/src/mongo/db/pipeline/document_source_out.h +++ b/src/mongo/db/pipeline/document_source_out.h @@ -59,7 +59,7 @@ public: bool _allowShardedOutNss; }; - DocumentSourceOut(const NamespaceString& outputNs, + DocumentSourceOut(NamespaceString outputNs, const boost::intrusive_ptr<ExpressionContext>& expCtx, WriteModeEnum mode, std::set<FieldPath> uniqueKey); @@ -70,28 +70,46 @@ public: const char* getSourceName() const final; Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final; DepsTracker::State getDependencies(DepsTracker* deps) const final; + /** + * For purposes of tracking which fields come from where, this stage does not modify any fields. + */ + GetModPathsReturn getModifiedPaths() const final { + return {GetModPathsReturn::Type::kFiniteSet, std::set<std::string>{}, {}}; + } StageConstraints constraints(Pipeline::SplitState pipeState) const final { return {StreamType::kStreaming, PositionRequirement::kLast, + // A $out to an unsharded collection should merge on the primary shard to perform + // local writes. A $out to a sharded collection has no requirement, since each shard + // can perform its own portion of the write. HostTypeRequirement::kPrimaryShard, DiskUseRequirement::kWritesPersistentData, FacetRequirement::kNotAllowed, TransactionRequirement::kNotAllowed}; } - // Virtuals for NeedsMergerDocumentSource + const NamespaceString& getOutputNs() const { + return _outputNs; + } + + WriteModeEnum getMode() const { + return _mode; + } + boost::intrusive_ptr<DocumentSource> getShardSource() final { - return NULL; + return nullptr; } MergingLogic mergingLogic() final { return {this}; } - - const NamespaceString& getOutputNs() const { - return _outputNs; + virtual bool canRunInParallelBeforeOut( + const std::set<std::string>& nameOfShardKeyFieldsUponEntryToStage) const final { + // If someone is asking the question, this must be the $out stage in question, so yes! + return true; } + /** * Retrieves the namespace to direct each batch to, which may be a temporary namespace or the * final output namespace. @@ -146,6 +164,18 @@ public: */ virtual void finalize() = 0; + /** + * Creates a new $out stage from the given arguments. + */ + static boost::intrusive_ptr<DocumentSourceOut> create( + NamespaceString outputNs, + const boost::intrusive_ptr<ExpressionContext>& expCtx, + WriteModeEnum, + std::set<FieldPath> uniqueKey = std::set<FieldPath>{"_id"}); + + /** + * Parses a $out stage from the user-supplied BSON. + */ static boost::intrusive_ptr<DocumentSource> createFromBson( BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx); diff --git a/src/mongo/db/pipeline/document_source_sort.cpp b/src/mongo/db/pipeline/document_source_sort.cpp index cb753b400f9..ca092927f56 100644 --- a/src/mongo/db/pipeline/document_source_sort.cpp +++ b/src/mongo/db/pipeline/document_source_sort.cpp @@ -513,6 +513,17 @@ NeedsMergerDocumentSource::MergingLogic DocumentSourceSort::mergingLogic() { return {_limitSrc ? DocumentSourceLimit::create(pExpCtx, _limitSrc->getLimit()) : nullptr, sortKeyPattern(SortKeySerialization::kForSortKeyMerging).toBson()}; } + +bool DocumentSourceSort::canRunInParallelBeforeOut( + const std::set<std::string>& nameOfShardKeyFieldsUponEntryToStage) const { + // This is an interesting special case. If there are no further stages which require merging the + // streams into one, a $sort should not require it. This is only the case because the sort order + // doesn't matter for a pipeline ending with a $out stage. We may encounter it here as an + // intermediate stage before a final $group with a $sort, which would make sense. Should we + // extend our analysis to detect if an exchange is appropriate in a general pipeline, a $sort + // would generally require merging the streams before producing output. + return false; +} } // namespace mongo #include "mongo/db/sorter/sorter.cpp" diff --git a/src/mongo/db/pipeline/document_source_sort.h b/src/mongo/db/pipeline/document_source_sort.h index ac860c5b7a5..a104c3f5713 100644 --- a/src/mongo/db/pipeline/document_source_sort.h +++ b/src/mongo/db/pipeline/document_source_sort.h @@ -83,6 +83,8 @@ public: boost::intrusive_ptr<DocumentSource> getShardSource() final; MergingLogic mergingLogic() final; + bool canRunInParallelBeforeOut( + const std::set<std::string>& nameOfShardKeyFieldsUponEntryToStage) const final; /** * Write out a Document whose contents are the sort key pattern. diff --git a/src/mongo/db/pipeline/document_source_test.cpp b/src/mongo/db/pipeline/document_source_test.cpp index 2979179c255..9372e683071 100644 --- a/src/mongo/db/pipeline/document_source_test.cpp +++ b/src/mongo/db/pipeline/document_source_test.cpp @@ -32,6 +32,7 @@ #include "mongo/bson/bsonobj.h" #include "mongo/bson/bsonobjbuilder.h" #include "mongo/db/pipeline/document_source.h" +#include "mongo/db/pipeline/document_source_test_optimizations.h" #include "mongo/db/service_context_test_fixture.h" #include "mongo/unittest/unittest.h" @@ -78,5 +79,244 @@ TEST_F(DocumentSourceTruncateSort, TruncateSortDedupsSortCorrectly) { ASSERT_EQUALS(truncated.count(BSON("a" << 1)), 1U); } +class RenamesAToB : public DocumentSourceTestOptimizations { +public: + RenamesAToB() : DocumentSourceTestOptimizations() {} + GetModPathsReturn getModifiedPaths() const final { + // Pretend this stage simply renames the "a" field to be "b", leaving the value of "a" the + // same. This would be the equivalent of an {$addFields: {b: "$a"}}. + return {GetModPathsReturn::Type::kFiniteSet, std::set<std::string>{}, {{"b", "a"}}}; + } +}; + +TEST(DocumentSourceRenamedPaths, DoesReturnSimpleRenameFromFiniteSetRename) { + RenamesAToB renamesAToB; + auto renames = renamesAToB.renamedPaths({"b"}); + ASSERT(static_cast<bool>(renames)); + auto map = *renames; + ASSERT_EQ(map.size(), 1UL); + ASSERT_EQ(map["b"], "a"); +} + +TEST(DocumentSourceRenamedPaths, ReturnsSimpleMapForUnaffectedFieldsFromFiniteSetRename) { + RenamesAToB renamesAToB; + { + auto renames = renamesAToB.renamedPaths({"c"}); + ASSERT(static_cast<bool>(renames)); + auto map = *renames; + ASSERT_EQ(map.size(), 1UL); + ASSERT_EQ(map["c"], "c"); + } + + { + auto renames = renamesAToB.renamedPaths({"a"}); + ASSERT(static_cast<bool>(renames)); + auto map = *renames; + ASSERT_EQ(map.size(), 1UL); + ASSERT_EQ(map["a"], "a"); + } + + { + auto renames = renamesAToB.renamedPaths({"e", "f", "g"}); + ASSERT(static_cast<bool>(renames)); + auto map = *renames; + ASSERT_EQ(map.size(), 3UL); + ASSERT_EQ(map["e"], "e"); + ASSERT_EQ(map["f"], "f"); + ASSERT_EQ(map["g"], "g"); + } +} + +class RenameCToDPreserveEFG : public DocumentSourceTestOptimizations { +public: + RenameCToDPreserveEFG() : DocumentSourceTestOptimizations() {} + + GetModPathsReturn getModifiedPaths() const final { + return {GetModPathsReturn::Type::kAllExcept, + std::set<std::string>{"e", "f", "g"}, + {{"d", "c"}}}; + } +}; + +TEST(DocumentSourceRenamedPaths, DoesReturnSimpleRenameFromAllExceptRename) { + RenameCToDPreserveEFG renameCToDPreserveEFG; + auto renames = renameCToDPreserveEFG.renamedPaths({"d"}); + ASSERT(static_cast<bool>(renames)); + auto map = *renames; + ASSERT_EQ(map.size(), 1UL); + ASSERT_EQ(map["d"], "c"); +} + +TEST(DocumentSourceRenamedPaths, ReturnsSimpleMapForUnaffectedFieldsFromAllExceptRename) { + RenameCToDPreserveEFG renameCToDPreserveEFG; + { + auto renames = renameCToDPreserveEFG.renamedPaths({"e"}); + ASSERT(static_cast<bool>(renames)); + auto map = *renames; + ASSERT_EQ(map.size(), 1UL); + ASSERT_EQ(map["e"], "e"); + } + + { + auto renames = renameCToDPreserveEFG.renamedPaths({"f", "g"}); + ASSERT(static_cast<bool>(renames)); + auto map = *renames; + ASSERT_EQ(map.size(), 2UL); + ASSERT_EQ(map["f"], "f"); + ASSERT_EQ(map["g"], "g"); + } +} + +class RenameCDotDToEPreserveFDotG : public DocumentSourceTestOptimizations { +public: + RenameCDotDToEPreserveFDotG() : DocumentSourceTestOptimizations() {} + + GetModPathsReturn getModifiedPaths() const final { + return {GetModPathsReturn::Type::kAllExcept, std::set<std::string>{"f.g"}, {{"e", "c.d"}}}; + } +}; + +TEST(DocumentSourceRenamedPaths, DoesReturnRenameToDottedFieldFromAllExceptRename) { + RenameCDotDToEPreserveFDotG renameCDotDToEPreserveFDotG; + { + auto renames = renameCDotDToEPreserveFDotG.renamedPaths({"e"}); + ASSERT(static_cast<bool>(renames)); + auto map = *renames; + ASSERT_EQ(map.size(), 1UL); + ASSERT_EQ(map["e"], "c.d"); + } + { + auto renames = renameCDotDToEPreserveFDotG.renamedPaths({"e.x", "e.y"}); + ASSERT(static_cast<bool>(renames)); + auto map = *renames; + ASSERT_EQ(map.size(), 2UL); + ASSERT_EQ(map["e.x"], "c.d.x"); + ASSERT_EQ(map["e.y"], "c.d.y"); + } +} + +TEST(DocumentSourceRenamedPaths, DoesNotTreatPrefixAsUnmodifiedWhenSuffixIsModifiedFromAllExcept) { + RenameCDotDToEPreserveFDotG renameCDotDToEPreserveFDotG; + { + auto renames = renameCDotDToEPreserveFDotG.renamedPaths({"f"}); + ASSERT_FALSE(static_cast<bool>(renames)); + } + { + // This is the exception, the only path that is not modified. + auto renames = renameCDotDToEPreserveFDotG.renamedPaths({"f.g"}); + ASSERT(static_cast<bool>(renames)); + auto map = *renames; + ASSERT_EQ(map.size(), 1UL); + ASSERT_EQ(map["f.g"], "f.g"); + } + { + // We know "f.g" is preserved, so it follows that a subpath of that path is also preserved. + auto renames = renameCDotDToEPreserveFDotG.renamedPaths({"f.g.x", "f.g.xyz.foobarbaz"}); + ASSERT(static_cast<bool>(renames)); + auto map = *renames; + ASSERT_EQ(map.size(), 2UL); + ASSERT_EQ(map["f.g.x"], "f.g.x"); + ASSERT_EQ(map["f.g.xyz.foobarbaz"], "f.g.xyz.foobarbaz"); + } + + { + // This shares a prefix with the unmodified path, but should not be reported as unmodified. + auto renames = renameCDotDToEPreserveFDotG.renamedPaths({"f.x"}); + ASSERT_FALSE(static_cast<bool>(renames)); + } +} + +class RenameAToXDotYModifyCDotD : public DocumentSourceTestOptimizations { +public: + RenameAToXDotYModifyCDotD() : DocumentSourceTestOptimizations() {} + + GetModPathsReturn getModifiedPaths() const final { + return {GetModPathsReturn::Type::kFiniteSet, std::set<std::string>{"c.d"}, {{"x.y", "a"}}}; + } +}; + +TEST(DocumentSourceRenamedPaths, DoesReturnRenameToDottedFieldFromFiniteSetRename) { + RenameAToXDotYModifyCDotD renameAToXDotYModifyCDotD; + { + auto renames = renameAToXDotYModifyCDotD.renamedPaths({"x.y"}); + ASSERT(static_cast<bool>(renames)); + auto map = *renames; + ASSERT_EQ(map.size(), 1UL); + ASSERT_EQ(map["x.y"], "a"); + } + { + auto renames = renameAToXDotYModifyCDotD.renamedPaths({"x.y.z", "x.y.a.b.c"}); + ASSERT(static_cast<bool>(renames)); + auto map = *renames; + ASSERT_EQ(map.size(), 2UL); + ASSERT_EQ(map["x.y.z"], "a.z"); + ASSERT_EQ(map["x.y.a.b.c"], "a.a.b.c"); + } +} + +TEST(DocumentSourceRenamedPaths, DoesNotTreatPrefixAsUnmodifiedWhenSuffixIsPartOfModifiedSet) { + RenameAToXDotYModifyCDotD renameAToXDotYModifyCDotD; + { + auto renames = renameAToXDotYModifyCDotD.renamedPaths({"c"}); + ASSERT_FALSE(static_cast<bool>(renames)); + } + { + auto renames = renameAToXDotYModifyCDotD.renamedPaths({"c.d"}); + ASSERT_FALSE(static_cast<bool>(renames)); + } + { + auto renames = renameAToXDotYModifyCDotD.renamedPaths({"c.d.e"}); + ASSERT_FALSE(static_cast<bool>(renames)); + } + { + auto renames = renameAToXDotYModifyCDotD.renamedPaths({"c.not_d", "c.decoy"}); + ASSERT(static_cast<bool>(renames)); + auto map = *renames; + ASSERT_EQ(map.size(), 2UL); + ASSERT_EQ(map["c.not_d"], "c.not_d"); + ASSERT_EQ(map["c.decoy"], "c.decoy"); + } +} + +class ModifiesAllPaths : public DocumentSourceTestOptimizations { +public: + ModifiesAllPaths() : DocumentSourceTestOptimizations() {} + GetModPathsReturn getModifiedPaths() const final { + return {GetModPathsReturn::Type::kAllPaths, std::set<std::string>{}, {}}; + } +}; + +TEST(DocumentSourceRenamedPaths, ReturnsNoneWhenAllPathsAreModified) { + ModifiesAllPaths modifiesAllPaths; + { + auto renames = modifiesAllPaths.renamedPaths({"a"}); + ASSERT_FALSE(static_cast<bool>(renames)); + } + { + auto renames = modifiesAllPaths.renamedPaths({"a", "b", "c.d"}); + ASSERT_FALSE(static_cast<bool>(renames)); + } +} + +class ModificationsUnknown : public DocumentSourceTestOptimizations { +public: + ModificationsUnknown() : DocumentSourceTestOptimizations() {} + GetModPathsReturn getModifiedPaths() const final { + return {GetModPathsReturn::Type::kNotSupported, std::set<std::string>{}, {}}; + } +}; + +TEST(DocumentSourceRenamedPaths, ReturnsNoneWhenModificationsAreNotKnown) { + ModificationsUnknown modificationsUnknown; + { + auto renames = modificationsUnknown.renamedPaths({"a"}); + ASSERT_FALSE(static_cast<bool>(renames)); + } + { + auto renames = modificationsUnknown.renamedPaths({"a", "b", "c.d"}); + ASSERT_FALSE(static_cast<bool>(renames)); + } +} + } // namespace } // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_test_optimizations.h b/src/mongo/db/pipeline/document_source_test_optimizations.h new file mode 100644 index 00000000000..e7141835457 --- /dev/null +++ b/src/mongo/db/pipeline/document_source_test_optimizations.h @@ -0,0 +1,67 @@ +/** + * Copyright (C) 2018 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#pragma once + +#include "mongo/db/pipeline/document_source.h" +#include "mongo/db/pipeline/expression_context_for_test.h" + +namespace mongo { +/** + * A dummy class for other tests to inherit from to customize the behavior of any of the virtual + * methods from DocumentSource without having to implement all of them. + */ +class DocumentSourceTestOptimizations : public DocumentSource { +public: + DocumentSourceTestOptimizations() : DocumentSource(new ExpressionContextForTest()) {} + virtual ~DocumentSourceTestOptimizations() = default; + virtual GetNextResult getNext() override { + MONGO_UNREACHABLE; + } + virtual StageConstraints constraints(Pipeline::SplitState) const override { + // Return the default constraints so that this can be used in test pipelines. Constructing a + // pipeline needs to do some validation that depends on this. + return StageConstraints{StreamType::kStreaming, + PositionRequirement::kNone, + HostTypeRequirement::kNone, + DiskUseRequirement::kNoDiskUse, + FacetRequirement::kAllowed, + TransactionRequirement::kNotAllowed}; + } + + virtual GetModPathsReturn getModifiedPaths() const override { + MONGO_UNREACHABLE; + } + +private: + virtual Value serialize(boost::optional<ExplainOptions::Verbosity>) const override { + MONGO_UNREACHABLE; + } +}; + +} // namespace mongo diff --git a/src/mongo/db/pipeline/expression.cpp b/src/mongo/db/pipeline/expression.cpp index 200792ac734..4e8f911087c 100644 --- a/src/mongo/db/pipeline/expression.cpp +++ b/src/mongo/db/pipeline/expression.cpp @@ -1970,6 +1970,16 @@ intrusive_ptr<Expression> ExpressionFieldPath::optimize() { return intrusive_ptr<Expression>(this); } +bool ExpressionFieldPath::representsPath(const std::string& dottedPath) const { + if (_variable != Variables::kRootId || _fieldPath.getPathLength() == 1) { + // This variable refers to the entire document, or refers to a sub-field of something + // besides the root document. Either way we can't prove that it represents the path given by + // 'dottedPath'. + return false; + } + return _fieldPath.tail().fullPath() == dottedPath; +} + void ExpressionFieldPath::_doAddDependencies(DepsTracker* deps) const { if (_variable == Variables::kRootId) { // includes CURRENT when it is equivalent to ROOT. if (_fieldPath.getPathLength() == 1) { diff --git a/src/mongo/db/pipeline/expression.h b/src/mongo/db/pipeline/expression.h index 6cfd4603bac..bc39e271575 100644 --- a/src/mongo/db/pipeline/expression.h +++ b/src/mongo/db/pipeline/expression.h @@ -1108,6 +1108,12 @@ public: const std::string& raw, const VariablesParseState& vps); + /** + * Returns true if this expression logically represents the path 'dottedPath'. For example, if + * 'dottedPath' is 'a.b' and this FieldPath is '$$CURRENT.a.b', returns true. + */ + bool representsPath(const std::string& dottedPath) const; + const FieldPath& getFieldPath() const { return _fieldPath; } diff --git a/src/mongo/db/pipeline/pipeline.cpp b/src/mongo/db/pipeline/pipeline.cpp index 1a82bf1569a..3c84881323f 100644 --- a/src/mongo/db/pipeline/pipeline.cpp +++ b/src/mongo/db/pipeline/pipeline.cpp @@ -478,6 +478,48 @@ void Pipeline::addFinalSource(intrusive_ptr<DocumentSource> source) { _sources.push_back(source); } +boost::optional<StringMap<std::string>> Pipeline::renamedPaths( + SourceContainer::const_reverse_iterator rstart, + SourceContainer::const_reverse_iterator rend, + std::set<std::string> pathsOfInterest) { + // Use a vector to give a path id to each input path. A path's id is its index in the vector. + const std::vector<string> inputPaths(pathsOfInterest.begin(), pathsOfInterest.end()); + std::vector<string> currentPaths(pathsOfInterest.begin(), pathsOfInterest.end()); + + // Loop backwards over the stages. We will re-use 'pathsOfInterest', modifying that set each + // time to be the current set of field's we're interested in. At the same time, we will maintain + // 'currentPaths'. 'pathsOfInterest' is used to compute the renames, while 'currentPaths' is + // used to tie a path back to its id. + // + // Interestingly, 'currentPaths' may contain duplicates. For example, if a stage like + // {$addFields: {a: "$b"}} duplicates the value of "a" and both paths are of interest, then + // 'currentPaths' may begin as ["a", "b"] representing the paths after the $addFields stage, but + // becomes ["a", "a"] via the rename. + for (auto it = rstart; it != rend; ++it) { + boost::optional<StringMap<string>> renamed = (*it)->renamedPaths(pathsOfInterest); + if (!renamed) { + return boost::none; + } + pathsOfInterest.clear(); + for (std::size_t pathId = 0; pathId < inputPaths.size(); ++pathId) { + currentPaths[pathId] = (*renamed)[currentPaths[pathId]]; + pathsOfInterest.insert(currentPaths[pathId]); + } + } + + // We got all the way through the pipeline via renames! Construct the mapping from path at the + // end of the pipeline to path at the beginning. + StringMap<string> renameMap; + for (std::size_t pathId = 0; pathId < currentPaths.size(); ++pathId) { + renameMap[inputPaths[pathId]] = currentPaths[pathId]; + } + return renameMap; +} + +boost::optional<StringMap<string>> Pipeline::renamedPaths(std::set<string> pathsOfInterest) const { + return renamedPaths(_sources.rbegin(), _sources.rend(), std::move(pathsOfInterest)); +} + DepsTracker Pipeline::getDependencies(DepsTracker::MetadataAvailable metadataAvailable) const { DepsTracker deps(metadataAvailable); const bool scopeHasVariables = pCtx->variablesParseState.hasDefinedVariables(); diff --git a/src/mongo/db/pipeline/pipeline.h b/src/mongo/db/pipeline/pipeline.h index d3e99daae31..4874126b398 100644 --- a/src/mongo/db/pipeline/pipeline.h +++ b/src/mongo/db/pipeline/pipeline.h @@ -132,6 +132,19 @@ public: */ static bool aggSupportsWriteConcern(const BSONObj& cmd); + /** + * Given 'pathsOfInterest' which describes a set of paths which the caller is interested in, + * returns boost::none if any of those paths are modified by the section of a pipeline + * described by 'rstart' and 'rend', or a mapping from their name at the end of the pipeline to + * their name at the beginning of the pipeline if they are preserved but possibly renamed by + * this pipeline. Note that the analysis proceeds backwards, so the iterators must be reverse + * iterators. + */ + static boost::optional<StringMap<std::string>> renamedPaths( + SourceContainer::const_reverse_iterator rstart, + SourceContainer::const_reverse_iterator rend, + std::set<std::string> pathsOfInterest); + const boost::intrusive_ptr<ExpressionContext>& getContext() const { return pCtx; } @@ -250,6 +263,15 @@ public: */ DepsTracker getDependencies(DepsTracker::MetadataAvailable metadataAvailable) const; + /** + * Given 'pathsOfInterest' which describes a set of paths which the caller is interested in, + * returns boost::none if any of those paths are modified by this pipeline, or a mapping from + * their name at the end of the pipeline to their name at the beginning of the pipeline if they + * are preserved but possibly renamed by this pipeline. + */ + boost::optional<StringMap<std::string>> renamedPaths( + std::set<std::string> pathsOfInterest) const; + const SourceContainer& getSources() const { return _sources; } diff --git a/src/mongo/db/pipeline/pipeline_test.cpp b/src/mongo/db/pipeline/pipeline_test.cpp index 65d3c4fecc0..280473848cc 100644 --- a/src/mongo/db/pipeline/pipeline_test.cpp +++ b/src/mongo/db/pipeline/pipeline_test.cpp @@ -45,10 +45,12 @@ #include "mongo/db/pipeline/document_source_out.h" #include "mongo/db/pipeline/document_source_project.h" #include "mongo/db/pipeline/document_source_sort.h" +#include "mongo/db/pipeline/document_source_test_optimizations.h" #include "mongo/db/pipeline/document_value_test_util.h" #include "mongo/db/pipeline/expression_context_for_test.h" #include "mongo/db/pipeline/field_path.h" #include "mongo/db/pipeline/pipeline.h" +#include "mongo/db/pipeline/stub_mongo_process_interface.h" #include "mongo/db/query/collation/collator_interface_mock.h" #include "mongo/db/query/query_test_service_context.h" #include "mongo/db/repl/replication_coordinator_mock.h" @@ -2246,11 +2248,24 @@ DEATH_TEST_F(PipelineMustRunOnMongoSTest, splitPipeline.shardsPipeline->requiredToRunOnMongos(); } +/** + * For the purpsoses of this test, assume every collection is unsharded. Stages may ask this during + * setup. For example, to compute its constraints, the $out stage needs to know if the output + * collection is sharded. + */ +class FakeMongoProcessInterface : public StubMongoProcessInterface { +public: + bool isSharded(OperationContext* opCtx, const NamespaceString& ns) override { + return false; + } +}; + TEST_F(PipelineMustRunOnMongoSTest, SplitMongoSMergePipelineAssertsIfShardStagePresent) { auto expCtx = getExpCtx(); expCtx->allowDiskUse = true; expCtx->inMongos = true; + expCtx->mongoProcessInterface = std::make_shared<FakeMongoProcessInterface>(); auto match = DocumentSourceMatch::create(fromjson("{x: 5}"), expCtx); auto split = DocumentSourceInternalSplitPipeline::create(expCtx, HostTypeRequirement::kNone); @@ -2678,6 +2693,201 @@ TEST_F(PipelineDependenciesTest, ShouldNotRequireTextScoreIfAvailableButDefinite } // namespace Dependencies +namespace { +TEST(PipelineRenameTracking, ReportsIdentityMapWhenEmpty) { + boost::intrusive_ptr<ExpressionContext> expCtx(new ExpressionContextForTest()); + auto pipeline = unittest::assertGet(Pipeline::create({DocumentSourceMock::create()}, expCtx)); + auto renames = pipeline->renamedPaths({"a", "b", "c.d"}); + ASSERT(static_cast<bool>(renames)); + auto nameMap = *renames; + ASSERT_EQ(nameMap.size(), 3UL); + ASSERT_EQ(nameMap["a"], "a"); + ASSERT_EQ(nameMap["b"], "b"); + ASSERT_EQ(nameMap["c.d"], "c.d"); +} + +class NoModifications : public DocumentSourceTestOptimizations { +public: + NoModifications() : DocumentSourceTestOptimizations() {} + static boost::intrusive_ptr<NoModifications> create() { + return new NoModifications(); + } + + /** + * Returns a description which communicate that this stage modifies nothing. + */ + GetModPathsReturn getModifiedPaths() const final { + return {GetModPathsReturn::Type::kFiniteSet, std::set<std::string>(), {}}; + } +}; + +TEST(PipelineRenameTracking, ReportsIdentityWhenNoStageModifiesAnything) { + boost::intrusive_ptr<ExpressionContext> expCtx(new ExpressionContextForTest()); + { + auto pipeline = unittest::assertGet( + Pipeline::create({DocumentSourceMock::create(), NoModifications::create()}, expCtx)); + auto renames = pipeline->renamedPaths({"a", "b", "c.d"}); + ASSERT(static_cast<bool>(renames)); + auto nameMap = *renames; + ASSERT_EQ(nameMap.size(), 3UL); + ASSERT_EQ(nameMap["a"], "a"); + ASSERT_EQ(nameMap["b"], "b"); + ASSERT_EQ(nameMap["c.d"], "c.d"); + } + { + auto pipeline = unittest::assertGet(Pipeline::create({DocumentSourceMock::create(), + NoModifications::create(), + NoModifications::create(), + NoModifications::create()}, + expCtx)); + auto renames = pipeline->renamedPaths({"a", "b", "c.d"}); + ASSERT(static_cast<bool>(renames)); + auto nameMap = *renames; + ASSERT_EQ(nameMap.size(), 3UL); + ASSERT_EQ(nameMap["a"], "a"); + ASSERT_EQ(nameMap["b"], "b"); + ASSERT_EQ(nameMap["c.d"], "c.d"); + } +} + +class NotSupported : public DocumentSourceTestOptimizations { +public: + NotSupported() : DocumentSourceTestOptimizations() {} + static boost::intrusive_ptr<NotSupported> create() { + return new NotSupported(); + } + + /** + * Returns a description which communicate that this stage modifies nothing. + */ + GetModPathsReturn getModifiedPaths() const final { + return {GetModPathsReturn::Type::kNotSupported, std::set<std::string>(), {}}; + } +}; + +TEST(PipelineRenameTracking, DoesNotReportRenamesIfAStageDoesNotSupportTrackingThem) { + boost::intrusive_ptr<ExpressionContext> expCtx(new ExpressionContextForTest()); + auto pipeline = unittest::assertGet(Pipeline::create({DocumentSourceMock::create(), + NoModifications::create(), + NotSupported::create(), + NoModifications::create()}, + expCtx)); + ASSERT_FALSE(static_cast<bool>(pipeline->renamedPaths({"a"}))); + ASSERT_FALSE(static_cast<bool>(pipeline->renamedPaths({"a", "b"}))); + ASSERT_FALSE(static_cast<bool>(pipeline->renamedPaths({"x", "yahoo", "c.d"}))); +} + +class RenamesAToB : public DocumentSourceTestOptimizations { +public: + RenamesAToB() : DocumentSourceTestOptimizations() {} + static boost::intrusive_ptr<RenamesAToB> create() { + return new RenamesAToB(); + } + GetModPathsReturn getModifiedPaths() const final { + return {GetModPathsReturn::Type::kFiniteSet, std::set<std::string>{}, {{"b", "a"}}}; + } +}; + +TEST(PipelineRenameTracking, ReportsNewNamesWhenSingleStageRenames) { + boost::intrusive_ptr<ExpressionContext> expCtx(new ExpressionContextForTest()); + auto pipeline = unittest::assertGet( + Pipeline::create({DocumentSourceMock::create(), RenamesAToB::create()}, expCtx)); + { + auto renames = pipeline->renamedPaths({"b"}); + ASSERT(static_cast<bool>(renames)); + auto nameMap = *renames; + ASSERT_EQ(nameMap.size(), 1UL); + ASSERT_EQ(nameMap["b"], "a"); + } + { + auto renames = pipeline->renamedPaths({"b", "c.d"}); + ASSERT(static_cast<bool>(renames)); + auto nameMap = *renames; + ASSERT_EQ(nameMap.size(), 2UL); + ASSERT_EQ(nameMap["b"], "a"); + ASSERT_EQ(nameMap["c.d"], "c.d"); + } + { + // This is strange; the mock stage reports to essentially duplicate the "a" field into "b". + // Because of this, both "b" and "a" should map to "a". + auto renames = pipeline->renamedPaths({"b", "a"}); + ASSERT(static_cast<bool>(renames)); + auto nameMap = *renames; + ASSERT_EQ(nameMap.size(), 2UL); + ASSERT_EQ(nameMap["b"], "a"); + ASSERT_EQ(nameMap["a"], "a"); + } +} + +TEST(PipelineRenameTracking, ReportsIdentityMapWhenGivenEmptyIteratorRange) { + boost::intrusive_ptr<ExpressionContext> expCtx(new ExpressionContextForTest()); + auto pipeline = unittest::assertGet( + Pipeline::create({DocumentSourceMock::create(), RenamesAToB::create()}, expCtx)); + { + auto renames = Pipeline::renamedPaths( + pipeline->getSources().rbegin(), pipeline->getSources().rbegin(), {"b"}); + ASSERT(static_cast<bool>(renames)); + auto nameMap = *renames; + ASSERT_EQ(nameMap.size(), 1UL); + ASSERT_EQ(nameMap["b"], "b"); + } + { + auto renames = Pipeline::renamedPaths( + pipeline->getSources().rbegin(), pipeline->getSources().rbegin(), {"b", "c.d"}); + ASSERT(static_cast<bool>(renames)); + auto nameMap = *renames; + ASSERT_EQ(nameMap.size(), 2UL); + ASSERT_EQ(nameMap["b"], "b"); + ASSERT_EQ(nameMap["c.d"], "c.d"); + } +} + +class RenamesBToC : public DocumentSourceTestOptimizations { +public: + RenamesBToC() : DocumentSourceTestOptimizations() {} + static boost::intrusive_ptr<RenamesBToC> create() { + return new RenamesBToC(); + } + GetModPathsReturn getModifiedPaths() const final { + return {GetModPathsReturn::Type::kFiniteSet, std::set<std::string>{}, {{"c", "b"}}}; + } +}; + +TEST(PipelineRenameTracking, ReportsNewNameAcrossMultipleRenames) { + boost::intrusive_ptr<ExpressionContext> expCtx(new ExpressionContextForTest()); + auto pipeline = unittest::assertGet(Pipeline::create( + {DocumentSourceMock::create(), RenamesAToB::create(), RenamesBToC::create()}, expCtx)); + auto renames = pipeline->renamedPaths({"c"}); + ASSERT(static_cast<bool>(renames)); + auto nameMap = *renames; + ASSERT_EQ(nameMap.size(), 1UL); + ASSERT_EQ(nameMap["c"], "a"); +} + +class RenamesBToA : public DocumentSourceTestOptimizations { +public: + RenamesBToA() : DocumentSourceTestOptimizations() {} + static boost::intrusive_ptr<RenamesBToA> create() { + return new RenamesBToA(); + } + GetModPathsReturn getModifiedPaths() const final { + return {GetModPathsReturn::Type::kFiniteSet, std::set<std::string>{}, {{"b", "a"}}}; + } +}; + +TEST(PipelineRenameTracking, CanHandleBackAndForthRename) { + boost::intrusive_ptr<ExpressionContext> expCtx(new ExpressionContextForTest()); + auto pipeline = unittest::assertGet(Pipeline::create( + {DocumentSourceMock::create(), RenamesAToB::create(), RenamesBToA::create()}, expCtx)); + auto renames = pipeline->renamedPaths({"a"}); + ASSERT(static_cast<bool>(renames)); + auto nameMap = *renames; + ASSERT_EQ(nameMap.size(), 1UL); + ASSERT_EQ(nameMap["a"], "a"); +} + +} // namespace + class All : public Suite { public: All() : Suite("PipelineOptimizations") {} diff --git a/src/mongo/s/SConscript b/src/mongo/s/SConscript index 2796119cc9e..522419420c8 100644 --- a/src/mongo/s/SConscript +++ b/src/mongo/s/SConscript @@ -360,7 +360,6 @@ env.CppUnitTest( 'shard_key_pattern_test.cpp', ], LIBDEPS=[ - "$BUILD_DIR/mongo/db/auth/authmocks", '$BUILD_DIR/mongo/db/serveronly', 'catalog_cache_test_fixture', ] @@ -392,6 +391,7 @@ env.Library( 'catalog_cache_test_fixture.cpp', ], LIBDEPS=[ + "$BUILD_DIR/mongo/db/auth/authmocks", '$BUILD_DIR/mongo/db/query/query_test_service_context', 'coreshard', 'sharding_router_test_fixture', diff --git a/src/mongo/s/catalog_cache_test_fixture.cpp b/src/mongo/s/catalog_cache_test_fixture.cpp index 835aedebf2f..c7df14e96b1 100644 --- a/src/mongo/s/catalog_cache_test_fixture.cpp +++ b/src/mongo/s/catalog_cache_test_fixture.cpp @@ -160,9 +160,9 @@ std::shared_ptr<ChunkManager> CatalogCacheTestFixture::makeChunkManager( return routingInfo->cm(); } -void CatalogCacheTestFixture::expectGetDatabase(NamespaceString nss) { +void CatalogCacheTestFixture::expectGetDatabase(NamespaceString nss, std::string shardId) { expectFindSendBSONObjVector(kConfigHostAndPort, [&]() { - DatabaseType db(nss.db().toString(), {"0"}, true, databaseVersion::makeNew()); + DatabaseType db(nss.db().toString(), {shardId}, true, databaseVersion::makeNew()); return std::vector<BSONObj>{db.toBSON()}; }()); } diff --git a/src/mongo/s/catalog_cache_test_fixture.h b/src/mongo/s/catalog_cache_test_fixture.h index ef155fa1c8a..c7cacad7eec 100644 --- a/src/mongo/s/catalog_cache_test_fixture.h +++ b/src/mongo/s/catalog_cache_test_fixture.h @@ -84,7 +84,7 @@ protected: /** * Mocks network responses for loading a sharded database and collection from the config server. */ - void expectGetDatabase(NamespaceString nss); + void expectGetDatabase(NamespaceString nss, std::string primaryShard = "0"); void expectGetCollection(NamespaceString nss, OID epoch, const ShardKeyPattern& shardKeyPattern); diff --git a/src/mongo/s/commands/SConscript b/src/mongo/s/commands/SConscript index 10d8ad623fb..3c6fe218bfd 100644 --- a/src/mongo/s/commands/SConscript +++ b/src/mongo/s/commands/SConscript @@ -153,7 +153,6 @@ env.CppUnitTest( LIBDEPS=[ 'cluster_commands', 'cluster_command_test_fixture', - '$BUILD_DIR/mongo/db/auth/authmocks', '$BUILD_DIR/mongo/db/auth/saslauth', ], ) diff --git a/src/mongo/s/grid.h b/src/mongo/s/grid.h index 02da388532b..ab61f840282 100644 --- a/src/mongo/s/grid.h +++ b/src/mongo/s/grid.h @@ -119,6 +119,10 @@ public: return _catalogClient.get(); } + /** + * Can return nullptr. For example, if this is a mongod that is not a shard server. This is + * always present on mongos after startup. + */ CatalogCache* catalogCache() const { return _catalogCache.get(); } diff --git a/src/mongo/s/query/SConscript b/src/mongo/s/query/SConscript index 9882356dd90..6dd7d6605c6 100644 --- a/src/mongo/s/query/SConscript +++ b/src/mongo/s/query/SConscript @@ -41,6 +41,7 @@ env.CppUnitTest( target="cluster_aggregate_test", source=[ "cluster_aggregate_test.cpp", + "cluster_aggregation_planner_test.cpp", ], LIBDEPS=[ '$BUILD_DIR/mongo/db/auth/authmocks', @@ -57,10 +58,9 @@ env.CppUnitTest( ], LIBDEPS=[ 'cluster_query', - '$BUILD_DIR/mongo/db/auth/authmocks', '$BUILD_DIR/mongo/db/keys_collection_client_sharded', - '$BUILD_DIR/mongo/s/commands/cluster_commands', '$BUILD_DIR/mongo/s/catalog_cache_test_fixture', + '$BUILD_DIR/mongo/s/commands/cluster_commands', ], ) diff --git a/src/mongo/s/query/cluster_aggregation_planner.cpp b/src/mongo/s/query/cluster_aggregation_planner.cpp index cd110ca31ff..288dcd48448 100644 --- a/src/mongo/s/query/cluster_aggregation_planner.cpp +++ b/src/mongo/s/query/cluster_aggregation_planner.cpp @@ -30,21 +30,25 @@ #include "mongo/s/query/cluster_aggregation_planner.h" +#include "mongo/db/pipeline/document_source_group.h" #include "mongo/db/pipeline/document_source_limit.h" #include "mongo/db/pipeline/document_source_match.h" #include "mongo/db/pipeline/document_source_merge_cursors.h" +#include "mongo/db/pipeline/document_source_out.h" #include "mongo/db/pipeline/document_source_project.h" #include "mongo/db/pipeline/document_source_skip.h" #include "mongo/db/pipeline/document_source_sort.h" #include "mongo/db/pipeline/document_source_unwind.h" #include "mongo/db/pipeline/document_source_update_on_add_shard.h" #include "mongo/executor/task_executor_pool.h" +#include "mongo/s/catalog_cache.h" #include "mongo/s/grid.h" #include "mongo/s/query/router_stage_limit.h" #include "mongo/s/query/router_stage_pipeline.h" #include "mongo/s/query/router_stage_remove_metadata_fields.h" #include "mongo/s/query/router_stage_skip.h" #include "mongo/s/shard_id.h" +#include "mongo/s/shard_key_pattern.h" namespace mongo { namespace cluster_aggregation_planner { @@ -186,6 +190,136 @@ ClusterClientCursorGuard convertPipelineToRouterStages( opCtx, std::move(root), Document::allMetadataFieldNames), std::move(cursorParams)); } + +bool stageCanRunInParallel(const boost::intrusive_ptr<DocumentSource>& stage, + const std::set<std::string>& nameOfShardKeyFieldsUponEntryToStage) { + if (auto needsMerger = dynamic_cast<NeedsMergerDocumentSource*>(stage.get())) { + return needsMerger->canRunInParallelBeforeOut(nameOfShardKeyFieldsUponEntryToStage); + } else { + // This stage is fine to execute in parallel on each stream. For example, a $match can be + // applied to each stream in parallel. + return true; + } +} + +std::string mapToString(const StringMap<std::string>& map) { + StringBuilder sb; + sb << "{"; + for (auto&& entry : map) { + if (sb.len() != 1) { + sb << ", "; + } + sb << entry.first << ": " << entry.second; + } + sb << "}"; + return sb.str(); +} + +BSONObj buildNewKeyPattern(const ShardKeyPattern& shardKey, StringMap<std::string> renames) { + BSONObjBuilder newPattern; + for (auto&& elem : shardKey.getKeyPattern().toBSON()) { + auto it = renames.find(elem.fieldNameStringData()); + invariant(it != renames.end(), + str::stream() << "Could not find new name of shard key field \"" + << elem.fieldName() + << "\": rename map was " + << mapToString(renames)); + newPattern.appendAs(elem, it->second); + } + return newPattern.obj(); +} + +StringMap<std::string> computeShardKeyRenameMap(const Pipeline* mergePipeline, + std::set<std::string>&& pathsOfShardKey) { + auto traversalStart = mergePipeline->getSources().crbegin(); + auto traversalEnd = mergePipeline->getSources().crend(); + const auto leadingGroup = + dynamic_cast<DocumentSourceGroup*>(mergePipeline->getSources().front().get()); + if (leadingGroup && leadingGroup->doingMerge()) { + // A leading $group stage will not report to preserve any fields, since it blows away the + // _id and replaces it with something new. It possibly renames some fields, but when + // computing the new shard key we are interested in the name of the shard key *in the middle + // of the $group*. The $exchange will be inserted between the shard-local groups and the + // global groups. Thus we want to exclude this stage from our rename tracking. + traversalEnd = std::prev(traversalEnd); + } + auto renameMap = Pipeline::renamedPaths(traversalStart, traversalEnd, pathsOfShardKey); + invariant(renameMap, + str::stream() + << "Analyzed pipeline was thought to preserve the shard key fields, but did not: " + << Value(mergePipeline->serialize()).toString()); + return *renameMap; +} + +/** + * Returns true if any stage in the pipeline would modify any of the fields in 'shardKeyPaths', or + * if there is any stage in the pipeline requires a unified stream to do its computation like a + * $limit would. + * + * Purposefully takes 'shardKeyPaths' by value so that it can be modified throughout. + */ +bool anyStageModifiesShardKeyOrNeedsMerge(std::set<std::string> shardKeyPaths, + const Pipeline* mergePipeline) { + const auto& stages = mergePipeline->getSources(); + for (auto it = stages.crbegin(); it != stages.crend(); ++it) { + const auto& stage = *it; + auto renames = stage->renamedPaths(std::move(shardKeyPaths)); + if (!renames) { + return true; + } + shardKeyPaths.clear(); + for (auto&& rename : *renames) { + shardKeyPaths.insert(rename.second); + } + if (!stageCanRunInParallel(stage, shardKeyPaths)) { + // In order for this stage to work it needs a single input stream which it wouldn't get + // if we inserted an exchange before it. + return true; + } + } + return false; +} + +boost::optional<ShardedExchangePolicy> walkPipelineBackwardsTrackingShardKey( + OperationContext* opCtx, + const boost::intrusive_ptr<const DocumentSourceOut>& outStage, + const Pipeline* mergePipeline, + const ChunkManager& chunkManager) { + + const ShardKeyPattern& shardKey = chunkManager.getShardKeyPattern(); + std::set<std::string> shardKeyPaths; + for (auto&& path : shardKey.getKeyPatternFields()) { + shardKeyPaths.emplace(path->dottedField().toString()); + } + if (anyStageModifiesShardKeyOrNeedsMerge(shardKeyPaths, mergePipeline)) { + return boost::none; + } + + // All the fields of the shard key are preserved by the pipeline, but they might be renamed. To + // set up the $exchange, we need to build a fake shard key pattern which uses the names of the + // shard key fields as they are at the split point of the pipeline. + auto renames = computeShardKeyRenameMap(mergePipeline, std::move(shardKeyPaths)); + ShardKeyPattern newShardKey(buildNewKeyPattern(shardKey, renames)); + + // Given the new shard key fields, build the distribution map. + StringMap<std::vector<ChunkRange>> distribution; + for (auto&& chunk : chunkManager.chunks()) { + // Append the boundaries with the new names from the new shard key. + auto translateBoundary = [&renames](const BSONObj& oldBoundary) { + BSONObjBuilder bob; + for (auto&& elem : oldBoundary) { + bob.appendAs(elem, renames[elem.fieldNameStringData()]); + } + return bob.obj(); + }; + distribution[chunk.getShardId().toString()].emplace_back(translateBoundary(chunk.getMin()), + translateBoundary(chunk.getMax())); + } + return ShardedExchangePolicy{ + ExchangePolicyEnum::kRange, + ShardDistributionInfo{ShardKeyPattern{std::move(newShardKey)}, std::move(distribution)}}; +} + } // namespace SplitPipeline splitPipeline(std::unique_ptr<Pipeline, PipelineDeleter> pipeline) { @@ -254,5 +388,36 @@ ClusterClientCursorGuard buildClusterCursor(OperationContext* opCtx, opCtx, std::make_unique<RouterStagePipeline>(std::move(pipeline)), std::move(cursorParams)); } +boost::optional<ShardedExchangePolicy> checkIfEligibleForExchange(OperationContext* opCtx, + const Pipeline* mergePipeline) { + const auto grid = Grid::get(opCtx); + invariant(grid); + + const auto outStage = + dynamic_cast<DocumentSourceOut*>(mergePipeline->getSources().back().get()); + if (!outStage || outStage->getMode() == WriteModeEnum::kModeReplaceCollection) { + // If there's no $out stage we won't try to do an $exchange. If the $out stage is using mode + // "replaceCollection", then there's no point doing an $exchange because all the writes will + // go to a single node, so we should just perform the merge on that host. + return boost::none; + } + + const auto routingInfo = uassertStatusOK( + grid->catalogCache()->getCollectionRoutingInfo(opCtx, outStage->getOutputNs())); + if (!routingInfo.cm()) { + return boost::none; + } + + // The collection is sharded and we have an $out stage! Here we assume the $out stage has + // already verified that the shard key pattern is compatible with the unique key being used. + // Assuming this, we just have to make sure the shard key is preserved (though possibly renamed) + // all the way to the front of the merge pipeline. If this is the case then for any document + // entering the merging pipeline we can predict which shard it will need to end up being + // inserted on. With this ability we can insert an exchange on the shards to partition the + // documents based on which shard will end up owning them. Then each shard can perform a merge + // of only those documents which belong to it (optimistically, barring chunk migrations). + return walkPipelineBackwardsTrackingShardKey(opCtx, outStage, mergePipeline, *routingInfo.cm()); +} + } // namespace cluster_aggregation_planner } // namespace mongo diff --git a/src/mongo/s/query/cluster_aggregation_planner.h b/src/mongo/s/query/cluster_aggregation_planner.h index 3b3aaa63df4..8a5f494cff2 100644 --- a/src/mongo/s/query/cluster_aggregation_planner.h +++ b/src/mongo/s/query/cluster_aggregation_planner.h @@ -28,9 +28,13 @@ #pragma once +#include "mongo/db/pipeline/document_source_exchange_gen.h" #include "mongo/db/pipeline/lite_parsed_pipeline.h" #include "mongo/db/pipeline/pipeline.h" +#include "mongo/db/pipeline/pipeline.h" +#include "mongo/s/catalog/type_chunk.h" #include "mongo/s/query/cluster_client_cursor_impl.h" +#include "mongo/s/shard_id.h" namespace mongo { namespace cluster_aggregation_planner { @@ -89,5 +93,29 @@ ClusterClientCursorGuard buildClusterCursor(OperationContext* opCtx, std::unique_ptr<Pipeline, PipelineDeleter> pipeline, ClusterClientCursorParams&&); +struct ShardDistributionInfo { + // If we want to send data to the shards which would own the data, 'logicalShardKeyAtSplitPoint' + // describes which of the fields to use to determine what the final shard key will be. For + // example, if the merging pipeline renames "x" to "out_shard_key" and then uses $out to output + // to a collection sharded by {out_shard_key: 1}, 'logicalShardKeyAtSplitPoint' will be {x: 1}. + ShardKeyPattern logicalShardKeyAtSplitPoint; + + // This map describes which shard is going to receive which range. The keys are the shard ids. + StringMap<std::vector<ChunkRange>> partitions; +}; + +struct ShardedExchangePolicy { + ExchangePolicyEnum policy; + + // Only set if the policy is ranged. + boost::optional<ShardDistributionInfo> shardDistributionInfo; +}; + +/** + * If the merging pipeline is eligible for an $exchange merge optimization, returns the information + * required to set that up. + */ +boost::optional<ShardedExchangePolicy> checkIfEligibleForExchange(OperationContext* opCtx, + const Pipeline* mergePipeline); } // namespace cluster_aggregation_planner } // namespace mongo diff --git a/src/mongo/s/query/cluster_aggregation_planner_test.cpp b/src/mongo/s/query/cluster_aggregation_planner_test.cpp new file mode 100644 index 00000000000..10782443e35 --- /dev/null +++ b/src/mongo/s/query/cluster_aggregation_planner_test.cpp @@ -0,0 +1,545 @@ +/** + * Copyright (C) 2018 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#include "mongo/platform/basic.h" + + +#include "mongo/client/remote_command_targeter_factory_mock.h" +#include "mongo/client/remote_command_targeter_mock.h" +#include "mongo/db/pipeline/document_source_group.h" +#include "mongo/db/pipeline/document_source_match.h" +#include "mongo/db/pipeline/document_source_out.h" +#include "mongo/db/pipeline/document_source_out_gen.h" +#include "mongo/db/pipeline/document_source_project.h" +#include "mongo/db/pipeline/document_source_sort.h" +#include "mongo/db/pipeline/expression_context_for_test.h" +#include "mongo/db/pipeline/stub_mongo_process_interface.h" +#include "mongo/s/catalog/type_shard.h" +#include "mongo/s/catalog_cache_test_fixture.h" +#include "mongo/s/query/cluster_aggregation_planner.h" +#include "mongo/unittest/unittest.h" +#include "mongo/util/scopeguard.h" + +namespace mongo { + +namespace { + +const NamespaceString kTestAggregateNss = NamespaceString{"unittests", "cluster_exchange"}; +const NamespaceString kTestOutNss = NamespaceString{"unittests", "out_ns"}; + +/** + * For the purposes of this test, assume every collection is sharded. Stages may ask this during + * setup. For example, to compute its constraints, the $out stage needs to know if the output + * collection is sharded. + */ +class FakeMongoProcessInterface : public StubMongoProcessInterface { +public: + bool isSharded(OperationContext* opCtx, const NamespaceString& ns) override { + return true; + } +}; + +class ClusterExchangeTest : public CatalogCacheTestFixture { +public: + void setUp() { + CatalogCacheTestFixture::setUp(); + _expCtx = new ExpressionContextForTest(operationContext(), + AggregationRequest{kTestAggregateNss, {}}); + _expCtx->mongoProcessInterface = std::make_shared<FakeMongoProcessInterface>(); + } + + boost::intrusive_ptr<ExpressionContext> expCtx() { + return _expCtx; + } + + boost::intrusive_ptr<DocumentSource> parse(const std::string& json) { + auto stages = DocumentSource::parse(_expCtx, fromjson(json)); + ASSERT_EQ(stages.size(), 1UL); + return stages.front(); + } + + std::vector<ChunkType> makeChunks(const NamespaceString& nss, + const OID epoch, + std::vector<std::pair<ChunkRange, ShardId>> chunkInfos) { + ChunkVersion version(1, 0, epoch); + std::vector<ChunkType> chunks; + for (auto&& pair : chunkInfos) { + chunks.emplace_back(nss, pair.first, version, pair.second); + version.incMinor(); + } + return chunks; + } + + void loadRoutingTable(NamespaceString nss, + const OID epoch, + const ShardKeyPattern& shardKey, + const std::vector<ChunkType>& chunkDistribution) { + auto future = scheduleRoutingInfoRefresh(nss); + + // Mock the expected config server queries. + expectGetDatabase(nss); + expectGetCollection(nss, epoch, shardKey); + expectGetCollection(nss, epoch, shardKey); + expectFindSendBSONObjVector(kConfigHostAndPort, [&]() { + std::vector<BSONObj> response; + for (auto&& chunk : chunkDistribution) { + response.push_back(chunk.toConfigBSON()); + } + return response; + }()); + + future.timed_get(kFutureTimeout).get(); + } + +private: + boost::intrusive_ptr<ExpressionContext> _expCtx; +}; + +TEST_F(ClusterExchangeTest, ShouldNotExchangeIfPipelineDoesNotEndWithOut) { + setupNShards(2); + auto mergePipe = + unittest::assertGet(Pipeline::create({DocumentSourceLimit::create(expCtx(), 1)}, expCtx())); + ASSERT_FALSE(cluster_aggregation_planner::checkIfEligibleForExchange(operationContext(), + mergePipe.get())); + mergePipe = unittest::assertGet( + Pipeline::create({DocumentSourceMatch::create(BSONObj(), expCtx())}, expCtx())); + ASSERT_FALSE(cluster_aggregation_planner::checkIfEligibleForExchange(operationContext(), + mergePipe.get())); +} + +TEST_F(ClusterExchangeTest, ShouldNotExchangeIfPipelineEndsWithReplaceCollectionOut) { + setupNShards(2); + + // For this test pretend 'kTestOutNss' is not sharded so that we can use a "replaceCollection" + // $out. + const auto originalMongoProcessInterface = expCtx()->mongoProcessInterface; + expCtx()->mongoProcessInterface = std::make_shared<StubMongoProcessInterface>(); + ON_BLOCK_EXIT([&]() { expCtx()->mongoProcessInterface = originalMongoProcessInterface; }); + + auto mergePipe = unittest::assertGet(Pipeline::create( + {DocumentSourceOut::create(kTestOutNss, expCtx(), WriteModeEnum::kModeReplaceCollection)}, + expCtx())); + ASSERT_FALSE(cluster_aggregation_planner::checkIfEligibleForExchange(operationContext(), + mergePipe.get())); +} + +TEST_F(ClusterExchangeTest, SingleOutStageNotEligibleForExchangeIfOutputDatabaseDoesNotExist) { + setupNShards(2); + auto mergePipe = unittest::assertGet(Pipeline::create( + {DocumentSourceOut::create(kTestOutNss, expCtx(), WriteModeEnum::kModeInsertDocuments)}, + expCtx())); + + auto future = launchAsync([&] { + ASSERT_THROWS_CODE(cluster_aggregation_planner::checkIfEligibleForExchange( + operationContext(), mergePipe.get()), + AssertionException, + ErrorCodes::NamespaceNotFound); + }); + + // Mock out a response as if the database doesn't exist. + expectFindSendBSONObjVector(kConfigHostAndPort, []() { return std::vector<BSONObj>{}; }()); + expectFindSendBSONObjVector(kConfigHostAndPort, []() { return std::vector<BSONObj>{}; }()); + + future.timed_get(kFutureTimeout); +} + +// If the output collection doesn't exist, we don't know how to distribute the output documents so +// cannot insert an $exchange. The $out stage should later create a new, unsharded collection. +TEST_F(ClusterExchangeTest, SingleOutStageNotEligibleForExchangeIfOutputCollectionDoesNotExist) { + setupNShards(2); + auto mergePipe = unittest::assertGet(Pipeline::create( + {DocumentSourceOut::create(kTestOutNss, expCtx(), WriteModeEnum::kModeInsertDocuments)}, + expCtx())); + + auto future = launchAsync([&] { + ASSERT_FALSE(cluster_aggregation_planner::checkIfEligibleForExchange(operationContext(), + mergePipe.get())); + }); + + expectGetDatabase(kTestOutNss); + // Pretend there are no collections in this database. + expectFindSendBSONObjVector(kConfigHostAndPort, std::vector<BSONObj>()); + + future.timed_get(kFutureTimeout); +} + +// A $limit stage requires a single merger. +TEST_F(ClusterExchangeTest, LimitFollowedByOutStageIsNotEligibleForExchange) { + // Sharded by {_id: 1}, [MinKey, 0) on shard "0", [0, MaxKey) on shard "1". + setupNShards(2); + loadRoutingTableWithTwoChunksAndTwoShards(kTestOutNss); + + auto mergePipe = unittest::assertGet(Pipeline::create( + {DocumentSourceLimit::create(expCtx(), 6), + DocumentSourceOut::create(kTestOutNss, expCtx(), WriteModeEnum::kModeInsertDocuments)}, + expCtx())); + + auto future = launchAsync([&] { + ASSERT_FALSE(cluster_aggregation_planner::checkIfEligibleForExchange(operationContext(), + mergePipe.get())); + }); + + future.timed_get(kFutureTimeout); +} + +TEST_F(ClusterExchangeTest, GroupFollowedByOutIsEligbleForExchange) { + // Sharded by {_id: 1}, [MinKey, 0) on shard "0", [0, MaxKey) on shard "1". + setupNShards(2); + loadRoutingTableWithTwoChunksAndTwoShards(kTestOutNss); + + auto mergePipe = unittest::assertGet(Pipeline::create( + {parse("{$group: {_id: '$x', $doingMerge: true}}"), + DocumentSourceOut::create(kTestOutNss, expCtx(), WriteModeEnum::kModeInsertDocuments)}, + expCtx())); + + auto future = launchAsync([&] { + auto exchangeSpec = cluster_aggregation_planner::checkIfEligibleForExchange( + operationContext(), mergePipe.get()); + ASSERT_TRUE(exchangeSpec); + ASSERT(exchangeSpec->policy == ExchangePolicyEnum::kRange); + ASSERT_TRUE(exchangeSpec->shardDistributionInfo); + const auto& partitions = exchangeSpec->shardDistributionInfo->partitions; + ASSERT_EQ(partitions.size(), 2UL); // One for each shard. + + auto shard0Ranges = partitions.find("0"); + ASSERT(shard0Ranges != partitions.end()); + ASSERT_EQ(shard0Ranges->second.size(), 1UL); + auto shard0Range = shard0Ranges->second[0]; + ASSERT(shard0Range == ChunkRange(BSON("_id" << MINKEY), BSON("_id" << 0))); + }); + + future.timed_get(kFutureTimeout); +} + +TEST_F(ClusterExchangeTest, RenamesAreEligibleForExchange) { + // Sharded by {_id: 1}, [MinKey, 0) on shard "0", [0, MaxKey) on shard "1". + setupNShards(2); + loadRoutingTableWithTwoChunksAndTwoShards(kTestOutNss); + + auto mergePipe = unittest::assertGet(Pipeline::create( + {parse("{$group: {_id: '$x', $doingMerge: true}}"), + parse("{$project: {temporarily_renamed: '$_id'}}"), + parse("{$project: {_id: '$temporarily_renamed'}}"), + DocumentSourceOut::create(kTestOutNss, expCtx(), WriteModeEnum::kModeInsertDocuments)}, + expCtx())); + + auto future = launchAsync([&] { + auto exchangeSpec = cluster_aggregation_planner::checkIfEligibleForExchange( + operationContext(), mergePipe.get()); + ASSERT_TRUE(exchangeSpec); + ASSERT(exchangeSpec->policy == ExchangePolicyEnum::kRange); + ASSERT_TRUE(exchangeSpec->shardDistributionInfo); + const auto& partitions = exchangeSpec->shardDistributionInfo->partitions; + ASSERT_EQ(partitions.size(), 2UL); // One for each shard. + + auto shard0Ranges = partitions.find("0"); + ASSERT(shard0Ranges != partitions.end()); + ASSERT_EQ(shard0Ranges->second.size(), 1UL); + auto shard0Range = shard0Ranges->second[0]; + ASSERT(shard0Range == ChunkRange(BSON("_id" << MINKEY), BSON("_id" << 0))); + + auto shard1Ranges = partitions.find("1"); + ASSERT(shard1Ranges != partitions.end()); + ASSERT_EQ(shard1Ranges->second.size(), 1UL); + auto shard1Range = shard1Ranges->second[0]; + ASSERT(shard1Range == ChunkRange(BSON("_id" << 0), BSON("_id" << MAXKEY))); + }); + + future.timed_get(kFutureTimeout); +} + +TEST_F(ClusterExchangeTest, SortThenGroupIsEligibleForExchange) { + // Sharded by {_id: 1}, [MinKey, 0) on shard "0", [0, MaxKey) on shard "1". + setupNShards(2); + loadRoutingTableWithTwoChunksAndTwoShards(kTestOutNss); + + // This would be the merging half of the pipeline if the original pipeline was + // [{$sort: {x: 1}}, + // {$group: {_id: "$x"}}, + // {$out: {to: "sharded_by_id", mode: "replaceDocuments"}}]. + // No $sort stage appears in the merging half since we'd expect that to be absorbed by the + // $mergeCursors and AsyncResultsMerger. + auto mergePipe = unittest::assertGet(Pipeline::create( + {parse("{$group: {_id: '$x'}}"), + DocumentSourceOut::create(kTestOutNss, expCtx(), WriteModeEnum::kModeInsertDocuments)}, + expCtx())); + + auto future = launchAsync([&] { + auto exchangeSpec = cluster_aggregation_planner::checkIfEligibleForExchange( + operationContext(), mergePipe.get()); + ASSERT_TRUE(exchangeSpec); + ASSERT(exchangeSpec->policy == ExchangePolicyEnum::kRange); + ASSERT_TRUE(exchangeSpec->shardDistributionInfo); + ASSERT_BSONOBJ_EQ(exchangeSpec->shardDistributionInfo->logicalShardKeyAtSplitPoint.toBSON(), + BSON("x" << 1)); + const auto& partitions = exchangeSpec->shardDistributionInfo->partitions; + ASSERT_EQ(partitions.size(), 2UL); // One for each shard. + + auto shard0Ranges = partitions.find("0"); + ASSERT(shard0Ranges != partitions.end()); + ASSERT_EQ(shard0Ranges->second.size(), 1UL); + auto shard0Range = shard0Ranges->second[0]; + ASSERT(shard0Range == ChunkRange(BSON("x" << MINKEY), BSON("x" << 0))); + + auto shard1Ranges = partitions.find("1"); + ASSERT(shard1Ranges != partitions.end()); + ASSERT_EQ(shard1Ranges->second.size(), 1UL); + auto shard1Range = shard1Ranges->second[0]; + ASSERT(shard1Range == ChunkRange(BSON("x" << 0), BSON("x" << MAXKEY))); + }); + + future.timed_get(kFutureTimeout); +} + +TEST_F(ClusterExchangeTest, ProjectThroughDottedFieldDoesNotPreserveShardKey) { + // Sharded by {_id: 1}, [MinKey, 0) on shard "0", [0, MaxKey) on shard "1". + setupNShards(2); + loadRoutingTableWithTwoChunksAndTwoShards(kTestOutNss); + + auto mergePipe = unittest::assertGet(Pipeline::create( + {parse("{$group: {" + " _id: {region: '$region', country: '$country'}," + " population: {$sum: '$population'}," + " cities: {$push: {name: '$city', population: '$population'}}" + "}}"), + parse( + "{$project: {_id: '$_id.country', region: '$_id.region', population: 1, cities: 1}}"), + DocumentSourceOut::create(kTestOutNss, expCtx(), WriteModeEnum::kModeInsertDocuments)}, + expCtx())); + + auto future = launchAsync([&] { + auto exchangeSpec = cluster_aggregation_planner::checkIfEligibleForExchange( + operationContext(), mergePipe.get()); + // Because '_id' is populated from '$_id.country', we cannot prove that '_id' is a simple + // rename. We cannot prove that '_id' is not an array, and thus the $project could do more + // than a rename. + ASSERT_FALSE(exchangeSpec); + }); + + future.timed_get(kFutureTimeout); +} + +TEST_F(ClusterExchangeTest, WordCountUseCaseExample) { + // Sharded by {_id: 1}, [MinKey, 0) on shard "0", [0, MaxKey) on shard "1". + setupNShards(2); + loadRoutingTableWithTwoChunksAndTwoShards(kTestOutNss); + + // As an example of a pipeline that might replace a map reduce, imagine that we are performing a + // word count, and the shards part of the pipeline tokenized some text field of each document + // into {word: <token>, count: 1}. Then this is the merging half of the pipeline: + auto mergePipe = unittest::assertGet(Pipeline::create( + {parse("{$group: {" + " _id: '$word'," + " count: {$sum: 1}," + " $doingMerge: true" + "}}"), + DocumentSourceOut::create(kTestOutNss, expCtx(), WriteModeEnum::kModeInsertDocuments)}, + expCtx())); + + auto future = launchAsync([&] { + auto exchangeSpec = cluster_aggregation_planner::checkIfEligibleForExchange( + operationContext(), mergePipe.get()); + ASSERT_TRUE(exchangeSpec); + ASSERT(exchangeSpec->policy == ExchangePolicyEnum::kRange); + ASSERT_TRUE(exchangeSpec->shardDistributionInfo); + const auto& partitions = exchangeSpec->shardDistributionInfo->partitions; + ASSERT_EQ(partitions.size(), 2UL); // One for each shard. + + auto shard0Ranges = partitions.find("0"); + ASSERT(shard0Ranges != partitions.end()); + ASSERT_EQ(shard0Ranges->second.size(), 1UL); + auto shard0Range = shard0Ranges->second[0]; + ASSERT(shard0Range == ChunkRange(BSON("_id" << MINKEY), BSON("_id" << 0))); + + auto shard1Ranges = partitions.find("1"); + ASSERT(shard1Ranges != partitions.end()); + ASSERT_EQ(shard1Ranges->second.size(), 1UL); + auto shard1Range = shard1Ranges->second[0]; + ASSERT(shard1Range == ChunkRange(BSON("_id" << 0), BSON("_id" << MAXKEY))); + }); + + future.timed_get(kFutureTimeout); +} + +TEST_F(ClusterExchangeTest, WordCountUseCaseExampleShardedByWord) { + setupNShards(2); + const OID epoch = OID::gen(); + ShardKeyPattern shardKey(BSON("word" << 1)); + loadRoutingTable(kTestOutNss, + epoch, + shardKey, + makeChunks(kTestOutNss, + epoch, + {{ChunkRange{BSON("word" << MINKEY), + BSON("word" + << "hello")}, + ShardId("0")}, + {ChunkRange{BSON("word" + << "hello"), + BSON("word" + << "world")}, + ShardId("1")}, + {ChunkRange{BSON("word" + << "world"), + BSON("word" << MAXKEY)}, + ShardId("1")}})); + + // As an example of a pipeline that might replace a map reduce, imagine that we are performing a + // word count, and the shards part of the pipeline tokenized some text field of each document + // into {word: <token>, count: 1}. Then this is the merging half of the pipeline: + auto mergePipe = unittest::assertGet(Pipeline::create( + {parse("{$group: {" + " _id: '$word'," + " count: {$sum: 1}," + " $doingMerge: true" + "}}"), + parse("{$project: {word: '$_id', count: 1}}"), + DocumentSourceOut::create(kTestOutNss, expCtx(), WriteModeEnum::kModeInsertDocuments)}, + expCtx())); + + auto future = launchAsync([&] { + auto exchangeSpec = cluster_aggregation_planner::checkIfEligibleForExchange( + operationContext(), mergePipe.get()); + ASSERT_TRUE(exchangeSpec); + ASSERT(exchangeSpec->policy == ExchangePolicyEnum::kRange); + ASSERT_TRUE(exchangeSpec->shardDistributionInfo); + ASSERT_BSONOBJ_EQ(exchangeSpec->shardDistributionInfo->logicalShardKeyAtSplitPoint.toBSON(), + BSON("_id" << 1)); + const auto& partitions = exchangeSpec->shardDistributionInfo->partitions; + ASSERT_EQ(partitions.size(), 2UL); // One for each shard. + + auto shard0Ranges = partitions.find("0"); + ASSERT(shard0Ranges != partitions.end()); + ASSERT_EQ(shard0Ranges->second.size(), 1UL); + auto firstRangeOnShard0 = shard0Ranges->second[0]; + ASSERT(firstRangeOnShard0 == ChunkRange(BSON("_id" << MINKEY), + BSON("_id" + << "hello"))); + + auto shard1Ranges = partitions.find("1"); + ASSERT(shard1Ranges != partitions.end()); + ASSERT_EQ(shard1Ranges->second.size(), 2UL); + auto firstRangeOnShard1 = shard1Ranges->second[0]; + ASSERT(firstRangeOnShard1 == ChunkRange(BSON("_id" + << "hello"), + BSON("_id" + << "world"))); + auto secondRangeOnShard1 = shard1Ranges->second[1]; + ASSERT(secondRangeOnShard1 == ChunkRange(BSON("_id" + << "world"), + BSON("_id" << MAXKEY))); + }); + + future.timed_get(kFutureTimeout); +} + +// We'd like to test that a compound shard key pattern can be used. Strangely, the only case we can +// actually perform an exchange today on a compound shard key is when the shard key contains fields +// which are all duplicates. This is due to the limitations of tracking renames through dots, see +// SERVER-36787 for an example. +TEST_F(ClusterExchangeTest, CompoundShardKeyThreeShards) { + const OID epoch = OID::gen(); + ShardKeyPattern shardKey(BSON("x" << 1 << "y" << 1)); + + setupNShards(3); + const std::vector<std::string> xBoundaries = {"a", "g", "m", "r", "u"}; + auto chunks = [&]() { + std::vector<ChunkType> chunks; + ChunkVersion version(1, 0, epoch); + chunks.emplace_back(kTestOutNss, + ChunkRange{BSON("x" << MINKEY << "y" << MINKEY), + BSON("x" << xBoundaries[0] << "y" << MINKEY)}, + version, + ShardId("0")); + for (std::size_t i = 0; i < xBoundaries.size() - 1; ++i) { + chunks.emplace_back(kTestOutNss, + ChunkRange{BSON("x" << xBoundaries[i] << "y" << MINKEY), + BSON("x" << xBoundaries[i + 1] << "y" << MINKEY)}, + version, + ShardId(str::stream() << i % 3)); + } + chunks.emplace_back(kTestOutNss, + ChunkRange{BSON("x" << xBoundaries.back() << "y" << MINKEY), + BSON("x" << MAXKEY << "y" << MAXKEY)}, + version, + ShardId(str::stream() << "1")); + return chunks; + }(); + + loadRoutingTable(kTestOutNss, epoch, shardKey, chunks); + + auto mergePipe = unittest::assertGet(Pipeline::create( + {parse("{$group: {" + " _id: '$x'," + " $doingMerge: true" + "}}"), + parse("{$project: {x: '$_id', y: '$_id'}}"), + DocumentSourceOut::create(kTestOutNss, expCtx(), WriteModeEnum::kModeInsertDocuments)}, + expCtx())); + + auto future = launchAsync([&] { + auto exchangeSpec = cluster_aggregation_planner::checkIfEligibleForExchange( + operationContext(), mergePipe.get()); + ASSERT_TRUE(exchangeSpec); + ASSERT(exchangeSpec->policy == ExchangePolicyEnum::kRange); + ASSERT_TRUE(exchangeSpec->shardDistributionInfo); + ASSERT_BSONOBJ_EQ(exchangeSpec->shardDistributionInfo->logicalShardKeyAtSplitPoint.toBSON(), + BSON("_id" << 1 << "_id" << 1)); + const auto& partitions = exchangeSpec->shardDistributionInfo->partitions; + ASSERT_EQ(partitions.size(), 3UL); // One for each shard. + + // Make sure each shard has the same chunks that it started with, just with the names of the + // boundary fields translated. For each chunk that we created to begin with, make sure its + // corresponding/translated chunk is present on the same shard in the same order. + StringMap<std::size_t> numChunksExaminedOnShard = {{"0", 0}, {"1", 0}, {"2", 0}}; + for (auto&& chunk : chunks) { + auto shardId = chunk.getShard().toString(); + auto shardRanges = partitions.find(shardId); + ASSERT(shardRanges != partitions.end()); + auto nextChunkOnShard = numChunksExaminedOnShard[shardId]++; + ASSERT_LTE(nextChunkOnShard, shardRanges->second.size()); + auto outputChunk = shardRanges->second[nextChunkOnShard]; + + auto expectedChunkMin = [&]() { + ASSERT_EQ(chunk.getMin().nFields(), 2); + return BSON("_id" << chunk.getMin()["x"] << "_id" << chunk.getMin()["y"]); + }(); + ASSERT_BSONOBJ_EQ(outputChunk.getMin(), expectedChunkMin); + + auto expectedChunkMax = [&]() { + ASSERT_EQ(chunk.getMax().nFields(), 2); + return BSON("_id" << chunk.getMax()["x"] << "_id" << chunk.getMax()["y"]); + }(); + ASSERT_BSONOBJ_EQ(outputChunk.getMax(), expectedChunkMax); + } + }); + + future.timed_get(kFutureTimeout); +} +} // namespace +} // namespace mongo |