summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorZuul <zuul@review.opendev.org>2020-01-30 13:09:25 +0000
committerGerrit Code Review <review@openstack.org>2020-01-30 13:09:26 +0000
commit4b0a872bbebc4c816fd461b77d2fe157fd9607d6 (patch)
tree6f3c6c7fe18179a77bb839ff2a00435942cba79d
parenta5daaa9a0416f33d7d6df15520ea7ec35430458b (diff)
parent136a9f79fafaf94d18b81618943c0f73b38bfdfe (diff)
downloaddesignate-4b0a872bbebc4c816fd461b77d2fe157fd9607d6.tar.gz
Merge "Fixing services getting stuck on shutdown"
-rw-r--r--designate/api/wsgi.py5
-rw-r--r--designate/cmd/agent.py2
-rw-r--r--designate/cmd/api.py2
-rw-r--r--designate/cmd/central.py3
-rw-r--r--designate/cmd/mdns.py2
-rw-r--r--designate/cmd/producer.py2
-rw-r--r--designate/cmd/sink.py2
-rw-r--r--designate/cmd/worker.py2
-rw-r--r--designate/service.py5
-rw-r--r--designate/service_status.py28
-rw-r--r--designate/tests/unit/test_heartbeat.py18
-rw-r--r--designate/tests/unit/test_service_status.py48
12 files changed, 63 insertions, 56 deletions
diff --git a/designate/api/wsgi.py b/designate/api/wsgi.py
index 46b80593..0103c9aa 100644
--- a/designate/api/wsgi.py
+++ b/designate/api/wsgi.py
@@ -15,7 +15,6 @@ import os
from oslo_config import cfg
from oslo_log import log as logging
-from oslo_service import threadgroup
from paste import deploy
from designate import conf
@@ -48,9 +47,7 @@ def init_application():
if not rpc.initialized():
rpc.init(CONF)
- heartbeat = service.Heartbeat(
- 'api', threadgroup.ThreadGroup(thread_pool_size=1)
- )
+ heartbeat = service.Heartbeat('api')
heartbeat.start()
conf = conf_files[0]
diff --git a/designate/cmd/agent.py b/designate/cmd/agent.py
index e96a5d59..edc7f46d 100644
--- a/designate/cmd/agent.py
+++ b/designate/cmd/agent.py
@@ -38,7 +38,7 @@ def main():
hookpoints.log_hook_setup()
server = agent_service.Service()
- heartbeat = service.Heartbeat(server.service_name, server.tg)
+ heartbeat = service.Heartbeat(server.service_name)
service.serve(server, workers=CONF['service:agent'].workers)
heartbeat.start()
service.wait()
diff --git a/designate/cmd/api.py b/designate/cmd/api.py
index 6ac6d558..61b07911 100644
--- a/designate/cmd/api.py
+++ b/designate/cmd/api.py
@@ -40,7 +40,7 @@ def main():
hookpoints.log_hook_setup()
server = api_service.Service()
- heartbeat = service.Heartbeat(server.service_name, server.tg)
+ heartbeat = service.Heartbeat(server.service_name)
service.serve(server, workers=CONF['service:api'].workers)
heartbeat.start()
service.wait()
diff --git a/designate/cmd/central.py b/designate/cmd/central.py
index 80ee8b93..6ae0524a 100644
--- a/designate/cmd/central.py
+++ b/designate/cmd/central.py
@@ -38,8 +38,7 @@ def main():
hookpoints.log_hook_setup()
server = central_service.Service()
- heartbeat = service.Heartbeat(server.service_name, server.tg,
- rpc_api=server)
+ heartbeat = service.Heartbeat(server.service_name, rpc_api=server)
service.serve(server, workers=CONF['service:central'].workers)
heartbeat.start()
service.wait()
diff --git a/designate/cmd/mdns.py b/designate/cmd/mdns.py
index e79586dc..b8993d9c 100644
--- a/designate/cmd/mdns.py
+++ b/designate/cmd/mdns.py
@@ -38,7 +38,7 @@ def main():
hookpoints.log_hook_setup()
server = mdns_service.Service()
- heartbeat = service.Heartbeat(server.service_name, server.tg)
+ heartbeat = service.Heartbeat(server.service_name)
service.serve(server, workers=CONF['service:mdns'].workers)
heartbeat.start()
service.wait()
diff --git a/designate/cmd/producer.py b/designate/cmd/producer.py
index 4bf18732..89808f27 100644
--- a/designate/cmd/producer.py
+++ b/designate/cmd/producer.py
@@ -38,7 +38,7 @@ def main():
hookpoints.log_hook_setup()
server = producer_service.Service()
- heartbeat = service.Heartbeat(server.service_name, server.tg)
+ heartbeat = service.Heartbeat(server.service_name)
service.serve(server, workers=CONF['service:producer'].workers)
heartbeat.start()
service.wait()
diff --git a/designate/cmd/sink.py b/designate/cmd/sink.py
index 7242b198..30725712 100644
--- a/designate/cmd/sink.py
+++ b/designate/cmd/sink.py
@@ -38,7 +38,7 @@ def main():
hookpoints.log_hook_setup()
server = sink_service.Service()
- heartbeat = service.Heartbeat(server.service_name, server.tg)
+ heartbeat = service.Heartbeat(server.service_name)
service.serve(server, workers=CONF['service:sink'].workers)
heartbeat.start()
service.wait()
diff --git a/designate/cmd/worker.py b/designate/cmd/worker.py
index e0046ac2..2e0f07af 100644
--- a/designate/cmd/worker.py
+++ b/designate/cmd/worker.py
@@ -38,7 +38,7 @@ def main():
hookpoints.log_hook_setup()
server = worker_service.Service()
- heartbeat = service.Heartbeat(server.service_name, server.tg)
+ heartbeat = service.Heartbeat(server.service_name)
service.serve(server, workers=CONF['service:worker'].workers)
heartbeat.start()
service.wait()
diff --git a/designate/service.py b/designate/service.py
index 3ad8e9b3..22f92532 100644
--- a/designate/service.py
+++ b/designate/service.py
@@ -69,9 +69,8 @@ class Service(service.Service):
class Heartbeat(object):
- def __init__(self, name, tg, rpc_api=None):
+ def __init__(self, name, rpc_api=None):
self.name = name
- self.tg = tg
self._status = 'UP'
self._stats = {}
@@ -81,7 +80,7 @@ class Heartbeat(object):
CONF.heartbeat_emitter.emitter_type
)
self.heartbeat_emitter = emitter_cls(
- self.name, self.tg,
+ self.name,
status_factory=self.get_status, rpc_api=rpc_api
)
diff --git a/designate/service_status.py b/designate/service_status.py
index b6f0ba24..22d4410e 100644
--- a/designate/service_status.py
+++ b/designate/service_status.py
@@ -14,6 +14,7 @@
import abc
from oslo_log import log as logging
+from oslo_service import loopingcall
from oslo_utils import timeutils
import designate.conf
@@ -31,19 +32,15 @@ class HeartBeatEmitter(plugin.DriverPlugin):
__plugin_ns__ = 'designate.heartbeat_emitter'
__plugin_type__ = 'heartbeat_emitter'
- def __init__(self, service, thread_group, status_factory=None,
- *args, **kwargs):
+ def __init__(self, service, status_factory=None, *args, **kwargs):
super(HeartBeatEmitter, self).__init__()
self._service = service
self._hostname = CONF.host
- self._running = False
- self._tg = thread_group
- self._tg.add_timer(
- CONF.heartbeat_emitter.heartbeat_interval,
- self._emit_heartbeat)
-
+ self._timer = loopingcall.FixedIntervalLoopingCall(
+ self._emit_heartbeat
+ )
self._status_factory = status_factory
def _get_status(self):
@@ -56,9 +53,6 @@ class HeartBeatEmitter(plugin.DriverPlugin):
"""
Returns Status, Stats, Capabilities
"""
- if not self._running:
- return
-
status, stats, capabilities = self._get_status()
service_status = objects.ServiceStatus(
@@ -79,10 +73,13 @@ class HeartBeatEmitter(plugin.DriverPlugin):
pass
def start(self):
- self._running = True
+ self._timer.start(
+ CONF.heartbeat_emitter.heartbeat_interval,
+ stop_on_exception=False
+ )
def stop(self):
- self._running = False
+ self._timer.stop()
class NoopEmitter(HeartBeatEmitter):
@@ -95,9 +92,8 @@ class NoopEmitter(HeartBeatEmitter):
class RpcEmitter(HeartBeatEmitter):
__plugin_name__ = 'rpc'
- def __init__(self, service, thread_group, rpc_api=None, *args, **kwargs):
- super(RpcEmitter, self).__init__(
- service, thread_group, *args, **kwargs)
+ def __init__(self, service, rpc_api=None, *args, **kwargs):
+ super(RpcEmitter, self).__init__(service, *args, **kwargs)
self.rpc_api = rpc_api
def _transmit(self, status):
diff --git a/designate/tests/unit/test_heartbeat.py b/designate/tests/unit/test_heartbeat.py
index fbb01862..a531d5b0 100644
--- a/designate/tests/unit/test_heartbeat.py
+++ b/designate/tests/unit/test_heartbeat.py
@@ -13,6 +13,7 @@ import mock
import oslotest.base
from oslo_config import cfg
from oslo_config import fixture as cfg_fixture
+from oslo_service import loopingcall
from designate import service
@@ -20,14 +21,18 @@ CONF = cfg.CONF
class HeartbeatTest(oslotest.base.BaseTestCase):
- def setUp(self):
+ @mock.patch.object(loopingcall, 'FixedIntervalLoopingCall')
+ def setUp(self, mock_looping):
super(HeartbeatTest, self).setUp()
+
+ self.mock_timer = mock.Mock()
+ mock_looping.return_value = self.mock_timer
+
self.useFixture(cfg_fixture.Config(CONF))
CONF.set_override('emitter_type', 'noop', 'heartbeat_emitter')
- self.mock_tg = mock.Mock()
- self.heartbeat = service.Heartbeat('test', self.mock_tg)
+ self.heartbeat = service.Heartbeat('test')
def test_get_status(self):
self.assertEqual(('UP', {}, {},), self.heartbeat.get_status())
@@ -38,16 +43,13 @@ class HeartbeatTest(oslotest.base.BaseTestCase):
)
def test_start_heartbeat(self):
- self.assertFalse(self.heartbeat.heartbeat_emitter._running)
-
self.heartbeat.start()
- self.assertTrue(self.heartbeat.heartbeat_emitter._running)
+ self.mock_timer.start.assert_called_once()
def test_stop_heartbeat(self):
- self.assertFalse(self.heartbeat.heartbeat_emitter._running)
self.heartbeat.start()
self.heartbeat.stop()
- self.assertFalse(self.heartbeat.heartbeat_emitter._running)
+ self.mock_timer.stop.assert_called_once()
diff --git a/designate/tests/unit/test_service_status.py b/designate/tests/unit/test_service_status.py
index a8fe3880..c7827289 100644
--- a/designate/tests/unit/test_service_status.py
+++ b/designate/tests/unit/test_service_status.py
@@ -14,6 +14,7 @@
import mock
import oslotest.base
from oslo_config import cfg
+from oslo_service import loopingcall
from designate import objects
from designate import service_status
@@ -22,40 +23,46 @@ from designate import service_status
class NoopEmitterTest(oslotest.base.BaseTestCase):
def setUp(self):
super(NoopEmitterTest, self).setUp()
- self.mock_tg = mock.Mock()
- def test_init(self):
- service_status.NoopEmitter("svc", self.mock_tg)
+ @mock.patch.object(loopingcall, 'FixedIntervalLoopingCall')
+ def test_start(self, mock_looping):
+ mock_timer = mock.Mock()
+ mock_looping.return_value = mock_timer
- def test_start(self):
- emitter = service_status.NoopEmitter("svc", self.mock_tg)
+ emitter = service_status.NoopEmitter("svc")
emitter.start()
- self.mock_tg.add_timer.assert_called_once_with(
- 10.0, emitter._emit_heartbeat)
+ mock_timer.start.assert_called_once()
- def test_stop(self):
- mock_pulse = mock.Mock()
- self.mock_tg.add_timer.return_value = mock_pulse
+ @mock.patch.object(loopingcall, 'FixedIntervalLoopingCall')
+ def test_stop(self, mock_looping):
+ mock_timer = mock.Mock()
+ mock_looping.return_value = mock_timer
- emitter = service_status.NoopEmitter("svc", self.mock_tg)
+ emitter = service_status.NoopEmitter("svc")
emitter.start()
emitter.stop()
- self.assertFalse(emitter._running)
+ mock_timer.stop.assert_called_once()
class RpcEmitterTest(oslotest.base.BaseTestCase):
def setUp(self):
super(RpcEmitterTest, self).setUp()
- self.mock_tg = mock.Mock()
@mock.patch.object(objects, "ServiceStatus")
@mock.patch("designate.context.DesignateContext.get_admin_context")
- def test_emit_no_status_factory(self, mock_context, mock_service_status):
- emitter = service_status.RpcEmitter("svc", self.mock_tg)
+ @mock.patch.object(loopingcall, 'FixedIntervalLoopingCall')
+ def test_emit_no_status_factory(self, mock_looping, mock_context,
+ mock_service_status):
+ mock_timer = mock.Mock()
+ mock_looping.return_value = mock_timer
+
+ emitter = service_status.RpcEmitter("svc")
emitter.start()
+ mock_timer.start.assert_called_once()
+
central = mock.Mock()
with mock.patch("designate.central.rpcapi.CentralAPI.get_instance",
return_value=central):
@@ -76,16 +83,23 @@ class RpcEmitterTest(oslotest.base.BaseTestCase):
@mock.patch.object(objects, "ServiceStatus")
@mock.patch("designate.context.DesignateContext.get_admin_context")
- def test_emit_status_factory(self, mock_context, mock_service_status):
+ @mock.patch.object(loopingcall, 'FixedIntervalLoopingCall')
+ def test_emit_status_factory(self, mock_looping, mock_context,
+ mock_service_status):
+ mock_timer = mock.Mock()
+ mock_looping.return_value = mock_timer
+
status = False
stats = {"a": 1}
capabilities = {"b": 2}
status_factory = mock.Mock(return_value=(status, stats, capabilities,))
- emitter = service_status.RpcEmitter("svc", self.mock_tg,
+ emitter = service_status.RpcEmitter("svc",
status_factory=status_factory)
emitter.start()
+ mock_timer.start.assert_called_once()
+
central = mock.Mock()
with mock.patch("designate.central.rpcapi.CentralAPI.get_instance",
return_value=central):