summaryrefslogtreecommitdiff
path: root/examples/secure_with_audit.py
blob: 434be5a49854236f81f1db034aee29442628e273 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
"""
This example demonstrates how to use `Python audit hooks
<https://docs.python.org/3/library/sys.html#sys.addaudithook>`_ to block access
to files and URLs.

It installs a audit hook with `sys.addaudithook <https://docs.python.org/3/library/sys.html#sys.addaudithook>`_ that blocks access to files and
URLs that end with ``blocked.jsonld``.

The code in the example then verifies that the audit hook is blocking access to
URLs and files as expected.
"""

import logging
import os
import sys
from typing import Any, Optional, Tuple

from rdflib import Graph


def audit_hook(name: str, args: Tuple[Any, ...]) -> None:
    """
    An audit hook that blocks access when an attempt is made to open a
    file or URL that ends with ``blocked.jsonld``.

    Details of the audit events can be seen in the `audit events
    table <https://docs.python.org/3/library/audit_events.html>`_.

    :param name: The name of the audit event.
    :param args: The arguments of the audit event.
    :return: `None` if the audit hook does not block access.
    :raises PermissionError: If the file or URL being accessed ends with ``blocked.jsonld``.
    """
    if name == "urllib.Request" and args[0].endswith("blocked.jsonld"):
        raise PermissionError("Permission denied for URL")
    if name == "open" and args[0].endswith("blocked.jsonld"):
        raise PermissionError("Permission denied for file")
    return None


def main() -> None:
    """
    The main code of the example.

    The important steps are:

    * Install a custom audit hook that blocks some URLs and files.
    * Attempt to parse a JSON-LD document that will result in a blocked URL being accessed.
    * Verify that the audit hook blocked access to the URL.
    * Attempt to parse a JSON-LD document that will result in a blocked file being accessed.
    * Verify that the audit hook blocked access to the file.
    """

    logging.basicConfig(
        level=os.environ.get("PYTHON_LOGGING_LEVEL", logging.INFO),
        stream=sys.stderr,
        datefmt="%Y-%m-%dT%H:%M:%S",
        format=(
            "%(asctime)s.%(msecs)03d %(process)d %(thread)d %(levelno)03d:%(levelname)-8s "
            "%(name)-12s %(module)s:%(lineno)s:%(funcName)s %(message)s"
        ),
    )

    if sys.version_info < (3, 8):
        logging.warn("This example requires Python 3.8 or higher")
        return None

    # Install the audit hook
    #
    # note on type error: This is needed because we are running mypy with python
    # 3.7 mode, so mypy thinks the previous condition will always be true.
    sys.addaudithook(audit_hook)  # type: ignore[unreachable]

    graph = Graph()

    # Attempt to parse a JSON-LD document that will result in the blocked URL
    # being accessed.
    error: Optional[PermissionError] = None
    try:
        graph.parse(
            data=r"""{
            "@context": "http://example.org/blocked.jsonld",
            "@id": "example:subject",
            "example:predicate": { "@id": "example:object" }
        }""",
            format="json-ld",
        )
    except PermissionError as caught:
        logging.info("Permission denied: %s", caught)
        error = caught

    # `Graph.parse` would have resulted in a `PermissionError` being raised from
    # the audit hook.
    assert isinstance(error, PermissionError)
    assert error.args[0] == "Permission denied for URL"

    # Attempt to parse a JSON-LD document that will result in the blocked file
    # being accessed.
    error = None
    try:
        graph.parse(
            data=r"""{
            "@context": "file:///srv/blocked.jsonld",
            "@id": "example:subject",
            "example:predicate": { "@id": "example:object" }
        }""",
            format="json-ld",
        )
    except PermissionError as caught:
        logging.info("Permission denied: %s", caught)
        error = caught

    # `Graph.parse` would have resulted in a `PermissionError` being raised from
    # the audit hook.
    assert isinstance(error, PermissionError)
    assert error.args[0] == "Permission denied for file"


if __name__ == "__main__":
    main()