from __future__ import annotations from typing import TYPE_CHECKING, Iterable, Iterator, List, Optional from rdflib.namespace import RDF from rdflib.term import BNode, Node if TYPE_CHECKING: from rdflib.graph import Graph __all__ = ["Collection"] class Collection(object): __doc__ = """ See "Emulating container types": >>> from rdflib.term import Literal >>> from rdflib.graph import Graph >>> from pprint import pprint >>> listname = BNode() >>> g = Graph('Memory') >>> listItem1 = BNode() >>> listItem2 = BNode() >>> g.add((listname, RDF.first, Literal(1))) # doctest: +ELLIPSIS )> >>> g.add((listname,, listItem1)) # doctest: +ELLIPSIS )> >>> g.add((listItem1, RDF.first, Literal(2))) # doctest: +ELLIPSIS )> >>> g.add((listItem1,, listItem2)) # doctest: +ELLIPSIS )> >>> g.add((listItem2,, RDF.nil)) # doctest: +ELLIPSIS )> >>> g.add((listItem2, RDF.first, Literal(3))) # doctest: +ELLIPSIS )> >>> c = Collection(g,listname) >>> pprint([term.n3() for term in c]) [u'"1"^^', u'"2"^^', u'"3"^^'] >>> Literal(1) in c True >>> len(c) 3 >>> c._get_container(1) == listItem1 True >>> c.index(Literal(2)) == 1 True """ def __init__(self, graph: Graph, uri: Node, seq: List[Node] = []): self.graph = graph self.uri = uri or BNode() self += seq def n3(self) -> str: """ >>> from rdflib.term import Literal >>> from rdflib.graph import Graph >>> listname = BNode() >>> g = Graph('Memory') >>> listItem1 = BNode() >>> listItem2 = BNode() >>> g.add((listname, RDF.first, Literal(1))) # doctest: +ELLIPSIS )> >>> g.add((listname,, listItem1)) # doctest: +ELLIPSIS )> >>> g.add((listItem1, RDF.first, Literal(2))) # doctest: +ELLIPSIS )> >>> g.add((listItem1,, listItem2)) # doctest: +ELLIPSIS )> >>> g.add((listItem2,, RDF.nil)) # doctest: +ELLIPSIS )> >>> g.add((listItem2, RDF.first, Literal(3))) # doctest: +ELLIPSIS )> >>> c = Collection(g, listname) >>> print(c.n3()) #doctest: +NORMALIZE_WHITESPACE ( "1"^^ "2"^^ "3"^^ ) """ # type error: "Node" has no attribute "n3" return "( %s )" % (" ".join([i.n3() for i in self])) # type: ignore[attr-defined] def _get_container(self, index: int) -> Optional[Node]: """Gets the first, rest holding node at index.""" assert isinstance(index, int) graph = self.graph container: Optional[Node] = self.uri i = 0 while i < index: i += 1 container = graph.value(container, if container is None: break return container def __len__(self) -> int: """length of items in collection.""" return len(list(self.graph.items(self.uri))) def index(self, item: Node) -> int: """ Returns the 0-based numerical index of the item in the list """ listname = self.uri index = 0 while True: if (listname, RDF.first, item) in self.graph: return index else: newlink = list(self.graph.objects(listname, index += 1 if newlink == [RDF.nil]: raise ValueError("%s is not in %s" % (item, self.uri)) elif not newlink: raise Exception("Malformed RDF Collection: %s" % self.uri) else: assert len(newlink) == 1, "Malformed RDF Collection: %s" % self.uri listname = newlink[0] def __getitem__(self, key: int) -> Node: """TODO""" c = self._get_container(key) if c: v = self.graph.value(c, RDF.first) if v: return v else: raise KeyError(key) else: raise IndexError(key) def __setitem__(self, key: int, value: Node) -> None: """TODO""" c = self._get_container(key) if c: self.graph.set((c, RDF.first, value)) else: raise IndexError(key) def __delitem__(self, key: int) -> None: """ >>> from rdflib.namespace import RDF, RDFS >>> from rdflib import Graph >>> from pprint import pformat >>> g = Graph() >>> a = BNode('foo') >>> b = BNode('bar') >>> c = BNode('baz') >>> g.add((a, RDF.first, RDF.type)) # doctest: +ELLIPSIS )> >>> g.add((a,, b)) # doctest: +ELLIPSIS )> >>> g.add((b, RDF.first, RDFS.label)) # doctest: +ELLIPSIS )> >>> g.add((b,, c)) # doctest: +ELLIPSIS )> >>> g.add((c, RDF.first, RDFS.comment)) # doctest: +ELLIPSIS )> >>> g.add((c,, RDF.nil)) # doctest: +ELLIPSIS )> >>> len(g) 6 >>> def listAncestry(node, graph): ... for i in graph.subjects(, node): ... yield i >>> [str(node.n3()) ... for node in g.transitiveClosure(listAncestry, RDF.nil)] ['_:baz', '_:bar', '_:foo'] >>> lst = Collection(g, a) >>> len(lst) 3 >>> b == lst._get_container(1) True >>> c == lst._get_container(2) True >>> del lst[1] >>> len(lst) 2 >>> len(g) 4 """ self[key] # to raise any potential key exceptions graph = self.graph current = self._get_container(key) assert current if len(self) == 1 and key > 0: pass elif key == len(self) - 1: # the tail priorlink = self._get_container(key - 1) # type error: Argument 1 to "set" of "Graph" has incompatible type "Tuple[Optional[Node], URIRef, URIRef]"; expected "Tuple[Node, Node, Any]" self.graph.set((priorlink,, RDF.nil)) # type: ignore[arg-type] graph.remove((current, None, None)) else: next = self._get_container(key + 1) prior = self._get_container(key - 1) assert next and prior graph.remove((current, None, None)) graph.set((prior,, next)) def __iter__(self) -> Iterator[Node]: """Iterator over items in Collections""" return self.graph.items(self.uri) def _end(self) -> Node: # find end of list container = self.uri while True: rest = self.graph.value(container, if rest is None or rest == RDF.nil: return container else: container = rest def append(self, item: Node) -> Collection: """ >>> from rdflib.term import Literal >>> from rdflib.graph import Graph >>> listname = BNode() >>> g = Graph() >>> c = Collection(g,listname,[Literal(1),Literal(2)]) >>> links = [ ... list(g.subjects(object=i, predicate=RDF.first))[0] for i in c] >>> len([i for i in links if (i,, RDF.nil) in g]) 1 """ end = self._end() if (end, RDF.first, None) in self.graph: # append new node to the end of the linked list node = BNode() self.graph.set((end,, node)) end = node self.graph.add((end, RDF.first, item)) self.graph.add((end,, RDF.nil)) return self def __iadd__(self, other: Iterable[Node]): end = self._end() self.graph.remove((end,, None)) for item in other: if (end, RDF.first, None) in self.graph: nxt = BNode() self.graph.add((end,, nxt)) end = nxt self.graph.add((end, RDF.first, item)) self.graph.add((end,, RDF.nil)) return self def clear(self): container: Optional[Node] = self.uri graph = self.graph while container: rest = graph.value(container, graph.remove((container, RDF.first, None)) graph.remove((container,, None)) container = rest return self