summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorZuul <zuul@review.opendev.org>2020-03-15 11:17:15 +0000
committerGerrit Code Review <review@openstack.org>2020-03-15 11:17:15 +0000
commit46dc595f21f758b31efeedcbbfe17bc7f23485d2 (patch)
tree176796b43a2fc87c957fdd1dfaea1adf15e431f3
parent2a05203c696e9d0f8e57c29e788dd774071964a6 (diff)
parent3fccc25bf661755b6f71f7e839938c214480bd5f (diff)
downloaddesignate-46dc595f21f758b31efeedcbbfe17bc7f23485d2.tar.gz
Merge "Re-factored Heartbeat implementation"
-rw-r--r--designate/api/wsgi.py4
-rw-r--r--designate/cmd/agent.py3
-rw-r--r--designate/cmd/api.py3
-rw-r--r--designate/cmd/central.py4
-rw-r--r--designate/cmd/mdns.py3
-rw-r--r--designate/cmd/producer.py3
-rw-r--r--designate/cmd/sink.py3
-rw-r--r--designate/cmd/worker.py3
-rw-r--r--designate/heartbeat_emitter.py (renamed from designate/service_status.py)73
-rw-r--r--designate/service.py27
-rw-r--r--designate/tests/__init__.py2
-rw-r--r--designate/tests/unit/test_heartbeat.py55
-rw-r--r--designate/tests/unit/test_heartbeat_emitter.py116
-rw-r--r--designate/tests/unit/test_service_status.py119
-rw-r--r--setup.cfg4
15 files changed, 176 insertions, 246 deletions
diff --git a/designate/api/wsgi.py b/designate/api/wsgi.py
index 0103c9aa..4d2e41ec 100644
--- a/designate/api/wsgi.py
+++ b/designate/api/wsgi.py
@@ -18,9 +18,9 @@ from oslo_log import log as logging
from paste import deploy
from designate import conf
+from designate import heartbeat_emitter
from designate import policy
from designate import rpc
-from designate import service
from designate.common import config
CONF = conf.CONF
@@ -47,7 +47,7 @@ def init_application():
if not rpc.initialized():
rpc.init(CONF)
- heartbeat = service.Heartbeat('api')
+ heartbeat = heartbeat_emitter.get_heartbeat_emitter('api')
heartbeat.start()
conf = conf_files[0]
diff --git a/designate/cmd/agent.py b/designate/cmd/agent.py
index edc7f46d..48b661b7 100644
--- a/designate/cmd/agent.py
+++ b/designate/cmd/agent.py
@@ -19,6 +19,7 @@ from oslo_log import log as logging
from oslo_reports import guru_meditation_report as gmr
import designate.conf
+from designate import heartbeat_emitter
from designate import hookpoints
from designate import service
from designate import utils
@@ -38,7 +39,7 @@ def main():
hookpoints.log_hook_setup()
server = agent_service.Service()
- heartbeat = service.Heartbeat(server.service_name)
+ heartbeat = heartbeat_emitter.get_heartbeat_emitter(server.service_name)
service.serve(server, workers=CONF['service:agent'].workers)
heartbeat.start()
service.wait()
diff --git a/designate/cmd/api.py b/designate/cmd/api.py
index 61b07911..001a1b64 100644
--- a/designate/cmd/api.py
+++ b/designate/cmd/api.py
@@ -20,6 +20,7 @@ from oslo_log import log as logging
from oslo_reports import guru_meditation_report as gmr
import designate.conf
+from designate import heartbeat_emitter
from designate import hookpoints
from designate import service
from designate import utils
@@ -40,7 +41,7 @@ def main():
hookpoints.log_hook_setup()
server = api_service.Service()
- heartbeat = service.Heartbeat(server.service_name)
+ heartbeat = heartbeat_emitter.get_heartbeat_emitter(server.service_name)
service.serve(server, workers=CONF['service:api'].workers)
heartbeat.start()
service.wait()
diff --git a/designate/cmd/central.py b/designate/cmd/central.py
index 6ae0524a..32ba370e 100644
--- a/designate/cmd/central.py
+++ b/designate/cmd/central.py
@@ -19,6 +19,7 @@ from oslo_log import log as logging
from oslo_reports import guru_meditation_report as gmr
import designate.conf
+from designate import heartbeat_emitter
from designate import hookpoints
from designate import service
from designate import utils
@@ -38,7 +39,8 @@ def main():
hookpoints.log_hook_setup()
server = central_service.Service()
- heartbeat = service.Heartbeat(server.service_name, rpc_api=server)
+ heartbeat = heartbeat_emitter.get_heartbeat_emitter(server.service_name,
+ rpc_api=server)
service.serve(server, workers=CONF['service:central'].workers)
heartbeat.start()
service.wait()
diff --git a/designate/cmd/mdns.py b/designate/cmd/mdns.py
index b8993d9c..5aed6335 100644
--- a/designate/cmd/mdns.py
+++ b/designate/cmd/mdns.py
@@ -19,6 +19,7 @@ from oslo_log import log as logging
from oslo_reports import guru_meditation_report as gmr
import designate.conf
+from designate import heartbeat_emitter
from designate import hookpoints
from designate import service
from designate import utils
@@ -38,7 +39,7 @@ def main():
hookpoints.log_hook_setup()
server = mdns_service.Service()
- heartbeat = service.Heartbeat(server.service_name)
+ heartbeat = heartbeat_emitter.get_heartbeat_emitter(server.service_name)
service.serve(server, workers=CONF['service:mdns'].workers)
heartbeat.start()
service.wait()
diff --git a/designate/cmd/producer.py b/designate/cmd/producer.py
index 89808f27..5f79b7ae 100644
--- a/designate/cmd/producer.py
+++ b/designate/cmd/producer.py
@@ -19,6 +19,7 @@ from oslo_log import log as logging
from oslo_reports import guru_meditation_report as gmr
import designate.conf
+from designate import heartbeat_emitter
from designate import hookpoints
from designate import service
from designate import utils
@@ -38,7 +39,7 @@ def main():
hookpoints.log_hook_setup()
server = producer_service.Service()
- heartbeat = service.Heartbeat(server.service_name)
+ heartbeat = heartbeat_emitter.get_heartbeat_emitter(server.service_name)
service.serve(server, workers=CONF['service:producer'].workers)
heartbeat.start()
service.wait()
diff --git a/designate/cmd/sink.py b/designate/cmd/sink.py
index 30725712..6f1d1e8a 100644
--- a/designate/cmd/sink.py
+++ b/designate/cmd/sink.py
@@ -19,6 +19,7 @@ from oslo_log import log as logging
from oslo_reports import guru_meditation_report as gmr
import designate.conf
+from designate import heartbeat_emitter
from designate import hookpoints
from designate import service
from designate import utils
@@ -38,7 +39,7 @@ def main():
hookpoints.log_hook_setup()
server = sink_service.Service()
- heartbeat = service.Heartbeat(server.service_name)
+ heartbeat = heartbeat_emitter.get_heartbeat_emitter(server.service_name)
service.serve(server, workers=CONF['service:sink'].workers)
heartbeat.start()
service.wait()
diff --git a/designate/cmd/worker.py b/designate/cmd/worker.py
index 2e0f07af..fe0eca1a 100644
--- a/designate/cmd/worker.py
+++ b/designate/cmd/worker.py
@@ -19,6 +19,7 @@ from oslo_log import log as logging
from oslo_reports import guru_meditation_report as gmr
import designate.conf
+from designate import heartbeat_emitter
from designate import hookpoints
from designate import service
from designate import utils
@@ -38,7 +39,7 @@ def main():
hookpoints.log_hook_setup()
server = worker_service.Service()
- heartbeat = service.Heartbeat(server.service_name)
+ heartbeat = heartbeat_emitter.get_heartbeat_emitter(server.service_name)
service.serve(server, workers=CONF['service:worker'].workers)
heartbeat.start()
service.wait()
diff --git a/designate/service_status.py b/designate/heartbeat_emitter.py
index 22d4410e..fbcbf1c8 100644
--- a/designate/service_status.py
+++ b/designate/heartbeat_emitter.py
@@ -28,35 +28,55 @@ CONF = designate.conf.CONF
LOG = logging.getLogger(__name__)
-class HeartBeatEmitter(plugin.DriverPlugin):
+def get_heartbeat_emitter(service_name, **kwargs):
+ cls = HeartbeatEmitter.get_driver(
+ CONF.heartbeat_emitter.emitter_type
+ )
+ return cls(service_name, **kwargs)
+
+
+class HeartbeatEmitter(plugin.DriverPlugin):
__plugin_ns__ = 'designate.heartbeat_emitter'
__plugin_type__ = 'heartbeat_emitter'
- def __init__(self, service, status_factory=None, *args, **kwargs):
- super(HeartBeatEmitter, self).__init__()
+ def __init__(self, service_name, **kwargs):
+ super(HeartbeatEmitter, self).__init__()
- self._service = service
+ self._status = 'UP'
+ self._stats = {}
+ self._capabilities = {}
+
+ self._service_name = service_name
self._hostname = CONF.host
self._timer = loopingcall.FixedIntervalLoopingCall(
self._emit_heartbeat
)
- self._status_factory = status_factory
- def _get_status(self):
- if self._status_factory is not None:
- return self._status_factory()
+ def start(self):
+ self._timer.start(
+ CONF.heartbeat_emitter.heartbeat_interval,
+ stop_on_exception=False
+ )
+
+ def stop(self):
+ self._timer.stop()
+
+ def get_status(self):
+ return self._status, self._stats, self._capabilities
- return True, {}, {}
+ @abc.abstractmethod
+ def transmit(self, status):
+ pass
def _emit_heartbeat(self):
"""
Returns Status, Stats, Capabilities
"""
- status, stats, capabilities = self._get_status()
+ status, stats, capabilities = self.get_status()
service_status = objects.ServiceStatus(
- service_name=self._service,
+ service_name=self._service_name,
hostname=self._hostname,
status=status,
stats=stats,
@@ -64,39 +84,26 @@ class HeartBeatEmitter(plugin.DriverPlugin):
heartbeated_at=timeutils.utcnow()
)
- LOG.trace("Emitting %s", service_status)
-
- self._transmit(service_status)
+ LOG.trace('Emitting %s', service_status)
- @abc.abstractmethod
- def _transmit(self, status):
- pass
-
- def start(self):
- self._timer.start(
- CONF.heartbeat_emitter.heartbeat_interval,
- stop_on_exception=False
- )
-
- def stop(self):
- self._timer.stop()
+ self.transmit(service_status)
-class NoopEmitter(HeartBeatEmitter):
+class NoopEmitter(HeartbeatEmitter):
__plugin_name__ = 'noop'
- def _transmit(self, status):
- LOG.debug(status)
+ def transmit(self, status):
+ LOG.info(status)
-class RpcEmitter(HeartBeatEmitter):
+class RpcEmitter(HeartbeatEmitter):
__plugin_name__ = 'rpc'
- def __init__(self, service, rpc_api=None, *args, **kwargs):
- super(RpcEmitter, self).__init__(service, *args, **kwargs)
+ def __init__(self, service_name, rpc_api=None, **kwargs):
+ super(RpcEmitter, self).__init__(service_name, **kwargs)
self.rpc_api = rpc_api
- def _transmit(self, status):
+ def transmit(self, status):
admin_context = context.DesignateContext.get_admin_context()
api = self.rpc_api or central_rpcapi.CentralAPI.get_instance()
api.update_service_status(admin_context, status)
diff --git a/designate/service.py b/designate/service.py
index 22f92532..9f5c68b3 100644
--- a/designate/service.py
+++ b/designate/service.py
@@ -32,7 +32,6 @@ from oslo_utils import netutils
from designate import policy
from designate import rpc
-from designate import service_status
from designate import utils
from designate import version
import designate.conf
@@ -68,32 +67,6 @@ class Service(service.Service):
super(Service, self).stop(graceful)
-class Heartbeat(object):
- def __init__(self, name, rpc_api=None):
- self.name = name
-
- self._status = 'UP'
- self._stats = {}
- self._capabilities = {}
-
- emitter_cls = service_status.HeartBeatEmitter.get_driver(
- CONF.heartbeat_emitter.emitter_type
- )
- self.heartbeat_emitter = emitter_cls(
- self.name,
- status_factory=self.get_status, rpc_api=rpc_api
- )
-
- def get_status(self):
- return self._status, self._stats, self._capabilities
-
- def start(self):
- self.heartbeat_emitter.start()
-
- def stop(self):
- self.heartbeat_emitter.stop()
-
-
class RPCService(Service):
def __init__(self, name, rpc_topic, threads=None):
super(RPCService, self).__init__(name, threads)
diff --git a/designate/tests/__init__.py b/designate/tests/__init__.py
index 3ed5412e..67fc3a49 100644
--- a/designate/tests/__init__.py
+++ b/designate/tests/__init__.py
@@ -50,7 +50,7 @@ CONF.import_opt('auth_strategy', 'designate.api',
group='service:api')
CONF.import_opt('connection', 'designate.storage.impl_sqlalchemy',
group='storage:sqlalchemy')
-CONF.import_opt('emitter_type', 'designate.service_status',
+CONF.import_opt('emitter_type', 'designate.heartbeat_emitter',
group="heartbeat_emitter")
CONF.import_opt('scheduler_filters', 'designate.scheduler',
group="service:central")
diff --git a/designate/tests/unit/test_heartbeat.py b/designate/tests/unit/test_heartbeat.py
deleted file mode 100644
index a531d5b0..00000000
--- a/designate/tests/unit/test_heartbeat.py
+++ /dev/null
@@ -1,55 +0,0 @@
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-import mock
-import oslotest.base
-from oslo_config import cfg
-from oslo_config import fixture as cfg_fixture
-from oslo_service import loopingcall
-
-from designate import service
-
-CONF = cfg.CONF
-
-
-class HeartbeatTest(oslotest.base.BaseTestCase):
- @mock.patch.object(loopingcall, 'FixedIntervalLoopingCall')
- def setUp(self, mock_looping):
- super(HeartbeatTest, self).setUp()
-
- self.mock_timer = mock.Mock()
- mock_looping.return_value = self.mock_timer
-
- self.useFixture(cfg_fixture.Config(CONF))
-
- CONF.set_override('emitter_type', 'noop', 'heartbeat_emitter')
-
- self.heartbeat = service.Heartbeat('test')
-
- def test_get_status(self):
- self.assertEqual(('UP', {}, {},), self.heartbeat.get_status())
-
- def test_get_heartbeat_emitter(self):
- self.assertEqual(
- 'noop', self.heartbeat.heartbeat_emitter.__plugin_name__
- )
-
- def test_start_heartbeat(self):
- self.heartbeat.start()
-
- self.mock_timer.start.assert_called_once()
-
- def test_stop_heartbeat(self):
-
- self.heartbeat.start()
- self.heartbeat.stop()
-
- self.mock_timer.stop.assert_called_once()
diff --git a/designate/tests/unit/test_heartbeat_emitter.py b/designate/tests/unit/test_heartbeat_emitter.py
new file mode 100644
index 00000000..8b9ba457
--- /dev/null
+++ b/designate/tests/unit/test_heartbeat_emitter.py
@@ -0,0 +1,116 @@
+# Copyright 2016 Hewlett Packard Enterprise Development Company LP
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+import mock
+import oslotest.base
+from oslo_config import cfg
+from oslo_config import fixture as cfg_fixture
+from oslo_service import loopingcall
+import time
+
+from designate import heartbeat_emitter
+from designate import objects
+from designate.tests import fixtures
+
+CONF = cfg.CONF
+
+
+class HeartbeatEmitterTest(oslotest.base.BaseTestCase):
+ def setUp(self):
+ super(HeartbeatEmitterTest, self).setUp()
+ self.stdlog = fixtures.StandardLogging()
+ self.useFixture(self.stdlog)
+ self.useFixture(cfg_fixture.Config(CONF))
+
+ CONF.set_override('emitter_type', 'noop', 'heartbeat_emitter')
+ CONF.set_override('heartbeat_interval', 0.1, 'heartbeat_emitter')
+ CONF.set_override('host', 'localhost')
+
+ @mock.patch.object(loopingcall, 'FixedIntervalLoopingCall')
+ def test_start(self, mock_looping):
+ mock_timer = mock.Mock()
+ mock_looping.return_value = mock_timer
+
+ noop_emitter = heartbeat_emitter.get_heartbeat_emitter('svc')
+
+ noop_emitter.start()
+
+ mock_timer.start.assert_called_once()
+
+ @mock.patch.object(loopingcall, 'FixedIntervalLoopingCall')
+ def test_stop(self, mock_looping):
+ mock_timer = mock.Mock()
+ mock_looping.return_value = mock_timer
+
+ noop_emitter = heartbeat_emitter.get_heartbeat_emitter('svc')
+
+ noop_emitter.start()
+ noop_emitter.stop()
+
+ mock_timer.stop.assert_called_once()
+
+ def test_get_status(self):
+ noop_emitter = heartbeat_emitter.get_heartbeat_emitter('svc')
+
+ self.assertEqual(('UP', {}, {},), noop_emitter.get_status())
+
+ def test_emit(self):
+ noop_emitter = heartbeat_emitter.get_heartbeat_emitter('svc')
+
+ noop_emitter.start()
+
+ time.sleep(0.125)
+
+ noop_emitter.stop()
+
+ self.assertIn(
+ "<ServiceStatus service_name:'svc' hostname:'localhost' "
+ "status:'UP'>",
+ self.stdlog.logger.output
+ )
+
+
+class RpcEmitterTest(oslotest.base.BaseTestCase):
+ def setUp(self):
+ super(RpcEmitterTest, self).setUp()
+
+ @mock.patch.object(objects, 'ServiceStatus')
+ @mock.patch('designate.context.DesignateContext.get_admin_context')
+ @mock.patch.object(loopingcall, 'FixedIntervalLoopingCall')
+ def test_emit_heartbeat(self, mock_looping, mock_context,
+ mock_service_status):
+ mock_timer = mock.Mock()
+ mock_looping.return_value = mock_timer
+
+ emitter = heartbeat_emitter.RpcEmitter('svc')
+ emitter.start()
+
+ mock_timer.start.assert_called_once()
+
+ central = mock.Mock()
+ with mock.patch('designate.central.rpcapi.CentralAPI.get_instance',
+ return_value=central):
+ emitter._emit_heartbeat()
+
+ mock_service_status.assert_called_once_with(
+ service_name='svc',
+ hostname=cfg.CONF.host,
+ status='UP',
+ stats={},
+ capabilities={},
+ heartbeated_at=mock.ANY
+ )
+
+ central.update_service_status.assert_called_once_with(
+ mock_context.return_value, mock_service_status.return_value
+ )
diff --git a/designate/tests/unit/test_service_status.py b/designate/tests/unit/test_service_status.py
deleted file mode 100644
index c7827289..00000000
--- a/designate/tests/unit/test_service_status.py
+++ /dev/null
@@ -1,119 +0,0 @@
-# Copyright 2016 Hewlett Packard Enterprise Development Company LP
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-import mock
-import oslotest.base
-from oslo_config import cfg
-from oslo_service import loopingcall
-
-from designate import objects
-from designate import service_status
-
-
-class NoopEmitterTest(oslotest.base.BaseTestCase):
- def setUp(self):
- super(NoopEmitterTest, self).setUp()
-
- @mock.patch.object(loopingcall, 'FixedIntervalLoopingCall')
- def test_start(self, mock_looping):
- mock_timer = mock.Mock()
- mock_looping.return_value = mock_timer
-
- emitter = service_status.NoopEmitter("svc")
- emitter.start()
-
- mock_timer.start.assert_called_once()
-
- @mock.patch.object(loopingcall, 'FixedIntervalLoopingCall')
- def test_stop(self, mock_looping):
- mock_timer = mock.Mock()
- mock_looping.return_value = mock_timer
-
- emitter = service_status.NoopEmitter("svc")
- emitter.start()
- emitter.stop()
-
- mock_timer.stop.assert_called_once()
-
-
-class RpcEmitterTest(oslotest.base.BaseTestCase):
- def setUp(self):
- super(RpcEmitterTest, self).setUp()
-
- @mock.patch.object(objects, "ServiceStatus")
- @mock.patch("designate.context.DesignateContext.get_admin_context")
- @mock.patch.object(loopingcall, 'FixedIntervalLoopingCall')
- def test_emit_no_status_factory(self, mock_looping, mock_context,
- mock_service_status):
- mock_timer = mock.Mock()
- mock_looping.return_value = mock_timer
-
- emitter = service_status.RpcEmitter("svc")
- emitter.start()
-
- mock_timer.start.assert_called_once()
-
- central = mock.Mock()
- with mock.patch("designate.central.rpcapi.CentralAPI.get_instance",
- return_value=central):
- emitter._emit_heartbeat()
-
- mock_service_status.assert_called_once_with(
- service_name="svc",
- hostname=cfg.CONF.host,
- status=True,
- stats={},
- capabilities={},
- heartbeated_at=mock.ANY
- )
-
- central.update_service_status.assert_called_once_with(
- mock_context.return_value, mock_service_status.return_value
- )
-
- @mock.patch.object(objects, "ServiceStatus")
- @mock.patch("designate.context.DesignateContext.get_admin_context")
- @mock.patch.object(loopingcall, 'FixedIntervalLoopingCall')
- def test_emit_status_factory(self, mock_looping, mock_context,
- mock_service_status):
- mock_timer = mock.Mock()
- mock_looping.return_value = mock_timer
-
- status = False
- stats = {"a": 1}
- capabilities = {"b": 2}
-
- status_factory = mock.Mock(return_value=(status, stats, capabilities,))
- emitter = service_status.RpcEmitter("svc",
- status_factory=status_factory)
- emitter.start()
-
- mock_timer.start.assert_called_once()
-
- central = mock.Mock()
- with mock.patch("designate.central.rpcapi.CentralAPI.get_instance",
- return_value=central):
- emitter._emit_heartbeat()
-
- mock_service_status.assert_called_once_with(
- service_name="svc",
- hostname=cfg.CONF.host,
- status=status,
- stats=stats,
- capabilities=capabilities,
- heartbeated_at=mock.ANY
- )
-
- central.update_service_status.assert_called_once_with(
- mock_context.return_value, mock_service_status.return_value
- )
diff --git a/setup.cfg b/setup.cfg
index 94519a3e..c930da27 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -119,8 +119,8 @@ designate.producer_tasks =
worker_periodic_recovery = designate.producer.tasks:WorkerPeriodicRecovery
designate.heartbeat_emitter =
- noop = designate.service_status:NoopEmitter
- rpc = designate.service_status:RpcEmitter
+ noop = designate.heartbeat_emitter:NoopEmitter
+ rpc = designate.heartbeat_emitter:RpcEmitter
designate.notification.plugin =
default = designate.notifications:Default