diff options
author | Zuul <zuul@review.opendev.org> | 2020-03-15 11:17:15 +0000 |
---|---|---|
committer | Gerrit Code Review <review@openstack.org> | 2020-03-15 11:17:15 +0000 |
commit | 46dc595f21f758b31efeedcbbfe17bc7f23485d2 (patch) | |
tree | 176796b43a2fc87c957fdd1dfaea1adf15e431f3 | |
parent | 2a05203c696e9d0f8e57c29e788dd774071964a6 (diff) | |
parent | 3fccc25bf661755b6f71f7e839938c214480bd5f (diff) | |
download | designate-46dc595f21f758b31efeedcbbfe17bc7f23485d2.tar.gz |
Merge "Re-factored Heartbeat implementation"
-rw-r--r-- | designate/api/wsgi.py | 4 | ||||
-rw-r--r-- | designate/cmd/agent.py | 3 | ||||
-rw-r--r-- | designate/cmd/api.py | 3 | ||||
-rw-r--r-- | designate/cmd/central.py | 4 | ||||
-rw-r--r-- | designate/cmd/mdns.py | 3 | ||||
-rw-r--r-- | designate/cmd/producer.py | 3 | ||||
-rw-r--r-- | designate/cmd/sink.py | 3 | ||||
-rw-r--r-- | designate/cmd/worker.py | 3 | ||||
-rw-r--r-- | designate/heartbeat_emitter.py (renamed from designate/service_status.py) | 73 | ||||
-rw-r--r-- | designate/service.py | 27 | ||||
-rw-r--r-- | designate/tests/__init__.py | 2 | ||||
-rw-r--r-- | designate/tests/unit/test_heartbeat.py | 55 | ||||
-rw-r--r-- | designate/tests/unit/test_heartbeat_emitter.py | 116 | ||||
-rw-r--r-- | designate/tests/unit/test_service_status.py | 119 | ||||
-rw-r--r-- | setup.cfg | 4 |
15 files changed, 176 insertions, 246 deletions
diff --git a/designate/api/wsgi.py b/designate/api/wsgi.py index 0103c9aa..4d2e41ec 100644 --- a/designate/api/wsgi.py +++ b/designate/api/wsgi.py @@ -18,9 +18,9 @@ from oslo_log import log as logging from paste import deploy from designate import conf +from designate import heartbeat_emitter from designate import policy from designate import rpc -from designate import service from designate.common import config CONF = conf.CONF @@ -47,7 +47,7 @@ def init_application(): if not rpc.initialized(): rpc.init(CONF) - heartbeat = service.Heartbeat('api') + heartbeat = heartbeat_emitter.get_heartbeat_emitter('api') heartbeat.start() conf = conf_files[0] diff --git a/designate/cmd/agent.py b/designate/cmd/agent.py index edc7f46d..48b661b7 100644 --- a/designate/cmd/agent.py +++ b/designate/cmd/agent.py @@ -19,6 +19,7 @@ from oslo_log import log as logging from oslo_reports import guru_meditation_report as gmr import designate.conf +from designate import heartbeat_emitter from designate import hookpoints from designate import service from designate import utils @@ -38,7 +39,7 @@ def main(): hookpoints.log_hook_setup() server = agent_service.Service() - heartbeat = service.Heartbeat(server.service_name) + heartbeat = heartbeat_emitter.get_heartbeat_emitter(server.service_name) service.serve(server, workers=CONF['service:agent'].workers) heartbeat.start() service.wait() diff --git a/designate/cmd/api.py b/designate/cmd/api.py index 61b07911..001a1b64 100644 --- a/designate/cmd/api.py +++ b/designate/cmd/api.py @@ -20,6 +20,7 @@ from oslo_log import log as logging from oslo_reports import guru_meditation_report as gmr import designate.conf +from designate import heartbeat_emitter from designate import hookpoints from designate import service from designate import utils @@ -40,7 +41,7 @@ def main(): hookpoints.log_hook_setup() server = api_service.Service() - heartbeat = service.Heartbeat(server.service_name) + heartbeat = heartbeat_emitter.get_heartbeat_emitter(server.service_name) service.serve(server, workers=CONF['service:api'].workers) heartbeat.start() service.wait() diff --git a/designate/cmd/central.py b/designate/cmd/central.py index 6ae0524a..32ba370e 100644 --- a/designate/cmd/central.py +++ b/designate/cmd/central.py @@ -19,6 +19,7 @@ from oslo_log import log as logging from oslo_reports import guru_meditation_report as gmr import designate.conf +from designate import heartbeat_emitter from designate import hookpoints from designate import service from designate import utils @@ -38,7 +39,8 @@ def main(): hookpoints.log_hook_setup() server = central_service.Service() - heartbeat = service.Heartbeat(server.service_name, rpc_api=server) + heartbeat = heartbeat_emitter.get_heartbeat_emitter(server.service_name, + rpc_api=server) service.serve(server, workers=CONF['service:central'].workers) heartbeat.start() service.wait() diff --git a/designate/cmd/mdns.py b/designate/cmd/mdns.py index b8993d9c..5aed6335 100644 --- a/designate/cmd/mdns.py +++ b/designate/cmd/mdns.py @@ -19,6 +19,7 @@ from oslo_log import log as logging from oslo_reports import guru_meditation_report as gmr import designate.conf +from designate import heartbeat_emitter from designate import hookpoints from designate import service from designate import utils @@ -38,7 +39,7 @@ def main(): hookpoints.log_hook_setup() server = mdns_service.Service() - heartbeat = service.Heartbeat(server.service_name) + heartbeat = heartbeat_emitter.get_heartbeat_emitter(server.service_name) service.serve(server, workers=CONF['service:mdns'].workers) heartbeat.start() service.wait() diff --git a/designate/cmd/producer.py b/designate/cmd/producer.py index 89808f27..5f79b7ae 100644 --- a/designate/cmd/producer.py +++ b/designate/cmd/producer.py @@ -19,6 +19,7 @@ from oslo_log import log as logging from oslo_reports import guru_meditation_report as gmr import designate.conf +from designate import heartbeat_emitter from designate import hookpoints from designate import service from designate import utils @@ -38,7 +39,7 @@ def main(): hookpoints.log_hook_setup() server = producer_service.Service() - heartbeat = service.Heartbeat(server.service_name) + heartbeat = heartbeat_emitter.get_heartbeat_emitter(server.service_name) service.serve(server, workers=CONF['service:producer'].workers) heartbeat.start() service.wait() diff --git a/designate/cmd/sink.py b/designate/cmd/sink.py index 30725712..6f1d1e8a 100644 --- a/designate/cmd/sink.py +++ b/designate/cmd/sink.py @@ -19,6 +19,7 @@ from oslo_log import log as logging from oslo_reports import guru_meditation_report as gmr import designate.conf +from designate import heartbeat_emitter from designate import hookpoints from designate import service from designate import utils @@ -38,7 +39,7 @@ def main(): hookpoints.log_hook_setup() server = sink_service.Service() - heartbeat = service.Heartbeat(server.service_name) + heartbeat = heartbeat_emitter.get_heartbeat_emitter(server.service_name) service.serve(server, workers=CONF['service:sink'].workers) heartbeat.start() service.wait() diff --git a/designate/cmd/worker.py b/designate/cmd/worker.py index 2e0f07af..fe0eca1a 100644 --- a/designate/cmd/worker.py +++ b/designate/cmd/worker.py @@ -19,6 +19,7 @@ from oslo_log import log as logging from oslo_reports import guru_meditation_report as gmr import designate.conf +from designate import heartbeat_emitter from designate import hookpoints from designate import service from designate import utils @@ -38,7 +39,7 @@ def main(): hookpoints.log_hook_setup() server = worker_service.Service() - heartbeat = service.Heartbeat(server.service_name) + heartbeat = heartbeat_emitter.get_heartbeat_emitter(server.service_name) service.serve(server, workers=CONF['service:worker'].workers) heartbeat.start() service.wait() diff --git a/designate/service_status.py b/designate/heartbeat_emitter.py index 22d4410e..fbcbf1c8 100644 --- a/designate/service_status.py +++ b/designate/heartbeat_emitter.py @@ -28,35 +28,55 @@ CONF = designate.conf.CONF LOG = logging.getLogger(__name__) -class HeartBeatEmitter(plugin.DriverPlugin): +def get_heartbeat_emitter(service_name, **kwargs): + cls = HeartbeatEmitter.get_driver( + CONF.heartbeat_emitter.emitter_type + ) + return cls(service_name, **kwargs) + + +class HeartbeatEmitter(plugin.DriverPlugin): __plugin_ns__ = 'designate.heartbeat_emitter' __plugin_type__ = 'heartbeat_emitter' - def __init__(self, service, status_factory=None, *args, **kwargs): - super(HeartBeatEmitter, self).__init__() + def __init__(self, service_name, **kwargs): + super(HeartbeatEmitter, self).__init__() - self._service = service + self._status = 'UP' + self._stats = {} + self._capabilities = {} + + self._service_name = service_name self._hostname = CONF.host self._timer = loopingcall.FixedIntervalLoopingCall( self._emit_heartbeat ) - self._status_factory = status_factory - def _get_status(self): - if self._status_factory is not None: - return self._status_factory() + def start(self): + self._timer.start( + CONF.heartbeat_emitter.heartbeat_interval, + stop_on_exception=False + ) + + def stop(self): + self._timer.stop() + + def get_status(self): + return self._status, self._stats, self._capabilities - return True, {}, {} + @abc.abstractmethod + def transmit(self, status): + pass def _emit_heartbeat(self): """ Returns Status, Stats, Capabilities """ - status, stats, capabilities = self._get_status() + status, stats, capabilities = self.get_status() service_status = objects.ServiceStatus( - service_name=self._service, + service_name=self._service_name, hostname=self._hostname, status=status, stats=stats, @@ -64,39 +84,26 @@ class HeartBeatEmitter(plugin.DriverPlugin): heartbeated_at=timeutils.utcnow() ) - LOG.trace("Emitting %s", service_status) - - self._transmit(service_status) + LOG.trace('Emitting %s', service_status) - @abc.abstractmethod - def _transmit(self, status): - pass - - def start(self): - self._timer.start( - CONF.heartbeat_emitter.heartbeat_interval, - stop_on_exception=False - ) - - def stop(self): - self._timer.stop() + self.transmit(service_status) -class NoopEmitter(HeartBeatEmitter): +class NoopEmitter(HeartbeatEmitter): __plugin_name__ = 'noop' - def _transmit(self, status): - LOG.debug(status) + def transmit(self, status): + LOG.info(status) -class RpcEmitter(HeartBeatEmitter): +class RpcEmitter(HeartbeatEmitter): __plugin_name__ = 'rpc' - def __init__(self, service, rpc_api=None, *args, **kwargs): - super(RpcEmitter, self).__init__(service, *args, **kwargs) + def __init__(self, service_name, rpc_api=None, **kwargs): + super(RpcEmitter, self).__init__(service_name, **kwargs) self.rpc_api = rpc_api - def _transmit(self, status): + def transmit(self, status): admin_context = context.DesignateContext.get_admin_context() api = self.rpc_api or central_rpcapi.CentralAPI.get_instance() api.update_service_status(admin_context, status) diff --git a/designate/service.py b/designate/service.py index 22f92532..9f5c68b3 100644 --- a/designate/service.py +++ b/designate/service.py @@ -32,7 +32,6 @@ from oslo_utils import netutils from designate import policy from designate import rpc -from designate import service_status from designate import utils from designate import version import designate.conf @@ -68,32 +67,6 @@ class Service(service.Service): super(Service, self).stop(graceful) -class Heartbeat(object): - def __init__(self, name, rpc_api=None): - self.name = name - - self._status = 'UP' - self._stats = {} - self._capabilities = {} - - emitter_cls = service_status.HeartBeatEmitter.get_driver( - CONF.heartbeat_emitter.emitter_type - ) - self.heartbeat_emitter = emitter_cls( - self.name, - status_factory=self.get_status, rpc_api=rpc_api - ) - - def get_status(self): - return self._status, self._stats, self._capabilities - - def start(self): - self.heartbeat_emitter.start() - - def stop(self): - self.heartbeat_emitter.stop() - - class RPCService(Service): def __init__(self, name, rpc_topic, threads=None): super(RPCService, self).__init__(name, threads) diff --git a/designate/tests/__init__.py b/designate/tests/__init__.py index 3ed5412e..67fc3a49 100644 --- a/designate/tests/__init__.py +++ b/designate/tests/__init__.py @@ -50,7 +50,7 @@ CONF.import_opt('auth_strategy', 'designate.api', group='service:api') CONF.import_opt('connection', 'designate.storage.impl_sqlalchemy', group='storage:sqlalchemy') -CONF.import_opt('emitter_type', 'designate.service_status', +CONF.import_opt('emitter_type', 'designate.heartbeat_emitter', group="heartbeat_emitter") CONF.import_opt('scheduler_filters', 'designate.scheduler', group="service:central") diff --git a/designate/tests/unit/test_heartbeat.py b/designate/tests/unit/test_heartbeat.py deleted file mode 100644 index a531d5b0..00000000 --- a/designate/tests/unit/test_heartbeat.py +++ /dev/null @@ -1,55 +0,0 @@ -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -import mock -import oslotest.base -from oslo_config import cfg -from oslo_config import fixture as cfg_fixture -from oslo_service import loopingcall - -from designate import service - -CONF = cfg.CONF - - -class HeartbeatTest(oslotest.base.BaseTestCase): - @mock.patch.object(loopingcall, 'FixedIntervalLoopingCall') - def setUp(self, mock_looping): - super(HeartbeatTest, self).setUp() - - self.mock_timer = mock.Mock() - mock_looping.return_value = self.mock_timer - - self.useFixture(cfg_fixture.Config(CONF)) - - CONF.set_override('emitter_type', 'noop', 'heartbeat_emitter') - - self.heartbeat = service.Heartbeat('test') - - def test_get_status(self): - self.assertEqual(('UP', {}, {},), self.heartbeat.get_status()) - - def test_get_heartbeat_emitter(self): - self.assertEqual( - 'noop', self.heartbeat.heartbeat_emitter.__plugin_name__ - ) - - def test_start_heartbeat(self): - self.heartbeat.start() - - self.mock_timer.start.assert_called_once() - - def test_stop_heartbeat(self): - - self.heartbeat.start() - self.heartbeat.stop() - - self.mock_timer.stop.assert_called_once() diff --git a/designate/tests/unit/test_heartbeat_emitter.py b/designate/tests/unit/test_heartbeat_emitter.py new file mode 100644 index 00000000..8b9ba457 --- /dev/null +++ b/designate/tests/unit/test_heartbeat_emitter.py @@ -0,0 +1,116 @@ +# Copyright 2016 Hewlett Packard Enterprise Development Company LP +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +import mock +import oslotest.base +from oslo_config import cfg +from oslo_config import fixture as cfg_fixture +from oslo_service import loopingcall +import time + +from designate import heartbeat_emitter +from designate import objects +from designate.tests import fixtures + +CONF = cfg.CONF + + +class HeartbeatEmitterTest(oslotest.base.BaseTestCase): + def setUp(self): + super(HeartbeatEmitterTest, self).setUp() + self.stdlog = fixtures.StandardLogging() + self.useFixture(self.stdlog) + self.useFixture(cfg_fixture.Config(CONF)) + + CONF.set_override('emitter_type', 'noop', 'heartbeat_emitter') + CONF.set_override('heartbeat_interval', 0.1, 'heartbeat_emitter') + CONF.set_override('host', 'localhost') + + @mock.patch.object(loopingcall, 'FixedIntervalLoopingCall') + def test_start(self, mock_looping): + mock_timer = mock.Mock() + mock_looping.return_value = mock_timer + + noop_emitter = heartbeat_emitter.get_heartbeat_emitter('svc') + + noop_emitter.start() + + mock_timer.start.assert_called_once() + + @mock.patch.object(loopingcall, 'FixedIntervalLoopingCall') + def test_stop(self, mock_looping): + mock_timer = mock.Mock() + mock_looping.return_value = mock_timer + + noop_emitter = heartbeat_emitter.get_heartbeat_emitter('svc') + + noop_emitter.start() + noop_emitter.stop() + + mock_timer.stop.assert_called_once() + + def test_get_status(self): + noop_emitter = heartbeat_emitter.get_heartbeat_emitter('svc') + + self.assertEqual(('UP', {}, {},), noop_emitter.get_status()) + + def test_emit(self): + noop_emitter = heartbeat_emitter.get_heartbeat_emitter('svc') + + noop_emitter.start() + + time.sleep(0.125) + + noop_emitter.stop() + + self.assertIn( + "<ServiceStatus service_name:'svc' hostname:'localhost' " + "status:'UP'>", + self.stdlog.logger.output + ) + + +class RpcEmitterTest(oslotest.base.BaseTestCase): + def setUp(self): + super(RpcEmitterTest, self).setUp() + + @mock.patch.object(objects, 'ServiceStatus') + @mock.patch('designate.context.DesignateContext.get_admin_context') + @mock.patch.object(loopingcall, 'FixedIntervalLoopingCall') + def test_emit_heartbeat(self, mock_looping, mock_context, + mock_service_status): + mock_timer = mock.Mock() + mock_looping.return_value = mock_timer + + emitter = heartbeat_emitter.RpcEmitter('svc') + emitter.start() + + mock_timer.start.assert_called_once() + + central = mock.Mock() + with mock.patch('designate.central.rpcapi.CentralAPI.get_instance', + return_value=central): + emitter._emit_heartbeat() + + mock_service_status.assert_called_once_with( + service_name='svc', + hostname=cfg.CONF.host, + status='UP', + stats={}, + capabilities={}, + heartbeated_at=mock.ANY + ) + + central.update_service_status.assert_called_once_with( + mock_context.return_value, mock_service_status.return_value + ) diff --git a/designate/tests/unit/test_service_status.py b/designate/tests/unit/test_service_status.py deleted file mode 100644 index c7827289..00000000 --- a/designate/tests/unit/test_service_status.py +++ /dev/null @@ -1,119 +0,0 @@ -# Copyright 2016 Hewlett Packard Enterprise Development Company LP -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -import mock -import oslotest.base -from oslo_config import cfg -from oslo_service import loopingcall - -from designate import objects -from designate import service_status - - -class NoopEmitterTest(oslotest.base.BaseTestCase): - def setUp(self): - super(NoopEmitterTest, self).setUp() - - @mock.patch.object(loopingcall, 'FixedIntervalLoopingCall') - def test_start(self, mock_looping): - mock_timer = mock.Mock() - mock_looping.return_value = mock_timer - - emitter = service_status.NoopEmitter("svc") - emitter.start() - - mock_timer.start.assert_called_once() - - @mock.patch.object(loopingcall, 'FixedIntervalLoopingCall') - def test_stop(self, mock_looping): - mock_timer = mock.Mock() - mock_looping.return_value = mock_timer - - emitter = service_status.NoopEmitter("svc") - emitter.start() - emitter.stop() - - mock_timer.stop.assert_called_once() - - -class RpcEmitterTest(oslotest.base.BaseTestCase): - def setUp(self): - super(RpcEmitterTest, self).setUp() - - @mock.patch.object(objects, "ServiceStatus") - @mock.patch("designate.context.DesignateContext.get_admin_context") - @mock.patch.object(loopingcall, 'FixedIntervalLoopingCall') - def test_emit_no_status_factory(self, mock_looping, mock_context, - mock_service_status): - mock_timer = mock.Mock() - mock_looping.return_value = mock_timer - - emitter = service_status.RpcEmitter("svc") - emitter.start() - - mock_timer.start.assert_called_once() - - central = mock.Mock() - with mock.patch("designate.central.rpcapi.CentralAPI.get_instance", - return_value=central): - emitter._emit_heartbeat() - - mock_service_status.assert_called_once_with( - service_name="svc", - hostname=cfg.CONF.host, - status=True, - stats={}, - capabilities={}, - heartbeated_at=mock.ANY - ) - - central.update_service_status.assert_called_once_with( - mock_context.return_value, mock_service_status.return_value - ) - - @mock.patch.object(objects, "ServiceStatus") - @mock.patch("designate.context.DesignateContext.get_admin_context") - @mock.patch.object(loopingcall, 'FixedIntervalLoopingCall') - def test_emit_status_factory(self, mock_looping, mock_context, - mock_service_status): - mock_timer = mock.Mock() - mock_looping.return_value = mock_timer - - status = False - stats = {"a": 1} - capabilities = {"b": 2} - - status_factory = mock.Mock(return_value=(status, stats, capabilities,)) - emitter = service_status.RpcEmitter("svc", - status_factory=status_factory) - emitter.start() - - mock_timer.start.assert_called_once() - - central = mock.Mock() - with mock.patch("designate.central.rpcapi.CentralAPI.get_instance", - return_value=central): - emitter._emit_heartbeat() - - mock_service_status.assert_called_once_with( - service_name="svc", - hostname=cfg.CONF.host, - status=status, - stats=stats, - capabilities=capabilities, - heartbeated_at=mock.ANY - ) - - central.update_service_status.assert_called_once_with( - mock_context.return_value, mock_service_status.return_value - ) @@ -119,8 +119,8 @@ designate.producer_tasks = worker_periodic_recovery = designate.producer.tasks:WorkerPeriodicRecovery designate.heartbeat_emitter = - noop = designate.service_status:NoopEmitter - rpc = designate.service_status:RpcEmitter + noop = designate.heartbeat_emitter:NoopEmitter + rpc = designate.heartbeat_emitter:RpcEmitter designate.notification.plugin = default = designate.notifications:Default |