1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
|
from threading import Lock
class ResponsibleGenerator(object):
"""A generator that will help clean up when it is done being used."""
__slots__ = ["cleanup", "gen"]
def __init__(self, gen, cleanup):
self.cleanup = cleanup
self.gen = gen
def __del__(self):
self.cleanup()
def __iter__(self):
return self
def __next__(self):
return next(self.gen)
class ConcurrentStore(object):
def __init__(self, store):
self.store = store
# number of calls to visit still in progress
self.__visit_count = 0
# lock for locking down the indices
self.__lock = Lock()
# lists for keeping track of added and removed triples while
# we wait for the lock
self.__pending_removes = []
self.__pending_adds = []
def add(self, triple):
(s, p, o) = triple
if self.__visit_count == 0:
self.store.add((s, p, o))
else:
self.__pending_adds.append((s, p, o))
def remove(self, triple):
(s, p, o) = triple
if self.__visit_count == 0:
self.store.remove((s, p, o))
else:
self.__pending_removes.append((s, p, o))
def triples(self, triple):
(su, pr, ob) = triple
g = self.store.triples((su, pr, ob))
pending_removes = self.__pending_removes
self.__begin_read()
for s, p, o in ResponsibleGenerator(g, self.__end_read):
if not (s, p, o) in pending_removes:
yield s, p, o
for s, p, o in self.__pending_adds:
if (
(su is None or su == s)
and (pr is None or pr == p)
and (ob is None or ob == o)
):
yield s, p, o
def __len__(self):
return self.store.__len__()
def __begin_read(self):
lock = self.__lock
lock.acquire()
self.__visit_count = self.__visit_count + 1
lock.release()
def __end_read(self):
lock = self.__lock
lock.acquire()
self.__visit_count = self.__visit_count - 1
if self.__visit_count == 0:
pending_removes = self.__pending_removes
while pending_removes:
(s, p, o) = pending_removes.pop()
try:
self.store.remove((s, p, o))
except: # noqa: E722
# TODO: change to try finally?
print(s, p, o, "Not in store to remove")
pending_adds = self.__pending_adds
while pending_adds:
(s, p, o) = pending_adds.pop()
self.store.add((s, p, o))
lock.release()
|