summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--ironic/conductor/rpcapi.py11
-rw-r--r--ironic/db/api.py7
-rw-r--r--ironic/db/sqlalchemy/api.py12
-rw-r--r--ironic/tests/unit/conductor/test_rpcapi.py14
-rw-r--r--ironic/tests/unit/db/test_conductor.py17
5 files changed, 57 insertions, 4 deletions
diff --git a/ironic/conductor/rpcapi.py b/ironic/conductor/rpcapi.py
index 8971360cb..6284e4034 100644
--- a/ironic/conductor/rpcapi.py
+++ b/ironic/conductor/rpcapi.py
@@ -29,6 +29,7 @@ from ironic.common import release_mappings as versions
from ironic.common import rpc
from ironic.conductor import manager
from ironic.conf import CONF
+from ironic.db import api as dbapi
from ironic.objects import base as objects_base
@@ -154,6 +155,16 @@ class ConductorAPI(object):
hostname = self.get_conductor_for(node)
return '%s.%s' % (self.topic, hostname)
+ def get_random_topic(self):
+ """Get an RPC topic for a random conductor service."""
+ conductors = dbapi.get_instance().get_online_conductors()
+ try:
+ hostname = random.choice(conductors)
+ except IndexError:
+ # There are no conductors - return 503 Service Unavailable
+ raise exception.TemporaryFailure()
+ return '%s.%s' % (self.topic, hostname)
+
def get_topic_for_driver(self, driver_name):
"""Get RPC topic name for a conductor supporting the given driver.
diff --git a/ironic/db/api.py b/ironic/db/api.py
index e43dc55da..c21d7454d 100644
--- a/ironic/db/api.py
+++ b/ironic/db/api.py
@@ -556,6 +556,13 @@ class Connection(object):
"""
@abc.abstractmethod
+ def get_online_conductors(self):
+ """Get a list conductor hostnames that are online and active.
+
+ :returns: A list of conductor hostnames.
+ """
+
+ @abc.abstractmethod
def list_conductor_hardware_interfaces(self, conductor_id):
"""List all registered hardware interfaces for a conductor.
diff --git a/ironic/db/sqlalchemy/api.py b/ironic/db/sqlalchemy/api.py
index beaca9339..65c436e96 100644
--- a/ironic/db/sqlalchemy/api.py
+++ b/ironic/db/sqlalchemy/api.py
@@ -916,10 +916,14 @@ class Connection(api.Connection):
def get_offline_conductors(self):
interval = CONF.conductor.heartbeat_timeout
limit = timeutils.utcnow() - datetime.timedelta(seconds=interval)
- result = (model_query(models.Conductor).filter_by()
- .filter(models.Conductor.updated_at < limit)
- .all())
- return [row['hostname'] for row in result]
+ result = (model_query(models.Conductor.hostname)
+ .filter(models.Conductor.updated_at < limit))
+ return [row[0] for row in result]
+
+ def get_online_conductors(self):
+ query = model_query(models.Conductor.hostname)
+ query = _filter_active_conductors(query)
+ return [row[0] for row in query]
def list_conductor_hardware_interfaces(self, conductor_id):
query = (model_query(models.ConductorHardwareInterfaces)
diff --git a/ironic/tests/unit/conductor/test_rpcapi.py b/ironic/tests/unit/conductor/test_rpcapi.py
index 1290cc890..7c289369f 100644
--- a/ironic/tests/unit/conductor/test_rpcapi.py
+++ b/ironic/tests/unit/conductor/test_rpcapi.py
@@ -168,6 +168,20 @@ class RPCAPITestCase(db_base.DbTestCase):
self.assertEqual(rpcapi.get_conductor_for(self.fake_node_obj),
'fake-host')
+ def test_get_random_topic(self):
+ CONF.set_override('host', 'fake-host')
+ self.dbapi.register_conductor({'hostname': 'fake-host', 'drivers': []})
+
+ rpcapi = conductor_rpcapi.ConductorAPI(topic='fake-topic')
+ expected_topic = 'fake-topic.fake-host'
+ self.assertEqual(expected_topic, rpcapi.get_random_topic())
+
+ def test_get_random_topic_no_conductors(self):
+ CONF.set_override('host', 'fake-host')
+
+ rpcapi = conductor_rpcapi.ConductorAPI(topic='fake-topic')
+ self.assertRaises(exception.TemporaryFailure, rpcapi.get_random_topic)
+
def _test_can_send_create_port(self, can_send):
rpcapi = conductor_rpcapi.ConductorAPI(topic='fake-topic')
with mock.patch.object(rpcapi.client,
diff --git a/ironic/tests/unit/db/test_conductor.py b/ironic/tests/unit/db/test_conductor.py
index 23474b743..541ce25e9 100644
--- a/ironic/tests/unit/db/test_conductor.py
+++ b/ironic/tests/unit/db/test_conductor.py
@@ -336,6 +336,23 @@ class DbConductorTestCase(base.DbTestCase):
self.assertEqual([c.hostname], self.dbapi.get_offline_conductors())
@mock.patch.object(timeutils, 'utcnow', autospec=True)
+ def test_get_online_conductors(self, mock_utcnow):
+ self.config(heartbeat_timeout=60, group='conductor')
+ time_ = datetime.datetime(2000, 1, 1, 0, 0)
+
+ mock_utcnow.return_value = time_
+ c = self._create_test_cdr()
+
+ # Only 30 seconds passed since last heartbeat, it's still
+ # considered alive
+ mock_utcnow.return_value = time_ + datetime.timedelta(seconds=30)
+ self.assertEqual([c.hostname], self.dbapi.get_online_conductors())
+
+ # 61 seconds passed since last heartbeat, it's dead
+ mock_utcnow.return_value = time_ + datetime.timedelta(seconds=61)
+ self.assertEqual([], self.dbapi.get_online_conductors())
+
+ @mock.patch.object(timeutils, 'utcnow', autospec=True)
def test_list_hardware_type_interfaces(self, mock_utcnow):
self.config(heartbeat_timeout=60, group='conductor')
time_ = datetime.datetime(2000, 1, 1, 0, 0)