summaryrefslogtreecommitdiff
path: root/rdflib/plugins/stores/concurrent.py
blob: cdf41ba0e21118ee5c5346c55db4b0947908e285 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
from threading import Lock


class ResponsibleGenerator(object):
    """A generator that will help clean up when it is done being used."""

    __slots__ = ["cleanup", "gen"]

    def __init__(self, gen, cleanup):
        self.cleanup = cleanup
        self.gen = gen

    def __del__(self):
        self.cleanup()

    def __iter__(self):
        return self

    def __next__(self):
        return next(self.gen)


class ConcurrentStore(object):
    def __init__(self, store):
        self.store = store

        # number of calls to visit still in progress
        self.__visit_count = 0

        # lock for locking down the indices
        self.__lock = Lock()

        # lists for keeping track of added and removed triples while
        # we wait for the lock
        self.__pending_removes = []
        self.__pending_adds = []

    def add(self, triple):
        (s, p, o) = triple
        if self.__visit_count == 0:
            self.store.add((s, p, o))
        else:
            self.__pending_adds.append((s, p, o))

    def remove(self, triple):
        (s, p, o) = triple
        if self.__visit_count == 0:
            self.store.remove((s, p, o))
        else:
            self.__pending_removes.append((s, p, o))

    def triples(self, triple):
        (su, pr, ob) = triple
        g = self.store.triples((su, pr, ob))
        pending_removes = self.__pending_removes
        self.__begin_read()
        for s, p, o in ResponsibleGenerator(g, self.__end_read):
            if not (s, p, o) in pending_removes:
                yield s, p, o

        for s, p, o in self.__pending_adds:
            if (
                (su is None or su == s)
                and (pr is None or pr == p)
                and (ob is None or ob == o)
            ):
                yield s, p, o

    def __len__(self):
        return self.store.__len__()

    def __begin_read(self):
        lock = self.__lock
        lock.acquire()
        self.__visit_count = self.__visit_count + 1
        lock.release()

    def __end_read(self):
        lock = self.__lock
        lock.acquire()
        self.__visit_count = self.__visit_count - 1
        if self.__visit_count == 0:
            pending_removes = self.__pending_removes
            while pending_removes:
                (s, p, o) = pending_removes.pop()
                try:
                    self.store.remove((s, p, o))
                except:  # noqa: E722
                    # TODO: change to try finally?
                    print(s, p, o, "Not in store to remove")
            pending_adds = self.__pending_adds
            while pending_adds:
                (s, p, o) = pending_adds.pop()
                self.store.add((s, p, o))
        lock.release()