1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
|
import logging
import threading
import requests
import os
from io import BytesIO
from rdflib.query import Result
log = logging.getLogger(__name__)
class SPARQLConnectorException(Exception):
pass
# TODO: Pull in these from the result implementation plugins?
_response_mime_types = {
'xml': 'application/sparql-results+xml, application/rdf+xml',
'json': 'application/sparql-results+json',
'csv': 'text/csv',
'tsv': 'text/tab-separated-values',
'application/rdf+xml': 'application/rdf+xml',
}
class SPARQLConnector(object):
"""
this class deals with nitty gritty details of talking to a SPARQL server
"""
def __init__(self, query_endpoint=None, update_endpoint=None, returnFormat='xml', method='GET', **kwargs):
"""
Any additional keyword arguments will be passed to requests, and can be used to setup timesouts, basic auth, etc.
"""
self.returnFormat = returnFormat
self.query_endpoint = query_endpoint
self.update_endpoint = update_endpoint
self.kwargs = kwargs
self.method = method
# it is recommended to have one session object per thread/process. This assures that is the case.
# https://github.com/kennethreitz/requests/issues/1871
self._session = threading.local()
@property
def session(self):
k = 'session_%d' % os.getpid()
self._session.__dict__.setdefault(k, requests.Session())
log.debug('Session %s %s', os.getpid(), id(self._session.__dict__[k]))
return self._session.__dict__[k]
@property
def method(self):
return self._method
@method.setter
def method(self, method):
if method not in ('GET', 'POST'):
raise SPARQLConnectorException('Method must be "GET" or "POST"')
self._method = method
def query(self, query, default_graph=None):
if not self.query_endpoint:
raise SPARQLConnectorException("Query endpoint not set!")
params = {'query': query}
if default_graph:
params["default-graph-uri"] = default_graph
headers = {'Accept': _response_mime_types[self.returnFormat]}
args = dict(self.kwargs)
args.update(url=self.query_endpoint)
# merge params/headers dicts
args.setdefault('params', {})
args.setdefault('headers', {})
args['headers'].update(headers)
if self.method == 'GET':
args['params'].update(params)
elif self.method == 'POST':
args['data'] = params
else:
raise SPARQLConnectorException("Unknown method %s" % self.method)
res = self.session.request(self.method, **args)
res.raise_for_status()
return Result.parse(BytesIO(res.content), content_type=res.headers['Content-type'])
def update(self, update, default_graph=None):
if not self.update_endpoint:
raise SPARQLConnectorException("Query endpoint not set!")
params = {}
if default_graph:
params["using-graph-uri"] = default_graph
headers = {'Accept': _response_mime_types[self.returnFormat]}
args = dict(self.kwargs)
args.update(url=self.update_endpoint,
data=update.encode('utf-8'))
# merge params/headers dicts
args.setdefault('params', {})
args['params'].update(params)
args.setdefault('headers', {})
args['headers'].update(headers)
res = self.session.post(**args)
res.raise_for_status()
def close(self):
self.session.close()
|