1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
|
import enum
import http.client
import itertools
import logging
from contextlib import ExitStack
from pathlib import Path
from test.utils.audit import AuditHookDispatcher
from test.utils.httpfileserver import HTTPFileServer, ProtoFileResource
from test.utils.urlopen import context_urlopener
from textwrap import dedent
from typing import Any, Iterable, Optional, Tuple
from urllib.request import HTTPHandler, OpenerDirector, Request
import pytest
from _pytest.mark.structures import ParameterSet
from rdflib import Graph
from rdflib.namespace import Namespace
from ..utils import GraphHelper
from ..utils.path import ctx_chdir
EGNS = Namespace("http://example.org/")
JSONLD_CONTEXT = """
{
"@context": {
"ex": "http://example.org/"
}
}
"""
EXPECTED_GRAPH = Graph().add((EGNS.subject, EGNS.predicate, EGNS.object))
def test_default(tmp_path: Path) -> None:
context_file = tmp_path / "context.jsonld"
context_file.write_text(dedent(JSONLD_CONTEXT))
data = f"""
{{
"@context": "{context_file.as_uri()}",
"@id": "ex:subject",
"ex:predicate": {{ "@id": "ex:object" }}
}}
"""
graph = Graph()
graph.parse(format="json-ld", data=data)
logging.debug("graph = %s", GraphHelper.triple_set(graph))
GraphHelper.assert_sets_equals(EXPECTED_GRAPH, graph)
class Defence(enum.Enum):
NONE = enum.auto()
AUDIT_HOOK = enum.auto()
URL_OPENER = enum.auto()
class URIKind(enum.Enum):
FILE = enum.auto()
HTTP = enum.auto()
RELATIVE = enum.auto()
def generate_make_block_file_cases() -> Iterable[ParameterSet]:
for defence, uri_kind in itertools.product(Defence, URIKind):
if defence == Defence.URL_OPENER and uri_kind != URIKind.HTTP:
# URL opener only works for not file URIs
continue
yield pytest.param(defence, uri_kind)
@pytest.mark.parametrize(["defence", "uri_kind"], generate_make_block_file_cases())
def test_block_file(
tmp_path: Path,
audit_hook_dispatcher: Optional[AuditHookDispatcher],
http_file_server: HTTPFileServer,
exit_stack: ExitStack,
defence: Defence,
uri_kind: URIKind,
) -> None:
if audit_hook_dispatcher is None:
pytest.skip(
"audit hook dispatcher not available, likely because of Python version"
)
context_file = tmp_path / "context.jsonld"
context_file.write_text(dedent(JSONLD_CONTEXT))
context_file_served = http_file_server.add_file_with_caching(
ProtoFileResource((), context_file)
)
context_uri: str
if uri_kind == URIKind.FILE:
context_uri = context_file.as_uri()
elif uri_kind == URIKind.HTTP:
context_uri = context_file_served.request_url
elif uri_kind == URIKind.RELATIVE:
context_uri = context_file.name
exit_stack.enter_context(ctx_chdir(tmp_path))
else:
raise ValueError(f"unknown URI kind: {uri_kind}")
data = f"""
{{
"@context": "{context_uri}",
"@id": "ex:subject",
"ex:predicate": {{ "@id": "ex:object" }}
}}
"""
data_file = tmp_path / "data.jsonld"
data_file.write_text(dedent(data))
if defence == Defence.AUDIT_HOOK and uri_kind == URIKind.FILE:
def audit_hook(name: str, args: Tuple[Any, ...]) -> None:
logging.info("block_file_access: name = %s, args = %s", name, args)
if name == "open" and args[0] == f"{context_file.absolute()}":
raise PermissionError("access blocked")
exit_stack.enter_context(audit_hook_dispatcher.ctx_hook("open", audit_hook))
elif defence == Defence.AUDIT_HOOK and uri_kind == URIKind.RELATIVE:
def audit_hook(name: str, args: Tuple[Any, ...]) -> None:
logging.info("block_file_access: name = %s, args = %s", name, args)
if name == "open" and args[0] == f"{Path.cwd() / context_file.name}":
raise PermissionError("access blocked")
exit_stack.enter_context(audit_hook_dispatcher.ctx_hook("open", audit_hook))
elif defence == Defence.AUDIT_HOOK and uri_kind == URIKind.HTTP:
def audit_hook(name: str, args: Tuple[Any, ...]) -> None:
logging.info("block_file_access: name = %s, args = %s", name, args)
if name == "urllib.Request" and args[0] == context_file_served.request_url:
raise PermissionError("access blocked")
exit_stack.enter_context(
audit_hook_dispatcher.ctx_hook("urllib.Request", audit_hook)
)
elif defence == Defence.URL_OPENER and uri_kind == URIKind.HTTP:
opener = OpenerDirector()
class SecuredHTTPHandler(HTTPHandler):
def http_open(self, req: Request) -> http.client.HTTPResponse:
if req.get_full_url() == context_file_served.request_url:
raise PermissionError("access blocked")
return super().http_open(req)
opener.add_handler(SecuredHTTPHandler())
exit_stack.enter_context(context_urlopener(opener))
elif defence == Defence.NONE:
pass
else:
raise ValueError(
f"unsupported defence {defence} and uri_kind {uri_kind} combination"
)
graph = Graph()
if defence != Defence.NONE:
with pytest.raises(PermissionError):
graph.parse(format="json-ld", data=data)
assert len(graph) == 0
else:
graph.parse(format="json-ld", data=data)
GraphHelper.assert_sets_equals(EXPECTED_GRAPH, graph)
|