diff options
Diffstat (limited to 'src/mongo/db/views/view_graph.cpp')
-rw-r--r-- | src/mongo/db/views/view_graph.cpp | 174 |
1 files changed, 102 insertions, 72 deletions
diff --git a/src/mongo/db/views/view_graph.cpp b/src/mongo/db/views/view_graph.cpp index 05037db7b03..fc40fc4720a 100644 --- a/src/mongo/db/views/view_graph.cpp +++ b/src/mongo/db/views/view_graph.cpp @@ -30,7 +30,9 @@ #include "mongo/db/views/view_graph.h" +#include "mongo/db/query/collation/collator_interface.h" #include "mongo/db/views/view.h" +#include "mongo/util/scopeguard.h" namespace mongo { @@ -46,50 +48,33 @@ void ViewGraph::clear() { _namespaceIds.clear(); } -Status ViewGraph::insertAndValidate(const NamespaceString& viewNss, +size_t ViewGraph::size() const { + return _graph.size(); +} + +Status ViewGraph::insertAndValidate(const ViewDefinition& view, const std::vector<NamespaceString>& refs, - const int pipelineSize) { - insertWithoutValidating(viewNss, refs, pipelineSize); + int pipelineSize) { + insertWithoutValidating(view, refs, pipelineSize); // Perform validation on this newly inserted view. Note, if the graph was put in an invalid // state through unvalidated inserts (e.g. if the user manually edits system.views) // this may not necessarily be detected. We only check for errors introduced by this view. + const auto& viewNss = view.name(); uint64_t nodeId = _getNodeId(viewNss); - auto viewDepthLimitExceeded = [this, &viewNss]() -> Status { - this->remove(viewNss); - return Status(ErrorCodes::ViewDepthLimitExceeded, - str::stream() << "View depth limit exceeded; maximum depth is " - << kMaxViewDepth); - }; - - auto viewPipelineMaxSizeExceeded = [this, &viewNss]() -> Status { - this->remove(viewNss); - return Status(ErrorCodes::ViewPipelineMaxSizeExceeded, - str::stream() << "View pipeline exceeds maximum size; maximum size is " - << ViewGraph::kMaxViewPipelineSizeBytes); - }; + // If the graph fails validation for any reason, the insert is automatically rolled back on + // exiting this method. + auto rollBackInsert = [&]() -> void { remove(viewNss); }; + auto guard = MakeGuard(rollBackInsert); // Check for cycles and get the height of the children. StatsMap statsMap; std::vector<uint64_t> cycleVertices; - ErrorCodes::Error childRes = - _getChildrenStatsAndCheckCycle(nodeId, nodeId, 0, &statsMap, &cycleVertices); - if (childRes == ErrorCodes::ViewDepthLimitExceeded) { - return viewDepthLimitExceeded(); - } else if (childRes == ErrorCodes::ViewPipelineMaxSizeExceeded) { - return viewPipelineMaxSizeExceeded(); - } else if (childRes == ErrorCodes::GraphContainsCycle) { - // Make the error message with the namespaces of the cycle and remove the node. - str::stream ss; - ss << "View cycle detected: "; - for (auto cycleIter = cycleVertices.rbegin(); cycleIter != cycleVertices.rend(); - cycleIter++) { - ss << _graph[*cycleIter].ns << " => "; - } - ss << viewNss.ns(); - remove(viewNss); - return Status(ErrorCodes::GraphContainsCycle, ss); + cycleVertices.reserve(kMaxViewDepth); + auto childRes = _validateChildren(nodeId, nodeId, 0, &statsMap, &cycleVertices); + if (!childRes.isOK()) { + return childRes; } // Subtract one since the child height includes the non-view leaf node(s). @@ -100,11 +85,9 @@ Status ViewGraph::insertAndValidate(const NamespaceString& viewNss, // Get the height of the parents to obtain the diameter through this node, as well as the size // of the pipeline to check if if the combined pipeline exceeds the max size. statsMap.clear(); - ErrorCodes::Error parentRes = _getParentsStats(nodeId, 0, &statsMap); - if (parentRes == ErrorCodes::ViewDepthLimitExceeded) { - return viewDepthLimitExceeded(); - } else if (parentRes == ErrorCodes::ViewPipelineMaxSizeExceeded) { - return viewPipelineMaxSizeExceeded(); + auto parentRes = _validateParents(nodeId, 0, &statsMap); + if (!parentRes.isOK()) { + return parentRes; } // Check the combined heights of the children and parents. @@ -113,7 +96,8 @@ Status ViewGraph::insertAndValidate(const NamespaceString& viewNss, int diameter = parentsHeight + childrenHeight - 1; if (diameter > kMaxViewDepth) { - return viewDepthLimitExceeded(); + return {ErrorCodes::ViewDepthLimitExceeded, + str::stream() << "View depth limit exceeded; maximum depth is " << kMaxViewDepth}; } // Check the combined sizes of the children and parents. @@ -124,22 +108,30 @@ Status ViewGraph::insertAndValidate(const NamespaceString& viewNss, int pipelineTotalSize = parentsSize + childrenSize - currentNode.size; if (pipelineTotalSize > kMaxViewPipelineSizeBytes) { - return viewPipelineMaxSizeExceeded(); + return {ErrorCodes::ViewPipelineMaxSizeExceeded, + str::stream() << "Operation would result in a resolved view pipeline that exceeds " + "the maximum size of " + << kMaxViewPipelineSizeBytes + << " bytes"}; } + guard.Dismiss(); return Status::OK(); } -void ViewGraph::insertWithoutValidating(const NamespaceString& viewNss, +void ViewGraph::insertWithoutValidating(const ViewDefinition& view, const std::vector<NamespaceString>& refs, - const int pipelineSize) { - uint64_t nodeId = _getNodeId(viewNss); + int pipelineSize) { + uint64_t nodeId = _getNodeId(view.name()); // Note, the parent pointers of this node are set when the parents are inserted. // This sets the children pointers of the node for this view, as well as the parent // pointers for its children. Node* node = &(_graph[nodeId]); - node->size = pipelineSize; invariant(node->children.empty()); + invariant(!static_cast<bool>(node->collator)); + + node->size = pipelineSize; + node->collator = view.defaultCollator(); for (const NamespaceString& childNss : refs) { uint64_t childId = _getNodeId(childNss); @@ -172,8 +164,10 @@ void ViewGraph::remove(const NamespaceString& viewNss) { } } - // Remove all child pointers since this view no longer references anything. + // This node no longer represents a view, so its children must be cleared and its collator + // unset. node->children.clear(); + node->collator = boost::none; // Only remove node if there are no remaining references to this node. if (node->parents.size() == 0) { @@ -182,9 +176,7 @@ void ViewGraph::remove(const NamespaceString& viewNss) { } } -ErrorCodes::Error ViewGraph::_getParentsStats(uint64_t currentId, - int currentDepth, - StatsMap* statsMap) { +Status ViewGraph::_validateParents(uint64_t currentId, int currentDepth, StatsMap* statsMap) { const Node& currentNode = _graph[currentId]; int maxHeightOfParents = 0; int maxSizeOfParents = 0; @@ -192,13 +184,25 @@ ErrorCodes::Error ViewGraph::_getParentsStats(uint64_t currentId, // Return early if we've already exceeded the maximum depth. This will also be triggered if // we're traversing a cycle introduced through unvalidated inserts. if (currentDepth > kMaxViewDepth) { - return ErrorCodes::ViewDepthLimitExceeded; + return {ErrorCodes::ViewDepthLimitExceeded, + str::stream() << "View depth limit exceeded; maximum depth is " + << ViewGraph::kMaxViewDepth}; } for (uint64_t parentId : currentNode.parents) { + const auto& parentNode = _graph[parentId]; + if (parentNode.isView() && + !CollatorInterface::collatorsMatch(currentNode.collator.get(), + parentNode.collator.get())) { + return {ErrorCodes::OptionNotSupportedOnView, + str::stream() << "View " << currentNode.ns + << " has a collation that does not match the collation of view " + << parentNode.ns}; + } + if (!(*statsMap)[parentId].checked) { - auto res = _getParentsStats(parentId, currentDepth + 1, statsMap); - if (res != ErrorCodes::OK) { + auto res = _validateParents(parentId, currentDepth + 1, statsMap); + if (!res.isOK()) { return res; } } @@ -210,51 +214,77 @@ ErrorCodes::Error ViewGraph::_getParentsStats(uint64_t currentId, (*statsMap)[currentId].height = maxHeightOfParents + 1; (*statsMap)[currentId].cumulativeSize += maxSizeOfParents + currentNode.size; - if ((*statsMap)[currentId].cumulativeSize > kMaxViewPipelineSizeBytes) { - return ErrorCodes::ViewPipelineMaxSizeExceeded; + const auto size = (*statsMap)[currentId].cumulativeSize; + if (size > kMaxViewPipelineSizeBytes) { + return {ErrorCodes::ViewPipelineMaxSizeExceeded, + str::stream() << "View pipeline is too large and exceeds the maximum size of " + << ViewGraph::kMaxViewPipelineSizeBytes + << " bytes"}; } - return ErrorCodes::OK; + return Status::OK(); } -ErrorCodes::Error ViewGraph::_getChildrenStatsAndCheckCycle(uint64_t startingId, - uint64_t currentId, - int currentDepth, - StatsMap* statsMap, - std::vector<uint64_t>* cycleIds) { - // Check children of current node. +Status ViewGraph::_validateChildren(uint64_t startingId, + uint64_t currentId, + int currentDepth, + StatsMap* statsMap, + std::vector<uint64_t>* traversalIds) { const Node& currentNode = _graph[currentId]; + traversalIds->push_back(currentId); + + // If we've encountered the id of the starting node, we've found a cycle in the graph. if (currentDepth > 0 && currentId == startingId) { - return ErrorCodes::GraphContainsCycle; + auto iterator = traversalIds->rbegin(); + StringBuilder errmsg; + + errmsg << "View cycle detected: "; + errmsg << _graph[*iterator].ns; + for (; iterator != traversalIds->rend(); ++iterator) { + errmsg << " => " << _graph[*iterator].ns; + } + return {ErrorCodes::GraphContainsCycle, errmsg.str()}; } // Return early if we've already exceeded the maximum depth. This will also be triggered if // we're traversing a cycle introduced through unvalidated inserts. if (currentDepth > kMaxViewDepth) { - return ErrorCodes::ViewDepthLimitExceeded; + return {ErrorCodes::ViewDepthLimitExceeded, + str::stream() << "View depth limit exceeded; maximum depth is " + << ViewGraph::kMaxViewDepth}; } int maxHeightOfChildren = 0; int maxSizeOfChildren = 0; for (uint64_t childId : currentNode.children) { - if (!(*statsMap)[childId].checked) { - auto res = _getChildrenStatsAndCheckCycle( - startingId, childId, currentDepth + 1, statsMap, cycleIds); - if (res == ErrorCodes::GraphContainsCycle) { - cycleIds->push_back(currentId); - return res; - } else if (res != ErrorCodes::OK) { - return res; - } + if ((*statsMap)[childId].checked) { + continue; } + + const auto& childNode = _graph[childId]; + if (childNode.isView() && + !CollatorInterface::collatorsMatch(currentNode.collator.get(), + childNode.collator.get())) { + return {ErrorCodes::OptionNotSupportedOnView, + str::stream() << "View " << currentNode.ns + << " has a collation that does not match the collation of view " + << childNode.ns}; + } + + auto res = _validateChildren(startingId, childId, currentDepth + 1, statsMap, traversalIds); + if (!res.isOK()) { + return res; + } + maxHeightOfChildren = std::max(maxHeightOfChildren, (*statsMap)[childId].height); maxSizeOfChildren = std::max(maxSizeOfChildren, (*statsMap)[childId].cumulativeSize); } + traversalIds->pop_back(); (*statsMap)[currentId].checked = true; (*statsMap)[currentId].height = maxHeightOfChildren + 1; (*statsMap)[currentId].cumulativeSize += maxSizeOfChildren + currentNode.size; - return ErrorCodes::OK; + return Status::OK(); } uint64_t ViewGraph::_getNodeId(const NamespaceString& nss) { |