diff options
-rw-r--r-- | ironic/conductor/rpcapi.py | 11 | ||||
-rw-r--r-- | ironic/db/api.py | 7 | ||||
-rw-r--r-- | ironic/db/sqlalchemy/api.py | 12 | ||||
-rw-r--r-- | ironic/tests/unit/conductor/test_rpcapi.py | 14 | ||||
-rw-r--r-- | ironic/tests/unit/db/test_conductor.py | 17 |
5 files changed, 57 insertions, 4 deletions
diff --git a/ironic/conductor/rpcapi.py b/ironic/conductor/rpcapi.py index 8971360cb..6284e4034 100644 --- a/ironic/conductor/rpcapi.py +++ b/ironic/conductor/rpcapi.py @@ -29,6 +29,7 @@ from ironic.common import release_mappings as versions from ironic.common import rpc from ironic.conductor import manager from ironic.conf import CONF +from ironic.db import api as dbapi from ironic.objects import base as objects_base @@ -154,6 +155,16 @@ class ConductorAPI(object): hostname = self.get_conductor_for(node) return '%s.%s' % (self.topic, hostname) + def get_random_topic(self): + """Get an RPC topic for a random conductor service.""" + conductors = dbapi.get_instance().get_online_conductors() + try: + hostname = random.choice(conductors) + except IndexError: + # There are no conductors - return 503 Service Unavailable + raise exception.TemporaryFailure() + return '%s.%s' % (self.topic, hostname) + def get_topic_for_driver(self, driver_name): """Get RPC topic name for a conductor supporting the given driver. diff --git a/ironic/db/api.py b/ironic/db/api.py index e43dc55da..c21d7454d 100644 --- a/ironic/db/api.py +++ b/ironic/db/api.py @@ -556,6 +556,13 @@ class Connection(object): """ @abc.abstractmethod + def get_online_conductors(self): + """Get a list conductor hostnames that are online and active. + + :returns: A list of conductor hostnames. + """ + + @abc.abstractmethod def list_conductor_hardware_interfaces(self, conductor_id): """List all registered hardware interfaces for a conductor. diff --git a/ironic/db/sqlalchemy/api.py b/ironic/db/sqlalchemy/api.py index beaca9339..65c436e96 100644 --- a/ironic/db/sqlalchemy/api.py +++ b/ironic/db/sqlalchemy/api.py @@ -916,10 +916,14 @@ class Connection(api.Connection): def get_offline_conductors(self): interval = CONF.conductor.heartbeat_timeout limit = timeutils.utcnow() - datetime.timedelta(seconds=interval) - result = (model_query(models.Conductor).filter_by() - .filter(models.Conductor.updated_at < limit) - .all()) - return [row['hostname'] for row in result] + result = (model_query(models.Conductor.hostname) + .filter(models.Conductor.updated_at < limit)) + return [row[0] for row in result] + + def get_online_conductors(self): + query = model_query(models.Conductor.hostname) + query = _filter_active_conductors(query) + return [row[0] for row in query] def list_conductor_hardware_interfaces(self, conductor_id): query = (model_query(models.ConductorHardwareInterfaces) diff --git a/ironic/tests/unit/conductor/test_rpcapi.py b/ironic/tests/unit/conductor/test_rpcapi.py index 1290cc890..7c289369f 100644 --- a/ironic/tests/unit/conductor/test_rpcapi.py +++ b/ironic/tests/unit/conductor/test_rpcapi.py @@ -168,6 +168,20 @@ class RPCAPITestCase(db_base.DbTestCase): self.assertEqual(rpcapi.get_conductor_for(self.fake_node_obj), 'fake-host') + def test_get_random_topic(self): + CONF.set_override('host', 'fake-host') + self.dbapi.register_conductor({'hostname': 'fake-host', 'drivers': []}) + + rpcapi = conductor_rpcapi.ConductorAPI(topic='fake-topic') + expected_topic = 'fake-topic.fake-host' + self.assertEqual(expected_topic, rpcapi.get_random_topic()) + + def test_get_random_topic_no_conductors(self): + CONF.set_override('host', 'fake-host') + + rpcapi = conductor_rpcapi.ConductorAPI(topic='fake-topic') + self.assertRaises(exception.TemporaryFailure, rpcapi.get_random_topic) + def _test_can_send_create_port(self, can_send): rpcapi = conductor_rpcapi.ConductorAPI(topic='fake-topic') with mock.patch.object(rpcapi.client, diff --git a/ironic/tests/unit/db/test_conductor.py b/ironic/tests/unit/db/test_conductor.py index 23474b743..541ce25e9 100644 --- a/ironic/tests/unit/db/test_conductor.py +++ b/ironic/tests/unit/db/test_conductor.py @@ -336,6 +336,23 @@ class DbConductorTestCase(base.DbTestCase): self.assertEqual([c.hostname], self.dbapi.get_offline_conductors()) @mock.patch.object(timeutils, 'utcnow', autospec=True) + def test_get_online_conductors(self, mock_utcnow): + self.config(heartbeat_timeout=60, group='conductor') + time_ = datetime.datetime(2000, 1, 1, 0, 0) + + mock_utcnow.return_value = time_ + c = self._create_test_cdr() + + # Only 30 seconds passed since last heartbeat, it's still + # considered alive + mock_utcnow.return_value = time_ + datetime.timedelta(seconds=30) + self.assertEqual([c.hostname], self.dbapi.get_online_conductors()) + + # 61 seconds passed since last heartbeat, it's dead + mock_utcnow.return_value = time_ + datetime.timedelta(seconds=61) + self.assertEqual([], self.dbapi.get_online_conductors()) + + @mock.patch.object(timeutils, 'utcnow', autospec=True) def test_list_hardware_type_interfaces(self, mock_utcnow): self.config(heartbeat_timeout=60, group='conductor') time_ = datetime.datetime(2000, 1, 1, 0, 0) |