summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorIwan Aucamp <aucampia@gmail.com>2023-03-16 20:56:48 +0100
committerGitHub <noreply@github.com>2023-03-16 20:56:48 +0100
commit1c256765ac7d5e7327695a44269be09e51bd88b1 (patch)
tree903b6210b1d4601f3f08c6edd04458062b886ec0
parent60d98dbbf781371efd74de7d525443a11daa622e (diff)
downloadrdflib-1c256765ac7d5e7327695a44269be09e51bd88b1.tar.gz
docs: document avaiable security measures (#2270)
docs: document available security measures Several security measures can be used to mitigate risk when processing potentially malicious input. This change adds documentation about available security measures and examples and tests that illustrate their usage.
-rw-r--r--docs/apidocs/examples.rst16
-rw-r--r--docs/index.rst13
-rw-r--r--docs/security_considerations.rst113
-rw-r--r--examples/secure_with_audit.py120
-rw-r--r--examples/secure_with_urlopen.py82
-rw-r--r--rdflib/graph.py97
-rw-r--r--rdflib/plugins/sparql/evaluate.py16
-rw-r--r--rdflib/plugins/sparql/processor.py28
-rw-r--r--rdflib/plugins/sparql/update.py13
-rw-r--r--test/conftest.py22
-rw-r--r--test/test_misc/test_security.py172
-rw-r--r--test/utils/audit.py28
-rw-r--r--test/utils/urlopen.py14
13 files changed, 719 insertions, 15 deletions
diff --git a/docs/apidocs/examples.rst b/docs/apidocs/examples.rst
index 4e3908b5..43b92c13 100644
--- a/docs/apidocs/examples.rst
+++ b/docs/apidocs/examples.rst
@@ -115,3 +115,19 @@ These examples all live in ``./examples`` in the source-distribution of RDFLib.
:undoc-members:
:show-inheritance:
+:mod:`~examples.secure_with_audit` Module
+-----------------------------------------
+
+.. automodule:: examples.secure_with_audit
+ :members:
+ :undoc-members:
+ :show-inheritance:
+
+
+:mod:`~examples.secure_with_urlopen` Module
+-------------------------------------------
+
+.. automodule:: examples.secure_with_urlopen
+ :members:
+ :undoc-members:
+ :show-inheritance:
diff --git a/docs/index.rst b/docs/index.rst
index 6a265dd2..e36962ea 100644
--- a/docs/index.rst
+++ b/docs/index.rst
@@ -26,6 +26,18 @@ RDFLib is a pure Python package for working with `RDF <http://www.w3.org/RDF/>`_
* both Queries and Updates are supported
+.. caution::
+
+ RDFLib is designed to access arbitrary network and file resources, in some
+ cases these are directly requested resources, in other cases they are
+ indirectly referenced resources.
+
+ If you are using RDFLib to process untrusted documents or queries you should
+ take measures to restrict file and network access.
+
+ For information on available security measures, see the RDFLib
+ :doc:`Security Considerations </security_considerations>`
+ documentation.
Getting started
---------------
@@ -56,6 +68,7 @@ If you are familiar with RDF and are looking for details on how RDFLib handles i
merging
upgrade5to6
upgrade4to5
+ security_considerations
Reference
diff --git a/docs/security_considerations.rst b/docs/security_considerations.rst
new file mode 100644
index 00000000..7e25695b
--- /dev/null
+++ b/docs/security_considerations.rst
@@ -0,0 +1,113 @@
+.. _security_considerations: Security Considerations
+
+=======================
+Security Considerations
+=======================
+
+RDFLib is designed to access arbitrary network and file resources, in some cases
+these are directly requested resources, in other cases they are indirectly
+referenced resources.
+
+An example of where indirect resources are access is JSON-LD processing, where
+network or file resources referenced by ``@context`` values will be loaded and
+processed.
+
+RDFLib also supports SPARQL, which has federated query capabilities that allow
+queries to query arbitrary remote endpoints.
+
+If you are using RDFLib to process untrusted documents or queries you should
+take measures to restrict file and network access.
+
+Some measures that can be taken to restrict file and network access are:
+
+* `Operating System Security Measures`_.
+* `Python Runtime Audit Hooks`_.
+* `Custom URL Openers`_.
+
+Of these, operating system security measures are recommended. The other
+measures work, but they are not as effective as operating system security
+measures, and even if they are used they should be used in conjunction with
+operating system security measures.
+
+Operating System Security Measures
+==================================
+
+Most operating systems provide functionality that can be used to restrict
+network and file access of a process.
+
+Some examples of these include:
+
+* `Open Container Initiative (OCI) Containers
+ <https://www.opencontainers.org/>`_ (aka Docker containers).
+
+ Most OCI runtimes provide mechanisms to restrict network and file access of
+ containers. For example, using Docker, you can limit your container to only
+ being access files explicitly mapped into the container and only access the
+ network through a firewall. For more information refer to the
+ documentation of the tool you use to manage your OCI containers:
+
+ * `Kubernetes <https://kubernetes.io/docs/home/>`_
+ * `Docker <https://docs.docker.com/>`_
+ * `Podman <https://podman.io/>`_
+
+* `firejail <https://firejail.wordpress.com/>`_ can be used to
+ sandbox a process on Linux and restrict its network and file access.
+
+* File and network access restrictions.
+
+ Most operating systems provide a way to restrict operating system users to
+ only being able to access files and network resources that are explicitly
+ allowed. Applications that process untrusted input could be run as a user with
+ these restrictions in place.
+
+Many other measures are available, however, listing them outside the scope
+of this document.
+
+Of the listed measures OCI containers are recommended. In most cases, OCI
+containers are constrained by default and can't access the loopback interface
+and can only access files that are explicitly mapped into the container.
+
+Python Runtime Audit Hooks
+==========================
+
+From Python 3.8 onwards, Python provides a mechanism to install runtime audit
+hooks that can be used to limit access to files and network resources.
+
+The runtime audit hook system is described in more detail in `PEP 578 – Python
+Runtime Audit Hooks <https://peps.python.org/pep-0578/>`_.
+
+Runtime audit hooks can be installed using the `sys.addaudithook
+<https://docs.python.org/3/library/sys.html#sys.addaudithook>`_ function, and
+will then get called when audit events occur. The audit events raised by the
+Python runtime and standard library are described in Python's `audit events
+table <https://docs.python.org/3/library/audit_events.html>`_.
+
+RDFLib uses `urllib.request.urlopen` for HTTP, HTTPS and other network access,
+and this function raises a ``urllib.Request`` audit event. For file access,
+RDFLib uses `open`, which raises an ``open`` audit event.
+
+Users of RDFLib can install audit hooks that react to these audit events and
+raises an exception when an attempt is made to access files or network resources
+that are not explicitly allowed.
+
+RDFLib's test suite includes tests which verify that audit hooks can block
+access to network and file resources.
+
+RDFLib also includes an example that shows how runtime audit hooks can be
+used to restrict network and file access in :mod:`~examples.secure_with_audit`.
+
+Custom URL Openers
+==================
+
+RDFLib uses the `urllib.request.urlopen` for HTTP, HTTPS and other network
+access. This function will use a `urllib.request.OpenerDirector` installed with
+`urllib.request.install_opener` to open the URLs.
+
+Users of RDFLib can install a custom URL opener that raise an exception when an
+attempt is made to access network resources that are not explicitly allowed.
+
+RDFLib's test suite includes tests which verify that custom URL openers can be
+used to block access to network resources.
+
+RDFLib also includes an example that shows how a custom opener can be used to
+restrict network access in :mod:`~examples.secure_with_urlopen`.
diff --git a/examples/secure_with_audit.py b/examples/secure_with_audit.py
new file mode 100644
index 00000000..434be5a4
--- /dev/null
+++ b/examples/secure_with_audit.py
@@ -0,0 +1,120 @@
+"""
+This example demonstrates how to use `Python audit hooks
+<https://docs.python.org/3/library/sys.html#sys.addaudithook>`_ to block access
+to files and URLs.
+
+It installs a audit hook with `sys.addaudithook <https://docs.python.org/3/library/sys.html#sys.addaudithook>`_ that blocks access to files and
+URLs that end with ``blocked.jsonld``.
+
+The code in the example then verifies that the audit hook is blocking access to
+URLs and files as expected.
+"""
+
+import logging
+import os
+import sys
+from typing import Any, Optional, Tuple
+
+from rdflib import Graph
+
+
+def audit_hook(name: str, args: Tuple[Any, ...]) -> None:
+ """
+ An audit hook that blocks access when an attempt is made to open a
+ file or URL that ends with ``blocked.jsonld``.
+
+ Details of the audit events can be seen in the `audit events
+ table <https://docs.python.org/3/library/audit_events.html>`_.
+
+ :param name: The name of the audit event.
+ :param args: The arguments of the audit event.
+ :return: `None` if the audit hook does not block access.
+ :raises PermissionError: If the file or URL being accessed ends with ``blocked.jsonld``.
+ """
+ if name == "urllib.Request" and args[0].endswith("blocked.jsonld"):
+ raise PermissionError("Permission denied for URL")
+ if name == "open" and args[0].endswith("blocked.jsonld"):
+ raise PermissionError("Permission denied for file")
+ return None
+
+
+def main() -> None:
+ """
+ The main code of the example.
+
+ The important steps are:
+
+ * Install a custom audit hook that blocks some URLs and files.
+ * Attempt to parse a JSON-LD document that will result in a blocked URL being accessed.
+ * Verify that the audit hook blocked access to the URL.
+ * Attempt to parse a JSON-LD document that will result in a blocked file being accessed.
+ * Verify that the audit hook blocked access to the file.
+ """
+
+ logging.basicConfig(
+ level=os.environ.get("PYTHON_LOGGING_LEVEL", logging.INFO),
+ stream=sys.stderr,
+ datefmt="%Y-%m-%dT%H:%M:%S",
+ format=(
+ "%(asctime)s.%(msecs)03d %(process)d %(thread)d %(levelno)03d:%(levelname)-8s "
+ "%(name)-12s %(module)s:%(lineno)s:%(funcName)s %(message)s"
+ ),
+ )
+
+ if sys.version_info < (3, 8):
+ logging.warn("This example requires Python 3.8 or higher")
+ return None
+
+ # Install the audit hook
+ #
+ # note on type error: This is needed because we are running mypy with python
+ # 3.7 mode, so mypy thinks the previous condition will always be true.
+ sys.addaudithook(audit_hook) # type: ignore[unreachable]
+
+ graph = Graph()
+
+ # Attempt to parse a JSON-LD document that will result in the blocked URL
+ # being accessed.
+ error: Optional[PermissionError] = None
+ try:
+ graph.parse(
+ data=r"""{
+ "@context": "http://example.org/blocked.jsonld",
+ "@id": "example:subject",
+ "example:predicate": { "@id": "example:object" }
+ }""",
+ format="json-ld",
+ )
+ except PermissionError as caught:
+ logging.info("Permission denied: %s", caught)
+ error = caught
+
+ # `Graph.parse` would have resulted in a `PermissionError` being raised from
+ # the audit hook.
+ assert isinstance(error, PermissionError)
+ assert error.args[0] == "Permission denied for URL"
+
+ # Attempt to parse a JSON-LD document that will result in the blocked file
+ # being accessed.
+ error = None
+ try:
+ graph.parse(
+ data=r"""{
+ "@context": "file:///srv/blocked.jsonld",
+ "@id": "example:subject",
+ "example:predicate": { "@id": "example:object" }
+ }""",
+ format="json-ld",
+ )
+ except PermissionError as caught:
+ logging.info("Permission denied: %s", caught)
+ error = caught
+
+ # `Graph.parse` would have resulted in a `PermissionError` being raised from
+ # the audit hook.
+ assert isinstance(error, PermissionError)
+ assert error.args[0] == "Permission denied for file"
+
+
+if __name__ == "__main__":
+ main()
diff --git a/examples/secure_with_urlopen.py b/examples/secure_with_urlopen.py
new file mode 100644
index 00000000..fd6576b1
--- /dev/null
+++ b/examples/secure_with_urlopen.py
@@ -0,0 +1,82 @@
+"""
+This example demonstrates how to use a custom global URL opener installed with `urllib.request.install_opener` to block access to URLs.
+"""
+import http.client
+import logging
+import os
+import sys
+from typing import Optional
+from urllib.request import HTTPHandler, OpenerDirector, Request, install_opener
+
+from rdflib import Graph
+
+
+class SecuredHTTPHandler(HTTPHandler):
+ """
+ A HTTP handler that blocks access to URLs that end with "blocked.jsonld".
+ """
+
+ def http_open(self, req: Request) -> http.client.HTTPResponse:
+ """
+ Block access to URLs that end with "blocked.jsonld".
+
+ :param req: The request to open.
+ :return: The response.
+ :raises PermissionError: If the URL ends with "blocked.jsonld".
+ """
+ if req.get_full_url().endswith("blocked.jsonld"):
+ raise PermissionError("Permission denied for URL")
+ return super().http_open(req)
+
+
+def main() -> None:
+ """
+ The main code of the example.
+
+ The important steps are:
+
+ * Install a custom global URL opener that blocks some URLs.
+ * Attempt to parse a JSON-LD document that will result in a blocked URL being accessed.
+ * Verify that the URL opener blocked access to the URL.
+ """
+
+ logging.basicConfig(
+ level=os.environ.get("PYTHON_LOGGING_LEVEL", logging.INFO),
+ stream=sys.stderr,
+ datefmt="%Y-%m-%dT%H:%M:%S",
+ format=(
+ "%(asctime)s.%(msecs)03d %(process)d %(thread)d %(levelno)03d:%(levelname)-8s "
+ "%(name)-12s %(module)s:%(lineno)s:%(funcName)s %(message)s"
+ ),
+ )
+
+ opener = OpenerDirector()
+ opener.add_handler(SecuredHTTPHandler())
+ install_opener(opener)
+
+ graph = Graph()
+
+ # Attempt to parse a JSON-LD document that will result in the blocked URL
+ # being accessed.
+ error: Optional[PermissionError] = None
+ try:
+ graph.parse(
+ data=r"""{
+ "@context": "http://example.org/blocked.jsonld",
+ "@id": "example:subject",
+ "example:predicate": { "@id": "example:object" }
+ }""",
+ format="json-ld",
+ )
+ except PermissionError as caught:
+ logging.info("Permission denied: %s", caught)
+ error = caught
+
+ # `Graph.parse` would have resulted in a `PermissionError` being raised from
+ # the url opener.
+ assert isinstance(error, PermissionError)
+ assert error.args[0] == "Permission denied for URL"
+
+
+if __name__ == "__main__":
+ main()
diff --git a/rdflib/graph.py b/rdflib/graph.py
index 717788fd..7d32ab38 100644
--- a/rdflib/graph.py
+++ b/rdflib/graph.py
@@ -1387,22 +1387,34 @@ class Graph(Node):
"""
Parse an RDF source adding the resulting triples to the Graph.
- The source is specified using one of source, location, file or
- data.
+ The source is specified using one of source, location, file or data.
+
+ .. caution::
+
+ This method can access directly or indirectly requested network or
+ file resources, for example, when parsing JSON-LD documents with
+ ``@context`` directives that point to a network location.
+
+ When processing untrusted or potentially malicious documents,
+ measures should be taken to restrict network and file access.
+
+ For information on available security measures, see the RDFLib
+ :doc:`Security Considerations </security_considerations>`
+ documentation.
:Parameters:
- ``source``: An InputSource, file-like object, or string. In the case
of a string the string is the location of the source.
- - ``location``: A string indicating the relative or absolute URL of the
- source. Graph's absolutize method is used if a relative location
+ - ``location``: A string indicating the relative or absolute URL of
+ the source. Graph's absolutize method is used if a relative location
is specified.
- ``file``: A file-like object.
- ``data``: A string containing the data to be parsed.
- - ``format``: Used if format can not be determined from source, e.g. file
- extension or Media Type. Defaults to text/turtle. Format support can
- be extended with plugins, but "xml", "n3" (use for turtle), "nt" &
- "trix" are built in.
+ - ``format``: Used if format can not be determined from source, e.g.
+ file extension or Media Type. Defaults to text/turtle. Format
+ support can be extended with plugins, but "xml", "n3" (use for
+ turtle), "nt" & "trix" are built in.
- ``publicID``: the logical URI to use as the document base. If None
specified the document location is used (at least in the case where
there is a document location).
@@ -1507,12 +1519,25 @@ class Graph(Node):
"""
Query this graph.
- A type of 'prepared queries' can be realised by providing
- initial variable bindings with initBindings
+ A type of 'prepared queries' can be realised by providing initial
+ variable bindings with initBindings
+
+ Initial namespaces are used to resolve prefixes used in the query, if
+ none are given, the namespaces from the graph's namespace manager are
+ used.
+
+ .. caution::
+
+ This method can access indirectly requested network endpoints, for
+ example, query processing will attempt to access network endpoints
+ specified in ``SERVICE`` directives.
- Initial namespaces are used to resolve prefixes used in the query,
- if none are given, the namespaces from the graph's namespace manager
- are used.
+ When processing untrusted or potentially malicious queries, measures
+ should be taken to restrict network and file access.
+
+ For information on available security measures, see the RDFLib
+ :doc:`Security Considerations </security_considerations>`
+ documentation.
:returntype: :class:`~rdflib.query.Result`
@@ -1550,7 +1575,22 @@ class Graph(Node):
use_store_provided: bool = True,
**kwargs: Any,
) -> None:
- """Update this graph with the given update query."""
+ """
+ Update this graph with the given update query.
+
+ .. caution::
+
+ This method can access indirectly requested network endpoints, for
+ example, query processing will attempt to access network endpoints
+ specified in ``SERVICE`` directives.
+
+ When processing untrusted or potentially malicious queries, measures
+ should be taken to restrict network and file access.
+
+ For information on available security measures, see the RDFLib
+ :doc:`Security Considerations </security_considerations>`
+ documentation.
+ """
initBindings = initBindings or {} # noqa: N806
initNs = initNs or dict(self.namespaces()) # noqa: N806
@@ -2171,6 +2211,19 @@ class ConjunctiveGraph(Graph):
The graph into which the source was parsed. In the case of n3
it returns the root context.
+
+ .. caution::
+
+ This method can access directly or indirectly requested network or
+ file resources, for example, when parsing JSON-LD documents with
+ ``@context`` directives that point to a network location.
+
+ When processing untrusted or potentially malicious documents,
+ measures should be taken to restrict network and file access.
+
+ For information on available security measures, see the RDFLib
+ :doc:`Security Considerations </security_considerations>`
+ documentation.
"""
source = create_input_source(
@@ -2401,6 +2454,22 @@ class Dataset(ConjunctiveGraph):
data: Optional[Union[str, bytes]] = None,
**args: Any,
) -> "Graph":
+ """
+
+ .. caution::
+
+ This method can access directly or indirectly requested network or
+ file resources, for example, when parsing JSON-LD documents with
+ ``@context`` directives that point to a network location.
+
+ When processing untrusted or potentially malicious documents,
+ measures should be taken to restrict network and file access.
+
+ For information on available security measures, see the RDFLib
+ :doc:`Security Considerations </security_considerations>`
+ documentation.
+ """
+
c = ConjunctiveGraph.parse(
self, source, publicID, format, location, file, data, **args
)
diff --git a/rdflib/plugins/sparql/evaluate.py b/rdflib/plugins/sparql/evaluate.py
index 252c73ba..4f8d687b 100644
--- a/rdflib/plugins/sparql/evaluate.py
+++ b/rdflib/plugins/sparql/evaluate.py
@@ -645,6 +645,22 @@ def evalQuery(
initBindings: Mapping[str, Identifier],
base: Optional[str] = None,
) -> Mapping[Any, Any]:
+ """
+
+ .. caution::
+
+ This method can access indirectly requested network endpoints, for
+ example, query processing will attempt to access network endpoints
+ specified in ``SERVICE`` directives.
+
+ When processing untrusted or potentially malicious queries, measures
+ should be taken to restrict network and file access.
+
+ For information on available security measures, see the RDFLib
+ :doc:`Security Considerations </security_considerations>`
+ documentation.
+ """
+
initBindings = dict((Variable(k), v) for k, v in initBindings.items())
ctx = QueryContext(graph, initBindings=initBindings)
diff --git a/rdflib/plugins/sparql/processor.py b/rdflib/plugins/sparql/processor.py
index e4d83494..c2fb7e54 100644
--- a/rdflib/plugins/sparql/processor.py
+++ b/rdflib/plugins/sparql/processor.py
@@ -76,6 +76,21 @@ class SPARQLUpdateProcessor(UpdateProcessor):
initBindings: Mapping[str, Identifier] = {},
initNs: Mapping[str, Any] = {},
) -> None:
+ """
+ .. caution::
+
+ This method can access indirectly requested network endpoints, for
+ example, query processing will attempt to access network endpoints
+ specified in ``SERVICE`` directives.
+
+ When processing untrusted or potentially malicious queries, measures
+ should be taken to restrict network and file access.
+
+ For information on available security measures, see the RDFLib
+ :doc:`Security Considerations </security_considerations>`
+ documentation.
+ """
+
if isinstance(strOrQuery, str):
strOrQuery = translateUpdate(parseUpdate(strOrQuery), initNs=initNs)
@@ -102,6 +117,19 @@ class SPARQLProcessor(Processor):
Evaluate a query with the given initial bindings, and initial
namespaces. The given base is used to resolve relative URIs in
the query and will be overridden by any BASE given in the query.
+
+ .. caution::
+
+ This method can access indirectly requested network endpoints, for
+ example, query processing will attempt to access network endpoints
+ specified in ``SERVICE`` directives.
+
+ When processing untrusted or potentially malicious queries, measures
+ should be taken to restrict network and file access.
+
+ For information on available security measures, see the RDFLib
+ :doc:`Security Considerations </security_considerations>`
+ documentation.
"""
if not isinstance(strOrQuery, Query):
diff --git a/rdflib/plugins/sparql/update.py b/rdflib/plugins/sparql/update.py
index 9be375bd..f27ee9b3 100644
--- a/rdflib/plugins/sparql/update.py
+++ b/rdflib/plugins/sparql/update.py
@@ -299,6 +299,19 @@ def evalUpdate(
This will return None on success and raise Exceptions on error
+ .. caution::
+
+ This method can access indirectly requested network endpoints, for
+ example, query processing will attempt to access network endpoints
+ specified in ``SERVICE`` directives.
+
+ When processing untrusted or potentially malicious queries, measures
+ should be taken to restrict network and file access.
+
+ For information on available security measures, see the RDFLib
+ :doc:`Security Considerations </security_considerations>`
+ documentation.
+
"""
for u in update.algebra:
diff --git a/test/conftest.py b/test/conftest.py
index daee3f28..98fe4738 100644
--- a/test/conftest.py
+++ b/test/conftest.py
@@ -1,10 +1,14 @@
+import sys
+from contextlib import ExitStack
+
import pytest
pytest.register_assert_rewrite("test.utils")
+from test.utils.audit import AuditHookDispatcher # noqa: E402
from test.utils.http import ctx_http_server # noqa: E402
from test.utils.httpfileserver import HTTPFileServer # noqa: E402
-from typing import Generator # noqa: E402
+from typing import Generator, Optional # noqa: E402
from rdflib import Graph
@@ -47,3 +51,19 @@ def function_httpmock(
) -> Generator[ServedBaseHTTPServerMock, None, None]:
_session_function_httpmock.reset()
yield _session_function_httpmock
+
+
+@pytest.fixture(scope="session", autouse=True)
+def audit_hook_dispatcher() -> Generator[Optional[AuditHookDispatcher], None, None]:
+ if sys.version_info >= (3, 8):
+ dispatcher = AuditHookDispatcher()
+ sys.addaudithook(dispatcher.audit)
+ yield dispatcher
+ else:
+ yield None
+
+
+@pytest.fixture(scope="function")
+def exit_stack() -> Generator[ExitStack, None, None]:
+ with ExitStack() as stack:
+ yield stack
diff --git a/test/test_misc/test_security.py b/test/test_misc/test_security.py
new file mode 100644
index 00000000..b4c8fc22
--- /dev/null
+++ b/test/test_misc/test_security.py
@@ -0,0 +1,172 @@
+import enum
+import http.client
+import itertools
+import logging
+from contextlib import ExitStack
+from pathlib import Path
+from test.utils.audit import AuditHookDispatcher
+from test.utils.httpfileserver import HTTPFileServer, ProtoFileResource
+from test.utils.urlopen import context_urlopener
+from textwrap import dedent
+from typing import Any, Iterable, Optional, Tuple
+from urllib.request import HTTPHandler, OpenerDirector, Request
+
+import pytest
+from _pytest.mark.structures import ParameterSet
+
+from rdflib import Graph
+from rdflib.namespace import Namespace
+
+from ..utils import GraphHelper
+from ..utils.path import ctx_chdir
+
+EGNS = Namespace("http://example.org/")
+
+JSONLD_CONTEXT = """
+{
+ "@context": {
+ "ex": "http://example.org/"
+ }
+}
+"""
+
+EXPECTED_GRAPH = Graph().add((EGNS.subject, EGNS.predicate, EGNS.object))
+
+
+def test_default(tmp_path: Path) -> None:
+ context_file = tmp_path / "context.jsonld"
+ context_file.write_text(dedent(JSONLD_CONTEXT))
+
+ data = f"""
+ {{
+ "@context": "{context_file.as_uri()}",
+ "@id": "ex:subject",
+ "ex:predicate": {{ "@id": "ex:object" }}
+ }}
+ """
+
+ graph = Graph()
+ graph.parse(format="json-ld", data=data)
+ logging.debug("graph = %s", GraphHelper.triple_set(graph))
+ GraphHelper.assert_sets_equals(EXPECTED_GRAPH, graph)
+
+
+class Defence(enum.Enum):
+ NONE = enum.auto()
+ AUDIT_HOOK = enum.auto()
+ URL_OPENER = enum.auto()
+
+
+class URIKind(enum.Enum):
+ FILE = enum.auto()
+ HTTP = enum.auto()
+ RELATIVE = enum.auto()
+
+
+def generate_make_block_file_cases() -> Iterable[ParameterSet]:
+ for defence, uri_kind in itertools.product(Defence, URIKind):
+ if defence == Defence.URL_OPENER and uri_kind != URIKind.HTTP:
+ # URL opener only works for not file URIs
+ continue
+ yield pytest.param(defence, uri_kind)
+
+
+@pytest.mark.parametrize(["defence", "uri_kind"], generate_make_block_file_cases())
+def test_block_file(
+ tmp_path: Path,
+ audit_hook_dispatcher: Optional[AuditHookDispatcher],
+ http_file_server: HTTPFileServer,
+ exit_stack: ExitStack,
+ defence: Defence,
+ uri_kind: URIKind,
+) -> None:
+ if audit_hook_dispatcher is None:
+ pytest.skip(
+ "audit hook dispatcher not available, likely because of Python version"
+ )
+
+ context_file = tmp_path / "context.jsonld"
+ context_file.write_text(dedent(JSONLD_CONTEXT))
+ context_file_served = http_file_server.add_file_with_caching(
+ ProtoFileResource((), context_file)
+ )
+
+ context_uri: str
+ if uri_kind == URIKind.FILE:
+ context_uri = context_file.as_uri()
+ elif uri_kind == URIKind.HTTP:
+ context_uri = context_file_served.request_url
+ elif uri_kind == URIKind.RELATIVE:
+ context_uri = context_file.name
+ exit_stack.enter_context(ctx_chdir(tmp_path))
+ else:
+ raise ValueError(f"unknown URI kind: {uri_kind}")
+
+ data = f"""
+ {{
+ "@context": "{context_uri}",
+ "@id": "ex:subject",
+ "ex:predicate": {{ "@id": "ex:object" }}
+ }}
+ """
+
+ data_file = tmp_path / "data.jsonld"
+ data_file.write_text(dedent(data))
+
+ if defence == Defence.AUDIT_HOOK and uri_kind == URIKind.FILE:
+
+ def audit_hook(name: str, args: Tuple[Any, ...]) -> None:
+ logging.info("block_file_access: name = %s, args = %s", name, args)
+ if name == "open" and args[0] == f"{context_file.absolute()}":
+ raise PermissionError("access blocked")
+
+ exit_stack.enter_context(audit_hook_dispatcher.ctx_hook("open", audit_hook))
+
+ elif defence == Defence.AUDIT_HOOK and uri_kind == URIKind.RELATIVE:
+
+ def audit_hook(name: str, args: Tuple[Any, ...]) -> None:
+ logging.info("block_file_access: name = %s, args = %s", name, args)
+ if name == "open" and args[0] == f"{Path.cwd() / context_file.name}":
+ raise PermissionError("access blocked")
+
+ exit_stack.enter_context(audit_hook_dispatcher.ctx_hook("open", audit_hook))
+
+ elif defence == Defence.AUDIT_HOOK and uri_kind == URIKind.HTTP:
+
+ def audit_hook(name: str, args: Tuple[Any, ...]) -> None:
+ logging.info("block_file_access: name = %s, args = %s", name, args)
+ if name == "urllib.Request" and args[0] == context_file_served.request_url:
+ raise PermissionError("access blocked")
+
+ exit_stack.enter_context(
+ audit_hook_dispatcher.ctx_hook("urllib.Request", audit_hook)
+ )
+
+ elif defence == Defence.URL_OPENER and uri_kind == URIKind.HTTP:
+ opener = OpenerDirector()
+
+ class SecuredHTTPHandler(HTTPHandler):
+ def http_open(self, req: Request) -> http.client.HTTPResponse:
+ if req.get_full_url() == context_file_served.request_url:
+ raise PermissionError("access blocked")
+ return super().http_open(req)
+
+ opener.add_handler(SecuredHTTPHandler())
+
+ exit_stack.enter_context(context_urlopener(opener))
+
+ elif defence == Defence.NONE:
+ pass
+ else:
+ raise ValueError(
+ f"unsupported defence {defence} and uri_kind {uri_kind} combination"
+ )
+
+ graph = Graph()
+ if defence != Defence.NONE:
+ with pytest.raises(PermissionError):
+ graph.parse(format="json-ld", data=data)
+ assert len(graph) == 0
+ else:
+ graph.parse(format="json-ld", data=data)
+ GraphHelper.assert_sets_equals(EXPECTED_GRAPH, graph)
diff --git a/test/utils/audit.py b/test/utils/audit.py
new file mode 100644
index 00000000..00045275
--- /dev/null
+++ b/test/utils/audit.py
@@ -0,0 +1,28 @@
+from __future__ import annotations
+
+from collections import defaultdict
+from contextlib import contextmanager
+from dataclasses import dataclass, field
+from typing import Any, Callable, DefaultDict, Generator, List, Tuple
+
+AuditHookType = Callable[[str, Tuple[Any, ...]], Any]
+
+
+@dataclass
+class AuditHookDispatcher:
+ handlers: DefaultDict[str, List[AuditHookType]] = field(
+ default_factory=lambda: defaultdict(list)
+ )
+
+ def audit(self, name: str, args: Tuple[Any, ...]) -> Any:
+ handlers = self.handlers[name]
+ for handler in handlers:
+ handler(name, args)
+
+ @contextmanager
+ def ctx_hook(self, name: str, hook: AuditHookType) -> Generator[None, None, None]:
+ self.handlers[name].append(hook)
+ try:
+ yield None
+ finally:
+ self.handlers[name].remove(hook)
diff --git a/test/utils/urlopen.py b/test/utils/urlopen.py
new file mode 100644
index 00000000..fb659707
--- /dev/null
+++ b/test/utils/urlopen.py
@@ -0,0 +1,14 @@
+import urllib.request
+from contextlib import contextmanager
+from typing import Generator, Optional
+from urllib.request import OpenerDirector, install_opener
+
+
+@contextmanager
+def context_urlopener(opener: OpenerDirector) -> Generator[OpenerDirector, None, None]:
+ old_opener: Optional[OpenerDirector] = urllib.request._opener # type: ignore[attr-defined]
+ try:
+ install_opener(opener)
+ yield opener
+ finally:
+ install_opener(old_opener) # type: ignore[arg-type]