summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorZuul <zuul@review.openstack.org>2018-04-27 15:11:08 +0000
committerGerrit Code Review <review@openstack.org>2018-04-27 15:11:08 +0000
commit2ac3ef36d2687f820cb44f6b27dd81ebf1c4165b (patch)
tree26f4fe364dcf9d7bec26533a3512baa7c9e1b419
parent96ce23ad12d0104cba9d32417bb094d1c3ac8d38 (diff)
parent6ab8c380c8d6a2e15611b225da7594e820cc773e (diff)
downloadtooz-2ac3ef36d2687f820cb44f6b27dd81ebf1c4165b.tar.gz
Merge "Implement group support for etcd3gw"1.62.0
-rw-r--r--releasenotes/notes/etcd3gw-group-support-598832a8764a8aa6.yaml4
-rw-r--r--tooz/drivers/etcd3gw.py209
2 files changed, 210 insertions, 3 deletions
diff --git a/releasenotes/notes/etcd3gw-group-support-598832a8764a8aa6.yaml b/releasenotes/notes/etcd3gw-group-support-598832a8764a8aa6.yaml
new file mode 100644
index 0000000..0231060
--- /dev/null
+++ b/releasenotes/notes/etcd3gw-group-support-598832a8764a8aa6.yaml
@@ -0,0 +1,4 @@
+---
+features:
+ - |
+ The etcd3gw driver now supports the group membership API.
diff --git a/tooz/drivers/etcd3gw.py b/tooz/drivers/etcd3gw.py
index d2a7781..c229e6b 100644
--- a/tooz/drivers/etcd3gw.py
+++ b/tooz/drivers/etcd3gw.py
@@ -29,6 +29,11 @@ from tooz import locking
from tooz import utils
+def _encode(data):
+ """Safely encode data for consumption of the gateway."""
+ return base64.b64encode(data).decode("ascii")
+
+
def _translate_failures(func):
"""Translates common requests exceptions into tooz exceptions."""
@@ -66,8 +71,8 @@ class Etcd3Lock(locking.Lock):
self._timeout = timeout
self._coord = coord
self._key = self.LOCK_PREFIX + name
- self._key_b64 = base64.b64encode(self._key).decode("ascii")
- self._uuid = base64.b64encode(uuid.uuid4().bytes).decode("ascii")
+ self._key_b64 = _encode(self._key)
+ self._uuid = _encode(uuid.uuid4().bytes)
self._exclusive_access = threading.Lock()
@_translate_failures
@@ -156,7 +161,7 @@ class Etcd3Lock(locking.Lock):
return False
-class Etcd3Driver(coordination.CoordinationDriver):
+class Etcd3Driver(coordination.CoordinationDriverWithExecutor):
"""An etcd based driver.
This driver uses etcd provide the coordination driver semantics and
@@ -172,6 +177,8 @@ class Etcd3Driver(coordination.CoordinationDriver):
#: Default port used if none provided (4001 or 2379 are the common ones).
DEFAULT_PORT = 2379
+ GROUP_PREFIX = b"tooz/groups/"
+
def __init__(self, member_id, parsed_url, options):
super(Etcd3Driver, self).__init__(member_id, parsed_url, options)
host = parsed_url.hostname or self.DEFAULT_HOST
@@ -180,8 +187,14 @@ class Etcd3Driver(coordination.CoordinationDriver):
timeout = int(options.get('timeout', self.DEFAULT_TIMEOUT))
self.client = etcd3gw.client(host=host, port=port, timeout=timeout)
self.lock_timeout = int(options.get('lock_timeout', timeout))
+ self.membership_timeout = int(options.get(
+ 'membership_timeout', timeout))
self._acquired_locks = set()
+ def _start(self):
+ super(Etcd3Driver, self)._start()
+ self._membership_lease = self.client.lease(self.membership_timeout)
+
def get_lock(self, name):
return Etcd3Lock(self, name, self.lock_timeout)
@@ -202,3 +215,193 @@ class Etcd3Driver(coordination.CoordinationDriver):
def unwatch_leave_group(self, group_id, callback):
raise tooz.NotImplemented
+
+ def _encode_group_id(self, group_id):
+ return _encode(self._prefix_group(group_id))
+
+ def _prefix_group(self, group_id):
+ return b"%s%s/" % (self.GROUP_PREFIX, group_id)
+
+ def create_group(self, group_id):
+ @_translate_failures
+ def _create_group():
+ encoded_group = self._encode_group_id(group_id)
+ txn = {
+ 'compare': [{
+ 'key': encoded_group,
+ 'result': 'EQUAL',
+ 'target': 'VERSION',
+ 'version': 0
+ }],
+ 'success': [{
+ 'request_put': {
+ 'key': encoded_group,
+ # We shouldn't need a value, but etcd3gw needs it for
+ # now
+ 'value': encoded_group
+ }
+ }],
+ 'failure': []
+ }
+ result = self.client.transaction(txn)
+ if not result.get("succeeded"):
+ raise coordination.GroupAlreadyExist(group_id)
+
+ return coordination.CoordinatorResult(
+ self._executor.submit(_create_group))
+
+ def _destroy_group(self, group_id):
+ self.client.delete(group_id)
+
+ def delete_group(self, group_id):
+ @_translate_failures
+ def _delete_group():
+ prefix_group = self._prefix_group(group_id)
+ members = self.client.get_prefix(prefix_group)
+ if len(members) > 1:
+ raise coordination.GroupNotEmpty(group_id)
+
+ encoded_group = self._encode_group_id(group_id)
+ txn = {
+ 'compare': [{
+ 'key': encoded_group,
+ 'result': 'NOT_EQUAL',
+ 'target': 'VERSION',
+ 'version': 0
+ }],
+ 'success': [{
+ 'request_delete_range': {
+ 'key': encoded_group,
+ }
+ }],
+ 'failure': []
+ }
+ result = self.client.transaction(txn)
+
+ if not result.get("succeeded"):
+ raise coordination.GroupNotCreated(group_id)
+
+ return coordination.CoordinatorResult(
+ self._executor.submit(_delete_group))
+
+ def join_group(self, group_id, capabilities=b""):
+ @_retry.retry()
+ @_translate_failures
+ def _join_group():
+ prefix_group = self._prefix_group(group_id)
+ prefix_member = prefix_group + self._member_id
+ members = self.client.get_prefix(prefix_group)
+
+ encoded_member = _encode(prefix_member)
+
+ group_metadata = None
+ for cap, metadata in members:
+ if metadata['key'] == prefix_member:
+ raise coordination.MemberAlreadyExist(group_id,
+ self._member_id)
+ if metadata['key'] == prefix_group:
+ group_metadata = metadata
+
+ if group_metadata is None:
+ raise coordination.GroupNotCreated(group_id)
+
+ encoded_group = self._encode_group_id(group_id)
+ txn = {
+ 'compare': [{
+ 'key': encoded_group,
+ 'result': 'EQUAL',
+ 'target': 'VERSION',
+ 'version': int(group_metadata['version'])
+ }],
+ 'success': [{
+ 'request_put': {
+ 'key': encoded_member,
+ 'value': _encode(utils.dumps(capabilities)),
+ 'lease': self._membership_lease.id
+ }
+ }],
+ 'failure': []
+ }
+ result = self.client.transaction(txn)
+ if not result.get('succeeded'):
+ raise _retry.TryAgain
+ else:
+ self._joined_groups.add(group_id)
+
+ return coordination.CoordinatorResult(
+ self._executor.submit(_join_group))
+
+ def leave_group(self, group_id):
+ @_translate_failures
+ def _leave_group():
+ prefix_group = self._prefix_group(group_id)
+ prefix_member = prefix_group + self._member_id
+ members = self.client.get_prefix(prefix_group)
+ for capabilities, metadata in members:
+ if metadata['key'] == prefix_member:
+ break
+ else:
+ raise coordination.MemberNotJoined(group_id,
+ self._member_id)
+
+ self.client.delete(prefix_member)
+ self._joined_groups.discard(group_id)
+
+ return coordination.CoordinatorResult(
+ self._executor.submit(_leave_group))
+
+ def get_members(self, group_id):
+ @_translate_failures
+ def _get_members():
+ prefix_group = self._prefix_group(group_id)
+ members = set()
+ group_found = False
+
+ for cap, metadata in self.client.get_prefix(prefix_group):
+ if metadata['key'] == prefix_group:
+ group_found = True
+ else:
+ members.add(metadata['key'][len(prefix_group):])
+
+ if not group_found:
+ raise coordination.GroupNotCreated(group_id)
+
+ return members
+
+ return coordination.CoordinatorResult(
+ self._executor.submit(_get_members))
+
+ def get_member_capabilities(self, group_id, member_id):
+ @_translate_failures
+ def _get_member_capabilities():
+ prefix_member = self._prefix_group(group_id) + member_id
+ result = self.client.get(prefix_member)
+ if not result:
+ raise coordination.MemberNotJoined(group_id, member_id)
+ return utils.loads(result[0])
+
+ return coordination.CoordinatorResult(
+ self._executor.submit(_get_member_capabilities))
+
+ def update_capabilities(self, group_id, capabilities):
+ @_translate_failures
+ def _update_capabilities():
+ prefix_member = self._prefix_group(group_id) + self._member_id
+ result = self.client.get(prefix_member)
+ if not result:
+ raise coordination.MemberNotJoined(group_id, self._member_id)
+
+ self.client.put(prefix_member, utils.dumps(capabilities),
+ lease=self._membership_lease)
+
+ return coordination.CoordinatorResult(
+ self._executor.submit(_update_capabilities))
+
+ def get_groups(self):
+ @_translate_failures
+ def _get_groups():
+ groups = self.client.get_prefix(self.GROUP_PREFIX)
+ return [
+ group[1]['key'][len(self.GROUP_PREFIX):-1] for group in groups]
+ return coordination.CoordinatorResult(
+ self._executor.submit(_get_groups))