summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorIwan Aucamp <aucampia@gmail.com>2023-03-05 21:48:40 +0100
committerGitHub <noreply@github.com>2023-03-05 21:48:40 +0100
commit123edf52a0fc9c5d76efb2caa5f532956b3437ce (patch)
tree85f7a86a09d65546a58a73f0026a9a93b09d91de
parentd2c9edc7a3db57e1b447c2c4ba06e1838b7ada0c (diff)
downloadrdflib-123edf52a0fc9c5d76efb2caa5f532956b3437ce.tar.gz
feat: add parser type hints (#2232)
Add type hints to: - `rdflib/parser.py` - `rdflib/plugins/parser/*.py` - some JSON-LD utils - `rdflib/exceptions.py`. This is mainly because the work I'm doing to fix <https://github.com/RDFLib/rdflib/issues/1844> is touching some of this parser stuff and the type hints are useful to avoid mistakes. No runtime changes are included in this PR.
-rw-r--r--docs/conf.py1
-rw-r--r--pyproject.toml5
-rw-r--r--rdflib/exceptions.py14
-rw-r--r--rdflib/parser.py47
-rw-r--r--rdflib/plugins/parsers/hext.py31
-rw-r--r--rdflib/plugins/parsers/jsonld.py137
-rwxr-xr-xrdflib/plugins/parsers/notation3.py267
-rw-r--r--rdflib/plugins/parsers/nquads.py32
-rw-r--r--rdflib/plugins/parsers/ntriples.py67
-rw-r--r--rdflib/plugins/parsers/rdfxml.py162
-rw-r--r--rdflib/plugins/parsers/trig.py27
-rw-r--r--rdflib/plugins/parsers/trix.py61
-rw-r--r--rdflib/plugins/shared/jsonld/context.py170
-rw-r--r--rdflib/plugins/shared/jsonld/util.py37
14 files changed, 698 insertions, 360 deletions
diff --git a/docs/conf.py b/docs/conf.py
index 050364f6..3068cc12 100644
--- a/docs/conf.py
+++ b/docs/conf.py
@@ -279,6 +279,7 @@ if sphinx_version < (5,):
nitpicky = False
nitpick_ignore = [
+ ("py:class", "urllib.response.addinfourl"),
("py:data", "typing.Literal"),
("py:class", "typing.IO[bytes]"),
("py:class", "http.client.HTTPMessage"),
diff --git a/pyproject.toml b/pyproject.toml
index 88ade542..a187f950 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -105,7 +105,10 @@ pep8-naming = ["-N802", "-N806", "-N815"]
pycodestyle = ["-E402"]
[tool.flakeheaven.exceptions."test/utils/namespace/_*"]
pep8-naming = ["-N815"]
-
+[tool.flakeheaven.exceptions."rdflib/plugins/parsers/rdfxml.py"]
+pep8-naming = ["-N802"]
+[tool.flakeheaven.exceptions."rdflib/plugins/parsers/trix.py"]
+pep8-naming = ["-N802"]
[tool.black]
required-version = "22.12.0"
diff --git a/rdflib/exceptions.py b/rdflib/exceptions.py
index 2d71e6e2..708756ef 100644
--- a/rdflib/exceptions.py
+++ b/rdflib/exceptions.py
@@ -5,13 +5,17 @@ TODO:
__all__ = [
"Error",
"ParserError",
+ "UniquenessError",
]
+from typing import Any, Optional
+
+
class Error(Exception):
"""Base class for rdflib exceptions."""
- def __init__(self, msg=None):
+ def __init__(self, msg: Optional[str] = None):
Exception.__init__(self, msg)
self.msg = msg
@@ -19,18 +23,18 @@ class Error(Exception):
class ParserError(Error):
"""RDF Parser error."""
- def __init__(self, msg):
+ def __init__(self, msg: str):
Error.__init__(self, msg)
- self.msg = msg
+ self.msg: str = msg
- def __str__(self):
+ def __str__(self) -> str:
return self.msg
class UniquenessError(Error):
"""A uniqueness assumption was made in the context, and that is not true"""
- def __init__(self, values):
+ def __init__(self, values: Any):
Error.__init__(
self,
"\
diff --git a/rdflib/parser.py b/rdflib/parser.py
index aa6b73d8..4f8505d9 100644
--- a/rdflib/parser.py
+++ b/rdflib/parser.py
@@ -9,6 +9,7 @@ can plugin to rdflib. If you are wanting to invoke a parser you likely
want to do so through the Graph class parse method.
"""
+from __future__ import annotations
import codecs
import os
@@ -37,9 +38,10 @@ from rdflib.namespace import Namespace
from rdflib.term import URIRef
if TYPE_CHECKING:
- from http.client import HTTPMessage, HTTPResponse
+ from email.message import Message
+ from urllib.response import addinfourl
- from rdflib import Graph
+ from rdflib.graph import Graph
__all__ = [
"Parser",
@@ -57,7 +59,7 @@ class Parser(object):
def __init__(self):
pass
- def parse(self, source: "InputSource", sink: "Graph"):
+ def parse(self, source: "InputSource", sink: "Graph") -> None:
pass
@@ -92,7 +94,7 @@ class BytesIOWrapper(BufferedIOBase):
raise NotImplementedError()
-class InputSource(xmlreader.InputSource, object):
+class InputSource(xmlreader.InputSource):
"""
TODO:
"""
@@ -102,7 +104,7 @@ class InputSource(xmlreader.InputSource, object):
self.content_type: Optional[str] = None
self.auto_close = False # see Graph.parse(), true if opened by us
- def close(self):
+ def close(self) -> None:
c = self.getCharacterStream()
if c and hasattr(c, "close"):
try:
@@ -133,26 +135,26 @@ class PythonInputSource(InputSource):
True
"""
- def __init__(self, data, system_id=None):
+ def __init__(self, data: Any, system_id: Optional[str] = None):
self.content_type = None
self.auto_close = False # see Graph.parse(), true if opened by us
- self.public_id = None
- self.system_id = system_id
+ self.public_id: Optional[str] = None
+ self.system_id: Optional[str] = system_id
self.data = data
- def getPublicId(self): # noqa: N802
+ def getPublicId(self) -> Optional[str]: # noqa: N802
return self.public_id
- def setPublicId(self, public_id): # noqa: N802
+ def setPublicId(self, public_id: Optional[str]) -> None: # noqa: N802
self.public_id = public_id
- def getSystemId(self): # noqa: N802
+ def getSystemId(self) -> Optional[str]: # noqa: N802
return self.system_id
- def setSystemId(self, system_id): # noqa: N802
+ def setSystemId(self, system_id: Optional[str]) -> None: # noqa: N802
self.system_id = system_id
- def close(self):
+ def close(self) -> None:
self.data = None
@@ -197,16 +199,16 @@ class URLInputSource(InputSource):
links: List[str]
@classmethod
- def getallmatchingheaders(cls, message: "HTTPMessage", name):
+ def getallmatchingheaders(cls, message: "Message", name) -> List[str]:
# This is reimplemented here, because the method
# getallmatchingheaders from HTTPMessage is broken since Python 3.0
name = name.lower()
return [val for key, val in message.items() if key.lower() == name]
@classmethod
- def get_links(cls, response: "HTTPResponse"):
+ def get_links(cls, response: addinfourl) -> List[str]:
linkslines = cls.getallmatchingheaders(response.headers, "Link")
- retarray = []
+ retarray: List[str] = []
for linksline in linkslines:
links = [linkstr.strip() for linkstr in linksline.split(",")]
for link in links:
@@ -279,7 +281,7 @@ class URLInputSource(InputSource):
else:
raise
- response: HTTPResponse = _urlopen(req)
+ response: addinfourl = _urlopen(req)
self.url = response.geturl() # in case redirections took place
self.links = self.get_links(response)
if format in ("json-ld", "application/ld+json"):
@@ -300,8 +302,9 @@ class URLInputSource(InputSource):
# TODO: self.setEncoding(encoding)
self.response_info = response.info() # a mimetools.Message instance
- def __repr__(self):
- return self.url
+ def __repr__(self) -> str:
+ # type error: Incompatible return value type (got "Optional[str]", expected "str")
+ return self.url # type: ignore[return-value]
class FileInputSource(InputSource):
@@ -325,7 +328,7 @@ class FileInputSource(InputSource):
# We cannot set characterStream here because
# we do not know the Raw Bytes File encoding.
- def __repr__(self):
+ def __repr__(self) -> str:
return repr(self.file)
@@ -336,8 +339,8 @@ def create_input_source(
publicID: Optional[str] = None, # noqa: N803
location: Optional[str] = None,
file: Optional[Union[BinaryIO, TextIO]] = None,
- data: Union[str, bytes, dict] = None,
- format: str = None,
+ data: Optional[Union[str, bytes, dict]] = None,
+ format: Optional[str] = None,
) -> InputSource:
"""
Return an appropriate InputSource instance for the given
diff --git a/rdflib/plugins/parsers/hext.py b/rdflib/plugins/parsers/hext.py
index df4087d5..142c6943 100644
--- a/rdflib/plugins/parsers/hext.py
+++ b/rdflib/plugins/parsers/hext.py
@@ -3,12 +3,15 @@ This is a rdflib plugin for parsing Hextuple files, which are Newline-Delimited
(ndjson) files, into Conjunctive. The store that backs the graph *must* be able to
handle contexts, i.e. multiple graphs.
"""
+from __future__ import annotations
+
import json
import warnings
-from typing import List, Union
+from typing import TYPE_CHECKING, Any, List, Optional, Union
-from rdflib import BNode, ConjunctiveGraph, Literal, URIRef
-from rdflib.parser import Parser
+from rdflib.graph import ConjunctiveGraph, Graph
+from rdflib.parser import FileInputSource, InputSource, Parser
+from rdflib.term import BNode, Literal, URIRef
__all__ = ["HextuplesParser"]
@@ -22,7 +25,7 @@ class HextuplesParser(Parser):
def __init__(self):
pass
- def _load_json_line(self, line: str):
+ def _load_json_line(self, line: str) -> List[Optional[Any]]:
# this complex handing is because the 'value' component is
# allowed to be "" but not None
# all other "" values are treated as None
@@ -32,7 +35,9 @@ class HextuplesParser(Parser):
ret2[2] = ""
return ret2
- def _parse_hextuple(self, cg: ConjunctiveGraph, tup: List[Union[str, None]]):
+ def _parse_hextuple(
+ self, cg: ConjunctiveGraph, tup: List[Union[str, None]]
+ ) -> None:
# all values check
# subject, predicate, value, datatype cannot be None
# language and graph may be None
@@ -71,7 +76,8 @@ class HextuplesParser(Parser):
else:
cg.add((s, p, o))
- def parse(self, source, graph, **kwargs):
+ # type error: Signature of "parse" incompatible with supertype "Parser"
+ def parse(self, source: InputSource, graph: Graph, **kwargs: Any) -> None: # type: ignore[override]
if kwargs.get("encoding") not in [None, "utf-8"]:
warnings.warn(
f"Hextuples files are always utf-8 encoded, "
@@ -88,10 +94,17 @@ class HextuplesParser(Parser):
# handle different source types - only file and string (data) for now
if hasattr(source, "file"):
- with open(source.file.name, encoding="utf-8") as fp:
- for l in fp:
+ if TYPE_CHECKING:
+ assert isinstance(source, FileInputSource)
+ # type error: Item "TextIOBase" of "Union[BinaryIO, TextIO, TextIOBase, RawIOBase, BufferedIOBase]" has no attribute "name"
+ # type error: Item "RawIOBase" of "Union[BinaryIO, TextIO, TextIOBase, RawIOBase, BufferedIOBase]" has no attribute "name"
+ # type error: Item "BufferedIOBase" of "Union[BinaryIO, TextIO, TextIOBase, RawIOBase, BufferedIOBase]" has no attribute "name"
+ with open(source.file.name, encoding="utf-8") as fp: # type: ignore[union-attr]
+ for l in fp: # noqa: E741
self._parse_hextuple(cg, self._load_json_line(l))
elif hasattr(source, "_InputSource__bytefile"):
if hasattr(source._InputSource__bytefile, "wrapped"):
- for l in source._InputSource__bytefile.wrapped.strip().splitlines():
+ for (
+ l # noqa: E741
+ ) in source._InputSource__bytefile.wrapped.strip().splitlines():
self._parse_hextuple(cg, self._load_json_line(l))
diff --git a/rdflib/plugins/parsers/jsonld.py b/rdflib/plugins/parsers/jsonld.py
index 86fdf800..8f7e5b88 100644
--- a/rdflib/plugins/parsers/jsonld.py
+++ b/rdflib/plugins/parsers/jsonld.py
@@ -32,15 +32,16 @@ Example usage::
# NOTE: This code reads the entire JSON object into memory before parsing, but
# we should consider streaming the input to deal with arbitrarily large graphs.
+from __future__ import annotations
import warnings
-from typing import Optional
+from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union
import rdflib.parser
-from rdflib.graph import ConjunctiveGraph
+from rdflib.graph import ConjunctiveGraph, Graph
from rdflib.namespace import RDF, XSD
-from rdflib.parser import URLInputSource
-from rdflib.term import BNode, Literal, URIRef
+from rdflib.parser import InputSource, URLInputSource
+from rdflib.term import BNode, IdentifiedNode, Literal, Node, URIRef
from ..shared.jsonld.context import UNDEF, Context, Term
from ..shared.jsonld.keys import (
@@ -78,7 +79,7 @@ class JsonLDParser(rdflib.parser.Parser):
def __init__(self):
super(JsonLDParser, self).__init__()
- def parse(self, source, sink, **kwargs):
+ def parse(self, source: InputSource, sink: Graph, **kwargs: Any) -> None:
# TODO: docstring w. args and return value
encoding = kwargs.get("encoding") or "utf-8"
if encoding not in ("utf-8", "utf-16"):
@@ -93,6 +94,8 @@ class JsonLDParser(rdflib.parser.Parser):
context_data = kwargs.get("context")
if not context_data and hasattr(source, "url") and hasattr(source, "links"):
+ if TYPE_CHECKING:
+ assert isinstance(source, URLInputSource)
context_data = context_from_urlinputsource(source)
try:
@@ -107,6 +110,7 @@ class JsonLDParser(rdflib.parser.Parser):
# NOTE: A ConjunctiveGraph parses into a Graph sink, so no sink will be
# context_aware. Keeping this check in case RDFLib is changed, or
# someone passes something context_aware to this parser directly.
+ conj_sink: Graph
if not sink.context_aware:
conj_sink = ConjunctiveGraph(store=sink.store, identifier=sink.identifier)
else:
@@ -116,13 +120,13 @@ class JsonLDParser(rdflib.parser.Parser):
def to_rdf(
- data,
- dataset,
- base=None,
- context_data=None,
+ data: Any,
+ dataset: Graph,
+ base: Optional[str] = None,
+ context_data: Optional[bool] = None,
version: Optional[float] = None,
- generalized_rdf=False,
- allow_lists_of_lists=None,
+ generalized_rdf: bool = False,
+ allow_lists_of_lists: Optional[bool] = None,
):
# TODO: docstring w. args and return value
context = Context(base=base, version=version)
@@ -135,7 +139,9 @@ def to_rdf(
class Parser(object):
- def __init__(self, generalized_rdf=False, allow_lists_of_lists=None):
+ def __init__(
+ self, generalized_rdf: bool = False, allow_lists_of_lists: Optional[bool] = None
+ ):
self.generalized_rdf = generalized_rdf
self.allow_lists_of_lists = (
allow_lists_of_lists
@@ -143,9 +149,9 @@ class Parser(object):
else ALLOW_LISTS_OF_LISTS
)
- def parse(self, data, context, dataset):
+ def parse(self, data: Any, context: Context, dataset: Graph) -> Graph:
topcontext = False
-
+ resources: Union[Dict[str, Any], List[Any]]
if isinstance(data, list):
resources = data
elif isinstance(data, dict):
@@ -154,7 +160,8 @@ class Parser(object):
context.load(local_context, context.base)
topcontext = True
resources = data
- if not isinstance(resources, list):
+ # type error: Subclass of "Dict[str, Any]" and "List[Any]" cannot exist: would have incompatible method signatures
+ if not isinstance(resources, list): # type: ignore[unreachable]
resources = [resources]
if context.vocab:
@@ -163,16 +170,25 @@ class Parser(object):
if term.id and term.id.endswith(VOCAB_DELIMS):
dataset.bind(name, term.id)
- graph = dataset.default_context if dataset.context_aware else dataset
+ # type error: "Graph" has no attribute "default_context"
+ graph = dataset.default_context if dataset.context_aware else dataset # type: ignore[attr-defined]
for node in resources:
self._add_to_graph(dataset, graph, context, node, topcontext)
return graph
- def _add_to_graph(self, dataset, graph, context, node, topcontext=False):
+ def _add_to_graph(
+ self,
+ dataset: Graph,
+ graph: Graph,
+ context: Context,
+ node: Any,
+ topcontext: bool = False,
+ ) -> Optional[Node]:
if not isinstance(node, dict) or context.get_value(node):
- return
+ # type error: Return value expected
+ return # type: ignore[return-value]
if CONTEXT in node and not topcontext:
local_context = node[CONTEXT]
@@ -181,7 +197,8 @@ class Parser(object):
else:
context = Context(base=context.doc_base)
- context = context.get_context_for_type(node)
+ # type error: Incompatible types in assignment (expression has type "Optional[Context]", variable has type "Context")
+ context = context.get_context_for_type(node) # type: ignore[assignment]
id_val = context.get_id(node)
@@ -222,7 +239,8 @@ class Parser(object):
return subj
- def _get_nested_id(self, context, node):
+ # type error: Missing return statement
+ def _get_nested_id(self, context: Context, node: Dict[str, Any]) -> Optional[str]: # type: ignore[return]
for key, obj in node.items():
if context.version >= 1.1 and key in context.get_keys(NEST):
term = context.terms.get(key)
@@ -242,8 +260,16 @@ class Parser(object):
return id_val
def _key_to_graph(
- self, dataset, graph, context, subj, key, obj, reverse=False, no_id=False
- ):
+ self,
+ dataset: Graph,
+ graph: Graph,
+ context: Context,
+ subj: Node,
+ key: str,
+ obj: Any,
+ reverse: bool = False,
+ no_id: bool = False,
+ ) -> None:
if isinstance(obj, list):
obj_nodes = obj
@@ -267,7 +293,10 @@ class Parser(object):
if GRAPH in (key, term_id):
if dataset.context_aware and not no_id:
- subgraph = dataset.get_context(subj)
+ if TYPE_CHECKING:
+ assert isinstance(dataset, ConjunctiveGraph)
+ # type error: Argument 1 to "get_context" of "ConjunctiveGraph" has incompatible type "Node"; expected "Union[IdentifiedNode, str, None]"
+ subgraph = dataset.get_context(subj) # type: ignore[arg-type]
else:
subgraph = graph
for onode in obj_nodes:
@@ -297,7 +326,8 @@ class Parser(object):
if nkey in context.get_keys(ID):
continue
subcontext = context.get_context_for_type(obj)
- self._key_to_graph(dataset, graph, subcontext, subj, nkey, nobj)
+ # type error: Argument 3 to "_key_to_graph" of "Parser" has incompatible type "Optional[Context]"; expected "Context"
+ self._key_to_graph(dataset, graph, subcontext, subj, nkey, nobj) # type: ignore[arg-type]
return
pred_uri = term.id if term else context.expand(key)
@@ -322,6 +352,7 @@ class Parser(object):
if term and term.reverse:
reverse = not reverse
+ pred: IdentifiedNode
bid = self._get_bnodeid(pred_uri)
if bid:
if not self.generalized_rdf:
@@ -339,7 +370,9 @@ class Parser(object):
else:
graph.add((subj, pred, obj))
- def _parse_container(self, context, term, obj):
+ def _parse_container(
+ self, context: Context, term: Term, obj: Dict[str, Any]
+ ) -> List[Any]:
if LANG in term.container:
obj_nodes = []
for lang, values in obj.items():
@@ -412,7 +445,7 @@ class Parser(object):
return [obj]
@staticmethod
- def _add_type(context, o, k):
+ def _add_type(context: Context, o: Dict[str, Any], k: str) -> Dict[str, Any]:
otype = context.get_type(o) or []
if otype and not isinstance(otype, list):
otype = [otype]
@@ -420,20 +453,31 @@ class Parser(object):
o[TYPE] = otype
return o
- def _to_object(self, dataset, graph, context, term, node, inlist=False):
+ def _to_object(
+ self,
+ dataset: Graph,
+ graph: Graph,
+ context: Context,
+ term: Optional[Term],
+ node: Any,
+ inlist: bool = False,
+ ) -> Optional[Node]:
if isinstance(node, tuple):
value, lang = node
if value is None:
- return
+ # type error: Return value expected
+ return # type: ignore[return-value]
if lang and " " in lang:
- return
+ # type error: Return value expected
+ return # type: ignore[return-value]
return Literal(value, lang=lang)
if isinstance(node, dict):
node_list = context.get_list(node)
if node_list is not None:
if inlist and not self.allow_lists_of_lists:
- return
+ # type error: Return value expected
+ return # type: ignore[return-value]
listref = self._add_list(dataset, graph, context, term, node_list)
if listref:
return listref
@@ -443,7 +487,8 @@ class Parser(object):
if term.type == JSON:
node = self._to_typed_json_value(node)
elif node is None:
- return
+ # type error: Return value expected
+ return # type: ignore[return-value]
elif term.type == ID and isinstance(node, str):
node = {ID: context.resolve(node)}
elif term.type == VOCAB and isinstance(node, str):
@@ -452,7 +497,8 @@ class Parser(object):
node = {TYPE: term.type, VALUE: node}
else:
if node is None:
- return
+ # type error: Return value expected
+ return # type: ignore[return-value]
if isinstance(node, float):
return Literal(node, datatype=XSD.double)
@@ -465,7 +511,8 @@ class Parser(object):
lang = context.get_language(node)
datatype = not lang and context.get_type(node) or None
value = context.get_value(node)
- if datatype in context.get_keys(JSON):
+ # type error: Unsupported operand types for in ("Optional[Any]" and "Generator[str, None, None]")
+ if datatype in context.get_keys(JSON): # type: ignore[operator]
node = self._to_typed_json_value(value)
datatype = context.get_type(node)
value = context.get_value(node)
@@ -475,7 +522,8 @@ class Parser(object):
return None
if lang:
if " " in lang:
- return
+ # type error: Return value expected
+ return # type: ignore[return-value]
return Literal(value, lang=lang)
elif datatype:
return Literal(value, datatype=context.expand(datatype))
@@ -484,7 +532,7 @@ class Parser(object):
else:
return self._add_to_graph(dataset, graph, context, node)
- def _to_rdf_id(self, context, id_val):
+ def _to_rdf_id(self, context: Context, id_val: str) -> Optional[IdentifiedNode]:
bid = self._get_bnodeid(id_val)
if bid:
return BNode(bid)
@@ -494,13 +542,21 @@ class Parser(object):
return None
return URIRef(uri)
- def _get_bnodeid(self, ref):
+ def _get_bnodeid(self, ref: str) -> Optional[str]:
if not ref.startswith("_:"):
- return
+ # type error: Return value expected
+ return # type: ignore[return-value]
bid = ref.split("_:", 1)[-1]
return bid or None
- def _add_list(self, dataset, graph, context, term, node_list):
+ def _add_list(
+ self,
+ dataset: Graph,
+ graph: Graph,
+ context: Context,
+ term: Optional[Term],
+ node_list: Any,
+ ) -> IdentifiedNode:
if not isinstance(node_list, list):
node_list = [node_list]
@@ -512,7 +568,8 @@ class Parser(object):
continue
if rest:
- graph.add((subj, RDF.rest, rest))
+ # type error: Statement is unreachable
+ graph.add((subj, RDF.rest, rest)) # type: ignore[unreachable]
subj = rest
obj = self._to_object(dataset, graph, context, term, node, inlist=True)
@@ -530,7 +587,7 @@ class Parser(object):
return RDF.nil
@staticmethod
- def _to_typed_json_value(value):
+ def _to_typed_json_value(value: Any) -> Dict[str, str]:
return {
TYPE: URIRef("%sJSON" % str(RDF)),
VALUE: json.dumps(
diff --git a/rdflib/plugins/parsers/notation3.py b/rdflib/plugins/parsers/notation3.py
index d9c0cd7b..513cc953 100755
--- a/rdflib/plugins/parsers/notation3.py
+++ b/rdflib/plugins/parsers/notation3.py
@@ -27,6 +27,8 @@ Modified to work with rdflib by Gunnar Aastrand Grimnes
Copyright 2010, Gunnar A. Grimnes
"""
+from __future__ import annotations
+
import codecs
import os
import re
@@ -35,7 +37,22 @@ import sys
# importing typing for `typing.List` because `List`` is used for something else
import typing
from decimal import Decimal
-from typing import IO, TYPE_CHECKING, Any, Callable, Dict, Optional, TypeVar, Union
+from typing import (
+ IO,
+ TYPE_CHECKING,
+ Any,
+ Callable,
+ Dict,
+ Match,
+ MutableSequence,
+ NoReturn,
+ Optional,
+ Pattern,
+ Set,
+ Tuple,
+ TypeVar,
+ Union,
+)
from uuid import uuid4
from rdflib.compat import long_type
@@ -44,6 +61,7 @@ from rdflib.graph import ConjunctiveGraph, Graph, QuotedGraph
from rdflib.term import (
_XSD_PFX,
BNode,
+ IdentifiedNode,
Identifier,
Literal,
Node,
@@ -72,10 +90,10 @@ from rdflib.parser import Parser
if TYPE_CHECKING:
from rdflib.parser import InputSource
-AnyT = TypeVar("AnyT")
+_AnyT = TypeVar("_AnyT")
-def splitFragP(uriref, punct=0):
+def splitFragP(uriref: str, punc: int = 0) -> Tuple[str, str]:
"""split a URI reference before the fragment
Punctuation is kept.
@@ -97,7 +115,10 @@ def splitFragP(uriref, punct=0):
return uriref, ""
-def join(here, there):
+_StrT = TypeVar("_StrT", bound=str)
+
+
+def join(here: str, there: str) -> str:
"""join an absolute URI and URI reference
(non-ascii characters are supported/doctested;
haven't checked the details of the IRI spec though)
@@ -195,7 +216,7 @@ def join(here, there):
return here[: slashr + 1] + path + frag
-def base():
+def base() -> str:
"""The base URI for this process - the Web equiv of cwd
Relative or absolute unix-standard filenames parsed relative to
@@ -208,7 +229,7 @@ def base():
return "file://" + _fixslash(os.getcwd()) + "/"
-def _fixslash(s):
+def _fixslash(s: str) -> str:
"""Fix windowslike filename to unixlike - (#ifdef WINDOWS)"""
s = s.replace("\\", "/")
if s[0] != "/" and s[1] == ":":
@@ -258,7 +279,7 @@ N3_Empty = (SYMBOL, List_NS + "Empty")
runNamespaceValue = None
-def runNamespace():
+def runNamespace() -> str:
"""Returns a URI suitable as a namespace for run-local objects"""
# @@@ include hostname (privacy?) (hash it?)
global runNamespaceValue
@@ -270,7 +291,7 @@ def runNamespace():
nextu = 0
-def uniqueURI():
+def uniqueURI() -> str:
"""A unique URI"""
global nextu
nextu += 1
@@ -283,12 +304,12 @@ chatty_flag = 50
# from why import BecauseOfData, becauseSubexpression
-def BecauseOfData(*args, **kargs):
+def BecauseOfData(*args: Any, **kargs: Any) -> None:
# print args, kargs
pass
-def becauseSubexpression(*args, **kargs):
+def becauseSubexpression(*args: Any, **kargs: Any) -> None:
# print args, kargs
pass
@@ -329,7 +350,7 @@ numberChars = set("0123456789-")
numberCharsPlus = numberChars | {"+", "."}
-def unicodeExpand(m):
+def unicodeExpand(m: Match) -> str:
try:
return chr(int(m.group(1), 16))
except:
@@ -389,10 +410,10 @@ class SinkParser:
self._genPrefix = genPrefix
self.keywords = ["a", "this", "bind", "has", "is", "of", "true", "false"]
self.keywordsSet = 0 # Then only can others be considered qnames
- self._anonymousNodes: Dict[str, Node] = {}
+ self._anonymousNodes: Dict[str, BNode] = {}
# Dict of anon nodes already declared ln: Term
- self._variables: Dict[Identifier, Identifier] = {}
- self._parentVariables: Dict[Identifier, Identifier] = {}
+ self._variables: Dict[str, Variable] = {}
+ self._parentVariables: Dict[str, Variable] = {}
self._reason = why # Why the parser was asked to parse this
self.turtle = turtle # raise exception when encountering N3 extensions
@@ -400,10 +421,11 @@ class SinkParser:
# only allows double quotes.
self.string_delimiters = ('"', "'") if turtle else ('"',)
- self._reason2 = None # Why these triples
+ self._reason2: Optional[Callable[..., None]] = None # Why these triples
# was: diag.tracking
if tracking:
- self._reason2 = BecauseOfData(
+ # type error: "BecauseOfData" does not return a value
+ self._reason2 = BecauseOfData( # type: ignore[func-returns-value]
store.newSymbol(thisDoc), because=self._reason
)
@@ -424,7 +446,7 @@ class SinkParser:
else:
self._genPrefix = uniqueURI()
- self._formula: Formula
+ self._formula: Optional[Formula]
if openFormula is None and not turtle:
if self._thisDoc:
# TODO FIXME: store.newFormula does not take any arguments
@@ -432,9 +454,9 @@ class SinkParser:
else:
self._formula = store.newFormula()
else:
- self._formula = openFormula # type: ignore[assignment]
+ self._formula = openFormula
- self._context = self._formula
+ self._context: Optional[Formula] = self._formula
self._parentContext: Optional[Formula] = None
def here(self, i: int) -> str:
@@ -450,20 +472,20 @@ class SinkParser:
return "%s_L%iC%i" % (self._genPrefix, self.lines, i - self.startOfLine + 1)
- def formula(self):
+ def formula(self) -> Optional[Formula]:
return self._formula
def loadStream(self, stream: Union[IO[str], IO[bytes]]) -> Optional["Formula"]:
return self.loadBuf(stream.read()) # Not ideal
- def loadBuf(self, buf: Union[str, bytes]):
+ def loadBuf(self, buf: Union[str, bytes]) -> Optional[Formula]:
"""Parses a buffer and returns its top level formula"""
self.startDoc()
self.feed(buf)
return self.endDoc() # self._formula
- def feed(self, octets: Union[str, bytes]):
+ def feed(self, octets: Union[str, bytes]) -> None:
"""Feed an octet stream to the parser
if BadSyntax is raised, the string
@@ -515,7 +537,7 @@ class SinkParser:
# @@I18N
# _namechars = string.lowercase + string.uppercase + string.digits + '_-'
- def tok(self, tok: str, argstr: str, i: int, colon: bool = False):
+ def tok(self, tok: str, argstr: str, i: int, colon: bool = False) -> int:
"""Check for keyword. Space must have been stripped on entry and
we must not be at end of file.
@@ -561,7 +583,7 @@ class SinkParser:
j = self.skipSpace(argstr, i)
if j < 0:
return j # eof
- res: typing.List[Any] = []
+ res: typing.List[str] = []
j = self.tok("bind", argstr, i) # implied "#". Obsolete.
if j > 0:
@@ -591,7 +613,8 @@ class SinkParser:
for x in res:
# self._context.declareUniversal(x)
if x not in self._variables or x in self._parentVariables:
- self._variables[x] = self._context.newUniversal(x)
+ # type error: Item "None" of "Optional[Formula]" has no attribute "newUniversal"
+ self._variables[x] = self._context.newUniversal(x) # type: ignore[union-attr]
return i
j = self.tok("forSome", argstr, i)
@@ -603,19 +626,20 @@ class SinkParser:
if i < 0:
self.BadSyntax(argstr, i, "Bad variable list after @forSome")
for x in res:
- self._context.declareExistential(x)
+ # type error: Item "None" of "Optional[Formula]" has no attribute "declareExistential"
+ self._context.declareExistential(x) # type: ignore[union-attr]
return i
j = self.tok("prefix", argstr, i, colon=True) # no implied "#"
if j >= 0:
- t: typing.List[Any] = []
+ t: typing.List[Union[Identifier, Tuple[str, str]]] = []
i = self.qname(argstr, j, t)
if i < 0:
self.BadSyntax(argstr, j, "expected qname after @prefix")
j = self.uri_ref2(argstr, i, t)
if j < 0:
self.BadSyntax(argstr, i, "expected <uriref> after @prefix _qname_")
- ns = self.uriOf(t[1])
+ ns: str = self.uriOf(t[1])
if self._baseURI:
ns = join(self._baseURI, ns)
@@ -655,7 +679,7 @@ class SinkParser:
return -1 # Not a directive, could be something else.
- def sparqlDirective(self, argstr: str, i: int):
+ def sparqlDirective(self, argstr: str, i: int) -> int:
"""
turtle and trig support BASE/PREFIX without @ and without
@@ -725,7 +749,7 @@ class SinkParser:
else:
self._store.bind(qn, uri)
- def setKeywords(self, k: Optional[typing.List[str]]):
+ def setKeywords(self, k: Optional[typing.List[str]]) -> None:
"""Takes a list of strings"""
if k is None:
self.keywordsSet = 0
@@ -742,7 +766,7 @@ class SinkParser:
self._store.endDoc(self._formula) # don't canonicalize yet
return self._formula
- def makeStatement(self, quadruple):
+ def makeStatement(self, quadruple) -> None:
# $$$$$$$$$$$$$$$$$$$$$
# print "# Parser output: ", `quadruple`
self._store.makeStatement(quadruple, why=self._reason2)
@@ -759,10 +783,10 @@ class SinkParser:
self.BadSyntax(argstr, i, "expected propertylist")
return j
- def subject(self, argstr: str, i: int, res: typing.List[Any]) -> int:
+ def subject(self, argstr: str, i: int, res: MutableSequence[Any]) -> int:
return self.item(argstr, i, res)
- def verb(self, argstr: str, i: int, res: typing.List[Any]) -> int:
+ def verb(self, argstr: str, i: int, res: MutableSequence[Any]) -> int:
"""has _prop_
is _prop_ of
a
@@ -848,16 +872,16 @@ class SinkParser:
return -1
- def prop(self, argstr: str, i: int, res):
+ def prop(self, argstr: str, i: int, res: MutableSequence[Any]) -> int:
return self.item(argstr, i, res)
- def item(self, argstr: str, i, res):
+ def item(self, argstr: str, i, res: MutableSequence[Any]) -> int:
return self.path(argstr, i, res)
- def blankNode(self, uri=None):
+ def blankNode(self, uri: Optional[str] = None) -> BNode:
return self._store.newBlankNode(self._context, uri, why=self._reason2)
- def path(self, argstr: str, i: int, res):
+ def path(self, argstr: str, i: int, res: MutableSequence[Any]) -> int:
"""Parse the path production."""
j = self.nodeOrLiteral(argstr, i, res)
if j < 0:
@@ -878,7 +902,7 @@ class SinkParser:
res.append(obj)
return j
- def anonymousNode(self, ln: str):
+ def anonymousNode(self, ln: str) -> BNode:
"""Remember or generate a term for one of these _: anonymous nodes"""
term = self._anonymousNodes.get(ln, None)
if term is not None:
@@ -887,12 +911,18 @@ class SinkParser:
self._anonymousNodes[ln] = term
return term
- def node(self, argstr: str, i: int, res, subjectAlready=None):
+ def node(
+ self,
+ argstr: str,
+ i: int,
+ res: MutableSequence[Any],
+ subjectAlready: Optional[Node] = None,
+ ) -> int:
"""Parse the <node> production.
Space is now skipped once at the beginning
instead of in multiple calls to self.skipSpace().
"""
- subj = subjectAlready
+ subj: Optional[Node] = subjectAlready
j = self.skipSpace(argstr, i)
if j < 0:
@@ -912,7 +942,7 @@ class SinkParser:
argstr, j, "Found '[=' or '[ =' when in turtle mode."
)
i = j + 1
- objs: typing.List[Any] = []
+ objs: typing.List[Node] = []
j = self.objectList(argstr, i, objs)
if j >= 0:
subj = objs[0]
@@ -993,8 +1023,10 @@ class SinkParser:
reason2 = self._reason2
self._reason2 = becauseSubexpression
if subj is None:
- subj = self._store.newFormula()
- self._context = subj
+ # type error: Incompatible types in assignment (expression has type "Formula", variable has type "Optional[Node]")
+ subj = self._store.newFormula() # type: ignore[assignment]
+ # type error: Incompatible types in assignment (expression has type "Optional[Node]", variable has type "Optional[Formula]")
+ self._context = subj # type: ignore[assignment]
while 1:
i = self.skipSpace(argstr, j)
@@ -1015,10 +1047,16 @@ class SinkParser:
self._context = self._parentContext
self._reason2 = reason2
self._parentContext = oldParentContext
- res.append(subj.close()) # No use until closed
+ # type error: Item "Node" of "Optional[Node]" has no attribute "close"
+ res.append(
+ subj.close() # type: ignore[union-attr]
+ ) # No use until closed
return j
if ch == "(":
+ thing_type: Callable[
+ [typing.List[Any], Optional[Formula]], Union[Set[Any], IdentifiedNode]
+ ]
thing_type = self._store.newList
ch2 = argstr[i + 1]
if ch2 == "$":
@@ -1069,7 +1107,7 @@ class SinkParser:
return -1
- def property_list(self, argstr: str, i: int, subj):
+ def property_list(self, argstr: str, i: int, subj: Node) -> int:
"""Parse property list
Leaves the terminating punctuation in the buffer
"""
@@ -1118,7 +1156,13 @@ class SinkParser:
return i
i += 1 # skip semicolon and continue
- def commaSeparatedList(self, argstr: str, j, res, what):
+ def commaSeparatedList(
+ self,
+ argstr: str,
+ j: int,
+ res: MutableSequence[Any],
+ what: Callable[[str, int, MutableSequence[Any]], int],
+ ) -> int:
"""return value: -1 bad syntax; >1 new position in argstr
res has things found appended
"""
@@ -1144,7 +1188,7 @@ class SinkParser:
if i < 0:
self.BadSyntax(argstr, i, "bad list content")
- def objectList(self, argstr: str, i: int, res: typing.List[Any]) -> int:
+ def objectList(self, argstr: str, i: int, res: MutableSequence[Any]) -> int:
i = self.object(argstr, i, res)
if i < 0:
return -1
@@ -1158,7 +1202,7 @@ class SinkParser:
if i < 0:
return i
- def checkDot(self, argstr: str, i: int):
+ def checkDot(self, argstr: str, i: int) -> int:
j = self.skipSpace(argstr, i)
if j < 0:
return j # eof
@@ -1171,7 +1215,7 @@ class SinkParser:
return j
self.BadSyntax(argstr, j, "expected '.' or '}' or ']' at end of statement")
- def uri_ref2(self, argstr: str, i: int, res):
+ def uri_ref2(self, argstr: str, i: int, res: MutableSequence[Any]) -> int:
"""Generate uri from n3 representation.
Note that the RDF convention of directly concatenating
@@ -1247,7 +1291,7 @@ class SinkParser:
else:
return -1
- def skipSpace(self, argstr: str, i: int):
+ def skipSpace(self, argstr: str, i: int) -> int:
"""Skip white space, newlines and comments.
return -1 if EOF, else position of first non-ws character"""
@@ -1276,7 +1320,7 @@ class SinkParser:
m = eof.match(argstr, i)
return i if m is None else -1
- def variable(self, argstr: str, i: int, res):
+ def variable(self, argstr: str, i: int, res) -> int:
"""?abc -> variable(:abc)"""
j = self.skipSpace(argstr, i)
@@ -1295,7 +1339,8 @@ class SinkParser:
if self._parentContext is None:
varURI = self._store.newSymbol(self._baseURI + "#" + argstr[j:i]) # type: ignore[operator]
if varURI not in self._variables:
- self._variables[varURI] = self._context.newUniversal(
+ # type error: Item "None" of "Optional[Formula]" has no attribute "newUniversal"
+ self._variables[varURI] = self._context.newUniversal( # type: ignore[union-attr]
varURI, why=self._reason2
)
res.append(self._variables[varURI])
@@ -1312,7 +1357,7 @@ class SinkParser:
res.append(self._parentVariables[varURI])
return i
- def bareWord(self, argstr: str, i: int, res):
+ def bareWord(self, argstr: str, i: int, res: MutableSequence[Any]) -> int:
"""abc -> :abc"""
j = self.skipSpace(argstr, i)
if j < 0:
@@ -1327,7 +1372,12 @@ class SinkParser:
res.append(argstr[j:i])
return i
- def qname(self, argstr: str, i: int, res):
+ def qname(
+ self,
+ argstr: str,
+ i: int,
+ res: MutableSequence[Union[Identifier, Tuple[str, str]]],
+ ) -> int:
"""
xyz:def -> ('xyz', 'def')
If not in keywords and keywordsSet: def -> ('', 'def')
@@ -1430,7 +1480,12 @@ class SinkParser:
return i
return -1
- def object(self, argstr: str, i: int, res):
+ def object(
+ self,
+ argstr: str,
+ i: int,
+ res: MutableSequence[Any],
+ ) -> int:
j = self.subject(argstr, i, res)
if j >= 0:
return j
@@ -1458,7 +1513,7 @@ class SinkParser:
else:
return -1
- def nodeOrLiteral(self, argstr: str, i: int, res):
+ def nodeOrLiteral(self, argstr: str, i: int, res: MutableSequence[Any]) -> int:
j = self.node(argstr, i, res)
startline = self.lines # Remember where for error messages
if j >= 0:
@@ -1526,13 +1581,13 @@ class SinkParser:
else:
return -1
- def uriOf(self, sym):
+ def uriOf(self, sym: Union[Identifier, Tuple[str, str]]) -> str:
if isinstance(sym, tuple):
return sym[1] # old system for --pipe
# return sym.uriref() # cwm api
return sym
- def strconst(self, argstr: str, i: int, delim):
+ def strconst(self, argstr: str, i: int, delim: str) -> Tuple[int, str]:
"""parse an N3 string constant delimited by delim.
return index, val
"""
@@ -1643,7 +1698,15 @@ class SinkParser:
self.BadSyntax(argstr, i, "unterminated string literal")
- def _unicodeEscape(self, argstr: str, i, startline, reg, n, prefix):
+ def _unicodeEscape(
+ self,
+ argstr: str,
+ i: int,
+ startline: int,
+ reg: Pattern[str],
+ n: int,
+ prefix: str,
+ ) -> Tuple[int, str]:
if len(argstr) < i + n:
raise BadSyntax(
self._thisDoc, startline, argstr, i, "unterminated string literal(3)"
@@ -1659,13 +1722,13 @@ class SinkParser:
"bad string literal hex escape: " + argstr[i : i + n],
)
- def uEscape(self, argstr: str, i, startline):
+ def uEscape(self, argstr: str, i: int, startline: int) -> Tuple[int, str]:
return self._unicodeEscape(argstr, i, startline, unicodeEscape4, 4, "u")
- def UEscape(self, argstr: str, i, startline):
+ def UEscape(self, argstr: str, i: int, startline: int) -> Tuple[int, str]:
return self._unicodeEscape(argstr, i, startline, unicodeEscape8, 8, "U")
- def BadSyntax(self, argstr: str, i, msg):
+ def BadSyntax(self, argstr: str, i: int, msg: str) -> NoReturn:
raise BadSyntax(self._thisDoc, self.lines, argstr, i, msg)
@@ -1674,14 +1737,14 @@ class SinkParser:
class BadSyntax(SyntaxError):
- def __init__(self, uri, lines, argstr, i, why):
+ def __init__(self, uri: str, lines: int, argstr: str, i: int, why: str):
self._str = argstr.encode("utf-8") # Better go back to strings for errors
self._i = i
self._why = why
self.lines = lines
self._uri = uri
- def __str__(self):
+ def __str__(self) -> str:
argstr = self._str
i = self._i
st = 0
@@ -1695,8 +1758,9 @@ class BadSyntax(SyntaxError):
else:
post = ""
+ # type error: On Python 3 formatting "b'abc'" with "%s" produces "b'abc'", not "abc"; use "%r" if this is desired behavior
return 'at line %i of <%s>:\nBad syntax (%s) at ^ in:\n"%s%s^%s%s"' % (
- self.lines + 1,
+ self.lines + 1, # type: ignore[str-bytes-safe]
self._uri,
self._why,
pre,
@@ -1706,7 +1770,7 @@ class BadSyntax(SyntaxError):
)
@property
- def message(self):
+ def message(self) -> str:
return str(self)
@@ -1714,23 +1778,25 @@ class BadSyntax(SyntaxError):
class Formula(object):
number = 0
- def __init__(self, parent):
+ def __init__(self, parent: Graph):
self.uuid = uuid4().hex
self.counter = 0
Formula.number += 1
self.number = Formula.number
- self.existentials = {}
- self.universals = {}
+ self.existentials: Dict[str, BNode] = {}
+ self.universals: Dict[str, BNode] = {}
self.quotedgraph = QuotedGraph(store=parent.store, identifier=self.id())
- def __str__(self):
+ def __str__(self) -> str:
return "_:Formula%s" % self.number
- def id(self):
+ def id(self) -> BNode:
return BNode("_:Formula%s" % self.number)
- def newBlankNode(self, uri=None, why=None):
+ def newBlankNode(
+ self, uri: Optional[str] = None, why: Optional[Any] = None
+ ) -> BNode:
if uri is None:
self.counter += 1
bn = BNode("f%sb%s" % (self.uuid, self.counter))
@@ -1738,13 +1804,13 @@ class Formula(object):
bn = BNode(uri.split("#").pop().replace("_", "b"))
return bn
- def newUniversal(self, uri, why=None):
+ def newUniversal(self, uri: str, why: Optional[Any] = None) -> Variable:
return Variable(uri.split("#").pop())
- def declareExistential(self, x):
+ def declareExistential(self, x: str) -> None:
self.existentials[x] = self.newBlankNode()
- def close(self):
+ def close(self) -> QuotedGraph:
return self.quotedgraph
@@ -1771,7 +1837,7 @@ class RDFSink(object):
def newGraph(self, identifier: Identifier) -> Graph:
return Graph(self.graph.store, identifier)
- def newSymbol(self, *args: str):
+ def newSymbol(self, *args: str) -> URIRef:
return URIRef(args[0])
def newBlankNode(
@@ -1795,7 +1861,7 @@ class RDFSink(object):
else:
return Literal(s, lang=lang)
- def newList(self, n: typing.List[Any], f: Optional[Formula]):
+ def newList(self, n: typing.List[Any], f: Optional[Formula]) -> IdentifiedNode:
nil = self.newSymbol("http://www.w3.org/1999/02/22-rdf-syntax-ns#nil")
if not n:
return nil
@@ -1813,21 +1879,26 @@ class RDFSink(object):
self.makeStatement((f, rest, a, nil))
return af
- def newSet(self, *args):
+ def newSet(self, *args: _AnyT) -> Set[_AnyT]:
return set(args)
- def setDefaultNamespace(self, *args) -> str:
+ def setDefaultNamespace(self, *args: bytes) -> str:
return ":".join(repr(n) for n in args)
- def makeStatement(self, quadruple, why=None) -> None:
+ def makeStatement(
+ self,
+ quadruple: Tuple[Optional[Union[Formula, Graph]], Node, Node, Node],
+ why: Optional[Any] = None,
+ ) -> None:
f, p, s, o = quadruple
if hasattr(p, "formula"):
raise ParserError("Formula used as predicate")
- s = self.normalise(f, s)
- p = self.normalise(f, p)
- o = self.normalise(f, o)
+ # type error: Argument 1 to "normalise" of "RDFSink" has incompatible type "Union[Formula, Graph, None]"; expected "Optional[Formula]"
+ s = self.normalise(f, s) # type: ignore[arg-type]
+ p = self.normalise(f, p) # type: ignore[arg-type]
+ o = self.normalise(f, o) # type: ignore[arg-type]
if f == self.rootFormula:
# print s, p, o, '.'
@@ -1835,11 +1906,16 @@ class RDFSink(object):
elif isinstance(f, Formula):
f.quotedgraph.add((s, p, o))
else:
- f.add((s, p, o))
+ # type error: Item "None" of "Optional[Graph]" has no attribute "add"
+ f.add((s, p, o)) # type: ignore[union-attr]
# return str(quadruple)
- def normalise(self, f: Optional[Formula], n):
+ def normalise(
+ self,
+ f: Optional[Formula],
+ n: Union[Tuple[int, str], bool, int, Decimal, float, _AnyT],
+ ) -> Union[URIRef, Literal, BNode, _AnyT]:
if isinstance(n, tuple):
return URIRef(str(n[1]))
@@ -1864,6 +1940,8 @@ class RDFSink(object):
if isinstance(f, Formula):
if n in f.existentials:
+ if TYPE_CHECKING:
+ assert isinstance(n, URIRef)
return f.existentials[n]
# if isinstance(n, Var):
@@ -1871,16 +1949,16 @@ class RDFSink(object):
# return f.universals[n]
# f.universals[n] = f.newBlankNode()
# return f.universals[n]
+ # type error: Incompatible return value type (got "Union[int, _AnyT]", expected "Union[URIRef, Literal, BNode, _AnyT]") [return-value]
+ return n # type: ignore[return-value]
- return n
-
- def intern(self, something: AnyT) -> AnyT:
+ def intern(self, something: _AnyT) -> _AnyT:
return something
- def bind(self, pfx, uri):
+ def bind(self, pfx, uri) -> None:
pass # print pfx, ':', uri
- def startDoc(self, formula: Optional[Formula]):
+ def startDoc(self, formula: Optional[Formula]) -> None:
self.rootFormula = formula
def endDoc(self, formula: Optional[Formula]) -> None:
@@ -1893,7 +1971,7 @@ class RDFSink(object):
#
-def hexify(ustr):
+def hexify(ustr: str) -> bytes:
"""Use URL encoding to return an ASCII string
corresponding to the given UTF8 string
@@ -1929,7 +2007,7 @@ class TurtleParser(Parser):
graph: Graph,
encoding: Optional[str] = "utf-8",
turtle: bool = True,
- ):
+ ) -> None:
if encoding not in [None, "utf-8"]:
raise ParserError(
"N3/Turtle files are always utf-8 encoded, I was passed: %s" % encoding
@@ -1961,7 +2039,10 @@ class N3Parser(TurtleParser):
def __init__(self):
pass
- def parse(self, source, graph, encoding="utf-8"):
+ # type error: Signature of "parse" incompatible with supertype "TurtleParser"
+ def parse( # type: ignore[override]
+ self, source: InputSource, graph: Graph, encoding: Optional[str] = "utf-8"
+ ) -> None:
# we're currently being handed a Graph, not a ConjunctiveGraph
# context-aware is this implied by formula_aware
ca = getattr(graph.store, "context_aware", False)
diff --git a/rdflib/plugins/parsers/nquads.py b/rdflib/plugins/parsers/nquads.py
index a13b8798..eb24aa64 100644
--- a/rdflib/plugins/parsers/nquads.py
+++ b/rdflib/plugins/parsers/nquads.py
@@ -22,24 +22,33 @@ graphs that can be used and queried. The store that backs the graph
>>> FOAF = Namespace("http://xmlns.com/foaf/0.1/")
>>> assert(g.value(s, FOAF.name).eq("Arco Publications"))
"""
+from __future__ import annotations
from codecs import getreader
+from typing import Any, MutableMapping, Optional
-from rdflib import ConjunctiveGraph
+from rdflib.exceptions import ParserError as ParseError
+from rdflib.graph import ConjunctiveGraph
+from rdflib.parser import InputSource
# Build up from the NTriples parser:
-from rdflib.plugins.parsers.ntriples import (
- ParseError,
- W3CNTriplesParser,
- r_tail,
- r_wspace,
-)
+from rdflib.plugins.parsers.ntriples import W3CNTriplesParser, r_tail, r_wspace
+from rdflib.term import BNode
__all__ = ["NQuadsParser"]
+_BNodeContextType = MutableMapping[str, BNode]
+
class NQuadsParser(W3CNTriplesParser):
- def parse(self, inputsource, sink, bnode_context=None, **kwargs):
+ # type error: Signature of "parse" incompatible with supertype "W3CNTriplesParser"
+ def parse( # type: ignore[override]
+ self,
+ inputsource: InputSource,
+ sink: ConjunctiveGraph,
+ bnode_context: Optional[_BNodeContextType] = None,
+ **kwargs: Any,
+ ) -> ConjunctiveGraph:
"""
Parse inputsource as an N-Quads file.
@@ -54,7 +63,10 @@ class NQuadsParser(W3CNTriplesParser):
assert sink.store.context_aware, (
"NQuadsParser must be given" " a context aware store."
)
- self.sink = ConjunctiveGraph(store=sink.store, identifier=sink.identifier)
+ # type error: Incompatible types in assignment (expression has type "ConjunctiveGraph", base class "W3CNTriplesParser" defined the type as "Union[DummySink, NTGraphSink]")
+ self.sink: ConjunctiveGraph = ConjunctiveGraph( # type: ignore[assignment]
+ store=sink.store, identifier=sink.identifier
+ )
source = inputsource.getCharacterStream()
if not source:
@@ -77,7 +89,7 @@ class NQuadsParser(W3CNTriplesParser):
return self.sink
- def parseline(self, bnode_context=None):
+ def parseline(self, bnode_context: Optional[_BNodeContextType] = None) -> None:
self.eat(r_wspace)
if (not self.line) or self.line.startswith(("#")):
return # The line is empty or a comment
diff --git a/rdflib/plugins/parsers/ntriples.py b/rdflib/plugins/parsers/ntriples.py
index d7d1da93..564a2cf1 100644
--- a/rdflib/plugins/parsers/ntriples.py
+++ b/rdflib/plugins/parsers/ntriples.py
@@ -1,4 +1,5 @@
#!/usr/bin/env python3
+from __future__ import annotations
__doc__ = """\
N-Triples Parser
@@ -9,16 +10,29 @@ Author: Sean B. Palmer, inamidst.com
import codecs
import re
from io import BytesIO, StringIO, TextIOBase
-from typing import IO, TYPE_CHECKING, Optional, Pattern, TextIO, Union
+from typing import (
+ IO,
+ TYPE_CHECKING,
+ Any,
+ Match,
+ MutableMapping,
+ Optional,
+ Pattern,
+ TextIO,
+ Union,
+)
from rdflib.compat import _string_escape_map, decodeUnicodeEscape
from rdflib.exceptions import ParserError as ParseError
from rdflib.parser import InputSource, Parser
from rdflib.term import BNode as bNode
from rdflib.term import Literal
+from rdflib.term import URIRef
from rdflib.term import URIRef as URI
if TYPE_CHECKING:
+ import typing_extensions as te
+
from rdflib.graph import Graph, _ObjectType, _PredicateType, _SubjectType
__all__ = [
@@ -102,13 +116,16 @@ def unquote(s: str) -> str:
r_hibyte = re.compile(r"([\x80-\xFF])")
-def uriquote(uri):
+def uriquote(uri: str) -> str:
if not validate:
return uri
else:
return r_hibyte.sub(lambda m: "%%%02X" % ord(m.group(1)), uri)
+_BNodeContextType = MutableMapping[str, bNode]
+
+
class W3CNTriplesParser(object):
"""An N-Triples Parser.
This is a legacy-style Triples parser for NTriples provided by W3C
@@ -126,7 +143,9 @@ class W3CNTriplesParser(object):
__slots__ = ("_bnode_ids", "sink", "buffer", "file", "line")
def __init__(
- self, sink: Optional[Union[DummySink, "NTGraphSink"]] = None, bnode_context=None
+ self,
+ sink: Optional[Union[DummySink, "NTGraphSink"]] = None,
+ bnode_context: Optional[_BNodeContextType] = None,
):
if bnode_context is not None:
self._bnode_ids = bnode_context
@@ -144,8 +163,10 @@ class W3CNTriplesParser(object):
self.line: Optional[str] = ""
def parse(
- self, f: Union[TextIO, IO[bytes], codecs.StreamReader], bnode_context=None
- ):
+ self,
+ f: Union[TextIO, IO[bytes], codecs.StreamReader],
+ bnode_context: Optional[_BNodeContextType] = None,
+ ) -> Union[DummySink, "NTGraphSink"]:
"""
Parse f as an N-Triples file.
@@ -177,7 +198,7 @@ class W3CNTriplesParser(object):
raise ParseError("Invalid line: {}".format(self.line))
return self.sink
- def parsestring(self, s: Union[bytes, bytearray, str], **kwargs):
+ def parsestring(self, s: Union[bytes, bytearray, str], **kwargs) -> None:
"""Parse s as an N-Triples string."""
if not isinstance(s, (str, bytes, bytearray)):
raise ParseError("Item to parse must be a string instance.")
@@ -188,12 +209,13 @@ class W3CNTriplesParser(object):
f = StringIO(s)
self.parse(f, **kwargs)
- def readline(self):
+ def readline(self) -> Optional[str]:
"""Read an N-Triples line from buffered input."""
# N-Triples lines end in either CRLF, CR, or LF
# Therefore, we can't just use f.readline()
if not self.buffer:
- buffer = self.file.read(bufsiz)
+ # type error: Item "None" of "Union[TextIO, StreamReader, None]" has no attribute "read"
+ buffer = self.file.read(bufsiz) # type: ignore[union-attr]
if not buffer:
return None
self.buffer = buffer
@@ -204,7 +226,8 @@ class W3CNTriplesParser(object):
self.buffer = self.buffer[m.end() :]
return m.group(1)
else:
- buffer = self.file.read(bufsiz)
+ # type error: Item "None" of "Union[TextIO, StreamReader, None]" has no attribute "read"
+ buffer = self.file.read(bufsiz) # type: ignore[union-attr]
if not buffer and not self.buffer.isspace():
# Last line does not need to be terminated with a newline
buffer += "\n"
@@ -212,7 +235,7 @@ class W3CNTriplesParser(object):
return None
self.buffer += buffer
- def parseline(self, bnode_context=None):
+ def parseline(self, bnode_context: Optional[_BNodeContextType] = None) -> None:
self.eat(r_wspace)
if (not self.line) or self.line.startswith("#"):
return # The line is empty or a comment
@@ -230,10 +253,10 @@ class W3CNTriplesParser(object):
raise ParseError("Trailing garbage: {}".format(self.line))
self.sink.triple(subject, predicate, object_)
- def peek(self, token: str):
+ def peek(self, token: str) -> bool:
return self.line.startswith(token) # type: ignore[union-attr]
- def eat(self, pattern: Pattern[str]):
+ def eat(self, pattern: Pattern[str]) -> Match[str]:
m = pattern.match(self.line) # type: ignore[arg-type]
if not m: # @@ Why can't we get the original pattern?
# print(dir(pattern))
@@ -242,26 +265,28 @@ class W3CNTriplesParser(object):
self.line = self.line[m.end() :] # type: ignore[index]
return m
- def subject(self, bnode_context=None):
+ def subject(self, bnode_context=None) -> Union[bNode, URIRef]:
# @@ Consider using dictionary cases
subj = self.uriref() or self.nodeid(bnode_context)
if not subj:
raise ParseError("Subject must be uriref or nodeID")
return subj
- def predicate(self):
+ def predicate(self) -> URIRef:
pred = self.uriref()
if not pred:
raise ParseError("Predicate must be uriref")
return pred
- def object(self, bnode_context=None):
+ def object(
+ self, bnode_context: Optional[_BNodeContextType] = None
+ ) -> Union[URI, bNode, Literal]:
objt = self.uriref() or self.nodeid(bnode_context) or self.literal()
if objt is False:
raise ParseError("Unrecognised object type")
return objt
- def uriref(self):
+ def uriref(self) -> Union["te.Literal[False]", URI]:
if self.peek("<"):
uri = self.eat(r_uriref).group(1)
uri = unquote(uri)
@@ -269,7 +294,9 @@ class W3CNTriplesParser(object):
return URI(uri)
return False
- def nodeid(self, bnode_context=None):
+ def nodeid(
+ self, bnode_context: Optional[_BNodeContextType] = None
+ ) -> Union["te.Literal[False]", bNode]:
if self.peek("_"):
# Fix for https://github.com/RDFLib/rdflib/issues/204
if bnode_context is None:
@@ -287,7 +314,7 @@ class W3CNTriplesParser(object):
return bnode
return False
- def literal(self):
+ def literal(self) -> Union["te.Literal[False]", Literal]:
if self.peek('"'):
lit, lang, dtype = self.eat(r_literal).groups()
if lang:
@@ -313,7 +340,7 @@ class NTGraphSink(object):
def __init__(self, graph: "Graph"):
self.g = graph
- def triple(self, s: "_SubjectType", p: "_PredicateType", o: "_ObjectType"):
+ def triple(self, s: "_SubjectType", p: "_PredicateType", o: "_ObjectType") -> None:
self.g.add((s, p, o))
@@ -325,7 +352,7 @@ class NTParser(Parser):
__slots__ = ()
@classmethod
- def parse(cls, source: InputSource, sink: "Graph", **kwargs):
+ def parse(cls, source: InputSource, sink: "Graph", **kwargs: Any) -> None:
"""
Parse the NT format
diff --git a/rdflib/plugins/parsers/rdfxml.py b/rdflib/plugins/parsers/rdfxml.py
index 1da4ff4f..1f8e7e68 100644
--- a/rdflib/plugins/parsers/rdfxml.py
+++ b/rdflib/plugins/parsers/rdfxml.py
@@ -1,17 +1,26 @@
"""
An RDF/XML parser for RDFLib
"""
+from __future__ import annotations
+from typing import TYPE_CHECKING, Any, Dict, List, NoReturn, Optional, Tuple
from urllib.parse import urldefrag, urljoin
from xml.sax import handler, make_parser, xmlreader
from xml.sax.handler import ErrorHandler
from xml.sax.saxutils import escape, quoteattr
from rdflib.exceptions import Error, ParserError
+from rdflib.graph import Graph
from rdflib.namespace import RDF, is_ncname
-from rdflib.parser import Parser
+from rdflib.parser import InputSource, Parser
from rdflib.plugins.parsers.RDFVOC import RDFVOC
-from rdflib.term import BNode, Literal, URIRef
+from rdflib.term import BNode, Identifier, Literal, URIRef
+
+if TYPE_CHECKING:
+ # from xml.sax.expatreader import ExpatLocator
+ from xml.sax.xmlreader import AttributesImpl, Locator
+
+ from rdflib.graph import _ObjectType, _SubjectType, _TripleType
__all__ = ["create_parser", "BagID", "ElementHandler", "RDFXMLHandler", "RDFXMLParser"]
@@ -125,42 +134,46 @@ class ElementHandler(object):
class RDFXMLHandler(handler.ContentHandler):
- def __init__(self, store):
+ def __init__(self, store: Graph):
self.store = store
self.preserve_bnode_ids = False
self.reset()
- def reset(self):
+ def reset(self) -> None:
document_element = ElementHandler()
document_element.start = self.document_element_start
document_element.end = lambda name, qname: None
- self.stack = [
+ self.stack: List[Optional[ElementHandler]] = [
None,
document_element,
]
- self.ids = {} # remember IDs we have already seen
- self.bnode = {}
- self._ns_contexts = [{}] # contains uri -> prefix dicts
- self._current_context = self._ns_contexts[-1]
+ self.ids: Dict[str, int] = {} # remember IDs we have already seen
+ self.bnode: Dict[str, Identifier] = {}
+ self._ns_contexts: List[Dict[str, Optional[str]]] = [
+ {}
+ ] # contains uri -> prefix dicts
+ self._current_context: Dict[str, Optional[str]] = self._ns_contexts[-1]
# ContentHandler methods
- def setDocumentLocator(self, locator):
+ def setDocumentLocator(self, locator: Locator):
self.locator = locator
- def startDocument(self):
+ def startDocument(self) -> None:
pass
- def startPrefixMapping(self, prefix, namespace):
+ def startPrefixMapping(self, prefix: Optional[str], namespace: str) -> None:
self._ns_contexts.append(self._current_context.copy())
self._current_context[namespace] = prefix
self.store.bind(prefix, namespace or "", override=False)
- def endPrefixMapping(self, prefix):
+ def endPrefixMapping(self, prefix: Optional[str]) -> None:
self._current_context = self._ns_contexts[-1]
del self._ns_contexts[-1]
- def startElementNS(self, name, qname, attrs):
+ def startElementNS(
+ self, name: Tuple[Optional[str], str], qname, attrs: AttributesImpl
+ ) -> None:
stack = self.stack
stack.append(ElementHandler())
current = self.current
@@ -189,29 +202,29 @@ class RDFXMLHandler(handler.ContentHandler):
current.language = language
current.start(name, qname, attrs)
- def endElementNS(self, name, qname):
+ def endElementNS(self, name: Tuple[Optional[str], str], qname) -> None:
self.current.end(name, qname)
self.stack.pop()
- def characters(self, content):
+ def characters(self, content: str) -> None:
char = self.current.char
if char:
char(content)
- def ignorableWhitespace(self, content):
+ def ignorableWhitespace(self, content) -> None:
pass
- def processingInstruction(self, target, data):
+ def processingInstruction(self, target, data) -> None:
pass
- def add_reified(self, sid, spo):
+ def add_reified(self, sid: Identifier, spo: _TripleType):
s, p, o = spo
self.store.add((sid, RDF.type, RDF.Statement))
self.store.add((sid, RDF.subject, s))
self.store.add((sid, RDF.predicate, p))
self.store.add((sid, RDF.object, o))
- def error(self, message):
+ def error(self, message: str) -> NoReturn:
locator = self.locator
info = "%s:%s:%s: " % (
locator.getSystemId(),
@@ -220,38 +233,44 @@ class RDFXMLHandler(handler.ContentHandler):
)
raise ParserError(info + message)
- def get_current(self):
+ def get_current(self) -> Optional[ElementHandler]:
return self.stack[-2]
# Create a read only property called current so that self.current
# give the current element handler.
current = property(get_current)
- def get_next(self):
+ def get_next(self) -> Optional[ElementHandler]:
return self.stack[-1]
# Create a read only property that gives the element handler to be
# used for the next element.
next = property(get_next)
- def get_parent(self):
+ def get_parent(self) -> Optional[ElementHandler]:
return self.stack[-3]
# Create a read only property that gives the current parent
# element handler
parent = property(get_parent)
- def absolutize(self, uri):
- result = urljoin(self.current.base, uri, allow_fragments=1)
+ def absolutize(self, uri: str) -> URIRef:
+ # type error: Argument "allow_fragments" to "urljoin" has incompatible type "int"; expected "bool"
+ result = urljoin(self.current.base, uri, allow_fragments=1) # type: ignore[arg-type]
if uri and uri[-1] == "#" and result[-1] != "#":
result = "%s#" % result
return URIRef(result)
- def convert(self, name, qname, attrs):
+ def convert(
+ self, name: Tuple[Optional[str], str], qname, attrs: AttributesImpl
+ ) -> Tuple[URIRef, Dict[URIRef, str]]:
if name[0] is None:
- name = URIRef(name[1])
+ # type error: Incompatible types in assignment (expression has type "URIRef", variable has type "Tuple[Optional[str], str]")
+ name = URIRef(name[1]) # type: ignore[assignment]
else:
- name = URIRef("".join(name))
+ # type error: Incompatible types in assignment (expression has type "URIRef", variable has type "Tuple[Optional[str], str]")
+ # type error: Argument 1 to "join" of "str" has incompatible type "Tuple[Optional[str], str]"; expected "Iterable[str]"
+ name = URIRef("".join(name)) # type: ignore[assignment, arg-type]
atts = {}
for (n, v) in attrs.items():
if n[0] is None:
@@ -262,12 +281,16 @@ class RDFXMLHandler(handler.ContentHandler):
pass
elif att in UNQUALIFIED:
# if not RDFNS[att] in atts:
- atts[RDFNS[att]] = v # type: ignore[misc]
+ # type error: Variable "att" is not valid as a type
+ atts[RDFNS[att]] = v # type: ignore[misc, valid-type]
else:
atts[URIRef(att)] = v
- return name, atts
+ # type error: Incompatible return value type (got "Tuple[Tuple[Optional[str], str], Dict[Any, Any]]", expected "Tuple[URIRef, Dict[URIRef, str]]")
+ return name, atts # type: ignore[return-value]
- def document_element_start(self, name, qname, attrs):
+ def document_element_start(
+ self, name: Tuple[str, str], qname, attrs: AttributesImpl
+ ) -> None:
if name[0] and URIRef("".join(name)) == RDFVOC.RDF:
# Cheap hack so 2to3 doesn't turn it into __next__
next = getattr(self, "next")
@@ -279,8 +302,11 @@ class RDFXMLHandler(handler.ContentHandler):
# TODO... set end to something that sets start such that
# another element will cause error
- def node_element_start(self, name, qname, attrs):
- name, atts = self.convert(name, qname, attrs)
+ def node_element_start(
+ self, name: Tuple[str, str], qname, attrs: AttributesImpl
+ ) -> None:
+ # type error: Incompatible types in assignment (expression has type "URIRef", variable has type "Tuple[str, str]")
+ name, atts = self.convert(name, qname, attrs) # type: ignore[assignment]
current = self.current
absolutize = self.absolutize
@@ -290,8 +316,9 @@ class RDFXMLHandler(handler.ContentHandler):
next.end = self.property_element_end
if name in NODE_ELEMENT_EXCEPTIONS:
- self.error("Invalid node element URI: %s" % name)
-
+ # type error: Not all arguments converted during string formatting
+ self.error("Invalid node element URI: %s" % name) # type: ignore[str-format]
+ subject: _SubjectType
if RDFVOC.ID in atts:
if RDFVOC.about in atts or RDFVOC.nodeID in atts:
self.error("Can have at most one of rdf:ID, rdf:about, and rdf:nodeID")
@@ -325,8 +352,10 @@ class RDFXMLHandler(handler.ContentHandler):
subject = BNode()
if name != RDFVOC.Description: # S1
- self.store.add((subject, RDF.type, absolutize(name)))
+ # error: Argument 1 has incompatible type "Tuple[str, str]"; expected "str"
+ self.store.add((subject, RDF.type, absolutize(name))) # type: ignore[arg-type]
+ object: _ObjectType
language = current.language
for att in atts:
if not att.startswith(str(RDFNS)):
@@ -334,7 +363,8 @@ class RDFXMLHandler(handler.ContentHandler):
try:
object = Literal(atts[att], language)
except Error as e:
- self.error(e.msg)
+ # type error: Argument 1 to "error" of "RDFXMLHandler" has incompatible type "Optional[str]"; expected "str"
+ self.error(e.msg) # type: ignore[arg-type]
elif att == RDF.type: # S2
predicate = RDF.type
object = absolutize(atts[RDF.type])
@@ -342,18 +372,20 @@ class RDFXMLHandler(handler.ContentHandler):
continue
elif att in PROPERTY_ATTRIBUTE_EXCEPTIONS: # S3
self.error("Invalid property attribute URI: %s" % att)
- continue # for when error does not throw an exception
+ # type error: Statement is unreachable
+ continue # type: ignore[unreachable] # for when error does not throw an exception
else:
predicate = absolutize(att)
try:
object = Literal(atts[att], language)
except Error as e:
- self.error(e.msg)
+ # type error: Argument 1 to "error" of "RDFXMLHandler" has incompatible type "Optional[str]"; expected "str"
+ self.error(e.msg) # type: ignore[arg-type]
self.store.add((subject, predicate, object))
current.subject = subject
- def node_element_end(self, name, qname):
+ def node_element_end(self, name: Tuple[str, str], qname) -> None:
# repeat node-elements are only allowed
# at at top-level
@@ -365,25 +397,32 @@ class RDFXMLHandler(handler.ContentHandler):
self.parent.object = self.current.subject
- def property_element_start(self, name, qname, attrs):
- name, atts = self.convert(name, qname, attrs)
+ def property_element_start(
+ self, name: Tuple[str, str], qname, attrs: AttributesImpl
+ ) -> None:
+ # type error: Incompatible types in assignment (expression has type "URIRef", variable has type "Tuple[str, str]")
+ name, atts = self.convert(name, qname, attrs) # type: ignore[assignment]
current = self.current
absolutize = self.absolutize
# Cheap hack so 2to3 doesn't turn it into __next__
next = getattr(self, "next")
- object = None
+ object: Optional[_ObjectType] = None
current.data = None
current.list = None
- if not name.startswith(str(RDFNS)):
- current.predicate = absolutize(name)
+ # type error: "Tuple[str, str]" has no attribute "startswith"
+ if not name.startswith(str(RDFNS)): # type: ignore[attr-defined]
+ # type error: Argument 1 has incompatible type "Tuple[str, str]"; expected "str"
+ current.predicate = absolutize(name) # type: ignore[arg-type]
elif name == RDFVOC.li:
current.predicate = current.next_li()
elif name in PROPERTY_ELEMENT_EXCEPTIONS:
- self.error("Invalid property element URI: %s" % name)
+ # type error: Not all arguments converted during string formatting
+ self.error("Invalid property element URI: %s" % name) # type: ignore[str-format]
else:
- current.predicate = absolutize(name)
+ # type error: Argument 1 has incompatible type "Tuple[str, str]"; expected "str"
+ current.predicate = absolutize(name) # type: ignore[arg-type]
id = atts.get(RDFVOC.ID, None)
if id is not None:
@@ -465,12 +504,13 @@ class RDFXMLHandler(handler.ContentHandler):
self.error("""Invalid property attribute URI: %s""" % att)
else:
predicate = absolutize(att)
-
+ o: _ObjectType
if att == RDF.type:
o = URIRef(atts[att])
else:
if datatype is not None:
- language = None
+ # type error: Statement is unreachable
+ language = None # type: ignore[unreachable]
o = Literal(atts[att], language, datatype)
if object is None:
@@ -483,12 +523,12 @@ class RDFXMLHandler(handler.ContentHandler):
current.data = None
current.object = object
- def property_element_char(self, data):
+ def property_element_char(self, data: str) -> None:
current = self.current
if current.data is not None:
current.data += data
- def property_element_end(self, name, qname):
+ def property_element_end(self, name: Tuple[str, str], qname) -> None:
current = self.current
if current.data is not None and current.object is None:
literalLang = current.language
@@ -507,7 +547,7 @@ class RDFXMLHandler(handler.ContentHandler):
)
current.subject = None
- def list_node_element_end(self, name, qname):
+ def list_node_element_end(self, name: Tuple[str, str], qname) -> None:
current = self.current
if self.parent.list == RDF.nil:
list = BNode()
@@ -525,7 +565,9 @@ class RDFXMLHandler(handler.ContentHandler):
self.store.add((list, RDF.first, current.subject))
self.parent.list = list
- def literal_element_start(self, name, qname, attrs):
+ def literal_element_start(
+ self, name: Tuple[str, str], qname, attrs: AttributesImpl
+ ) -> None:
current = self.current
self.next.start = self.literal_element_start
self.next.char = self.literal_element_char
@@ -552,14 +594,15 @@ class RDFXMLHandler(handler.ContentHandler):
current.declared[name[0]] = self._current_context[name[0]]
name = current.declared[name[0]] + ":" + name[1]
else:
- name = name[1]
+ # type error: Incompatible types in assignment (expression has type "str", variable has type "Tuple[str, str]")
+ name = name[1] # type: ignore[assignment]
current.object += " %s=%s" % (name, quoteattr(value))
current.object += ">"
- def literal_element_char(self, data):
+ def literal_element_char(self, data: str) -> None:
self.current.object += escape(data)
- def literal_element_end(self, name, qname):
+ def literal_element_end(self, name: Tuple[str, str], qname) -> None:
if name[0]:
prefix = self._current_context[name[0]]
if prefix:
@@ -571,7 +614,7 @@ class RDFXMLHandler(handler.ContentHandler):
self.parent.object += self.current.object + end
-def create_parser(target, store) -> xmlreader.XMLReader:
+def create_parser(target: InputSource, store: Graph) -> xmlreader.XMLReader:
parser = make_parser()
try:
# Workaround for bug in expatreader.py. Needed when
@@ -581,7 +624,8 @@ def create_parser(target, store) -> xmlreader.XMLReader:
pass # Not present in Jython (at least)
parser.setFeature(handler.feature_namespaces, 1)
rdfxml = RDFXMLHandler(store)
- rdfxml.setDocumentLocator(target)
+ # type error: Argument 1 to "setDocumentLocator" of "RDFXMLHandler" has incompatible type "InputSource"; expected "Locator"
+ rdfxml.setDocumentLocator(target) # type: ignore[arg-type]
# rdfxml.setDocumentLocator(_Locator(self.url, self.parser))
parser.setContentHandler(rdfxml)
parser.setErrorHandler(ErrorHandler())
@@ -592,7 +636,7 @@ class RDFXMLParser(Parser):
def __init__(self):
pass
- def parse(self, source, sink, **args):
+ def parse(self, source: InputSource, sink: Graph, **args: Any) -> None:
self._parser = create_parser(source, sink)
content_handler = self._parser.getContentHandler()
preserve_bnode_ids = args.get("preserve_bnode_ids", None)
diff --git a/rdflib/plugins/parsers/trig.py b/rdflib/plugins/parsers/trig.py
index 215586a0..9e8c8d70 100644
--- a/rdflib/plugins/parsers/trig.py
+++ b/rdflib/plugins/parsers/trig.py
@@ -1,5 +1,9 @@
-from rdflib import ConjunctiveGraph
-from rdflib.parser import Parser
+from __future__ import annotations
+
+from typing import Any, MutableSequence
+
+from rdflib.graph import ConjunctiveGraph, Graph
+from rdflib.parser import InputSource, Parser
from .notation3 import RDFSink, SinkParser
@@ -9,7 +13,7 @@ def becauseSubGraph(*args, **kwargs):
class TrigSinkParser(SinkParser):
- def directiveOrStatement(self, argstr, h):
+ def directiveOrStatement(self, argstr: str, h: int) -> int: # noqa: N802
# import pdb; pdb.set_trace()
@@ -35,7 +39,9 @@ class TrigSinkParser(SinkParser):
return j
- def labelOrSubject(self, argstr, i, res):
+ def labelOrSubject( # noqa: N802
+ self, argstr: str, i: int, res: MutableSequence[Any]
+ ) -> int:
j = self.skipSpace(argstr, i)
if j < 0:
return j # eof
@@ -54,7 +60,7 @@ class TrigSinkParser(SinkParser):
return j + 1
return -1
- def graph(self, argstr, i):
+ def graph(self, argstr: str, i: int) -> int:
"""
Parse trig graph, i.e.
@@ -69,7 +75,7 @@ class TrigSinkParser(SinkParser):
if j >= 0:
i = j
- r = []
+ r: MutableSequence[Any] = []
j = self.labelOrSubject(argstr, i, r)
if j >= 0:
graph = r[0]
@@ -98,7 +104,8 @@ class TrigSinkParser(SinkParser):
self._parentContext = self._context
reason2 = self._reason2
self._reason2 = becauseSubGraph
- self._context = self._store.newGraph(graph)
+ # type error: Incompatible types in assignment (expression has type "Graph", variable has type "Optional[Formula]")
+ self._context = self._store.newGraph(graph) # type: ignore[assignment]
while 1:
i = self.skipSpace(argstr, j)
@@ -129,11 +136,13 @@ class TrigParser(Parser):
def __init__(self):
pass
- def parse(self, source, graph, encoding="utf-8"):
+ def parse(self, source: InputSource, graph: Graph, encoding: str = "utf-8") -> None:
if encoding not in [None, "utf-8"]:
raise Exception(
- ("TriG files are always utf-8 encoded, ", "I was passed: %s") % encoding
+ # type error: Unsupported left operand type for % ("Tuple[str, str]")
+ ("TriG files are always utf-8 encoded, ", "I was passed: %s") # type: ignore[operator]
+ % encoding
)
# we're currently being handed a Graph, not a ConjunctiveGraph
diff --git a/rdflib/plugins/parsers/trix.py b/rdflib/plugins/parsers/trix.py
index 76ff5745..d1205ac8 100644
--- a/rdflib/plugins/parsers/trix.py
+++ b/rdflib/plugins/parsers/trix.py
@@ -1,14 +1,22 @@
"""
A TriX parser for RDFLib
"""
+from __future__ import annotations
+
+from typing import TYPE_CHECKING, Any, Dict, List, NoReturn, Optional, Tuple
from xml.sax import handler, make_parser
from xml.sax.handler import ErrorHandler
from rdflib.exceptions import ParserError
from rdflib.graph import Graph
from rdflib.namespace import Namespace
-from rdflib.parser import Parser
-from rdflib.term import BNode, Literal, URIRef
+from rdflib.parser import InputSource, Parser
+from rdflib.store import Store
+from rdflib.term import BNode, Identifier, Literal, URIRef
+
+if TYPE_CHECKING:
+ # from xml.sax.expatreader import ExpatLocator
+ from xml.sax.xmlreader import AttributesImpl, Locator, XMLReader
__all__ = ["create_parser", "TriXHandler", "TriXParser"]
@@ -20,34 +28,36 @@ XMLNS = Namespace("http://www.w3.org/XML/1998/namespace")
class TriXHandler(handler.ContentHandler):
"""An Sax Handler for TriX. See http://sw.nokia.com/trix/"""
- def __init__(self, store):
+ def __init__(self, store: Store):
self.store = store
self.preserve_bnode_ids = False
self.reset()
- def reset(self):
- self.bnode = {}
- self.graph = None
- self.triple = None
+ def reset(self) -> None:
+ self.bnode: Dict[str, BNode] = {}
+ self.graph: Optional[Graph] = None
+ self.triple: Optional[List[Identifier]] = None
self.state = 0
self.lang = None
self.datatype = None
# ContentHandler methods
- def setDocumentLocator(self, locator):
+ def setDocumentLocator(self, locator: Locator):
self.locator = locator
- def startDocument(self):
+ def startDocument(self) -> None:
pass
- def startPrefixMapping(self, prefix, namespace):
+ def startPrefixMapping(self, prefix: Optional[str], namespace: str) -> None:
pass
- def endPrefixMapping(self, prefix):
+ def endPrefixMapping(self, prefix: Optional[str]) -> None:
pass
- def startElementNS(self, name, qname, attrs):
+ def startElementNS(
+ self, name: Tuple[Optional[str], str], qname, attrs: AttributesImpl
+ ) -> None:
if name[0] != str(TRIXNS):
self.error(
@@ -136,7 +146,9 @@ class TriXHandler(handler.ContentHandler):
self.chars = ""
- def endElementNS(self, name, qname):
+ def endElementNS(self, name: Tuple[Optional[str], str], qname) -> None:
+ if TYPE_CHECKING:
+ assert self.triple is not None
if name[0] != str(TRIXNS):
self.error(
"Only elements in the TriX namespace are allowed. %s!=%s"
@@ -189,8 +201,9 @@ class TriXHandler(handler.ContentHandler):
"Triple has wrong length, got %d elements: %s"
% (len(self.triple), self.triple)
)
-
- self.graph.add(self.triple)
+ # type error: Item "None" of "Optional[Graph]" has no attribute "add"
+ # type error: Argument 1 to "add" of "Graph" has incompatible type "List[Identifier]"; expected "Tuple[Node, Node, Node]"
+ self.graph.add(self.triple) # type: ignore[union-attr, arg-type]
# self.store.store.add(self.triple,context=self.graph)
# self.store.addN([self.triple+[self.graph]])
self.state = 2
@@ -210,7 +223,7 @@ class TriXHandler(handler.ContentHandler):
else:
self.error("Unexpected close element")
- def get_bnode(self, label):
+ def get_bnode(self, label: str) -> BNode:
if self.preserve_bnode_ids:
bn = BNode(label)
else:
@@ -221,16 +234,16 @@ class TriXHandler(handler.ContentHandler):
self.bnode[label] = bn
return bn
- def characters(self, content):
+ def characters(self, content: str) -> None:
self.chars += content
- def ignorableWhitespace(self, content):
+ def ignorableWhitespace(self, content) -> None:
pass
- def processingInstruction(self, target, data):
+ def processingInstruction(self, target, data) -> None:
pass
- def error(self, message):
+ def error(self, message: str) -> NoReturn:
locator = self.locator
info = "%s:%s:%s: " % (
locator.getSystemId(),
@@ -240,12 +253,14 @@ class TriXHandler(handler.ContentHandler):
raise ParserError(info + message)
-def create_parser(store):
+def create_parser(store: Store) -> XMLReader:
parser = make_parser()
try:
+
# Workaround for bug in expatreader.py. Needed when
# expatreader is trying to guess a prefix.
- parser.start_namespace_decl("xml", "http://www.w3.org/XML/1998/namespace")
+ # type error: "XMLReader" has no attribute "start_namespace_decl"
+ parser.start_namespace_decl("xml", "http://www.w3.org/XML/1998/namespace") # type: ignore[attr-defined]
except AttributeError:
pass # Not present in Jython (at least)
parser.setFeature(handler.feature_namespaces, 1)
@@ -261,7 +276,7 @@ class TriXParser(Parser):
def __init__(self):
pass
- def parse(self, source, sink, **args):
+ def parse(self, source: InputSource, sink: Graph, **args: Any) -> None:
assert (
sink.store.context_aware
), "TriXParser must be given a context aware store."
diff --git a/rdflib/plugins/shared/jsonld/context.py b/rdflib/plugins/shared/jsonld/context.py
index 40a1db7a..7b2f87f1 100644
--- a/rdflib/plugins/shared/jsonld/context.py
+++ b/rdflib/plugins/shared/jsonld/context.py
@@ -6,9 +6,22 @@ Implementation of the JSON-LD Context structure. See:
"""
# https://github.com/RDFLib/rdflib-jsonld/blob/feature/json-ld-1.1/rdflib_jsonld/context.py
+from __future__ import annotations
from collections import namedtuple
-from typing import TYPE_CHECKING, Any, Dict, List, Optional, Set, Tuple, Union
+from typing import (
+ TYPE_CHECKING,
+ Any,
+ Collection,
+ Dict,
+ Generator,
+ List,
+ Optional,
+ Set,
+ Tuple,
+ Union,
+)
+from urllib.parse import urljoin, urlsplit
from rdflib.namespace import RDF
@@ -41,7 +54,7 @@ from .keys import (
VERSION,
VOCAB,
)
-from .util import norm_url, source_to_json, split_iri, urljoin, urlsplit
+from .util import norm_url, source_to_json, split_iri
NODE_KEYS = {GRAPH, ID, INCLUDED, JSON, LIST, NEST, NONE, REV, SET, TYPE, VALUE, LANG}
@@ -63,9 +76,9 @@ class Context(object):
base: Optional[str] = None,
version: Optional[float] = None,
):
- self.version = version or 1.0
+ self.version: float = version or 1.0
self.language = None
- self.vocab = None
+ self.vocab: Optional[str] = None
self._base: Optional[str]
self.base = base
self.doc_base = base
@@ -75,7 +88,7 @@ class Context(object):
self._lookup: Dict[Tuple[str, Any, Union[Defined, str], bool], Any] = {}
self._prefixes: Dict[str, Any] = {}
self.active = False
- self.parent = None
+ self.parent: Optional[Context] = None
self.propagate = True
self._context_cache: Dict[str, Any] = {}
if source:
@@ -98,12 +111,13 @@ class Context(object):
)
self._basedomain = "%s://%s" % urlsplit(base)[0:2] if base else None
- def subcontext(self, source, propagate=True):
+ def subcontext(self, source: Any, propagate: bool = True) -> "Context":
# IMPROVE: to optimize, implement SubContext with parent fallback support
parent = self.parent if self.propagate is False else self
- return parent._subcontext(source, propagate)
+ # type error: Item "None" of "Optional[Context]" has no attribute "_subcontext"
+ return parent._subcontext(source, propagate) # type: ignore[union-attr]
- def _subcontext(self, source, propagate):
+ def _subcontext(self, source: Any, propagate: bool) -> "Context":
ctx = Context(version=self.version)
ctx.propagate = propagate
ctx.parent = self
@@ -119,7 +133,7 @@ class Context(object):
ctx.load(source)
return ctx
- def _clear(self):
+ def _clear(self) -> None:
self.language = None
self.vocab = None
self.terms = {}
@@ -129,12 +143,12 @@ class Context(object):
self.active = False
self.propagate = True
- def get_context_for_term(self, term):
+ def get_context_for_term(self, term: Optional["Term"]) -> "Context":
if term and term.context is not UNDEF:
return self._subcontext(term.context, propagate=True)
return self
- def get_context_for_type(self, node):
+ def get_context_for_type(self, node: Any) -> Optional["Context"]:
if self.version >= 1.1:
rtype = self.get_type(node) if isinstance(node, dict) else None
if not isinstance(rtype, list):
@@ -154,41 +168,42 @@ class Context(object):
return self.parent if self.propagate is False else self
- def get_id(self, obj):
+ def get_id(self, obj: Dict[str, Any]) -> Any:
return self._get(obj, ID)
- def get_type(self, obj):
+ def get_type(self, obj: Dict[str, Any]) -> Any:
return self._get(obj, TYPE)
- def get_language(self, obj):
+ def get_language(self, obj: Dict[str, Any]) -> Any:
return self._get(obj, LANG)
- def get_value(self, obj):
+ def get_value(self, obj: Dict[str, Any]) -> Any:
return self._get(obj, VALUE)
- def get_graph(self, obj):
+ def get_graph(self, obj: Dict[str, Any]) -> Any:
return self._get(obj, GRAPH)
- def get_list(self, obj):
+ def get_list(self, obj: Dict[str, Any]) -> Any:
return self._get(obj, LIST)
- def get_set(self, obj):
+ def get_set(self, obj: Dict[str, Any]) -> Any:
return self._get(obj, SET)
- def get_rev(self, obj):
+ def get_rev(self, obj: Dict[str, Any]) -> Any:
return self._get(obj, REV)
- def _get(self, obj, key):
+ def _get(self, obj: Dict[str, Any], key: str) -> Any:
for alias in self._alias.get(key, []):
if alias in obj:
return obj.get(alias)
return obj.get(key)
- def get_key(self, key: str):
+ # type error: Missing return statement
+ def get_key(self, key: str) -> str: # type: ignore[return]
for alias in self.get_keys(key):
return alias
- def get_keys(self, key: str):
+ def get_keys(self, key: str) -> Generator[str, None, None]:
if key in self._alias:
for alias in self._alias[key]:
yield alias
@@ -207,13 +222,13 @@ class Context(object):
name: str,
idref: str,
coercion: Union[Defined, str] = UNDEF,
- container=UNDEF,
- index=None,
- language=UNDEF,
- reverse=False,
- context=UNDEF,
- prefix=None,
- protected=False,
+ container: Union[Collection[Any], str, Defined] = UNDEF,
+ index: Optional[Union[str, Defined]] = None,
+ language: Optional[Union[str, Defined]] = UNDEF,
+ reverse: bool = False,
+ context: Any = UNDEF,
+ prefix: Optional[bool] = None,
+ protected: bool = False,
):
if self.version < 1.1 or prefix is None:
prefix = isinstance(idref, str) and idref.endswith(URI_GEN_DELIMS)
@@ -261,7 +276,7 @@ class Context(object):
def find_term(
self,
idref: str,
- coercion=None,
+ coercion: Optional[Union[str, Defined]] = None,
container: Union[Defined, str] = UNDEF,
language: Optional[str] = None,
reverse: bool = False,
@@ -296,21 +311,26 @@ class Context(object):
return lu.get((idref, UNDEF, UNDEF, reverse))
- def resolve(self, curie_or_iri):
+ def resolve(self, curie_or_iri: str) -> str:
iri = self.expand(curie_or_iri, False)
- if self.isblank(iri):
- return iri
- if " " in iri:
+ # type error: Argument 1 to "isblank" of "Context" has incompatible type "Optional[str]"; expected "str"
+ if self.isblank(iri): # type: ignore[arg-type]
+ # type error: Incompatible return value type (got "Optional[str]", expected "str")
+ return iri # type: ignore[return-value]
+ # type error: Unsupported right operand type for in ("Optional[str]")
+ if " " in iri: # type: ignore[operator]
return ""
- return self.resolve_iri(iri)
+ # type error: Argument 1 to "resolve_iri" of "Context" has incompatible type "Optional[str]"; expected "str"
+ return self.resolve_iri(iri) # type: ignore[arg-type]
- def resolve_iri(self, iri):
- return norm_url(self._base, iri)
+ def resolve_iri(self, iri: str) -> str:
+ # type error: Argument 1 to "norm_url" has incompatible type "Optional[str]"; expected "str"
+ return norm_url(self._base, iri) # type: ignore[arg-type]
- def isblank(self, ref):
+ def isblank(self, ref: str) -> bool:
return ref.startswith("_:")
- def expand(self, term_curie_or_iri, use_vocab=True):
+ def expand(self, term_curie_or_iri: Any, use_vocab: bool = True) -> Optional[str]:
if not isinstance(term_curie_or_iri, str):
return term_curie_or_iri
@@ -337,19 +357,22 @@ class Context(object):
return self.resolve_iri(term_curie_or_iri)
- def shrink_iri(self, iri):
+ def shrink_iri(self, iri: str) -> str:
ns, name = split_iri(str(iri))
pfx = self._prefixes.get(ns)
if pfx:
- return ":".join((pfx, name))
+ # type error: Argument 1 to "join" of "str" has incompatible type "Tuple[Any, Optional[str]]"; expected "Iterable[str]"
+ return ":".join((pfx, name)) # type: ignore[arg-type]
elif self._base:
if str(iri) == self._base:
return ""
- elif iri.startswith(self._basedomain):
- return iri[len(self._basedomain) :]
+ # type error: Argument 1 to "startswith" of "str" has incompatible type "Optional[str]"; expected "Union[str, Tuple[str, ...]]"
+ elif iri.startswith(self._basedomain): # type: ignore[arg-type]
+ # type error: Argument 1 to "len" has incompatible type "Optional[str]"; expected "Sized"
+ return iri[len(self._basedomain) :] # type: ignore[arg-type]
return iri
- def to_symbol(self, iri):
+ def to_symbol(self, iri: str) -> Optional[str]:
iri = str(iri)
term = self.find_term(iri)
if term:
@@ -359,7 +382,8 @@ class Context(object):
return name
pfx = self._prefixes.get(ns)
if pfx:
- return ":".join((pfx, name))
+ # type error: Argument 1 to "join" of "str" has incompatible type "Tuple[Any, Optional[str]]"; expected "Iterable[str]"
+ return ":".join((pfx, name)) # type: ignore[arg-type]
return iri
def load(
@@ -391,9 +415,9 @@ class Context(object):
self,
base: Optional[str],
inputs: List[Any],
- sources,
- referenced_contexts,
- in_source_url=None,
+ sources: List[Any],
+ referenced_contexts: Set[str],
+ in_source_url: Optional[str] = None,
):
for source in inputs:
@@ -428,24 +452,37 @@ class Context(object):
else:
sources.append((source_url, source))
- def _fetch_context(self, source, base, referenced_contexts):
- source_url = urljoin(base, source)
+ def _fetch_context(
+ self, source: str, base: Optional[str], referenced_contexts: Set[str]
+ ):
+ # type error: Value of type variable "AnyStr" of "urljoin" cannot be "Optional[str]"
+ source_url = urljoin(base, source) # type: ignore[type-var]
if source_url in referenced_contexts:
raise RECURSIVE_CONTEXT_INCLUSION
- referenced_contexts.add(source_url)
+
+ # type error: Argument 1 to "add" of "set" has incompatible type "Optional[str]"; expected "str"
+ referenced_contexts.add(source_url) # type: ignore[arg-type]
if source_url in self._context_cache:
return self._context_cache[source_url]
- source = source_to_json(source_url)
+ # type error: Incompatible types in assignment (expression has type "Optional[Any]", variable has type "str")
+ source = source_to_json(source_url) # type: ignore[assignment]
if source and CONTEXT not in source:
raise INVALID_REMOTE_CONTEXT
- self._context_cache[source_url] = source
+
+ # type error: Invalid index type "Optional[str]" for "Dict[str, Any]"; expected type "str"
+ self._context_cache[source_url] = source # type: ignore[index]
return source
- def _read_source(self, source, source_url=None, referenced_contexts=None):
+ def _read_source(
+ self,
+ source: Dict[str, Any],
+ source_url: Optional[str] = None,
+ referenced_contexts: Optional[Set[str]] = None,
+ ):
imports = source.get(IMPORT)
if imports:
if not isinstance(imports, str):
@@ -478,7 +515,13 @@ class Context(object):
else:
self._read_term(source, key, value, protected)
- def _read_term(self, source, name, dfn, protected=False):
+ def _read_term(
+ self,
+ source: Dict[str, Any],
+ name: str,
+ dfn: Union[Dict[str, Any], str],
+ protected: bool = False,
+ ) -> None:
idref = None
if isinstance(dfn, dict):
# term = self._create_term(source, key, value)
@@ -519,17 +562,21 @@ class Context(object):
if not self._accept_term(dfn):
return
idref = self._rec_expand(source, dfn)
-
- self.add_term(name, idref, protected=protected)
+ # type error: Argument 2 to "add_term" of "Context" has incompatible type "Optional[str]"; expected "str"
+ self.add_term(name, idref, protected=protected) # type: ignore[arg-type]
if idref in NODE_KEYS:
self._alias.setdefault(idref, []).append(name)
- def _rec_expand(self, source, expr, prev=None):
+ def _rec_expand(
+ self, source: Dict[str, Any], expr: Optional[str], prev: Optional[str] = None
+ ) -> Optional[str]:
if expr == prev or expr in NODE_KEYS:
return expr
- is_term, pfx, nxt = self._prep_expand(expr)
+ nxt: Optional[str]
+ # type error: Argument 1 to "_prep_expand" of "Context" has incompatible type "Optional[str]"; expected "str"
+ is_term, pfx, nxt = self._prep_expand(expr) # type: ignore[arg-type]
if pfx:
iri = self._get_source_id(source, pfx)
if iri is None:
@@ -551,7 +598,7 @@ class Context(object):
return self._rec_expand(source, nxt, expr)
- def _prep_expand(self, expr):
+ def _prep_expand(self, expr: str) -> Tuple[bool, Optional[str], str]:
if ":" not in expr:
return True, None, expr
pfx, local = expr.split(":", 1)
@@ -560,7 +607,7 @@ class Context(object):
else:
return False, None, expr
- def _get_source_id(self, source, key):
+ def _get_source_id(self, source: Dict[str, Any], key: str) -> Optional[str]:
# .. from source dict or if already defined
term = source.get(key)
if term is None:
@@ -576,4 +623,5 @@ Term = namedtuple(
"Term",
"id, name, type, container, index, language, reverse, context," "prefix, protected",
)
+
Term.__new__.__defaults__ = (UNDEF, UNDEF, UNDEF, UNDEF, False, UNDEF, False, False)
diff --git a/rdflib/plugins/shared/jsonld/util.py b/rdflib/plugins/shared/jsonld/util.py
index c60bca24..486f8b07 100644
--- a/rdflib/plugins/shared/jsonld/util.py
+++ b/rdflib/plugins/shared/jsonld/util.py
@@ -1,8 +1,11 @@
# -*- coding: utf-8 -*-
# https://github.com/RDFLib/rdflib-jsonld/blob/feature/json-ld-1.1/rdflib_jsonld/util.py
-import typing as t
+from __future__ import annotations
-if t.TYPE_CHECKING:
+import pathlib
+from typing import IO, TYPE_CHECKING, Any, Optional, TextIO, Tuple, Union
+
+if TYPE_CHECKING:
import json
else:
try:
@@ -18,13 +21,19 @@ from urllib.parse import urljoin, urlsplit, urlunsplit
from rdflib.parser import (
BytesIOWrapper,
+ InputSource,
PythonInputSource,
StringInputSource,
+ URLInputSource,
create_input_source,
)
-def source_to_json(source):
+def source_to_json(
+ source: Optional[
+ Union[IO[bytes], TextIO, InputSource, str, bytes, pathlib.PurePath]
+ ]
+) -> Optional[Any]:
if isinstance(source, PythonInputSource):
return source.data
@@ -50,7 +59,7 @@ def source_to_json(source):
VOCAB_DELIMS = ("#", "/", ":")
-def split_iri(iri):
+def split_iri(iri: str) -> Tuple[str, Optional[str]]:
for delim in VOCAB_DELIMS:
at = iri.rfind(delim)
if at > -1:
@@ -58,7 +67,7 @@ def split_iri(iri):
return iri, None
-def norm_url(base, url):
+def norm_url(base: str, url: str) -> str:
"""
>>> norm_url('http://example.org/', '/one')
'http://example.org/one'
@@ -87,7 +96,8 @@ def norm_url(base, url):
return result
-def context_from_urlinputsource(source):
+# type error: Missing return statement
+def context_from_urlinputsource(source: URLInputSource) -> Optional[str]: # type: ignore[return]
"""
Please note that JSON-LD documents served with the application/ld+json media type
MUST have all context information, including references to external contexts,
@@ -100,9 +110,20 @@ def context_from_urlinputsource(source):
# source.links is the new way of getting Link headers from URLInputSource
links = source.links
except AttributeError:
- return
+ # type error: Return value expected
+ return # type: ignore[return-value]
for link in links:
if ' rel="http://www.w3.org/ns/json-ld#context"' in link:
i, j = link.index("<"), link.index(">")
if i > -1 and j > -1:
- return urljoin(source.url, link[i + 1 : j])
+ # type error: Value of type variable "AnyStr" of "urljoin" cannot be "Optional[str]"
+ return urljoin(source.url, link[i + 1 : j]) # type: ignore[type-var]
+
+
+__all__ = [
+ "json",
+ "source_to_json",
+ "split_iri",
+ "norm_url",
+ "context_from_urlinputsource",
+]