diff options
author | Jenkins <jenkins@review.openstack.org> | 2016-02-09 13:12:34 +0000 |
---|---|---|
committer | Gerrit Code Review <review@openstack.org> | 2016-02-09 13:12:35 +0000 |
commit | b8ac0c4c0dbe1cfae56aabfa4ee009d070f00fc2 (patch) | |
tree | 24df51f4fd9bf8fb45fee0ce44c2a4f4af497cfa | |
parent | ef95d674b031c26b4f8c5539150ed7cbee136e67 (diff) | |
parent | c7646b2adff3da331392b1316b0a1d82e0b39389 (diff) | |
download | ceilometer-b8ac0c4c0dbe1cfae56aabfa4ee009d070f00fc2.tar.gz |
Merge "raise coordination error if not registered" into stable/liberty
-rw-r--r-- | ceilometer/coordination.py | 26 |
1 files changed, 24 insertions, 2 deletions
diff --git a/ceilometer/coordination.py b/ceilometer/coordination.py index 0a31fe4f..e4b173f8 100644 --- a/ceilometer/coordination.py +++ b/ceilometer/coordination.py @@ -17,9 +17,10 @@ import uuid from oslo_config import cfg from oslo_log import log +import retrying import tooz.coordination -from ceilometer.i18n import _LE, _LI +from ceilometer.i18n import _LE, _LI, _LW from ceilometer import utils LOG = log.getLogger(__name__) @@ -45,6 +46,18 @@ OPTS = [ cfg.CONF.register_opts(OPTS, group='coordination') +class MemberNotInGroupError(Exception): + def __init__(self, group_id, members, my_id): + super(MemberNotInGroupError, self).__init__(_LE( + 'Group ID: %{group_id}s, Members: %{members}s, Me: %{me}s: ' + 'Current agent is not part of group and cannot take tasks') % + {'group_id': group_id, 'members': members, 'me': my_id}) + + +def retry_on_member_not_in_group(exception): + return isinstance(exception, MemberNotInGroupError) + + class PartitionCoordinator(object): """Workload partitioning coordinator. @@ -154,7 +167,9 @@ class PartitionCoordinator(object): except tooz.coordination.GroupNotCreated: self.join_group(group_id) - def extract_my_subset(self, group_id, iterable): + @retrying.retry(stop_max_attempt_number=5, wait_random_max=2000, + retry_on_exception=retry_on_member_not_in_group) + def extract_my_subset(self, group_id, iterable, attempt=0): """Filters an iterable, returning only objects assigned to this agent. We have a list of objects and get a list of active group members from @@ -168,6 +183,13 @@ class PartitionCoordinator(object): try: members = self._get_members(group_id) LOG.debug('Members of group: %s, Me: %s', members, self._my_id) + if self._my_id not in members: + LOG.warning(_LW('Cannot extract tasks because agent failed to ' + 'join group properly. Rejoining group.')) + self.join_group(group_id) + members = self._get_members(group_id) + if self._my_id not in members: + raise MemberNotInGroupError(group_id, members, self._my_id) hr = utils.HashRing(members) filtered = [v for v in iterable if hr.get_node(str(v)) == self._my_id] |