summaryrefslogtreecommitdiff
path: root/Lib/packaging/depgraph.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/packaging/depgraph.py')
-rw-r--r--Lib/packaging/depgraph.py273
1 files changed, 273 insertions, 0 deletions
diff --git a/Lib/packaging/depgraph.py b/Lib/packaging/depgraph.py
new file mode 100644
index 0000000000..b3c555a8a2
--- /dev/null
+++ b/Lib/packaging/depgraph.py
@@ -0,0 +1,273 @@
+"""Class and functions dealing with dependencies between distributions.
+
+This module provides a DependencyGraph class to represent the
+dependencies between distributions. Auxiliary functions can generate a
+graph, find reverse dependencies, and print a graph in DOT format.
+"""
+
+import sys
+
+from io import StringIO
+from packaging.errors import PackagingError
+from packaging.version import VersionPredicate, IrrationalVersionError
+
+__all__ = ['DependencyGraph', 'generate_graph', 'dependent_dists',
+ 'graph_to_dot']
+
+
+class DependencyGraph:
+ """
+ Represents a dependency graph between distributions.
+
+ The dependency relationships are stored in an ``adjacency_list`` that maps
+ distributions to a list of ``(other, label)`` tuples where ``other``
+ is a distribution and the edge is labeled with ``label`` (i.e. the version
+ specifier, if such was provided). Also, for more efficient traversal, for
+ every distribution ``x``, a list of predecessors is kept in
+ ``reverse_list[x]``. An edge from distribution ``a`` to
+ distribution ``b`` means that ``a`` depends on ``b``. If any missing
+ dependencies are found, they are stored in ``missing``, which is a
+ dictionary that maps distributions to a list of requirements that were not
+ provided by any other distributions.
+ """
+
+ def __init__(self):
+ self.adjacency_list = {}
+ self.reverse_list = {}
+ self.missing = {}
+
+ def add_distribution(self, distribution):
+ """Add the *distribution* to the graph.
+
+ :type distribution: :class:`packaging.database.Distribution` or
+ :class:`packaging.database.EggInfoDistribution`
+ """
+ self.adjacency_list[distribution] = []
+ self.reverse_list[distribution] = []
+ self.missing[distribution] = []
+
+ def add_edge(self, x, y, label=None):
+ """Add an edge from distribution *x* to distribution *y* with the given
+ *label*.
+
+ :type x: :class:`packaging.database.Distribution` or
+ :class:`packaging.database.EggInfoDistribution`
+ :type y: :class:`packaging.database.Distribution` or
+ :class:`packaging.database.EggInfoDistribution`
+ :type label: ``str`` or ``None``
+ """
+ self.adjacency_list[x].append((y, label))
+ # multiple edges are allowed, so be careful
+ if x not in self.reverse_list[y]:
+ self.reverse_list[y].append(x)
+
+ def add_missing(self, distribution, requirement):
+ """
+ Add a missing *requirement* for the given *distribution*.
+
+ :type distribution: :class:`packaging.database.Distribution` or
+ :class:`packaging.database.EggInfoDistribution`
+ :type requirement: ``str``
+ """
+ self.missing[distribution].append(requirement)
+
+ def _repr_dist(self, dist):
+ return '%r %s' % (dist.name, dist.metadata['Version'])
+
+ def repr_node(self, dist, level=1):
+ """Prints only a subgraph"""
+ output = []
+ output.append(self._repr_dist(dist))
+ for other, label in self.adjacency_list[dist]:
+ dist = self._repr_dist(other)
+ if label is not None:
+ dist = '%s [%s]' % (dist, label)
+ output.append(' ' * level + str(dist))
+ suboutput = self.repr_node(other, level + 1)
+ subs = suboutput.split('\n')
+ output.extend(subs[1:])
+ return '\n'.join(output)
+
+ def __repr__(self):
+ """Representation of the graph"""
+ output = []
+ for dist, adjs in self.adjacency_list.items():
+ output.append(self.repr_node(dist))
+ return '\n'.join(output)
+
+
+def graph_to_dot(graph, f, skip_disconnected=True):
+ """Writes a DOT output for the graph to the provided file *f*.
+
+ If *skip_disconnected* is set to ``True``, then all distributions
+ that are not dependent on any other distribution are skipped.
+
+ :type f: has to support ``file``-like operations
+ :type skip_disconnected: ``bool``
+ """
+ disconnected = []
+
+ f.write("digraph dependencies {\n")
+ for dist, adjs in graph.adjacency_list.items():
+ if len(adjs) == 0 and not skip_disconnected:
+ disconnected.append(dist)
+ for other, label in adjs:
+ if not label is None:
+ f.write('"%s" -> "%s" [label="%s"]\n' %
+ (dist.name, other.name, label))
+ else:
+ f.write('"%s" -> "%s"\n' % (dist.name, other.name))
+ if not skip_disconnected and len(disconnected) > 0:
+ f.write('subgraph disconnected {\n')
+ f.write('label = "Disconnected"\n')
+ f.write('bgcolor = red\n')
+
+ for dist in disconnected:
+ f.write('"%s"' % dist.name)
+ f.write('\n')
+ f.write('}\n')
+ f.write('}\n')
+
+
+def generate_graph(dists):
+ """Generates a dependency graph from the given distributions.
+
+ :parameter dists: a list of distributions
+ :type dists: list of :class:`packaging.database.Distribution` and
+ :class:`packaging.database.EggInfoDistribution` instances
+ :rtype: a :class:`DependencyGraph` instance
+ """
+ graph = DependencyGraph()
+ provided = {} # maps names to lists of (version, dist) tuples
+
+ # first, build the graph and find out the provides
+ for dist in dists:
+ graph.add_distribution(dist)
+ provides = (dist.metadata['Provides-Dist'] +
+ dist.metadata['Provides'] +
+ ['%s (%s)' % (dist.name, dist.metadata['Version'])])
+
+ for p in provides:
+ comps = p.strip().rsplit(" ", 1)
+ name = comps[0]
+ version = None
+ if len(comps) == 2:
+ version = comps[1]
+ if len(version) < 3 or version[0] != '(' or version[-1] != ')':
+ raise PackagingError('distribution %r has ill-formed'
+ 'provides field: %r' % (dist.name, p))
+ version = version[1:-1] # trim off parenthesis
+ if name not in provided:
+ provided[name] = []
+ provided[name].append((version, dist))
+
+ # now make the edges
+ for dist in dists:
+ requires = dist.metadata['Requires-Dist'] + dist.metadata['Requires']
+ for req in requires:
+ try:
+ predicate = VersionPredicate(req)
+ except IrrationalVersionError:
+ # XXX compat-mode if cannot read the version
+ name = req.split()[0]
+ predicate = VersionPredicate(name)
+
+ name = predicate.name
+
+ if name not in provided:
+ graph.add_missing(dist, req)
+ else:
+ matched = False
+ for version, provider in provided[name]:
+ try:
+ match = predicate.match(version)
+ except IrrationalVersionError:
+ # XXX small compat-mode
+ if version.split(' ') == 1:
+ match = True
+ else:
+ match = False
+
+ if match:
+ graph.add_edge(dist, provider, req)
+ matched = True
+ break
+ if not matched:
+ graph.add_missing(dist, req)
+ return graph
+
+
+def dependent_dists(dists, dist):
+ """Recursively generate a list of distributions from *dists* that are
+ dependent on *dist*.
+
+ :param dists: a list of distributions
+ :param dist: a distribution, member of *dists* for which we are interested
+ """
+ if dist not in dists:
+ raise ValueError('given distribution %r is not a member of the list' %
+ dist.name)
+ graph = generate_graph(dists)
+
+ dep = [dist] # dependent distributions
+ fringe = graph.reverse_list[dist] # list of nodes we should inspect
+
+ while not len(fringe) == 0:
+ node = fringe.pop()
+ dep.append(node)
+ for prev in graph.reverse_list[node]:
+ if prev not in dep:
+ fringe.append(prev)
+
+ dep.pop(0) # remove dist from dep, was there to prevent infinite loops
+ return dep
+
+
+def main():
+ from packaging.database import get_distributions
+ tempout = StringIO()
+ try:
+ old = sys.stderr
+ sys.stderr = tempout
+ try:
+ dists = list(get_distributions(use_egg_info=True))
+ graph = generate_graph(dists)
+ finally:
+ sys.stderr = old
+ except Exception as e:
+ tempout.seek(0)
+ tempout = tempout.read()
+ print('Could not generate the graph')
+ print(tempout)
+ print(e)
+ sys.exit(1)
+
+ for dist, reqs in graph.missing.items():
+ if len(reqs) > 0:
+ print("Warning: Missing dependencies for %r:" % dist.name,
+ ", ".join(reqs))
+ # XXX replace with argparse
+ if len(sys.argv) == 1:
+ print('Dependency graph:')
+ print(' ', repr(graph).replace('\n', '\n '))
+ sys.exit(0)
+ elif len(sys.argv) > 1 and sys.argv[1] in ('-d', '--dot'):
+ if len(sys.argv) > 2:
+ filename = sys.argv[2]
+ else:
+ filename = 'depgraph.dot'
+
+ with open(filename, 'w') as f:
+ graph_to_dot(graph, f, True)
+ tempout.seek(0)
+ tempout = tempout.read()
+ print(tempout)
+ print('Dot file written at %r' % filename)
+ sys.exit(0)
+ else:
+ print('Supported option: -d [filename]')
+ sys.exit(1)
+
+
+if __name__ == '__main__':
+ main()