summaryrefslogtreecommitdiff
path: root/zuul/zk/change_cache.py
blob: 2f77ef395c40639d0ada8b5715e1d90b1c800927 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
# Copyright 2021 BMW Group
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

import abc
import contextlib
import json
import logging
import os
import threading
import time
import uuid
from collections import defaultdict
from collections.abc import Iterable
from urllib.parse import quote_plus, unquote_plus

from kazoo.exceptions import BadVersionError, NodeExistsError, NoNodeError

from zuul import model
from zuul.zk import sharding, ZooKeeperSimpleBase
from zuul.zk.exceptions import ZuulZooKeeperException
from zuul.zk.vendor.watchers import ExistingDataWatch

CHANGE_CACHE_ROOT = "/zuul/cache/connection"


def _keyFromPath(path):
    return unquote_plus(os.path.basename(path))


class ConcurrentUpdateError(ZuulZooKeeperException):
    pass


class AbstractChangeCache(ZooKeeperSimpleBase, Iterable, abc.ABC):
    """Abstract class for caching change items in Zookeeper.

    In order to make updates atomic the change data is stored separate
    from the cache entry. The data uses a random UUID znode that is
    then referenced from the actual cache entry.

    The change data is immutable, which means that an update of a cached
    item will result in a new data node. The cache entry will then be
    changed to reference the new data.

    This approach also allows us to check if a given change is
    up-to-date by comparing the referenced UUID in Zookeeper with the
    one in the local cache without loading the whole change data.

    The change data is stored in the following Zookeeper path:

        /zuul/cache/connection/<connection-name>/data/<uuid>

    The cache entries that reference the change data use the following
    path:

        /zuul/cache/connection/<connection-name>/cache/<key>

    Data nodes will not be directly removed when an entry is removed
    or updated in order to prevent race conditions with multiple
    consumers of the cache. The stale data nodes will be instead
    cleaned up in the cache's cleanup() method. This is expected to
    happen periodically.
    """
    log = logging.getLogger("zuul.zk.AbstractChangeCache")

    def __init__(self, client, connection):
        super().__init__(client)
        self.connection = connection
        self.root_path = f"{CHANGE_CACHE_ROOT}/{connection.connection_name}"
        self.cache_root = f"{self.root_path}/cache"
        self.data_root = f"{self.root_path}/data"
        self.kazoo_client.ensure_path(self.data_root)
        self.kazoo_client.ensure_path(self.cache_root)
        self._change_cache = {}
        # Per change locks to serialize concurrent creation and update of
        # local objects.
        self._change_locks = defaultdict(threading.Lock)
        self._watched_keys = set()
        # Data UUIDs that are candidates to be removed on the next
        # cleanup iteration.
        self._data_cleanup_candidates = set()
        self.kazoo_client.ChildrenWatch(self.cache_root, self._cacheWatcher)

    def _dataPath(self, data_uuid):
        return f"{self.data_root}/{data_uuid}"

    def _cachePath(self, key):
        return f"{self.cache_root}/{quote_plus(key)}"

    def _cacheWatcher(self, cache_keys):
        cache_keys = {unquote_plus(k) for k in cache_keys}
        existing_keys = set(self._change_cache.keys())
        deleted_keys = existing_keys - cache_keys
        for key in deleted_keys:
            with contextlib.suppress(KeyError):
                del self._change_cache[key]
            with contextlib.suppress(KeyError):
                del self._change_locks[key]

        stale_watches = self._watched_keys - cache_keys
        for key in stale_watches:
            with contextlib.suppress(KeyError):
                self._watched_keys.remove(key)

        new_keys = cache_keys - self._watched_keys
        for key in new_keys:
            ExistingDataWatch(self.kazoo_client,
                              f"{self.cache_root}/{quote_plus(key)}",
                              self._cacheItemWatcher)
            self._watched_keys.add(key)

    def _cacheItemWatcher(self, data, zstat, event=None):
        if not all((data, zstat, event)):
            return

        key = _keyFromPath(event.path)
        data_uuid = data.decode("utf8")
        self._get(key, data_uuid, zstat)

    def prune(self, relevant, max_age=3600):  # 1h
        cutoff_time = time.time() - max_age
        outdated = {c.cache_stat.key for c in list(self._change_cache.values())
                    if c.cache_stat.last_modified < cutoff_time}
        to_prune = outdated - set(relevant)
        for key in to_prune:
            self.delete(key)

    def cleanup(self):
        valid_uuids = {c.cache_stat.uuid
                       for c in list(self._change_cache.values())}
        stale_uuids = self._data_cleanup_candidates - valid_uuids
        for data_uuid in stale_uuids:
            self.kazoo_client.delete(self._dataPath(data_uuid), recursive=True)

        data_uuids = set(self.kazoo_client.get_children(self.data_root))
        self._data_cleanup_candidates = data_uuids - valid_uuids

    def __iter__(self):
        try:
            children = self.kazoo_client.get_children(self.cache_root)
        except NoNodeError:
            return

        for key in sorted(unquote_plus(c) for c in children):
            change = self.get(key)
            if change is not None:
                yield change

    def get(self, key):
        cache_path = self._cachePath(key)
        try:
            value, zstat = self.kazoo_client.get(cache_path)
        except NoNodeError:
            return None

        data_uuid = value.decode("utf8")
        return self._get(key, data_uuid, zstat)

    def _get(self, key, data_uuid, zstat):
        change = self._change_cache.get(key)
        if change and change.cache_stat.uuid == data_uuid:
            # Change in our local cache is up-to-date
            return change

        try:
            data = self._getData(data_uuid)
        except NoNodeError:
            cache_path = self._cachePath(key)
            self.log.error("Removing cache entry %s without any data",
                           cache_path)
            # TODO: handle no node + version mismatch
            self.kazoo_client.delete(cache_path, zstat.version)
            return None

        with self._change_locks[key]:
            if change:
                # While holding the lock check if we still need to update
                # the change and skip the update if we have the latest version.
                if change.cache_version >= zstat.version:
                    return change
                self._updateChange(change, data)
            else:
                change = self._changeFromData(data)

            change.cache_stat = model.CacheStat(key, data_uuid, zstat.version,
                                                zstat.last_modified)
            # Use setdefault here so we only have a single instance of a change
            # around. In case of a concurrent get this might return a different
            # change instance than the one we just created.
            return self._change_cache.setdefault(key, change)

    def _getData(self, data_uuid):
        with sharding.BufferedShardReader(
                self.kazoo_client, self._dataPath(data_uuid)) as stream:
            data = stream.read()
        return json.loads(data)

    def set(self, key, change, version=-1):
        data_uuid = self._setData(self._dataFromChange(change))
        cache_path = self._cachePath(key)
        with self._change_locks[key]:
            try:
                if version == -1:
                    _, zstat = self.kazoo_client.create(
                        cache_path,
                        data_uuid.encode("utf8"),
                        include_data=True)
                else:
                    # Sanity check that we only have a single change instance
                    # for a key.
                    if self._change_cache[key] is not change:
                        raise RuntimeError(
                            "Conflicting change objects (existing "
                            f"{self._change_cache[key]} vs. new {change} "
                            f"for key '{key}'")
                    zstat = self.kazoo_client.set(
                        cache_path, data_uuid.encode("utf8"), version)
            except (BadVersionError, NodeExistsError, NoNodeError) as exc:
                raise ConcurrentUpdateError from exc

            change.cache_stat = model.CacheStat(
                key, data_uuid, zstat.version, zstat.last_modified)
            self._change_cache[key] = change

    def _setData(self, data):
        data_uuid = uuid.uuid4().hex
        payload = json.dumps(data).encode("utf8")
        with sharding.BufferedShardWriter(
                self.kazoo_client, self._dataPath(data_uuid)) as stream:
            stream.write(payload)
        return data_uuid

    def updateChangeWithRetry(self, key, change, update_func, retry_count=5):
        for attempt in range(1, retry_count + 1):
            try:
                version = change.cache_version
                update_func(change)
                self.set(key, change, version)
                break
            except ConcurrentUpdateError:
                self.log.info(
                    "Conflicting cache update of %s (attempt: %s/%s)",
                    change, attempt, retry_count)
                if attempt == retry_count:
                    raise
            # Update the cache and get the change as it might have
            # changed due to a concurrent create.
            change = self.get(key)
        return change

    def delete(self, key):
        cache_path = self._cachePath(key)
        # Only delete the cache entry and NOT the data node in order to
        # prevent race conditions with other consumers. The stale data
        # nodes will be removed by the periodic cleanup.
        self.kazoo_client.delete(cache_path, recursive=True)

        with contextlib.suppress(KeyError):
            del self._change_cache[key]

    def _changeFromData(self, data):
        change_type, change_data = data["change_type"], data["change_data"]
        change_class = self._getChangeClass(change_type)
        project = self.connection.source.getProject(change_data["project"])
        change = change_class(project)
        change.deserialize(change_data)
        return change

    def _dataFromChange(self, change):
        return {
            "change_type": self._getChangeType(change),
            "change_data": change.serialize(),
        }

    def _updateChange(self, change, data):
        change.deserialize(data["change_data"])

    def _getChangeClass(self, change_type):
        """Return the change class for the given type."""
        return self.CHANGE_TYPE_MAP[change_type]

    def _getChangeType(self, change):
        """Return the change type as a string for the given type."""
        return type(change).__name__

    @abc.abstractproperty
    def CHANGE_TYPE_MAP(self):
        """Return a mapping of change type as string to change class.

        This property cann also be defined as a class attribute.
        """
        pass