summaryrefslogtreecommitdiff
path: root/taskflow/persistence/backends/impl_zookeeper.py
blob: e75826ec29168cebbcab4d80c64731b8ad32d78a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
# -*- coding: utf-8 -*-

#    Copyright (C) 2014 AT&T Labs All Rights Reserved.
#    Copyright (C) 2015 Rackspace Hosting All Rights Reserved.
#
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
#    not use this file except in compliance with the License. You may obtain
#    a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#    License for the specific language governing permissions and limitations
#    under the License.

import contextlib

from kazoo import exceptions as k_exc
from kazoo.protocol import paths
from oslo_serialization import jsonutils
from oslo_utils import strutils

from taskflow import exceptions as exc
from taskflow.persistence import path_based
from taskflow.utils import kazoo_utils as k_utils
from taskflow.utils import misc


MIN_ZK_VERSION = (3, 4, 0)


class ZkBackend(path_based.PathBasedBackend):
    """A zookeeper-backed backend.

    Example configuration::

        conf = {
            "hosts": "192.168.0.1:2181,192.168.0.2:2181,192.168.0.3:2181",
            "path": "/taskflow",
        }

    Do note that the creation of a kazoo client is achieved
    by :py:func:`~taskflow.utils.kazoo_utils.make_client` and the transfer
    of this backend configuration to that function to make a
    client may happen at ``__init__`` time. This implies that certain
    parameters from this backend configuration may be provided to
    :py:func:`~taskflow.utils.kazoo_utils.make_client` such
    that if a client was not provided by the caller one will be created
    according to :py:func:`~taskflow.utils.kazoo_utils.make_client`'s
    specification
    """

    #: Default path used when none is provided.
    DEFAULT_PATH = '/taskflow'

    def __init__(self, conf, client=None):
        super(ZkBackend, self).__init__(conf)
        if not paths.isabs(self._path):
            raise ValueError("Zookeeper path must be absolute")
        if client is not None:
            self._client = client
            self._owned = False
        else:
            self._client = k_utils.make_client(self._conf)
            self._owned = True
        self._validated = False

    def get_connection(self):
        conn = ZkConnection(self, self._client, self._conf)
        if not self._validated:
            conn.validate()
            self._validated = True
        return conn

    def close(self):
        self._validated = False
        if not self._owned:
            return
        try:
            k_utils.finalize_client(self._client)
        except (k_exc.KazooException, k_exc.ZookeeperError):
            exc.raise_with_cause(exc.StorageFailure,
                                 "Unable to finalize client")


class ZkConnection(path_based.PathBasedConnection):
    def __init__(self, backend, client, conf):
        super(ZkConnection, self).__init__(backend)
        self._conf = conf
        self._client = client
        with self._exc_wrapper():
            # NOOP if already started.
            self._client.start()

    @contextlib.contextmanager
    def _exc_wrapper(self):
        """Exception context-manager which wraps kazoo exceptions.

        This is used to capture and wrap any kazoo specific exceptions and
        then group them into corresponding taskflow exceptions (not doing
        that would expose the underlying kazoo exception model).
        """
        try:
            yield
        except self._client.handler.timeout_exception:
            exc.raise_with_cause(exc.StorageFailure,
                                 "Storage backend timeout")
        except k_exc.SessionExpiredError:
            exc.raise_with_cause(exc.StorageFailure,
                                 "Storage backend session has expired")
        except k_exc.NoNodeError:
            exc.raise_with_cause(exc.NotFound,
                                 "Storage backend node not found")
        except k_exc.NodeExistsError:
            exc.raise_with_cause(exc.Duplicate,
                                 "Storage backend duplicate node")
        except (k_exc.KazooException, k_exc.ZookeeperError):
            exc.raise_with_cause(exc.StorageFailure,
                                 "Storage backend internal error")

    def _join_path(self, *parts):
        return paths.join(*parts)

    def _get_item(self, path):
        with self._exc_wrapper():
            data, _ = self._client.get(path)
        return misc.decode_json(data)

    def _set_item(self, path, value, transaction):
        data = misc.binary_encode(jsonutils.dumps(value))
        if not self._client.exists(path):
            transaction.create(path, data)
        else:
            transaction.set_data(path, data)

    def _del_tree(self, path, transaction):
        for child in self._get_children(path):
            self._del_tree(self._join_path(path, child), transaction)
        transaction.delete(path)

    def _get_children(self, path):
        with self._exc_wrapper():
            return self._client.get_children(path)

    def _ensure_path(self, path):
        with self._exc_wrapper():
            self._client.ensure_path(path)

    def _create_link(self, src_path, dest_path, transaction):
        if not self._client.exists(dest_path):
            transaction.create(dest_path)

    @contextlib.contextmanager
    def _transaction(self):
        transaction = self._client.transaction()
        with self._exc_wrapper():
            yield transaction
            k_utils.checked_commit(transaction)

    def validate(self):
        with self._exc_wrapper():
            try:
                if strutils.bool_from_string(
                        self._conf.get('check_compatible'), default=True):
                    k_utils.check_compatible(self._client, MIN_ZK_VERSION)
            except exc.IncompatibleVersion:
                exc.raise_with_cause(exc.StorageFailure, "Backend storage is"
                                     " not a compatible version")