summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorErik Olof Gunnar Andersson <eandersson@blizzard.com>2019-12-27 20:57:43 -0800
committerErik Olof Gunnar Andersson <eandersson@blizzard.com>2020-03-09 03:15:21 +0000
commit3fccc25bf661755b6f71f7e839938c214480bd5f (patch)
tree798e2baf63e0dd30ddce7d05e192e753c50a4155
parent21f94dea86abecd0f238f01c4d39d2fafb84d832 (diff)
downloaddesignate-3fccc25bf661755b6f71f7e839938c214480bd5f.tar.gz
Re-factored Heartbeat implementation
The use of this code path is very limited at the moment, and this patch further simplifies the code to make it easier to either deprecate or extend it in the future. * Renamed service_status.py -> heartbeat_emitter.py. * Merged HeartbeatEmitter and Heartbeat. Change-Id: I68a4a49dd3e6bbc123c3ed5d3d3661aa08ad0bbb
-rw-r--r--designate/api/wsgi.py4
-rw-r--r--designate/cmd/agent.py3
-rw-r--r--designate/cmd/api.py3
-rw-r--r--designate/cmd/central.py4
-rw-r--r--designate/cmd/mdns.py3
-rw-r--r--designate/cmd/producer.py3
-rw-r--r--designate/cmd/sink.py3
-rw-r--r--designate/cmd/worker.py3
-rw-r--r--designate/heartbeat_emitter.py (renamed from designate/service_status.py)73
-rw-r--r--designate/service.py27
-rw-r--r--designate/tests/__init__.py2
-rw-r--r--designate/tests/unit/test_heartbeat.py55
-rw-r--r--designate/tests/unit/test_heartbeat_emitter.py116
-rw-r--r--designate/tests/unit/test_service_status.py119
-rw-r--r--setup.cfg4
15 files changed, 176 insertions, 246 deletions
diff --git a/designate/api/wsgi.py b/designate/api/wsgi.py
index 0103c9aa..4d2e41ec 100644
--- a/designate/api/wsgi.py
+++ b/designate/api/wsgi.py
@@ -18,9 +18,9 @@ from oslo_log import log as logging
from paste import deploy
from designate import conf
+from designate import heartbeat_emitter
from designate import policy
from designate import rpc
-from designate import service
from designate.common import config
CONF = conf.CONF
@@ -47,7 +47,7 @@ def init_application():
if not rpc.initialized():
rpc.init(CONF)
- heartbeat = service.Heartbeat('api')
+ heartbeat = heartbeat_emitter.get_heartbeat_emitter('api')
heartbeat.start()
conf = conf_files[0]
diff --git a/designate/cmd/agent.py b/designate/cmd/agent.py
index edc7f46d..48b661b7 100644
--- a/designate/cmd/agent.py
+++ b/designate/cmd/agent.py
@@ -19,6 +19,7 @@ from oslo_log import log as logging
from oslo_reports import guru_meditation_report as gmr
import designate.conf
+from designate import heartbeat_emitter
from designate import hookpoints
from designate import service
from designate import utils
@@ -38,7 +39,7 @@ def main():
hookpoints.log_hook_setup()
server = agent_service.Service()
- heartbeat = service.Heartbeat(server.service_name)
+ heartbeat = heartbeat_emitter.get_heartbeat_emitter(server.service_name)
service.serve(server, workers=CONF['service:agent'].workers)
heartbeat.start()
service.wait()
diff --git a/designate/cmd/api.py b/designate/cmd/api.py
index 61b07911..001a1b64 100644
--- a/designate/cmd/api.py
+++ b/designate/cmd/api.py
@@ -20,6 +20,7 @@ from oslo_log import log as logging
from oslo_reports import guru_meditation_report as gmr
import designate.conf
+from designate import heartbeat_emitter
from designate import hookpoints
from designate import service
from designate import utils
@@ -40,7 +41,7 @@ def main():
hookpoints.log_hook_setup()
server = api_service.Service()
- heartbeat = service.Heartbeat(server.service_name)
+ heartbeat = heartbeat_emitter.get_heartbeat_emitter(server.service_name)
service.serve(server, workers=CONF['service:api'].workers)
heartbeat.start()
service.wait()
diff --git a/designate/cmd/central.py b/designate/cmd/central.py
index 6ae0524a..32ba370e 100644
--- a/designate/cmd/central.py
+++ b/designate/cmd/central.py
@@ -19,6 +19,7 @@ from oslo_log import log as logging
from oslo_reports import guru_meditation_report as gmr
import designate.conf
+from designate import heartbeat_emitter
from designate import hookpoints
from designate import service
from designate import utils
@@ -38,7 +39,8 @@ def main():
hookpoints.log_hook_setup()
server = central_service.Service()
- heartbeat = service.Heartbeat(server.service_name, rpc_api=server)
+ heartbeat = heartbeat_emitter.get_heartbeat_emitter(server.service_name,
+ rpc_api=server)
service.serve(server, workers=CONF['service:central'].workers)
heartbeat.start()
service.wait()
diff --git a/designate/cmd/mdns.py b/designate/cmd/mdns.py
index b8993d9c..5aed6335 100644
--- a/designate/cmd/mdns.py
+++ b/designate/cmd/mdns.py
@@ -19,6 +19,7 @@ from oslo_log import log as logging
from oslo_reports import guru_meditation_report as gmr
import designate.conf
+from designate import heartbeat_emitter
from designate import hookpoints
from designate import service
from designate import utils
@@ -38,7 +39,7 @@ def main():
hookpoints.log_hook_setup()
server = mdns_service.Service()
- heartbeat = service.Heartbeat(server.service_name)
+ heartbeat = heartbeat_emitter.get_heartbeat_emitter(server.service_name)
service.serve(server, workers=CONF['service:mdns'].workers)
heartbeat.start()
service.wait()
diff --git a/designate/cmd/producer.py b/designate/cmd/producer.py
index 89808f27..5f79b7ae 100644
--- a/designate/cmd/producer.py
+++ b/designate/cmd/producer.py
@@ -19,6 +19,7 @@ from oslo_log import log as logging
from oslo_reports import guru_meditation_report as gmr
import designate.conf
+from designate import heartbeat_emitter
from designate import hookpoints
from designate import service
from designate import utils
@@ -38,7 +39,7 @@ def main():
hookpoints.log_hook_setup()
server = producer_service.Service()
- heartbeat = service.Heartbeat(server.service_name)
+ heartbeat = heartbeat_emitter.get_heartbeat_emitter(server.service_name)
service.serve(server, workers=CONF['service:producer'].workers)
heartbeat.start()
service.wait()
diff --git a/designate/cmd/sink.py b/designate/cmd/sink.py
index 30725712..6f1d1e8a 100644
--- a/designate/cmd/sink.py
+++ b/designate/cmd/sink.py
@@ -19,6 +19,7 @@ from oslo_log import log as logging
from oslo_reports import guru_meditation_report as gmr
import designate.conf
+from designate import heartbeat_emitter
from designate import hookpoints
from designate import service
from designate import utils
@@ -38,7 +39,7 @@ def main():
hookpoints.log_hook_setup()
server = sink_service.Service()
- heartbeat = service.Heartbeat(server.service_name)
+ heartbeat = heartbeat_emitter.get_heartbeat_emitter(server.service_name)
service.serve(server, workers=CONF['service:sink'].workers)
heartbeat.start()
service.wait()
diff --git a/designate/cmd/worker.py b/designate/cmd/worker.py
index 2e0f07af..fe0eca1a 100644
--- a/designate/cmd/worker.py
+++ b/designate/cmd/worker.py
@@ -19,6 +19,7 @@ from oslo_log import log as logging
from oslo_reports import guru_meditation_report as gmr
import designate.conf
+from designate import heartbeat_emitter
from designate import hookpoints
from designate import service
from designate import utils
@@ -38,7 +39,7 @@ def main():
hookpoints.log_hook_setup()
server = worker_service.Service()
- heartbeat = service.Heartbeat(server.service_name)
+ heartbeat = heartbeat_emitter.get_heartbeat_emitter(server.service_name)
service.serve(server, workers=CONF['service:worker'].workers)
heartbeat.start()
service.wait()
diff --git a/designate/service_status.py b/designate/heartbeat_emitter.py
index 22d4410e..fbcbf1c8 100644
--- a/designate/service_status.py
+++ b/designate/heartbeat_emitter.py
@@ -28,35 +28,55 @@ CONF = designate.conf.CONF
LOG = logging.getLogger(__name__)
-class HeartBeatEmitter(plugin.DriverPlugin):
+def get_heartbeat_emitter(service_name, **kwargs):
+ cls = HeartbeatEmitter.get_driver(
+ CONF.heartbeat_emitter.emitter_type
+ )
+ return cls(service_name, **kwargs)
+
+
+class HeartbeatEmitter(plugin.DriverPlugin):
__plugin_ns__ = 'designate.heartbeat_emitter'
__plugin_type__ = 'heartbeat_emitter'
- def __init__(self, service, status_factory=None, *args, **kwargs):
- super(HeartBeatEmitter, self).__init__()
+ def __init__(self, service_name, **kwargs):
+ super(HeartbeatEmitter, self).__init__()
- self._service = service
+ self._status = 'UP'
+ self._stats = {}
+ self._capabilities = {}
+
+ self._service_name = service_name
self._hostname = CONF.host
self._timer = loopingcall.FixedIntervalLoopingCall(
self._emit_heartbeat
)
- self._status_factory = status_factory
- def _get_status(self):
- if self._status_factory is not None:
- return self._status_factory()
+ def start(self):
+ self._timer.start(
+ CONF.heartbeat_emitter.heartbeat_interval,
+ stop_on_exception=False
+ )
+
+ def stop(self):
+ self._timer.stop()
+
+ def get_status(self):
+ return self._status, self._stats, self._capabilities
- return True, {}, {}
+ @abc.abstractmethod
+ def transmit(self, status):
+ pass
def _emit_heartbeat(self):
"""
Returns Status, Stats, Capabilities
"""
- status, stats, capabilities = self._get_status()
+ status, stats, capabilities = self.get_status()
service_status = objects.ServiceStatus(
- service_name=self._service,
+ service_name=self._service_name,
hostname=self._hostname,
status=status,
stats=stats,
@@ -64,39 +84,26 @@ class HeartBeatEmitter(plugin.DriverPlugin):
heartbeated_at=timeutils.utcnow()
)
- LOG.trace("Emitting %s", service_status)
-
- self._transmit(service_status)
+ LOG.trace('Emitting %s', service_status)
- @abc.abstractmethod
- def _transmit(self, status):
- pass
-
- def start(self):
- self._timer.start(
- CONF.heartbeat_emitter.heartbeat_interval,
- stop_on_exception=False
- )
-
- def stop(self):
- self._timer.stop()
+ self.transmit(service_status)
-class NoopEmitter(HeartBeatEmitter):
+class NoopEmitter(HeartbeatEmitter):
__plugin_name__ = 'noop'
- def _transmit(self, status):
- LOG.debug(status)
+ def transmit(self, status):
+ LOG.info(status)
-class RpcEmitter(HeartBeatEmitter):
+class RpcEmitter(HeartbeatEmitter):
__plugin_name__ = 'rpc'
- def __init__(self, service, rpc_api=None, *args, **kwargs):
- super(RpcEmitter, self).__init__(service, *args, **kwargs)
+ def __init__(self, service_name, rpc_api=None, **kwargs):
+ super(RpcEmitter, self).__init__(service_name, **kwargs)
self.rpc_api = rpc_api
- def _transmit(self, status):
+ def transmit(self, status):
admin_context = context.DesignateContext.get_admin_context()
api = self.rpc_api or central_rpcapi.CentralAPI.get_instance()
api.update_service_status(admin_context, status)
diff --git a/designate/service.py b/designate/service.py
index 22f92532..9f5c68b3 100644
--- a/designate/service.py
+++ b/designate/service.py
@@ -32,7 +32,6 @@ from oslo_utils import netutils
from designate import policy
from designate import rpc
-from designate import service_status
from designate import utils
from designate import version
import designate.conf
@@ -68,32 +67,6 @@ class Service(service.Service):
super(Service, self).stop(graceful)
-class Heartbeat(object):
- def __init__(self, name, rpc_api=None):
- self.name = name
-
- self._status = 'UP'
- self._stats = {}
- self._capabilities = {}
-
- emitter_cls = service_status.HeartBeatEmitter.get_driver(
- CONF.heartbeat_emitter.emitter_type
- )
- self.heartbeat_emitter = emitter_cls(
- self.name,
- status_factory=self.get_status, rpc_api=rpc_api
- )
-
- def get_status(self):
- return self._status, self._stats, self._capabilities
-
- def start(self):
- self.heartbeat_emitter.start()
-
- def stop(self):
- self.heartbeat_emitter.stop()
-
-
class RPCService(Service):
def __init__(self, name, rpc_topic, threads=None):
super(RPCService, self).__init__(name, threads)
diff --git a/designate/tests/__init__.py b/designate/tests/__init__.py
index 3ed5412e..67fc3a49 100644
--- a/designate/tests/__init__.py
+++ b/designate/tests/__init__.py
@@ -50,7 +50,7 @@ CONF.import_opt('auth_strategy', 'designate.api',
group='service:api')
CONF.import_opt('connection', 'designate.storage.impl_sqlalchemy',
group='storage:sqlalchemy')
-CONF.import_opt('emitter_type', 'designate.service_status',
+CONF.import_opt('emitter_type', 'designate.heartbeat_emitter',
group="heartbeat_emitter")
CONF.import_opt('scheduler_filters', 'designate.scheduler',
group="service:central")
diff --git a/designate/tests/unit/test_heartbeat.py b/designate/tests/unit/test_heartbeat.py
deleted file mode 100644
index a531d5b0..00000000
--- a/designate/tests/unit/test_heartbeat.py
+++ /dev/null
@@ -1,55 +0,0 @@
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-import mock
-import oslotest.base
-from oslo_config import cfg
-from oslo_config import fixture as cfg_fixture
-from oslo_service import loopingcall
-
-from designate import service
-
-CONF = cfg.CONF
-
-
-class HeartbeatTest(oslotest.base.BaseTestCase):
- @mock.patch.object(loopingcall, 'FixedIntervalLoopingCall')
- def setUp(self, mock_looping):
- super(HeartbeatTest, self).setUp()
-
- self.mock_timer = mock.Mock()
- mock_looping.return_value = self.mock_timer
-
- self.useFixture(cfg_fixture.Config(CONF))
-
- CONF.set_override('emitter_type', 'noop', 'heartbeat_emitter')
-
- self.heartbeat = service.Heartbeat('test')
-
- def test_get_status(self):
- self.assertEqual(('UP', {}, {},), self.heartbeat.get_status())
-
- def test_get_heartbeat_emitter(self):
- self.assertEqual(
- 'noop', self.heartbeat.heartbeat_emitter.__plugin_name__
- )
-
- def test_start_heartbeat(self):
- self.heartbeat.start()
-
- self.mock_timer.start.assert_called_once()
-
- def test_stop_heartbeat(self):
-
- self.heartbeat.start()
- self.heartbeat.stop()
-
- self.mock_timer.stop.assert_called_once()
diff --git a/designate/tests/unit/test_heartbeat_emitter.py b/designate/tests/unit/test_heartbeat_emitter.py
new file mode 100644
index 00000000..8b9ba457
--- /dev/null
+++ b/designate/tests/unit/test_heartbeat_emitter.py
@@ -0,0 +1,116 @@
+# Copyright 2016 Hewlett Packard Enterprise Development Company LP
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+import mock
+import oslotest.base
+from oslo_config import cfg
+from oslo_config import fixture as cfg_fixture
+from oslo_service import loopingcall
+import time
+
+from designate import heartbeat_emitter
+from designate import objects
+from designate.tests import fixtures
+
+CONF = cfg.CONF
+
+
+class HeartbeatEmitterTest(oslotest.base.BaseTestCase):
+ def setUp(self):
+ super(HeartbeatEmitterTest, self).setUp()
+ self.stdlog = fixtures.StandardLogging()
+ self.useFixture(self.stdlog)
+ self.useFixture(cfg_fixture.Config(CONF))
+
+ CONF.set_override('emitter_type', 'noop', 'heartbeat_emitter')
+ CONF.set_override('heartbeat_interval', 0.1, 'heartbeat_emitter')
+ CONF.set_override('host', 'localhost')
+
+ @mock.patch.object(loopingcall, 'FixedIntervalLoopingCall')
+ def test_start(self, mock_looping):
+ mock_timer = mock.Mock()
+ mock_looping.return_value = mock_timer
+
+ noop_emitter = heartbeat_emitter.get_heartbeat_emitter('svc')
+
+ noop_emitter.start()
+
+ mock_timer.start.assert_called_once()
+
+ @mock.patch.object(loopingcall, 'FixedIntervalLoopingCall')
+ def test_stop(self, mock_looping):
+ mock_timer = mock.Mock()
+ mock_looping.return_value = mock_timer
+
+ noop_emitter = heartbeat_emitter.get_heartbeat_emitter('svc')
+
+ noop_emitter.start()
+ noop_emitter.stop()
+
+ mock_timer.stop.assert_called_once()
+
+ def test_get_status(self):
+ noop_emitter = heartbeat_emitter.get_heartbeat_emitter('svc')
+
+ self.assertEqual(('UP', {}, {},), noop_emitter.get_status())
+
+ def test_emit(self):
+ noop_emitter = heartbeat_emitter.get_heartbeat_emitter('svc')
+
+ noop_emitter.start()
+
+ time.sleep(0.125)
+
+ noop_emitter.stop()
+
+ self.assertIn(
+ "<ServiceStatus service_name:'svc' hostname:'localhost' "
+ "status:'UP'>",
+ self.stdlog.logger.output
+ )
+
+
+class RpcEmitterTest(oslotest.base.BaseTestCase):
+ def setUp(self):
+ super(RpcEmitterTest, self).setUp()
+
+ @mock.patch.object(objects, 'ServiceStatus')
+ @mock.patch('designate.context.DesignateContext.get_admin_context')
+ @mock.patch.object(loopingcall, 'FixedIntervalLoopingCall')
+ def test_emit_heartbeat(self, mock_looping, mock_context,
+ mock_service_status):
+ mock_timer = mock.Mock()
+ mock_looping.return_value = mock_timer
+
+ emitter = heartbeat_emitter.RpcEmitter('svc')
+ emitter.start()
+
+ mock_timer.start.assert_called_once()
+
+ central = mock.Mock()
+ with mock.patch('designate.central.rpcapi.CentralAPI.get_instance',
+ return_value=central):
+ emitter._emit_heartbeat()
+
+ mock_service_status.assert_called_once_with(
+ service_name='svc',
+ hostname=cfg.CONF.host,
+ status='UP',
+ stats={},
+ capabilities={},
+ heartbeated_at=mock.ANY
+ )
+
+ central.update_service_status.assert_called_once_with(
+ mock_context.return_value, mock_service_status.return_value
+ )
diff --git a/designate/tests/unit/test_service_status.py b/designate/tests/unit/test_service_status.py
deleted file mode 100644
index c7827289..00000000
--- a/designate/tests/unit/test_service_status.py
+++ /dev/null
@@ -1,119 +0,0 @@
-# Copyright 2016 Hewlett Packard Enterprise Development Company LP
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-import mock
-import oslotest.base
-from oslo_config import cfg
-from oslo_service import loopingcall
-
-from designate import objects
-from designate import service_status
-
-
-class NoopEmitterTest(oslotest.base.BaseTestCase):
- def setUp(self):
- super(NoopEmitterTest, self).setUp()
-
- @mock.patch.object(loopingcall, 'FixedIntervalLoopingCall')
- def test_start(self, mock_looping):
- mock_timer = mock.Mock()
- mock_looping.return_value = mock_timer
-
- emitter = service_status.NoopEmitter("svc")
- emitter.start()
-
- mock_timer.start.assert_called_once()
-
- @mock.patch.object(loopingcall, 'FixedIntervalLoopingCall')
- def test_stop(self, mock_looping):
- mock_timer = mock.Mock()
- mock_looping.return_value = mock_timer
-
- emitter = service_status.NoopEmitter("svc")
- emitter.start()
- emitter.stop()
-
- mock_timer.stop.assert_called_once()
-
-
-class RpcEmitterTest(oslotest.base.BaseTestCase):
- def setUp(self):
- super(RpcEmitterTest, self).setUp()
-
- @mock.patch.object(objects, "ServiceStatus")
- @mock.patch("designate.context.DesignateContext.get_admin_context")
- @mock.patch.object(loopingcall, 'FixedIntervalLoopingCall')
- def test_emit_no_status_factory(self, mock_looping, mock_context,
- mock_service_status):
- mock_timer = mock.Mock()
- mock_looping.return_value = mock_timer
-
- emitter = service_status.RpcEmitter("svc")
- emitter.start()
-
- mock_timer.start.assert_called_once()
-
- central = mock.Mock()
- with mock.patch("designate.central.rpcapi.CentralAPI.get_instance",
- return_value=central):
- emitter._emit_heartbeat()
-
- mock_service_status.assert_called_once_with(
- service_name="svc",
- hostname=cfg.CONF.host,
- status=True,
- stats={},
- capabilities={},
- heartbeated_at=mock.ANY
- )
-
- central.update_service_status.assert_called_once_with(
- mock_context.return_value, mock_service_status.return_value
- )
-
- @mock.patch.object(objects, "ServiceStatus")
- @mock.patch("designate.context.DesignateContext.get_admin_context")
- @mock.patch.object(loopingcall, 'FixedIntervalLoopingCall')
- def test_emit_status_factory(self, mock_looping, mock_context,
- mock_service_status):
- mock_timer = mock.Mock()
- mock_looping.return_value = mock_timer
-
- status = False
- stats = {"a": 1}
- capabilities = {"b": 2}
-
- status_factory = mock.Mock(return_value=(status, stats, capabilities,))
- emitter = service_status.RpcEmitter("svc",
- status_factory=status_factory)
- emitter.start()
-
- mock_timer.start.assert_called_once()
-
- central = mock.Mock()
- with mock.patch("designate.central.rpcapi.CentralAPI.get_instance",
- return_value=central):
- emitter._emit_heartbeat()
-
- mock_service_status.assert_called_once_with(
- service_name="svc",
- hostname=cfg.CONF.host,
- status=status,
- stats=stats,
- capabilities=capabilities,
- heartbeated_at=mock.ANY
- )
-
- central.update_service_status.assert_called_once_with(
- mock_context.return_value, mock_service_status.return_value
- )
diff --git a/setup.cfg b/setup.cfg
index 94519a3e..c930da27 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -119,8 +119,8 @@ designate.producer_tasks =
worker_periodic_recovery = designate.producer.tasks:WorkerPeriodicRecovery
designate.heartbeat_emitter =
- noop = designate.service_status:NoopEmitter
- rpc = designate.service_status:RpcEmitter
+ noop = designate.heartbeat_emitter:NoopEmitter
+ rpc = designate.heartbeat_emitter:RpcEmitter
designate.notification.plugin =
default = designate.notifications:Default