summaryrefslogtreecommitdiff
path: root/src/mongo/db/views/view_graph.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/views/view_graph.cpp')
-rw-r--r--src/mongo/db/views/view_graph.cpp174
1 files changed, 102 insertions, 72 deletions
diff --git a/src/mongo/db/views/view_graph.cpp b/src/mongo/db/views/view_graph.cpp
index 05037db7b03..fc40fc4720a 100644
--- a/src/mongo/db/views/view_graph.cpp
+++ b/src/mongo/db/views/view_graph.cpp
@@ -30,7 +30,9 @@
#include "mongo/db/views/view_graph.h"
+#include "mongo/db/query/collation/collator_interface.h"
#include "mongo/db/views/view.h"
+#include "mongo/util/scopeguard.h"
namespace mongo {
@@ -46,50 +48,33 @@ void ViewGraph::clear() {
_namespaceIds.clear();
}
-Status ViewGraph::insertAndValidate(const NamespaceString& viewNss,
+size_t ViewGraph::size() const {
+ return _graph.size();
+}
+
+Status ViewGraph::insertAndValidate(const ViewDefinition& view,
const std::vector<NamespaceString>& refs,
- const int pipelineSize) {
- insertWithoutValidating(viewNss, refs, pipelineSize);
+ int pipelineSize) {
+ insertWithoutValidating(view, refs, pipelineSize);
// Perform validation on this newly inserted view. Note, if the graph was put in an invalid
// state through unvalidated inserts (e.g. if the user manually edits system.views)
// this may not necessarily be detected. We only check for errors introduced by this view.
+ const auto& viewNss = view.name();
uint64_t nodeId = _getNodeId(viewNss);
- auto viewDepthLimitExceeded = [this, &viewNss]() -> Status {
- this->remove(viewNss);
- return Status(ErrorCodes::ViewDepthLimitExceeded,
- str::stream() << "View depth limit exceeded; maximum depth is "
- << kMaxViewDepth);
- };
-
- auto viewPipelineMaxSizeExceeded = [this, &viewNss]() -> Status {
- this->remove(viewNss);
- return Status(ErrorCodes::ViewPipelineMaxSizeExceeded,
- str::stream() << "View pipeline exceeds maximum size; maximum size is "
- << ViewGraph::kMaxViewPipelineSizeBytes);
- };
+ // If the graph fails validation for any reason, the insert is automatically rolled back on
+ // exiting this method.
+ auto rollBackInsert = [&]() -> void { remove(viewNss); };
+ auto guard = MakeGuard(rollBackInsert);
// Check for cycles and get the height of the children.
StatsMap statsMap;
std::vector<uint64_t> cycleVertices;
- ErrorCodes::Error childRes =
- _getChildrenStatsAndCheckCycle(nodeId, nodeId, 0, &statsMap, &cycleVertices);
- if (childRes == ErrorCodes::ViewDepthLimitExceeded) {
- return viewDepthLimitExceeded();
- } else if (childRes == ErrorCodes::ViewPipelineMaxSizeExceeded) {
- return viewPipelineMaxSizeExceeded();
- } else if (childRes == ErrorCodes::GraphContainsCycle) {
- // Make the error message with the namespaces of the cycle and remove the node.
- str::stream ss;
- ss << "View cycle detected: ";
- for (auto cycleIter = cycleVertices.rbegin(); cycleIter != cycleVertices.rend();
- cycleIter++) {
- ss << _graph[*cycleIter].ns << " => ";
- }
- ss << viewNss.ns();
- remove(viewNss);
- return Status(ErrorCodes::GraphContainsCycle, ss);
+ cycleVertices.reserve(kMaxViewDepth);
+ auto childRes = _validateChildren(nodeId, nodeId, 0, &statsMap, &cycleVertices);
+ if (!childRes.isOK()) {
+ return childRes;
}
// Subtract one since the child height includes the non-view leaf node(s).
@@ -100,11 +85,9 @@ Status ViewGraph::insertAndValidate(const NamespaceString& viewNss,
// Get the height of the parents to obtain the diameter through this node, as well as the size
// of the pipeline to check if if the combined pipeline exceeds the max size.
statsMap.clear();
- ErrorCodes::Error parentRes = _getParentsStats(nodeId, 0, &statsMap);
- if (parentRes == ErrorCodes::ViewDepthLimitExceeded) {
- return viewDepthLimitExceeded();
- } else if (parentRes == ErrorCodes::ViewPipelineMaxSizeExceeded) {
- return viewPipelineMaxSizeExceeded();
+ auto parentRes = _validateParents(nodeId, 0, &statsMap);
+ if (!parentRes.isOK()) {
+ return parentRes;
}
// Check the combined heights of the children and parents.
@@ -113,7 +96,8 @@ Status ViewGraph::insertAndValidate(const NamespaceString& viewNss,
int diameter = parentsHeight + childrenHeight - 1;
if (diameter > kMaxViewDepth) {
- return viewDepthLimitExceeded();
+ return {ErrorCodes::ViewDepthLimitExceeded,
+ str::stream() << "View depth limit exceeded; maximum depth is " << kMaxViewDepth};
}
// Check the combined sizes of the children and parents.
@@ -124,22 +108,30 @@ Status ViewGraph::insertAndValidate(const NamespaceString& viewNss,
int pipelineTotalSize = parentsSize + childrenSize - currentNode.size;
if (pipelineTotalSize > kMaxViewPipelineSizeBytes) {
- return viewPipelineMaxSizeExceeded();
+ return {ErrorCodes::ViewPipelineMaxSizeExceeded,
+ str::stream() << "Operation would result in a resolved view pipeline that exceeds "
+ "the maximum size of "
+ << kMaxViewPipelineSizeBytes
+ << " bytes"};
}
+ guard.Dismiss();
return Status::OK();
}
-void ViewGraph::insertWithoutValidating(const NamespaceString& viewNss,
+void ViewGraph::insertWithoutValidating(const ViewDefinition& view,
const std::vector<NamespaceString>& refs,
- const int pipelineSize) {
- uint64_t nodeId = _getNodeId(viewNss);
+ int pipelineSize) {
+ uint64_t nodeId = _getNodeId(view.name());
// Note, the parent pointers of this node are set when the parents are inserted.
// This sets the children pointers of the node for this view, as well as the parent
// pointers for its children.
Node* node = &(_graph[nodeId]);
- node->size = pipelineSize;
invariant(node->children.empty());
+ invariant(!static_cast<bool>(node->collator));
+
+ node->size = pipelineSize;
+ node->collator = view.defaultCollator();
for (const NamespaceString& childNss : refs) {
uint64_t childId = _getNodeId(childNss);
@@ -172,8 +164,10 @@ void ViewGraph::remove(const NamespaceString& viewNss) {
}
}
- // Remove all child pointers since this view no longer references anything.
+ // This node no longer represents a view, so its children must be cleared and its collator
+ // unset.
node->children.clear();
+ node->collator = boost::none;
// Only remove node if there are no remaining references to this node.
if (node->parents.size() == 0) {
@@ -182,9 +176,7 @@ void ViewGraph::remove(const NamespaceString& viewNss) {
}
}
-ErrorCodes::Error ViewGraph::_getParentsStats(uint64_t currentId,
- int currentDepth,
- StatsMap* statsMap) {
+Status ViewGraph::_validateParents(uint64_t currentId, int currentDepth, StatsMap* statsMap) {
const Node& currentNode = _graph[currentId];
int maxHeightOfParents = 0;
int maxSizeOfParents = 0;
@@ -192,13 +184,25 @@ ErrorCodes::Error ViewGraph::_getParentsStats(uint64_t currentId,
// Return early if we've already exceeded the maximum depth. This will also be triggered if
// we're traversing a cycle introduced through unvalidated inserts.
if (currentDepth > kMaxViewDepth) {
- return ErrorCodes::ViewDepthLimitExceeded;
+ return {ErrorCodes::ViewDepthLimitExceeded,
+ str::stream() << "View depth limit exceeded; maximum depth is "
+ << ViewGraph::kMaxViewDepth};
}
for (uint64_t parentId : currentNode.parents) {
+ const auto& parentNode = _graph[parentId];
+ if (parentNode.isView() &&
+ !CollatorInterface::collatorsMatch(currentNode.collator.get(),
+ parentNode.collator.get())) {
+ return {ErrorCodes::OptionNotSupportedOnView,
+ str::stream() << "View " << currentNode.ns
+ << " has a collation that does not match the collation of view "
+ << parentNode.ns};
+ }
+
if (!(*statsMap)[parentId].checked) {
- auto res = _getParentsStats(parentId, currentDepth + 1, statsMap);
- if (res != ErrorCodes::OK) {
+ auto res = _validateParents(parentId, currentDepth + 1, statsMap);
+ if (!res.isOK()) {
return res;
}
}
@@ -210,51 +214,77 @@ ErrorCodes::Error ViewGraph::_getParentsStats(uint64_t currentId,
(*statsMap)[currentId].height = maxHeightOfParents + 1;
(*statsMap)[currentId].cumulativeSize += maxSizeOfParents + currentNode.size;
- if ((*statsMap)[currentId].cumulativeSize > kMaxViewPipelineSizeBytes) {
- return ErrorCodes::ViewPipelineMaxSizeExceeded;
+ const auto size = (*statsMap)[currentId].cumulativeSize;
+ if (size > kMaxViewPipelineSizeBytes) {
+ return {ErrorCodes::ViewPipelineMaxSizeExceeded,
+ str::stream() << "View pipeline is too large and exceeds the maximum size of "
+ << ViewGraph::kMaxViewPipelineSizeBytes
+ << " bytes"};
}
- return ErrorCodes::OK;
+ return Status::OK();
}
-ErrorCodes::Error ViewGraph::_getChildrenStatsAndCheckCycle(uint64_t startingId,
- uint64_t currentId,
- int currentDepth,
- StatsMap* statsMap,
- std::vector<uint64_t>* cycleIds) {
- // Check children of current node.
+Status ViewGraph::_validateChildren(uint64_t startingId,
+ uint64_t currentId,
+ int currentDepth,
+ StatsMap* statsMap,
+ std::vector<uint64_t>* traversalIds) {
const Node& currentNode = _graph[currentId];
+ traversalIds->push_back(currentId);
+
+ // If we've encountered the id of the starting node, we've found a cycle in the graph.
if (currentDepth > 0 && currentId == startingId) {
- return ErrorCodes::GraphContainsCycle;
+ auto iterator = traversalIds->rbegin();
+ StringBuilder errmsg;
+
+ errmsg << "View cycle detected: ";
+ errmsg << _graph[*iterator].ns;
+ for (; iterator != traversalIds->rend(); ++iterator) {
+ errmsg << " => " << _graph[*iterator].ns;
+ }
+ return {ErrorCodes::GraphContainsCycle, errmsg.str()};
}
// Return early if we've already exceeded the maximum depth. This will also be triggered if
// we're traversing a cycle introduced through unvalidated inserts.
if (currentDepth > kMaxViewDepth) {
- return ErrorCodes::ViewDepthLimitExceeded;
+ return {ErrorCodes::ViewDepthLimitExceeded,
+ str::stream() << "View depth limit exceeded; maximum depth is "
+ << ViewGraph::kMaxViewDepth};
}
int maxHeightOfChildren = 0;
int maxSizeOfChildren = 0;
for (uint64_t childId : currentNode.children) {
- if (!(*statsMap)[childId].checked) {
- auto res = _getChildrenStatsAndCheckCycle(
- startingId, childId, currentDepth + 1, statsMap, cycleIds);
- if (res == ErrorCodes::GraphContainsCycle) {
- cycleIds->push_back(currentId);
- return res;
- } else if (res != ErrorCodes::OK) {
- return res;
- }
+ if ((*statsMap)[childId].checked) {
+ continue;
}
+
+ const auto& childNode = _graph[childId];
+ if (childNode.isView() &&
+ !CollatorInterface::collatorsMatch(currentNode.collator.get(),
+ childNode.collator.get())) {
+ return {ErrorCodes::OptionNotSupportedOnView,
+ str::stream() << "View " << currentNode.ns
+ << " has a collation that does not match the collation of view "
+ << childNode.ns};
+ }
+
+ auto res = _validateChildren(startingId, childId, currentDepth + 1, statsMap, traversalIds);
+ if (!res.isOK()) {
+ return res;
+ }
+
maxHeightOfChildren = std::max(maxHeightOfChildren, (*statsMap)[childId].height);
maxSizeOfChildren = std::max(maxSizeOfChildren, (*statsMap)[childId].cumulativeSize);
}
+ traversalIds->pop_back();
(*statsMap)[currentId].checked = true;
(*statsMap)[currentId].height = maxHeightOfChildren + 1;
(*statsMap)[currentId].cumulativeSize += maxSizeOfChildren + currentNode.size;
- return ErrorCodes::OK;
+ return Status::OK();
}
uint64_t ViewGraph::_getNodeId(const NamespaceString& nss) {