summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2016-02-09 13:12:34 +0000
committerGerrit Code Review <review@openstack.org>2016-02-09 13:12:35 +0000
commitb8ac0c4c0dbe1cfae56aabfa4ee009d070f00fc2 (patch)
tree24df51f4fd9bf8fb45fee0ce44c2a4f4af497cfa
parentef95d674b031c26b4f8c5539150ed7cbee136e67 (diff)
parentc7646b2adff3da331392b1316b0a1d82e0b39389 (diff)
downloadceilometer-b8ac0c4c0dbe1cfae56aabfa4ee009d070f00fc2.tar.gz
Merge "raise coordination error if not registered" into stable/liberty
-rw-r--r--ceilometer/coordination.py26
1 files changed, 24 insertions, 2 deletions
diff --git a/ceilometer/coordination.py b/ceilometer/coordination.py
index 0a31fe4f..e4b173f8 100644
--- a/ceilometer/coordination.py
+++ b/ceilometer/coordination.py
@@ -17,9 +17,10 @@ import uuid
from oslo_config import cfg
from oslo_log import log
+import retrying
import tooz.coordination
-from ceilometer.i18n import _LE, _LI
+from ceilometer.i18n import _LE, _LI, _LW
from ceilometer import utils
LOG = log.getLogger(__name__)
@@ -45,6 +46,18 @@ OPTS = [
cfg.CONF.register_opts(OPTS, group='coordination')
+class MemberNotInGroupError(Exception):
+ def __init__(self, group_id, members, my_id):
+ super(MemberNotInGroupError, self).__init__(_LE(
+ 'Group ID: %{group_id}s, Members: %{members}s, Me: %{me}s: '
+ 'Current agent is not part of group and cannot take tasks') %
+ {'group_id': group_id, 'members': members, 'me': my_id})
+
+
+def retry_on_member_not_in_group(exception):
+ return isinstance(exception, MemberNotInGroupError)
+
+
class PartitionCoordinator(object):
"""Workload partitioning coordinator.
@@ -154,7 +167,9 @@ class PartitionCoordinator(object):
except tooz.coordination.GroupNotCreated:
self.join_group(group_id)
- def extract_my_subset(self, group_id, iterable):
+ @retrying.retry(stop_max_attempt_number=5, wait_random_max=2000,
+ retry_on_exception=retry_on_member_not_in_group)
+ def extract_my_subset(self, group_id, iterable, attempt=0):
"""Filters an iterable, returning only objects assigned to this agent.
We have a list of objects and get a list of active group members from
@@ -168,6 +183,13 @@ class PartitionCoordinator(object):
try:
members = self._get_members(group_id)
LOG.debug('Members of group: %s, Me: %s', members, self._my_id)
+ if self._my_id not in members:
+ LOG.warning(_LW('Cannot extract tasks because agent failed to '
+ 'join group properly. Rejoining group.'))
+ self.join_group(group_id)
+ members = self._get_members(group_id)
+ if self._my_id not in members:
+ raise MemberNotInGroupError(group_id, members, self._my_id)
hr = utils.HashRing(members)
filtered = [v for v in iterable
if hr.get_node(str(v)) == self._my_id]