summaryrefslogtreecommitdiff
path: root/nova/servicegroup/drivers/zk.py
blob: 9ba3ae64f98287b053fdcbf1681c08146fb622ea (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
# Copyright (c) AT&T 2012-2013 Yun Mao <yunmao@gmail.com>
# Copyright 2012 IBM Corp.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os

import eventlet
from oslo.config import cfg

from nova import exception
from nova.i18n import _
from nova.openstack.common import importutils
from nova.openstack.common import log as logging
from nova.openstack.common import loopingcall
from nova.servicegroup import api

evzookeeper = importutils.try_import('evzookeeper')
membership = importutils.try_import('evzookeeper.membership')
zookeeper = importutils.try_import('zookeeper')

zk_driver_opts = [
    cfg.StrOpt('address',
               help='The ZooKeeper addresses for servicegroup service in the '
                    'format of host1:port,host2:port,host3:port'),
    cfg.IntOpt('recv_timeout',
               default=4000,
               help='The recv_timeout parameter for the zk session'),
    cfg.StrOpt('sg_prefix',
               default="/servicegroups",
               help='The prefix used in ZooKeeper to store ephemeral nodes'),
    cfg.IntOpt('sg_retry_interval',
               default=5,
               help='Number of seconds to wait until retrying to join the '
                    'session'),
    ]

CONF = cfg.CONF
CONF.register_opts(zk_driver_opts, group="zookeeper")

LOG = logging.getLogger(__name__)


class ZooKeeperDriver(api.ServiceGroupDriver):
    """ZooKeeper driver for the service group API."""

    def __init__(self, *args, **kwargs):
        """Create the zk session object."""
        if not all([evzookeeper, membership, zookeeper]):
            raise ImportError('zookeeper module not found')
        null = open(os.devnull, "w")
        self._session = evzookeeper.ZKSession(CONF.zookeeper.address,
                                              recv_timeout=
                                                CONF.zookeeper.recv_timeout,
                                              zklog_fd=null)
        self._memberships = {}
        self._monitors = {}
        # Make sure the prefix exists
        try:
            self._session.create(CONF.zookeeper.sg_prefix, "",
                                 acl=[evzookeeper.ZOO_OPEN_ACL_UNSAFE])
        except zookeeper.NodeExistsException:
            pass

        super(ZooKeeperDriver, self).__init__()

    def join(self, member_id, group, service=None):
        """Join the given service with its group."""
        LOG.debug('ZooKeeperDriver: join new member %(id)s to the '
                  '%(gr)s group, service=%(sr)s',
                  {'id': member_id, 'gr': group, 'sr': service})
        member = self._memberships.get((group, member_id), None)
        if member is None:
            # the first time to join. Generate a new object
            path = "%s/%s" % (CONF.zookeeper.sg_prefix, group)
            try:
                member = membership.Membership(self._session, path, member_id)
            except RuntimeError:
                LOG.exception(_("Unable to join. It is possible that either "
                                "another node exists with the same name, or "
                                "this node just restarted. We will try "
                                "again in a short while to make sure."))
                eventlet.sleep(CONF.zookeeper.sg_retry_interval)
                member = membership.Membership(self._session, path, member_id)
            self._memberships[(group, member_id)] = member
        return FakeLoopingCall(self, member_id, group)

    def leave(self, member_id, group):
        """Remove the given member from the service group."""
        LOG.debug('ZooKeeperDriver.leave: %(member)s from group %(group)s',
                  {'member': member_id, 'group': group})
        try:
            key = (group, member_id)
            member = self._memberships[key]
            member.leave()
            del self._memberships[key]
        except KeyError:
            LOG.error(_('ZooKeeperDriver.leave: %(id)s has not joined to the '
                        '%(gr)s group'), {'id': member_id, 'gr': group})

    def is_up(self, service_ref):
        group_id = service_ref['topic']
        member_id = service_ref['host']
        all_members = self.get_all(group_id)
        return member_id in all_members

    def get_all(self, group_id):
        """Return all members in a list, or a ServiceGroupUnavailable
        exception.
        """
        monitor = self._monitors.get(group_id, None)
        if monitor is None:
            path = "%s/%s" % (CONF.zookeeper.sg_prefix, group_id)
            monitor = membership.MembershipMonitor(self._session, path)
            self._monitors[group_id] = monitor
            # Note(maoy): When initialized for the first time, it takes a
            # while to retrieve all members from zookeeper. To prevent
            # None to be returned, we sleep 5 sec max to wait for data to
            # be ready.
            for _retry in range(50):
                eventlet.sleep(0.1)
                all_members = monitor.get_all()
                if all_members is not None:
                    return all_members
        all_members = monitor.get_all()
        if all_members is None:
            raise exception.ServiceGroupUnavailable(driver="ZooKeeperDriver")
        return all_members


class FakeLoopingCall(loopingcall.LoopingCallBase):
    """The fake Looping Call implementation, created for backward
    compatibility with a membership based on DB.
    """
    def __init__(self, driver, host, group):
        self._driver = driver
        self._group = group
        self._host = host

    def stop(self):
        self._driver.leave(self._host, self._group)

    def start(self, interval, initial_delay=None):
        pass

    def wait(self):
        pass