summaryrefslogtreecommitdiff
path: root/ironic/conductor
diff options
context:
space:
mode:
authorJim Rollenhagen <jim@jimrollenhagen.com>2018-07-02 20:44:32 +0000
committerJim Rollenhagen <jim@jimrollenhagen.com>2018-07-23 20:51:31 +0000
commit26fd55f7da2860d7fa71c8bbeabde72e0a74049a (patch)
tree4758924266ce0b228aac8c5eb497b595f2861260 /ironic/conductor
parent3b7e7fb3fcb24726b94d1bf1c44dc710fde0e996 (diff)
downloadironic-26fd55f7da2860d7fa71c8bbeabde72e0a74049a.tar.gz
Use conductor group for hash ring calculations
This changes the calculation for keys in the hash ring manager to be of the form "<conductor_group>:<driver>", instead of just driver. This is used when the RPC version pin is 1.47 or greater (1.47 was created to handle this). When finding an RPC topic, we use the conductor group marked on the node as part of this calculation. However, this becomes a problem when we don't have a node that we're looking up a topic for. In this case we look for a conductor in any group which has the driver loaded, and use a temporary hash ring that does not use conductor groups to find a conductor. This also begins the API work, as the API must be aware of the new hash ring calculation. However, exposing the conductor_group field and adding a microversion is left for a future patch. Story: 2001795 Task: 22641 Change-Id: Iaf71348666b683518fc6ce4769112459d98938f2
Diffstat (limited to 'ironic/conductor')
-rw-r--r--ironic/conductor/base_manager.py37
-rw-r--r--ironic/conductor/manager.py14
-rw-r--r--ironic/conductor/rpcapi.py22
3 files changed, 49 insertions, 24 deletions
diff --git a/ironic/conductor/base_manager.py b/ironic/conductor/base_manager.py
index 2fe5217f7..081927904 100644
--- a/ironic/conductor/base_manager.py
+++ b/ironic/conductor/base_manager.py
@@ -22,6 +22,7 @@ from futurist import rejection
from oslo_db import exception as db_exception
from oslo_log import log
from oslo_utils import excutils
+from oslo_utils import versionutils
import six
from ironic.common import context as ironic_context
@@ -29,6 +30,7 @@ from ironic.common import driver_factory
from ironic.common import exception
from ironic.common import hash_ring
from ironic.common.i18n import _
+from ironic.common import release_mappings as versions
from ironic.common import rpc
from ironic.common import states
from ironic.conductor import notification_utils as notify_utils
@@ -107,7 +109,10 @@ class BaseConductorManager(object):
check_and_reject=rejection_func)
"""Executor for performing tasks async."""
- self.ring_manager = hash_ring.HashRingManager()
+ # TODO(jroll) delete the use_groups argument and use the default
+ # in Stein.
+ self.ring_manager = hash_ring.HashRingManager(
+ use_groups=self._use_groups())
"""Consistent hash ring which maps drivers to conductors."""
# TODO(dtantsur): remove in Stein
@@ -217,6 +222,14 @@ class BaseConductorManager(object):
self._started = True
+ def _use_groups(self):
+ release_ver = versions.RELEASE_MAPPING.get(CONF.pin_release_version)
+ # NOTE(jroll) self.RPC_API_VERSION is actually defined in a subclass,
+ # but we only use this class from there.
+ version_cap = (release_ver['rpc'] if release_ver
+ else self.RPC_API_VERSION)
+ return versionutils.is_compatible('1.47', version_cap)
+
def _fail_transient_state(self, state, last_error):
"""Apply "fail" transition to nodes in a transient state.
@@ -362,21 +375,23 @@ class BaseConductorManager(object):
Requests node set from and filters out nodes that are not
mapped to this conductor.
- Yields tuples (node_uuid, driver, ...) where ... is derived from
- fields argument, e.g.: fields=None means yielding ('uuid', 'driver'),
- fields=['foo'] means yielding ('uuid', 'driver', 'foo').
+ Yields tuples (node_uuid, driver, conductor_group, ...) where ... is
+ derived from fields argument, e.g.: fields=None means yielding ('uuid',
+ 'driver', 'conductor_group'), fields=['foo'] means yielding ('uuid',
+ 'driver', 'conductor_group', 'foo').
- :param fields: list of fields to fetch in addition to uuid and driver
+ :param fields: list of fields to fetch in addition to uuid, driver,
+ and conductor_group
:param kwargs: additional arguments to pass to dbapi when looking for
nodes
:return: generator yielding tuples of requested fields
"""
- columns = ['uuid', 'driver'] + list(fields or ())
+ columns = ['uuid', 'driver', 'conductor_group'] + list(fields or ())
node_list = self.dbapi.get_nodeinfo_list(columns=columns, **kwargs)
for result in node_list:
if self._shutdown:
break
- if self._mapped_to_this_conductor(*result[:2]):
+ if self._mapped_to_this_conductor(*result[:3]):
yield result
def _spawn_worker(self, func, *args, **kwargs):
@@ -407,7 +422,7 @@ class BaseConductorManager(object):
{'err': e})
self._keepalive_evt.wait(CONF.conductor.heartbeat_interval)
- def _mapped_to_this_conductor(self, node_uuid, driver):
+ def _mapped_to_this_conductor(self, node_uuid, driver, conductor_group):
"""Check that node is mapped to this conductor.
Note that because mappings are eventually consistent, it is possible
@@ -416,7 +431,7 @@ class BaseConductorManager(object):
take out a lock.
"""
try:
- ring = self.ring_manager[driver]
+ ring = self.ring_manager.get_ring(driver, conductor_group)
except exception.DriverNotFound:
return False
@@ -468,7 +483,7 @@ class BaseConductorManager(object):
sort_dir='asc')
workers_count = 0
- for node_uuid, driver in node_iter:
+ for node_uuid, driver, conductor_group in node_iter:
try:
with task_manager.acquire(context, node_uuid,
purpose='node state check') as task:
@@ -507,7 +522,7 @@ class BaseConductorManager(object):
node_iter = self.iter_nodes(filters=filters)
- for node_uuid, driver in node_iter:
+ for node_uuid, driver, conductor_group in node_iter:
try:
with task_manager.acquire(context, node_uuid, shared=False,
purpose='start console') as task:
diff --git a/ironic/conductor/manager.py b/ironic/conductor/manager.py
index f760848e0..1497201f4 100644
--- a/ironic/conductor/manager.py
+++ b/ironic/conductor/manager.py
@@ -100,7 +100,7 @@ class ConductorManager(base_manager.BaseConductorManager):
# NOTE(rloo): This must be in sync with rpcapi.ConductorAPI's.
# NOTE(pas-ha): This also must be in sync with
# ironic.common.release_mappings.RELEASE_MAPPING['master']
- RPC_API_VERSION = '1.46'
+ RPC_API_VERSION = '1.47'
target = messaging.Target(version=RPC_API_VERSION)
@@ -1596,7 +1596,7 @@ class ConductorManager(base_manager.BaseConductorManager):
filters = {'maintenance': False}
node_iter = self.iter_nodes(fields=['id'], filters=filters)
- for (node_uuid, driver, node_id) in node_iter:
+ for (node_uuid, driver, conductor_group, node_id) in node_iter:
try:
# NOTE(dtantsur): start with a shared lock, upgrade if needed
with task_manager.acquire(context, node_uuid,
@@ -1685,7 +1685,7 @@ class ConductorManager(base_manager.BaseConductorManager):
filters = {'maintenance': True,
'fault': faults.POWER_FAILURE}
node_iter = self.iter_nodes(fields=['id'], filters=filters)
- for (node_uuid, driver, node_id) in node_iter:
+ for (node_uuid, driver, conductor_group, node_id) in node_iter:
try:
with task_manager.acquire(context, node_uuid,
purpose='power failure recovery',
@@ -1777,7 +1777,7 @@ class ConductorManager(base_manager.BaseConductorManager):
state_cleanup_required = []
- for (node_uuid, driver, node_id, conductor_hostname,
+ for (node_uuid, driver, conductor_group, node_id, conductor_hostname,
maintenance, provision_state, target_power_state) in node_iter:
# NOTE(lucasagomes): Although very rare, this may lead to a
# race condition. By the time we release the lock the conductor
@@ -1994,7 +1994,8 @@ class ConductorManager(base_manager.BaseConductorManager):
filters=filters)
workers_count = 0
- for node_uuid, driver, node_id, conductor_affinity in node_iter:
+ for (node_uuid, driver, conductor_group, node_id,
+ conductor_affinity) in node_iter:
if conductor_affinity == self.conductor.id:
continue
@@ -2661,7 +2662,8 @@ class ConductorManager(base_manager.BaseConductorManager):
"""Sends sensors data for nodes from synchronized queue."""
while not self._shutdown:
try:
- node_uuid, driver, instance_uuid = nodes.get_nowait()
+ (node_uuid, driver, conductor_group,
+ instance_uuid) = nodes.get_nowait()
except queue.Empty:
break
# populate the message which will be sent to ceilometer
diff --git a/ironic/conductor/rpcapi.py b/ironic/conductor/rpcapi.py
index b644d8566..95f6445a5 100644
--- a/ironic/conductor/rpcapi.py
+++ b/ironic/conductor/rpcapi.py
@@ -95,13 +95,14 @@ class ConductorAPI(object):
| 1.44 - Added add_node_traits and remove_node_traits.
| 1.45 - Added continue_node_deploy
| 1.46 - Added reset_interfaces to update_node
+ | 1.47 - Added support for conductor groups
"""
# NOTE(rloo): This must be in sync with manager.ConductorManager's.
# NOTE(pas-ha): This also must be in sync with
# ironic.common.release_mappings.RELEASE_MAPPING['master']
- RPC_API_VERSION = '1.46'
+ RPC_API_VERSION = '1.47'
def __init__(self, topic=None):
super(ConductorAPI, self).__init__()
@@ -117,8 +118,10 @@ class ConductorAPI(object):
else self.RPC_API_VERSION)
self.client = rpc.get_client(target, version_cap=version_cap,
serializer=serializer)
+
+ use_groups = self.client.can_send_version('1.47')
# NOTE(deva): this is going to be buggy
- self.ring_manager = hash_ring.HashRingManager()
+ self.ring_manager = hash_ring.HashRingManager(use_groups=use_groups)
def get_topic_for(self, node):
"""Get the RPC topic for the conductor service the node is mapped to.
@@ -135,13 +138,15 @@ class ConductorAPI(object):
raise exception.TemporaryFailure()
try:
- ring = self.ring_manager[node.driver]
+ ring = self.ring_manager.get_ring(node.driver,
+ node.conductor_group)
dest = ring.get_nodes(node.uuid.encode('utf-8'),
replicas=CONF.hash_distribution_replicas)
return '%s.%s' % (self.topic, dest.pop())
except exception.DriverNotFound:
reason = (_('No conductor service registered which supports '
- 'driver %s.') % node.driver)
+ 'driver %(driver)s for conductor group "%(group)s".') %
+ {'driver': node.driver, 'group': node.conductor_group})
raise exception.NoValidHost(reason=reason)
def get_topic_for_driver(self, driver_name):
@@ -156,9 +161,12 @@ class ConductorAPI(object):
:raises: DriverNotFound
"""
- self.ring_manager.reset()
-
- ring = self.ring_manager[driver_name]
+ # NOTE(jroll) we want to be able to route this to any conductor,
+ # regardless of groupings. We use a fresh, uncached hash ring that
+ # does not take groups into account.
+ local_ring_manager = hash_ring.HashRingManager(use_groups=False,
+ cache=False)
+ ring = local_ring_manager.get_ring(driver_name, '')
host = random.choice(list(ring.nodes))
return self.topic + "." + host