summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorZuul <zuul@review.opendev.org>2019-09-25 18:11:31 +0000
committerGerrit Code Review <review@openstack.org>2019-09-25 18:11:31 +0000
commit4d47719b2638a96f0f2b5f71ddb0e94619fa3980 (patch)
tree37bb2878ff9905b504b9ebb884019d8f640e4773
parent9fbdc779daf7dd23f1cb175197659418b1014f35 (diff)
parenta09064a5d15859703b97d61a1f014681a17799c6 (diff)
downloaddesignate-4d47719b2638a96f0f2b5f71ddb0e94619fa3980.tar.gz
Merge "Refactored service layer"
-rw-r--r--designate/__init__.py1
-rw-r--r--designate/agent/service.py36
-rw-r--r--designate/api/service.py29
-rw-r--r--designate/central/service.py29
-rw-r--r--designate/cmd/agent.py5
-rw-r--r--designate/cmd/api.py6
-rw-r--r--designate/cmd/central.py9
-rw-r--r--designate/cmd/mdns.py6
-rw-r--r--designate/cmd/pool_manager.py9
-rw-r--r--designate/cmd/producer.py6
-rw-r--r--designate/cmd/sink.py5
-rw-r--r--designate/cmd/worker.py6
-rw-r--r--designate/cmd/zone_manager.py8
-rw-r--r--designate/conf/agent.py8
-rw-r--r--designate/conf/api.py8
-rw-r--r--designate/conf/mdns.py8
-rw-r--r--designate/coordination.py45
-rw-r--r--designate/mdns/service.py42
-rw-r--r--designate/pool_manager/service.py60
-rw-r--r--designate/producer/service.py34
-rw-r--r--designate/service.py316
-rw-r--r--designate/service_status.py5
-rw-r--r--designate/sink/service.py13
-rw-r--r--designate/tests/test_api/test_service.py3
-rw-r--r--designate/tests/test_coordination.py44
-rw-r--r--designate/tests/test_mdns/test_service.py26
-rw-r--r--designate/tests/unit/agent/backends/test_bind9.py2
-rw-r--r--designate/tests/unit/agent/backends/test_denominator.py2
-rw-r--r--designate/tests/unit/agent/backends/test_fake.py2
-rw-r--r--designate/tests/unit/agent/test_service.py34
-rw-r--r--designate/tests/unit/mdns/test_service.py53
-rw-r--r--designate/tests/unit/pool_manager/test_service.py2
-rw-r--r--designate/tests/unit/producer/test_service.py51
-rw-r--r--designate/tests/unit/sink/test_service.py8
-rw-r--r--designate/tests/unit/test_heartbeat.py53
-rw-r--r--designate/tests/unit/workers/test_service.py15
-rw-r--r--designate/worker/service.py30
-rw-r--r--etc/designate/designate-config-generator.conf1
-rw-r--r--releasenotes/notes/new-service-layer-8023c242de89075a.yaml18
39 files changed, 564 insertions, 474 deletions
diff --git a/designate/__init__.py b/designate/__init__.py
index 45dfced4..8df65523 100644
--- a/designate/__init__.py
+++ b/designate/__init__.py
@@ -24,7 +24,6 @@ from oslo_concurrency import lockutils
import oslo_messaging as messaging
_EXTRA_DEFAULT_LOG_LEVELS = [
- 'eventlet.wsgi.server=WARN',
'kazoo.client=WARN',
'keystone=INFO',
'oslo_service.loopingcall=WARN',
diff --git a/designate/agent/service.py b/designate/agent/service.py
index 74acaa47..1763712e 100644
--- a/designate/agent/service.py
+++ b/designate/agent/service.py
@@ -37,22 +37,41 @@ from designate.utils import DEFAULT_AGENT_PORT
CONF = cfg.CONF
-class Service(service.DNSService, service.Service):
+class Service(service.Service):
_dns_default_port = DEFAULT_AGENT_PORT
- def __init__(self, threads=None):
- super(Service, self).__init__(threads=threads)
+ def __init__(self):
+ super(Service, self).__init__(
+ self.service_name, threads=cfg.CONF['service:agent'].threads
+ )
+
+ self.dns_service = service.DNSService(
+ self.dns_application, self.tg,
+ cfg.CONF['service:agent'].listen,
+ cfg.CONF['service:agent'].tcp_backlog,
+ cfg.CONF['service:agent'].tcp_recv_timeout,
+ )
backend_driver = cfg.CONF['service:agent'].backend_driver
self.backend = agent_backend.get_backend(backend_driver, self)
+ def start(self):
+ super(Service, self).start()
+ self.dns_service.start()
+ self.backend.start()
+
+ def stop(self, graceful=False):
+ self.dns_service.stop()
+ self.backend.stop()
+ super(Service, self).stop(graceful)
+
@property
def service_name(self):
return 'agent'
@property
@utils.cache_result
- def _dns_application(self):
+ def dns_application(self):
# Create an instance of the RequestHandler class
application = handler.RequestHandler()
if cfg.CONF['service:agent'].notify_delay > 0.0:
@@ -60,12 +79,3 @@ class Service(service.DNSService, service.Service):
application = dnsutils.SerializationMiddleware(application)
return application
-
- def start(self):
- super(Service, self).start()
- self.backend.start()
-
- def stop(self):
- super(Service, self).stop()
- # TODO(kiall): Shouldn't we be stppping the backend here too? To fix
- # in another review.
diff --git a/designate/api/service.py b/designate/api/service.py
index ca04b088..224145eb 100644
--- a/designate/api/service.py
+++ b/designate/api/service.py
@@ -18,41 +18,32 @@ from oslo_log import log as logging
from paste import deploy
from designate import exceptions
-from designate import utils
from designate import service
-from designate import service_status
-
+from designate import utils
LOG = logging.getLogger(__name__)
-class Service(service.WSGIService, service.Service):
- def __init__(self, threads=None):
- super(Service, self).__init__(threads=threads)
-
- emitter_cls = service_status.HeartBeatEmitter.get_driver(
- cfg.CONF.heartbeat_emitter.emitter_type
- )
- self.heartbeat_emitter = emitter_cls(
- self.service_name, self.tg, status_factory=self._get_status
+class Service(service.WSGIService):
+ def __init__(self):
+ super(Service, self).__init__(
+ self.wsgi_application,
+ self.service_name,
+ cfg.CONF['service:api'].listen,
)
def start(self):
super(Service, self).start()
- self.heartbeat_emitter.start()
- def _get_status(self):
- status = "UP"
- stats = {}
- capabilities = {}
- return status, stats, capabilities
+ def stop(self, graceful=True):
+ super(Service, self).stop(graceful)
@property
def service_name(self):
return 'api'
@property
- def _wsgi_application(self):
+ def wsgi_application(self):
api_paste_config = cfg.CONF['service:api'].api_paste_config
config_paths = utils.find_config(api_paste_config)
diff --git a/designate/central/service.py b/designate/central/service.py
index 169f7fbe..3843de88 100644
--- a/designate/central/service.py
+++ b/designate/central/service.py
@@ -184,37 +184,40 @@ def notification(notification_type):
return outer
-class Service(service.RPCService, service.Service):
+class Service(service.RPCService):
RPC_API_VERSION = '6.2'
target = messaging.Target(version=RPC_API_VERSION)
- def __init__(self, threads=None):
- super(Service, self).__init__(threads=threads)
+ def __init__(self):
+ self._scheduler = None
+ self._storage = None
+ self._quota = None
- self.network_api = network_api.get_network_api(cfg.CONF.network_api)
+ super(Service, self).__init__(
+ self.service_name, cfg.CONF['service:central'].topic,
+ threads=cfg.CONF['service:central'].threads,
+ )
- # update_service_status needs is called by the emitter so we pass
- # ourselves as the rpc_api.
- self.heartbeat_emitter.rpc_api = self
+ self.network_api = network_api.get_network_api(cfg.CONF.network_api)
@property
def scheduler(self):
- if not hasattr(self, '_scheduler'):
+ if not self._scheduler:
# Get a scheduler instance
self._scheduler = scheduler.get_scheduler(storage=self.storage)
return self._scheduler
@property
def quota(self):
- if not hasattr(self, '_quota'):
+ if not self._quota:
# Get a quota manager instance
self._quota = quota.get_quota()
return self._quota
@property
def storage(self):
- if not hasattr(self, '_storage'):
+ if not self._storage:
# Get a storage connection
storage_driver = cfg.CONF['service:central'].storage_driver
self._storage = storage.get_storage(storage_driver)
@@ -232,8 +235,8 @@ class Service(service.RPCService, service.Service):
super(Service, self).start()
- def stop(self):
- super(Service, self).stop()
+ def stop(self, graceful=True):
+ super(Service, self).stop(graceful)
@property
def mdns_api(self):
@@ -251,7 +254,7 @@ class Service(service.RPCService, service.Service):
def zone_api(self):
# TODO(timsim): Remove this when pool_manager_api is gone
if cfg.CONF['service:worker'].enabled:
- return self.worker_api
+ return self.worker_api
return self.pool_manager_api
def _is_valid_zone_name(self, context, zone_name):
diff --git a/designate/cmd/agent.py b/designate/cmd/agent.py
index ffe0c775..e96a5d59 100644
--- a/designate/cmd/agent.py
+++ b/designate/cmd/agent.py
@@ -28,7 +28,6 @@ from designate.agent import service as agent_service
CONF = designate.conf.CONF
CONF.import_opt('workers', 'designate.agent', group='service:agent')
-CONF.import_opt('threads', 'designate.agent', group='service:agent')
def main():
@@ -38,6 +37,8 @@ def main():
hookpoints.log_hook_setup()
- server = agent_service.Service(threads=CONF['service:agent'].threads)
+ server = agent_service.Service()
+ heartbeat = service.Heartbeat(server.service_name, server.tg)
service.serve(server, workers=CONF['service:agent'].workers)
+ heartbeat.start()
service.wait()
diff --git a/designate/cmd/api.py b/designate/cmd/api.py
index a4e83251..6ac6d558 100644
--- a/designate/cmd/api.py
+++ b/designate/cmd/api.py
@@ -29,7 +29,6 @@ from designate.api import service as api_service
CONF = designate.conf.CONF
CONF.import_opt('workers', 'designate.api', group='service:api')
-CONF.import_opt('threads', 'designate.api', group='service:api')
cfg.CONF.import_group('keystone_authtoken', 'keystonemiddleware.auth_token')
@@ -40,7 +39,8 @@ def main():
hookpoints.log_hook_setup()
- server = api_service.Service(threads=CONF['service:api'].threads)
+ server = api_service.Service()
+ heartbeat = service.Heartbeat(server.service_name, server.tg)
service.serve(server, workers=CONF['service:api'].workers)
- server.heartbeat_emitter.start()
+ heartbeat.start()
service.wait()
diff --git a/designate/cmd/central.py b/designate/cmd/central.py
index 5364e354..80ee8b93 100644
--- a/designate/cmd/central.py
+++ b/designate/cmd/central.py
@@ -23,12 +23,11 @@ from designate import hookpoints
from designate import service
from designate import utils
from designate import version
-from designate.central import service as central
+from designate.central import service as central_service
CONF = designate.conf.CONF
CONF.import_opt('workers', 'designate.central', group='service:central')
-CONF.import_opt('threads', 'designate.central', group='service:central')
def main():
@@ -38,7 +37,9 @@ def main():
hookpoints.log_hook_setup()
- server = central.Service(threads=CONF['service:central'].threads)
+ server = central_service.Service()
+ heartbeat = service.Heartbeat(server.service_name, server.tg,
+ rpc_api=server)
service.serve(server, workers=CONF['service:central'].workers)
- server.heartbeat_emitter.start()
+ heartbeat.start()
service.wait()
diff --git a/designate/cmd/mdns.py b/designate/cmd/mdns.py
index 66dd9870..e79586dc 100644
--- a/designate/cmd/mdns.py
+++ b/designate/cmd/mdns.py
@@ -28,7 +28,6 @@ from designate.mdns import service as mdns_service
CONF = designate.conf.CONF
CONF.import_opt('workers', 'designate.mdns', group='service:mdns')
-CONF.import_opt('threads', 'designate.mdns', group='service:mdns')
def main():
@@ -38,7 +37,8 @@ def main():
hookpoints.log_hook_setup()
- server = mdns_service.Service(threads=CONF['service:mdns'].threads)
+ server = mdns_service.Service()
+ heartbeat = service.Heartbeat(server.service_name, server.tg)
service.serve(server, workers=CONF['service:mdns'].workers)
- server.heartbeat_emitter.start()
+ heartbeat.start()
service.wait()
diff --git a/designate/cmd/pool_manager.py b/designate/cmd/pool_manager.py
index 686541ba..013af497 100644
--- a/designate/cmd/pool_manager.py
+++ b/designate/cmd/pool_manager.py
@@ -30,8 +30,6 @@ LOG = logging.getLogger(__name__)
CONF = designate.conf.CONF
CONF.import_opt('workers', 'designate.pool_manager',
group='service:pool_manager')
-CONF.import_opt('threads', 'designate.pool_manager',
- group='service:pool_manager')
def main():
@@ -53,12 +51,11 @@ def main():
'designate-worker', version='newton',
removal_version='rocky')
- server = pool_manager_service.Service(
- threads=CONF['service:pool_manager'].threads
- )
+ server = pool_manager_service.Service()
+ heartbeat = service.Heartbeat(server.service_name, server.tg)
hookpoints.log_hook_setup()
service.serve(server, workers=CONF['service:pool_manager'].workers)
- server.heartbeat_emitter.start()
+ heartbeat.start()
service.wait()
diff --git a/designate/cmd/producer.py b/designate/cmd/producer.py
index 9b16a735..bd7bbe2b 100644
--- a/designate/cmd/producer.py
+++ b/designate/cmd/producer.py
@@ -28,7 +28,6 @@ from designate.producer import service as producer_service
LOG = logging.getLogger(__name__)
CONF = designate.conf.CONF
CONF.import_opt('workers', 'designate.producer', group='service:producer')
-CONF.import_opt('threads', 'designate.producer', group='service:producer')
def main():
@@ -46,7 +45,8 @@ def main():
hookpoints.log_hook_setup()
- server = producer_service.Service(threads=CONF['service:producer'].threads)
+ server = producer_service.Service()
+ heartbeat = service.Heartbeat(server.service_name, server.tg)
service.serve(server, workers=CONF['service:producer'].workers)
- server.heartbeat_emitter.start()
+ heartbeat.start()
service.wait()
diff --git a/designate/cmd/sink.py b/designate/cmd/sink.py
index 4081e146..7242b198 100644
--- a/designate/cmd/sink.py
+++ b/designate/cmd/sink.py
@@ -28,7 +28,6 @@ from designate.sink import service as sink_service
CONF = designate.conf.CONF
CONF.import_opt('workers', 'designate.sink', group='service:sink')
-CONF.import_opt('threads', 'designate.sink', group='service:sink')
def main():
@@ -38,6 +37,8 @@ def main():
hookpoints.log_hook_setup()
- server = sink_service.Service(threads=CONF['service:sink'].threads)
+ server = sink_service.Service()
+ heartbeat = service.Heartbeat(server.service_name, server.tg)
service.serve(server, workers=CONF['service:sink'].workers)
+ heartbeat.start()
service.wait()
diff --git a/designate/cmd/worker.py b/designate/cmd/worker.py
index 48a23938..0560a946 100644
--- a/designate/cmd/worker.py
+++ b/designate/cmd/worker.py
@@ -28,7 +28,6 @@ from designate.worker import service as worker_service
LOG = logging.getLogger(__name__)
CONF = designate.conf.CONF
CONF.import_opt('workers', 'designate.worker', group='service:worker')
-CONF.import_opt('threads', 'designate.worker', group='service:worker')
def main():
@@ -46,7 +45,8 @@ def main():
hookpoints.log_hook_setup()
- server = worker_service.Service(threads=CONF['service:worker'].threads)
+ server = worker_service.Service()
+ heartbeat = service.Heartbeat(server.service_name, server.tg)
service.serve(server, workers=CONF['service:worker'].workers)
- server.heartbeat_emitter.start()
+ heartbeat.start()
service.wait()
diff --git a/designate/cmd/zone_manager.py b/designate/cmd/zone_manager.py
index 327cd6d9..d09433e4 100644
--- a/designate/cmd/zone_manager.py
+++ b/designate/cmd/zone_manager.py
@@ -30,8 +30,6 @@ LOG = logging.getLogger(__name__)
CONF = designate.conf.CONF
CONF.import_opt('workers', 'designate.producer',
group='service:zone_manager')
-CONF.import_opt('threads', 'designate.producer',
- group='service:zone_manager')
def main():
@@ -56,8 +54,8 @@ def main():
LOG.warning('Starting designate-producer under the zone-manager name')
- server = producer_service.Service(
- threads=CONF['service:zone_manager'].threads)
+ server = producer_service.Service()
+ heartbeat = service.Heartbeat(server.service_name, server.tg)
service.serve(server, workers=CONF['service:zone_manager'].workers)
- server.heartbeat_emitter.start()
+ heartbeat.start()
service.wait()
diff --git a/designate/conf/agent.py b/designate/conf/agent.py
index ffca416e..672496fc 100644
--- a/designate/conf/agent.py
+++ b/designate/conf/agent.py
@@ -27,14 +27,6 @@ AGENT_OPTS = [
help='Number of agent worker processes to spawn'),
cfg.IntOpt('threads', default=1000,
help='Number of agent greenthreads to spawn'),
- cfg.IPOpt('host',
- deprecated_for_removal=True,
- deprecated_reason="Replaced by 'listen' option",
- help='Agent Bind Host'),
- cfg.PortOpt('port',
- deprecated_for_removal=True,
- deprecated_reason="Replaced by 'listen' option",
- help='Agent Port Number'),
cfg.ListOpt('listen',
default=['0.0.0.0:%d' % DEFAULT_AGENT_PORT],
help='Agent host:port pairs to listen on'),
diff --git a/designate/conf/api.py b/designate/conf/api.py
index cb8dd02c..d9c40c26 100644
--- a/designate/conf/api.py
+++ b/designate/conf/api.py
@@ -33,14 +33,6 @@ API_OPTS = [
'the hostname, port, and any paths that are added'
'to the base of Designate is URLs,'
'For example http://dns.openstack.example.com/dns'),
- cfg.IPOpt('api_host',
- deprecated_for_removal=True,
- deprecated_reason="Replaced by 'listen' option",
- help='API Bind Host'),
- cfg.PortOpt('api_port',
- deprecated_for_removal=True,
- deprecated_reason="Replaced by 'listen' option",
- help='API Port Number'),
cfg.ListOpt('listen',
default=['0.0.0.0:9001'],
help='API host:port pairs to listen on'),
diff --git a/designate/conf/mdns.py b/designate/conf/mdns.py
index 0d481fd7..c7d941b5 100644
--- a/designate/conf/mdns.py
+++ b/designate/conf/mdns.py
@@ -26,14 +26,6 @@ MDNS_OPTS = [
help='Number of mdns worker processes to spawn'),
cfg.IntOpt('threads', default=1000,
help='Number of mdns greenthreads to spawn'),
- cfg.IPOpt('host',
- deprecated_for_removal=True,
- deprecated_reason="Replaced by 'listen' option",
- help='mDNS Bind Host'),
- cfg.PortOpt('port',
- deprecated_for_removal=True,
- deprecated_reason="Replaced by 'listen' option",
- help='mDNS Port Number'),
cfg.ListOpt('listen',
default=['0.0.0.0:%d' % DEFAULT_MDNS_PORT],
help='mDNS host:port pairs to listen on'),
diff --git a/designate/coordination.py b/designate/coordination.py
index f107d1da..bbc1556f 100644
--- a/designate/coordination.py
+++ b/designate/coordination.py
@@ -35,21 +35,31 @@ def _retry_if_tooz_error(exception):
return isinstance(exception, tooz.coordination.ToozError)
-class CoordinationMixin(object):
- def __init__(self, *args, **kwargs):
- super(CoordinationMixin, self).__init__(*args, **kwargs)
-
+class Coordination(object):
+ def __init__(self, name, tg):
+ self.name = name
+ self.tg = tg
+ self.coordination_id = None
self._coordinator = None
+ self._started = False
+
+ @property
+ def coordinator(self):
+ return self._coordinator
+
+ @property
+ def started(self):
+ return self._started
def start(self):
- self._coordination_id = ":".join([CONF.host, generate_uuid()])
+ self.coordination_id = ":".join([CONF.host, generate_uuid()])
if CONF.coordination.backend_url is not None:
backend_url = CONF.coordination.backend_url
self._coordinator = tooz.coordination.get_coordinator(
- backend_url, self._coordination_id.encode())
- self._coordination_started = False
+ backend_url, self.coordination_id.encode())
+ self._started = False
self.tg.add_timer(CONF.coordination.heartbeat_interval,
self._coordinator_heartbeat)
@@ -61,25 +71,22 @@ class CoordinationMixin(object):
"coordination functionality will be disabled. "
"Please configure a coordination backend.")
- super(CoordinationMixin, self).start()
-
if self._coordinator is not None:
- while not self._coordination_started:
+ while not self._started:
try:
self._coordinator.start()
try:
create_group_req = self._coordinator.create_group(
- self.service_name)
+ self.name)
create_group_req.get()
except tooz.coordination.GroupAlreadyExist:
pass
- join_group_req = self._coordinator.join_group(
- self.service_name)
+ join_group_req = self._coordinator.join_group(self.name)
join_group_req.get()
- self._coordination_started = True
+ self._started = True
except Exception:
LOG.warning("Failed to start Coordinator:", exc_info=True)
@@ -87,18 +94,16 @@ class CoordinationMixin(object):
def stop(self):
if self._coordinator is not None:
- self._coordination_started = False
+ self._started = False
- leave_group_req = self._coordinator.leave_group(self.service_name)
+ leave_group_req = self._coordinator.leave_group(self.name)
leave_group_req.get()
self._coordinator.stop()
- super(CoordinationMixin, self).stop()
-
self._coordinator = None
def _coordinator_heartbeat(self):
- if not self._coordination_started:
+ if not self._started:
return
try:
@@ -107,7 +112,7 @@ class CoordinationMixin(object):
LOG.exception('Error sending a heartbeat to coordination backend.')
def _coordinator_run_watchers(self):
- if not self._coordination_started:
+ if not self._started:
return
self._coordinator.run_watchers()
diff --git a/designate/mdns/service.py b/designate/mdns/service.py
index 75c24682..324d354a 100644
--- a/designate/mdns/service.py
+++ b/designate/mdns/service.py
@@ -16,10 +16,10 @@
from oslo_config import cfg
from oslo_log import log as logging
-from designate import utils
+from designate import dnsutils
from designate import service
from designate import storage
-from designate import dnsutils
+from designate import utils
from designate.mdns import handler
from designate.mdns import notify
from designate.mdns import xfr
@@ -29,13 +29,38 @@ LOG = logging.getLogger(__name__)
CONF = cfg.CONF
-class Service(service.DNSService, service.RPCService, service.Service):
+class Service(service.RPCService):
_dns_default_port = DEFAULT_MDNS_PORT
+ def __init__(self):
+ self._storage = None
+
+ super(Service, self).__init__(
+ self.service_name, cfg.CONF['service:mdns'].topic,
+ threads=cfg.CONF['service:mdns'].threads,
+ )
+ self.override_endpoints(
+ [notify.NotifyEndpoint(self.tg), xfr.XfrEndpoint(self.tg)]
+ )
+
+ self.dns_service = service.DNSService(
+ self.dns_application, self.tg,
+ cfg.CONF['service:mdns'].listen,
+ cfg.CONF['service:mdns'].tcp_backlog,
+ cfg.CONF['service:mdns'].tcp_recv_timeout,
+ )
+
+ def start(self):
+ super(Service, self).start()
+ self.dns_service.start()
+
+ def stop(self, graceful=False):
+ self.dns_service.stop()
+ super(Service, self).stop(graceful)
+
@property
def storage(self):
- if not hasattr(self, '_storage'):
- # Get a storage connection
+ if not self._storage:
self._storage = storage.get_storage(
CONF['service:mdns'].storage_driver
)
@@ -47,12 +72,7 @@ class Service(service.DNSService, service.RPCService, service.Service):
@property
@utils.cache_result
- def _rpc_endpoints(self):
- return [notify.NotifyEndpoint(self.tg), xfr.XfrEndpoint(self.tg)]
-
- @property
- @utils.cache_result
- def _dns_application(self):
+ def dns_application(self):
# Create an instance of the RequestHandler class and wrap with
# necessary middleware.
application = handler.RequestHandler(self.storage, self.tg)
diff --git a/designate/pool_manager/service.py b/designate/pool_manager/service.py
index fe854ddd..da32e969 100644
--- a/designate/pool_manager/service.py
+++ b/designate/pool_manager/service.py
@@ -76,8 +76,7 @@ def _constant_retries(num_attempts, sleep_interval):
yield True
-class Service(service.RPCService, coordination.CoordinationMixin,
- service.Service):
+class Service(service.RPCService):
"""
Service side of the Pool Manager RPC API.
@@ -91,8 +90,30 @@ class Service(service.RPCService, coordination.CoordinationMixin,
target = messaging.Target(version=RPC_API_VERSION)
- def __init__(self, threads=None):
- super(Service, self).__init__(threads=threads)
+ def __init__(self):
+ self._scheduler = None
+ self._storage = None
+ self._quota = None
+
+ self._pool_election = None
+
+ self._central_api = None
+ self._mdns_api = None
+ self._pool_manager_api = None
+
+ topic = '%s.%s' % (
+ cfg.CONF['service:pool_manager'].topic,
+ CONF['service:pool_manager'].pool_id
+ )
+
+ super(Service, self).__init__(
+ self.service_name, topic,
+ threads=cfg.CONF['service:worker'].threads,
+ )
+
+ self.coordination = coordination.Coordination(
+ self.service_name, self.tg
+ )
# Get a pool manager cache connection.
self.cache = cache.get_pool_manager_cache(
@@ -110,8 +131,9 @@ class Service(service.RPCService, coordination.CoordinationMixin,
CONF['service:pool_manager'].periodic_sync_retry_interval
# Compute a time (seconds) by which things should have propagated
- self.max_prop_time = utils.max_prop_time(self.timeout,
- self.max_retries, self.retry_interval, self.delay)
+ self.max_prop_time = utils.max_prop_time(
+ self.timeout, self.max_retries, self.retry_interval, self.delay
+ )
def _setup_target_backends(self):
self.target_backends = {}
@@ -130,17 +152,6 @@ class Service(service.RPCService, coordination.CoordinationMixin,
def service_name(self):
return 'pool_manager'
- @property
- def _rpc_topic(self):
- # Modify the default topic so it's pool manager instance specific.
- topic = super(Service, self)._rpc_topic
-
- topic = '%s.%s' % (topic, CONF['service:pool_manager'].pool_id)
- LOG.info('Using topic %(topic)s for this pool manager instance.',
- {'topic': topic})
-
- return topic
-
def start(self):
# Build the Pool (and related) Object from Config
context = DesignateContext.get_admin_context()
@@ -182,11 +193,13 @@ class Service(service.RPCService, coordination.CoordinationMixin,
self.target_backends[target.id].start()
super(Service, self).start()
+ self.coordination.start()
# Setup a Leader Election, use for ensuring certain tasks are executed
# on exactly one pool-manager instance at a time]
self._pool_election = coordination.LeaderElection(
- self._coordinator, '%s:%s' % (self.service_name, self.pool.id))
+ self.coordination.coordinator,
+ '%s:%s' % (self.service_name, self.pool.id))
self._pool_election.start()
if CONF['service:pool_manager'].enable_recovery_timer:
@@ -201,29 +214,30 @@ class Service(service.RPCService, coordination.CoordinationMixin,
' %(interval)s s', {'interval': interval})
self.tg.add_timer(interval, self.periodic_sync, interval)
- def stop(self):
+ def stop(self, graceful=True):
self._pool_election.stop()
+ # self.coordination.stop()
- super(Service, self).stop()
+ super(Service, self).stop(graceful)
for target in self.pool.targets:
self.target_backends[target.id].stop()
@property
def central_api(self):
- if not hasattr(self, '_central_api'):
+ if not self._central_api:
self._central_api = central_api.CentralAPI.get_instance()
return self._central_api
@property
def mdns_api(self):
- if not hasattr(self, '_mdns_adpi'):
+ if not self._mdns_api:
self._mdns_api = mdns_api.MdnsAPI.get_instance()
return self._mdns_api
@property
def pool_manager_api(self):
- if not hasattr(self, '_pool_manager_api'):
+ if not self._pool_manager_api:
pool_mgr_api = pool_manager_rpcapi.PoolManagerAPI
self._pool_manager_api = pool_mgr_api.get_instance()
return self._pool_manager_api
diff --git a/designate/producer/service.py b/designate/producer/service.py
index b9c196ee..51af6882 100644
--- a/designate/producer/service.py
+++ b/designate/producer/service.py
@@ -31,15 +31,29 @@ CONF = cfg.CONF
NS = 'designate.periodic_tasks'
-class Service(service.RPCService, coordination.CoordinationMixin,
- service.Service):
+class Service(service.RPCService):
RPC_API_VERSION = '1.0'
target = messaging.Target(version=RPC_API_VERSION)
+ def __init__(self):
+ self._partitioner = None
+
+ self._storage = None
+ self._quota = None
+
+ super(Service, self).__init__(
+ self.service_name, cfg.CONF['service:producer'].topic,
+ threads=cfg.CONF['service:producer'].threads,
+ )
+
+ self.coordination = coordination.Coordination(
+ self.service_name, self.tg
+ )
+
@property
def storage(self):
- if not hasattr(self, '_storage'):
+ if not self._storage:
# TODO(timsim): Remove this when zone_mgr goes away
storage_driver = cfg.CONF['service:zone_manager'].storage_driver
if cfg.CONF['service:producer'].storage_driver != storage_driver:
@@ -49,7 +63,7 @@ class Service(service.RPCService, coordination.CoordinationMixin,
@property
def quota(self):
- if not hasattr(self, '_quota'):
+ if not self._quota:
# Get a quota manager instance
self._quota = quota.get_quota()
return self._quota
@@ -64,10 +78,12 @@ class Service(service.RPCService, coordination.CoordinationMixin,
def start(self):
super(Service, self).start()
+ self.coordination.start()
self._partitioner = coordination.Partitioner(
- self._coordinator, self.service_name,
- self._coordination_id.encode(), range(0, 4095))
+ self.coordination.coordinator, self.service_name,
+ self.coordination.coordination_id.encode(), range(0, 4095)
+ )
self._partitioner.start()
self._partitioner.watch_partition_change(self._rebalance)
@@ -76,7 +92,7 @@ class Service(service.RPCService, coordination.CoordinationMixin,
zmgr_enabled_tasks = CONF['service:zone_manager'].enabled_tasks
producer_enabled_tasks = CONF['service:producer'].enabled_tasks
enabled = zmgr_enabled_tasks
- if producer_enabled_tasks != []:
+ if producer_enabled_tasks:
enabled = producer_enabled_tasks
for task in tasks.PeriodicTask.get_extensions(enabled):
@@ -91,6 +107,10 @@ class Service(service.RPCService, coordination.CoordinationMixin,
interval = CONF[task.get_canonical_name()].interval
self.tg.add_timer(interval, task)
+ def stop(self, graceful=True):
+ super(Service, self).stop(graceful)
+ self.coordination.stop()
+
def _rebalance(self, my_partitions, members, event):
LOG.info("Received rebalance event %s", event)
self.partition_range = my_partitions
diff --git a/designate/service.py b/designate/service.py
index 0b499799..e4e590b5 100644
--- a/designate/service.py
+++ b/designate/service.py
@@ -17,241 +17,161 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
-import abc
+import errno
import socket
import struct
-import errno
-import six
-import eventlet.wsgi
import eventlet.debug
-from oslo_config import cfg
-import oslo_messaging as messaging
from oslo_log import log as logging
+import oslo_messaging as messaging
from oslo_service import service
from oslo_service import sslutils
+from oslo_service import wsgi
from oslo_utils import netutils
-import designate.conf
-from designate.i18n import _
-from designate.metrics import metrics
from designate import policy
from designate import rpc
from designate import service_status
-from designate import version
from designate import utils
-
-
-# TODO(kiall): These options have been cut+paste from the old WSGI code, and
-# should be moved into service:api etc..
-
+from designate import version
+import designate.conf
+from designate.i18n import _
+from designate.metrics import metrics
CONF = designate.conf.CONF
LOG = logging.getLogger(__name__)
-@six.add_metaclass(abc.ABCMeta)
class Service(service.Service):
- """
- Service class to be shared among the diverse service inside of Designate.
- """
- def __init__(self, threads=None):
+ def __init__(self, name, threads=None):
threads = threads or 1000
-
super(Service, self).__init__(threads)
-
- self._host = CONF.host
- self._service_config = CONF['service:%s' % self.service_name]
+ self.name = name
+ self.host = CONF.host
policy.init()
- # NOTE(kiall): All services need RPC initialized, as this is used
- # for clients AND servers. Hence, this is common to
- # all Designate services.
if not rpc.initialized():
rpc.init(CONF)
- @abc.abstractproperty
- def service_name(self):
- pass
-
def start(self):
- super(Service, self).start()
-
LOG.info('Starting %(name)s service (version: %(version)s)',
{
- 'name': self.service_name,
+ 'name': self.name,
'version': version.version_info.version_string()
})
+ super(Service, self).start()
- def stop(self):
- LOG.info('Stopping %(name)s service', {'name': self.service_name})
-
- super(Service, self).stop()
-
- def _get_listen_on_addresses(self, default_port):
- """
- Helper Method to handle migration from singular host/port to
- multiple binds
- """
- try:
- # The API service uses "api_host", and "api_port", others use
- # just host and port.
- host = self._service_config.api_host
- port = self._service_config.api_port
-
- except cfg.NoSuchOptError:
- host = self._service_config.host
- port = self._service_config.port
-
- if host or port is not None:
- LOG.warning("host and port config options used, the 'listen' "
- "option has been ignored")
-
- host = host or "0.0.0.0"
- # "port" might be 0 to pick a free port, usually during testing
- port = default_port if port is None else port
-
- return [(host, port)]
-
- else:
-
- return map(
- netutils.parse_host_port,
- set(self._service_config.listen)
- )
+ def stop(self, graceful=True):
+ LOG.info('Stopping %(name)s service', {'name': self.name})
+ super(Service, self).stop(graceful)
-class RPCService(object):
- """
- RPC Service mixin used by all Designate RPC Services
- """
- def __init__(self, *args, **kwargs):
- super(RPCService, self).__init__(*args, **kwargs)
+class Heartbeat(object):
+ def __init__(self, name, tg, rpc_api=None):
+ self.name = name
+ self.tg = tg
- LOG.debug("Creating RPC Server on topic '%s'", self._rpc_topic)
- self._rpc_server = rpc.get_server(
- messaging.Target(topic=self._rpc_topic, server=self._host),
- self._rpc_endpoints)
+ self._status = 'UP'
+ self._stats = {}
+ self._capabilities = {}
emitter_cls = service_status.HeartBeatEmitter.get_driver(
CONF.heartbeat_emitter.emitter_type
)
self.heartbeat_emitter = emitter_cls(
- self.service_name, self.tg, status_factory=self._get_status
+ self.name, self.tg,
+ status_factory=self.get_status, rpc_api=rpc_api
)
- def _get_status(self):
- status = "UP"
- stats = {}
- capabilities = {}
- return status, stats, capabilities
-
- @property
- def _rpc_endpoints(self):
- return [self]
-
- @property
- def _rpc_topic(self):
- return CONF['service:%s' % self.service_name].topic
+ def get_status(self):
+ return self._status, self._stats, self._capabilities
def start(self):
- super(RPCService, self).start()
-
- LOG.debug("Starting RPC server on topic '%s'", self._rpc_topic)
- self._rpc_server.start()
-
- # TODO(kiall): This probably belongs somewhere else, maybe the base
- # Service class?
- self.notifier = rpc.get_notifier(self.service_name)
-
- for e in self._rpc_endpoints:
- if e != self and hasattr(e, 'start'):
- e.start()
-
self.heartbeat_emitter.start()
def stop(self):
- LOG.debug("Stopping RPC server on topic '%s'", self._rpc_topic)
self.heartbeat_emitter.stop()
- for e in self._rpc_endpoints:
- if e != self and hasattr(e, 'stop'):
- e.stop()
- # Try to shut the connection down, but if we get any sort of
- # errors, go ahead and ignore them.. as we're shutting down anyway
- try:
- self._rpc_server.stop()
- except Exception:
- pass
+class RPCService(Service):
+ def __init__(self, name, rpc_topic, threads=None):
+ super(RPCService, self).__init__(name, threads)
+ LOG.debug("Creating RPC Server on topic '%s' for %s",
+ rpc_topic, self.name)
- super(RPCService, self).stop()
+ self.endpoints = [self]
+ self.notifier = None
+ self.rpc_server = None
+ self.rpc_topic = rpc_topic
- def wait(self):
- for e in self._rpc_endpoints:
- if e != self and hasattr(e, 'wait'):
- e.wait()
+ def override_endpoints(self, endpoints):
+ self.endpoints = endpoints
- super(RPCService, self).wait()
-
-
-@six.add_metaclass(abc.ABCMeta)
-class WSGIService(object):
- """
- WSGI Service mixin used by all Designate WSGI Services
- """
- def __init__(self, *args, **kwargs):
- super(WSGIService, self).__init__(*args, **kwargs)
+ def start(self):
+ super(RPCService, self).start()
+ target = messaging.Target(topic=self.rpc_topic, server=self.host)
+ self.rpc_server = rpc.get_server(target, self.endpoints)
+ self.rpc_server.start()
+ self.notifier = rpc.get_notifier(self.name)
- self._use_ssl = sslutils.is_enabled(CONF)
- self._wsgi_socks = []
+ def stop(self, graceful=True):
+ if self.rpc_server:
+ self.rpc_server.stop()
+ super(RPCService, self).stop(graceful)
- @abc.abstractproperty
- def _wsgi_application(self):
- pass
+ def wait(self):
+ super(RPCService, self).wait()
- def start(self):
- super(WSGIService, self).start()
- addresses = self._get_listen_on_addresses(9001)
+class WSGIService(Service):
+ def __init__(self, app, name, listen, max_url_len=None):
+ super(WSGIService, self).__init__(name)
+ self.app = app
+ self.name = name
- for address in addresses:
- self._start(address[0], address[1])
+ self.listen = listen
- def _start(self, host, port):
- wsgi_sock = utils.bind_tcp(
- host, port, CONF.backlog, CONF.tcp_keepidle)
+ self.servers = []
- if self._use_ssl:
- wsgi_sock = sslutils.wrap(CONF, wsgi_sock)
+ for address in self.listen:
+ host, port = netutils.parse_host_port(address)
+ server = wsgi.Server(
+ CONF, name, app,
+ host=host,
+ port=port,
+ pool_size=CONF['service:api'].threads,
+ use_ssl=sslutils.is_enabled(CONF),
+ max_url_len=max_url_len
+ )
- self._wsgi_socks.append(wsgi_sock)
+ self.servers.append(server)
- self.tg.add_thread(self._wsgi_handle, wsgi_sock)
+ def start(self):
+ for server in self.servers:
+ server.start()
+ super(WSGIService, self).start()
- def _wsgi_handle(self, wsgi_sock):
- logger = logging.getLogger('eventlet.wsgi')
- # Adjust wsgi MAX_HEADER_LINE to accept large tokens.
- eventlet.wsgi.MAX_HEADER_LINE = self._service_config.max_header_line
+ def stop(self, graceful=True):
+ for server in self.servers:
+ server.stop()
+ super(WSGIService, self).stop(graceful)
- eventlet.wsgi.server(wsgi_sock,
- self._wsgi_application,
- custom_pool=self.tg.pool,
- log=logger)
+ def wait(self):
+ for server in self.servers:
+ server.wait()
+ super(WSGIService, self).wait()
-@six.add_metaclass(abc.ABCMeta)
class DNSService(object):
- """
- DNS Service mixin used by all Designate DNS Services
- """
-
_TCP_RECV_MAX_SIZE = 65535
- def __init__(self, *args, **kwargs):
- super(DNSService, self).__init__(*args, **kwargs)
-
+ def __init__(self, app, tg, listen, tcp_backlog, tcp_recv_timeout):
+ self.app = app
+ self.tg = tg
+ self.tcp_backlog = tcp_backlog
+ self.tcp_recv_timeout = tcp_recv_timeout
+ self.listen = listen
metrics.init()
# Eventet will complain loudly about our use of multiple greentheads
@@ -261,21 +181,18 @@ class DNSService(object):
self._dns_socks_tcp = []
self._dns_socks_udp = []
- @abc.abstractproperty
- def _dns_application(self):
- pass
-
def start(self):
- super(DNSService, self).start()
-
- addresses = self._get_listen_on_addresses(self._dns_default_port)
+ addresses = map(
+ netutils.parse_host_port,
+ set(self.listen)
+ )
for address in addresses:
self._start(address[0], address[1])
def _start(self, host, port):
sock_tcp = utils.bind_tcp(
- host, port, self._service_config.tcp_backlog)
+ host, port, self.tcp_backlog)
sock_udp = utils.bind_udp(
host, port)
@@ -286,14 +203,7 @@ class DNSService(object):
self.tg.add_thread(self._dns_handle_tcp, sock_tcp)
self.tg.add_thread(self._dns_handle_udp, sock_udp)
- def wait(self):
- super(DNSService, self).wait()
-
def stop(self):
- # When the service is stopped, the threads for _handle_tcp and
- # _handle_udp are stopped too.
- super(DNSService, self).stop()
-
for sock_tcp in self._dns_socks_tcp:
sock_tcp.close()
@@ -301,7 +211,7 @@ class DNSService(object):
sock_udp.close()
def _dns_handle_tcp(self, sock_tcp):
- LOG.info("_handle_tcp thread started")
+ LOG.info('_handle_tcp thread started')
client = None
while True:
@@ -309,13 +219,13 @@ class DNSService(object):
# handle a new TCP connection
client, addr = sock_tcp.accept()
- if self._service_config.tcp_recv_timeout:
- client.settimeout(self._service_config.tcp_recv_timeout)
+ if self.tcp_recv_timeout:
+ client.settimeout(self.tcp_recv_timeout)
- LOG.debug("Handling TCP Request from: %(host)s:%(port)d",
+ LOG.debug('Handling TCP Request from: %(host)s:%(port)d',
{'host': addr[0], 'port': addr[1]})
if len(addr) == 4:
- LOG.debug("Flow info: %(host)s scope: %(port)d",
+ LOG.debug('Flow info: %(host)s scope: %(port)d',
{'host': addr[2], 'port': addr[3]})
# Dispatch a thread to handle the connection
@@ -327,21 +237,21 @@ class DNSService(object):
except socket.timeout:
if client:
client.close()
- LOG.warning("TCP Timeout from: %(host)s:%(port)d",
+ LOG.warning('TCP Timeout from: %(host)s:%(port)d',
{'host': addr[0], 'port': addr[1]})
except socket.error as e:
if client:
client.close()
errname = errno.errorcode[e.args[0]]
- LOG.warning("Socket error %(err)s from: %(host)s:%(port)d",
+ LOG.warning('Socket error %(err)s from: %(host)s:%(port)d',
{'host': addr[0], 'port': addr[1], 'err': errname})
except Exception:
if client:
client.close()
- LOG.exception("Unknown exception handling TCP request from: "
- "%(host)s:%(port)d",
+ LOG.exception('Unknown exception handling TCP request from: '
+ '%(host)s:%(port)d',
{'host': addr[0], 'port': addr[1]})
def _dns_handle_tcp_conn(self, addr, client):
@@ -369,7 +279,7 @@ class DNSService(object):
expected_length_raw = client.recv(2)
if len(expected_length_raw) == 0:
break
- (expected_length, ) = struct.unpack('!H', expected_length_raw)
+ (expected_length,) = struct.unpack('!H', expected_length_raw)
# Keep receiving data until we've got all the data we expect
# The buffer contains only one query at a time
@@ -385,7 +295,7 @@ class DNSService(object):
query = buf
# Call into the DNS Application itself with payload and addr
- for response in self._dns_application(
+ for response in self.app(
{'payload': query, 'addr': addr}):
# Send back a response only if present
@@ -398,20 +308,20 @@ class DNSService(object):
client.sendall(tcp_response)
except socket.timeout:
- LOG.info("TCP Timeout from: %(host)s:%(port)d",
+ LOG.info('TCP Timeout from: %(host)s:%(port)d',
{'host': host, 'port': port})
except socket.error as e:
errname = errno.errorcode[e.args[0]]
- LOG.warning("Socket error %(err)s from: %(host)s:%(port)d",
+ LOG.warning('Socket error %(err)s from: %(host)s:%(port)d',
{'host': host, 'port': port, 'err': errname})
except struct.error:
- LOG.warning("Invalid packet from: %(host)s:%(port)d",
+ LOG.warning('Invalid packet from: %(host)s:%(port)d',
{'host': host, 'port': port})
except Exception:
- LOG.exception("Unknown exception handling TCP request from: "
+ LOG.exception('Unknown exception handling TCP request from: '
"%(host)s:%(port)d", {'host': host, 'port': port})
finally:
if client:
@@ -424,7 +334,7 @@ class DNSService(object):
:type sock_udp: socket
:raises: None
"""
- LOG.info("_handle_udp thread started")
+ LOG.info('_handle_udp thread started')
while True:
try:
@@ -432,8 +342,8 @@ class DNSService(object):
# UDP recvfrom.
payload, addr = sock_udp.recvfrom(8192)
- LOG.debug("Handling UDP Request from: %(host)s:%(port)d",
- {'host': addr[0], 'port': addr[1]})
+ LOG.debug('Handling UDP Request from: %(host)s:%(port)d',
+ {'host': addr[0], 'port': addr[1]})
# Dispatch a thread to handle the query
self.tg.add_thread(self._dns_handle_udp_query, sock_udp, addr,
@@ -441,12 +351,12 @@ class DNSService(object):
except socket.error as e:
errname = errno.errorcode[e.args[0]]
- LOG.warning("Socket error %(err)s from: %(host)s:%(port)d",
+ LOG.warning('Socket error %(err)s from: %(host)s:%(port)d',
{'host': addr[0], 'port': addr[1], 'err': errname})
except Exception:
- LOG.exception("Unknown exception handling UDP request from: "
- "%(host)s:%(port)d",
+ LOG.exception('Unknown exception handling UDP request from: '
+ '%(host)s:%(port)d',
{'host': addr[0], 'port': addr[1]})
def _dns_handle_udp_query(self, sock, addr, payload):
@@ -463,7 +373,7 @@ class DNSService(object):
"""
try:
# Call into the DNS Application itself with the payload and addr
- for response in self._dns_application(
+ for response in self.app(
{'payload': payload, 'addr': addr}):
# Send back a response only if present
@@ -471,7 +381,7 @@ class DNSService(object):
sock.sendto(response, addr)
except Exception:
- LOG.exception("Unhandled exception while processing request from "
+ LOG.exception('Unhandled exception while processing request from '
"%(host)s:%(port)d",
{'host': addr[0], 'port': addr[1]})
diff --git a/designate/service_status.py b/designate/service_status.py
index bf4a2044..b6f0ba24 100644
--- a/designate/service_status.py
+++ b/designate/service_status.py
@@ -31,14 +31,15 @@ class HeartBeatEmitter(plugin.DriverPlugin):
__plugin_ns__ = 'designate.heartbeat_emitter'
__plugin_type__ = 'heartbeat_emitter'
- def __init__(self, service, threadgroup, status_factory=None):
+ def __init__(self, service, thread_group, status_factory=None,
+ *args, **kwargs):
super(HeartBeatEmitter, self).__init__()
self._service = service
self._hostname = CONF.host
self._running = False
- self._tg = threadgroup
+ self._tg = thread_group
self._tg.add_timer(
CONF.heartbeat_emitter.heartbeat_interval,
self._emit_heartbeat)
diff --git a/designate/sink/service.py b/designate/sink/service.py
index 11f3e156..ea7ddfbc 100644
--- a/designate/sink/service.py
+++ b/designate/sink/service.py
@@ -27,8 +27,10 @@ LOG = logging.getLogger(__name__)
class Service(service.Service):
- def __init__(self, threads=None):
- super(Service, self).__init__(threads=threads)
+ def __init__(self):
+ super(Service, self).__init__(
+ self.service_name, threads=cfg.CONF['service:sink'].threads
+ )
# Initialize extensions
self.handlers = self._init_extensions()
@@ -38,7 +40,8 @@ class Service(service.Service):
def service_name(self):
return 'sink'
- def _init_extensions(self):
+ @staticmethod
+ def _init_extensions():
"""Loads and prepares all enabled extensions"""
enabled_notification_handlers = \
@@ -75,7 +78,7 @@ class Service(service.Service):
if targets:
self._server.start()
- def stop(self):
+ def stop(self, graceful=True):
# Try to shut the connection down, but if we get any sort of
# errors, go ahead and ignore them.. as we're shutting down anyway
try:
@@ -83,7 +86,7 @@ class Service(service.Service):
except Exception:
pass
- super(Service, self).stop()
+ super(Service, self).stop(graceful)
def _get_targets(self):
"""
diff --git a/designate/tests/test_api/test_service.py b/designate/tests/test_api/test_service.py
index 12eafc07..e8e054d4 100644
--- a/designate/tests/test_api/test_service.py
+++ b/designate/tests/test_api/test_service.py
@@ -21,8 +21,7 @@ class ApiServiceTest(ApiTestCase):
def setUp(self):
super(ApiServiceTest, self).setUp()
- # Use a random port for the API
- self.config(api_port=0, group='service:api')
+ self.config(listen=['0.0.0.0:0'], group='service:api')
self.service = service.Service()
diff --git a/designate/tests/test_coordination.py b/designate/tests/test_coordination.py
index 7a5125d6..67fbe149 100644
--- a/designate/tests/test_coordination.py
+++ b/designate/tests/test_coordination.py
@@ -27,45 +27,57 @@ cfg.CONF.register_opts([
], group="service:dummy")
-class CoordinatedService(coordination.CoordinationMixin, service.Service):
+class CoordinatedService(service.Service):
+ def __init__(self):
+ super(CoordinatedService, self).__init__()
+ self.coordination = coordination.Coordination(
+ self.service_name, self.tg
+ )
+
+ def start(self):
+ super(CoordinatedService, self).start()
+ self.coordination.start()
+
@property
def service_name(self):
return "dummy"
-class TestCoordinationMixin(TestCase):
+class TestCoordination(TestCase):
def setUp(self):
- super(TestCoordinationMixin, self).setUp()
+ super(TestCoordination, self).setUp()
+ self.name = 'coordination'
+ self.tg = mock.Mock()
self.config(backend_url="zake://", group="coordination")
def test_start(self):
- service = CoordinatedService()
+ service = coordination.Coordination(self.name, self.tg)
service.start()
- self.assertTrue(service._coordination_started)
- self.assertIn(service.service_name.encode('utf-8'),
- service._coordinator.get_groups().get())
- self.assertIn(service._coordination_id.encode('utf-8'),
- service._coordinator.get_members(
- service.service_name).get())
+ self.assertTrue(service.started)
+ self.assertIn(self.name.encode('utf-8'),
+ service.coordinator.get_groups().get())
+ self.assertIn(service.coordination_id.encode('utf-8'),
+ service.coordinator.get_members(
+ self.name.encode('utf-8')).get())
service.stop()
def test_stop(self):
- service = CoordinatedService()
+ service = coordination.Coordination(self.name, self.tg)
service.start()
service.stop()
- self.assertFalse(service._coordination_started)
+ self.assertFalse(service.started)
def test_start_no_coordination(self):
self.config(backend_url=None, group="coordination")
- service = CoordinatedService()
+ service = coordination.Coordination(self.name, self.tg)
service.start()
- self.assertIsNone(service._coordinator)
+ self.assertIsNone(service.coordinator)
def test_stop_no_coordination(self):
self.config(backend_url=None, group="coordination")
- service = CoordinatedService()
- self.assertIsNone(service._coordinator)
+ service = coordination.Coordination(self.name, self.tg)
+ self.assertIsNone(service.coordinator)
service.start()
service.stop()
diff --git a/designate/tests/test_mdns/test_service.py b/designate/tests/test_mdns/test_service.py
index 6a7e763e..f2804525 100644
--- a/designate/tests/test_mdns/test_service.py
+++ b/designate/tests/test_mdns/test_service.py
@@ -34,7 +34,6 @@ def hex_wire(response):
class MdnsServiceTest(MdnsTestCase):
-
# DNS packet with IQUERY opcode
query_payload = binascii.a2b_hex(
"271209000001000000000000076578616d706c6503636f6d0000010001"
@@ -42,6 +41,7 @@ class MdnsServiceTest(MdnsTestCase):
expected_response = binascii.a2b_hex(
b"271289050001000000000000076578616d706c6503636f6d0000010001"
)
+
# expected response is an error code REFUSED. The other fields are
# id 10002
# opcode IQUERY
@@ -58,10 +58,10 @@ class MdnsServiceTest(MdnsTestCase):
def setUp(self):
super(MdnsServiceTest, self).setUp()
- # Use a random port for MDNS
- self.config(port=0, group='service:mdns')
+ self.config(listen=['0.0.0.0:0'], group='service:mdns')
self.service = self.start_service('mdns')
+ self.dns_service = self.service.dns_service
self.addr = ['0.0.0.0', 5556]
@staticmethod
@@ -77,14 +77,14 @@ class MdnsServiceTest(MdnsTestCase):
@mock.patch.object(dns.message, 'make_query')
def test_handle_empty_payload(self, query_mock):
mock_socket = mock.Mock()
- self.service._dns_handle_udp_query(mock_socket, self.addr,
- ' '.encode('utf-8'))
+ self.dns_service._dns_handle_udp_query(mock_socket, self.addr,
+ ' '.encode('utf-8'))
query_mock.assert_called_once_with('unknown', dns.rdatatype.A)
def test_handle_udp_payload(self):
mock_socket = mock.Mock()
- self.service._dns_handle_udp_query(mock_socket, self.addr,
- self.query_payload)
+ self.dns_service._dns_handle_udp_query(mock_socket, self.addr,
+ self.query_payload)
mock_socket.sendto.assert_called_once_with(self.expected_response,
self.addr)
@@ -93,7 +93,7 @@ class MdnsServiceTest(MdnsTestCase):
mock_socket = mock.Mock()
mock_socket.recv.side_effect = ['X', 'boo'] # X will fail unpack
- self.service._dns_handle_tcp_conn(('1.2.3.4', 42), mock_socket)
+ self.dns_service._dns_handle_tcp_conn(('1.2.3.4', 42), mock_socket)
self.assertEqual(1, mock_socket.recv.call_count)
self.assertEqual(1, mock_socket.close.call_count)
@@ -103,14 +103,14 @@ class MdnsServiceTest(MdnsTestCase):
pay_len = struct.pack("!H", len(payload))
mock_socket.recv.side_effect = [pay_len, payload, socket.timeout]
- self.service._dns_handle_tcp_conn(('1.2.3.4', 42), mock_socket)
+ self.dns_service._dns_handle_tcp_conn(('1.2.3.4', 42), mock_socket)
self.assertEqual(3, mock_socket.recv.call_count)
self.assertEqual(1, mock_socket.sendall.call_count)
self.assertEqual(1, mock_socket.close.call_count)
wire = mock_socket.sendall.call_args[0][0]
expected_length_raw = wire[:2]
- (expected_length, ) = struct.unpack('!H', expected_length_raw)
+ (expected_length,) = struct.unpack('!H', expected_length_raw)
self.assertEqual(len(wire), expected_length + 2)
self.assertEqual(self.expected_response, wire[2:])
@@ -130,7 +130,7 @@ class MdnsServiceTest(MdnsTestCase):
pay_len, payload,
pay_len, payload,
]
- self.service._dns_handle_tcp_conn(('1.2.3.4', 42), mock_socket)
+ self.dns_service._dns_handle_tcp_conn(('1.2.3.4', 42), mock_socket)
self.assertEqual(11, mock_socket.recv.call_count)
self.assertEqual(5, mock_socket.sendall.call_count)
@@ -152,7 +152,7 @@ class MdnsServiceTest(MdnsTestCase):
pay_len, payload,
pay_len, payload,
]
- self.service._dns_handle_tcp_conn(('1.2.3.4', 42), mock_socket)
+ self.dns_service._dns_handle_tcp_conn(('1.2.3.4', 42), mock_socket)
self.assertEqual(11, mock_socket.recv.call_count)
self.assertEqual(5, mock_socket.sendall.call_count)
@@ -171,7 +171,7 @@ class MdnsServiceTest(MdnsTestCase):
pay_len, payload,
pay_len, payload,
]
- self.service._dns_handle_tcp_conn(('1.2.3.4', 42), mock_socket)
+ self.dns_service._dns_handle_tcp_conn(('1.2.3.4', 42), mock_socket)
self.assertEqual(11, mock_socket.recv.call_count)
self.assertEqual(4, mock_socket.sendall.call_count)
diff --git a/designate/tests/unit/agent/backends/test_bind9.py b/designate/tests/unit/agent/backends/test_bind9.py
index 91a422d1..a88ff0d8 100644
--- a/designate/tests/unit/agent/backends/test_bind9.py
+++ b/designate/tests/unit/agent/backends/test_bind9.py
@@ -27,7 +27,7 @@ class Bind9AgentBackendTestCase(designate.tests.TestCase):
def setUp(self):
super(Bind9AgentBackendTestCase, self).setUp()
- self.CONF.set_override('port', 0, 'service:agent')
+ self.CONF.set_override('listen', ['0.0.0.0:0'], 'service:agent')
self.backend = impl_bind9.Bind9Backend('foo')
diff --git a/designate/tests/unit/agent/backends/test_denominator.py b/designate/tests/unit/agent/backends/test_denominator.py
index 89c35450..36a72806 100644
--- a/designate/tests/unit/agent/backends/test_denominator.py
+++ b/designate/tests/unit/agent/backends/test_denominator.py
@@ -28,7 +28,7 @@ class DenominatorAgentBackendTestCase(designate.tests.TestCase):
def setUp(self):
super(DenominatorAgentBackendTestCase, self).setUp()
- self.CONF.set_override('port', 0, 'service:agent')
+ self.CONF.set_override('listen', ['0.0.0.0:0'], 'service:agent')
self.backend = impl_denominator.DenominatorBackend('foo')
diff --git a/designate/tests/unit/agent/backends/test_fake.py b/designate/tests/unit/agent/backends/test_fake.py
index e4b32778..00fd6414 100644
--- a/designate/tests/unit/agent/backends/test_fake.py
+++ b/designate/tests/unit/agent/backends/test_fake.py
@@ -22,7 +22,7 @@ class FakeAgentBackendTestCase(designate.tests.TestCase):
def setUp(self):
super(FakeAgentBackendTestCase, self).setUp()
- self.CONF.set_override('port', 0, 'service:agent')
+ self.CONF.set_override('listen', ['0.0.0.0:0'], 'service:agent')
self.backend = impl_fake.FakeBackend('foo')
diff --git a/designate/tests/unit/agent/test_service.py b/designate/tests/unit/agent/test_service.py
index 94cde354..f4d47c9f 100644
--- a/designate/tests/unit/agent/test_service.py
+++ b/designate/tests/unit/agent/test_service.py
@@ -21,30 +21,40 @@ from designate import utils
from designate.agent import service
from designate.backend import agent_backend
from designate.backend.agent_backend import impl_fake
+from designate.tests import fixtures
class AgentServiceTest(designate.tests.TestCase):
def setUp(self):
super(AgentServiceTest, self).setUp()
+ self.stdlog = fixtures.StandardLogging()
+ self.useFixture(self.stdlog)
- self.CONF.set_override('port', 0, 'service:agent')
+ self.CONF.set_override('listen', ['0.0.0.0:0'], 'service:agent')
self.CONF.set_override('notify_delay', 0, 'service:agent')
self.service = service.Service()
- self.service._start = mock.Mock()
- self.service._rpc_server = mock.Mock()
+ self.service.dns_service._start = mock.Mock()
- def test_service_name(self):
- self.assertEqual('agent', self.service.service_name)
-
- def test_start(self):
+ def test_service_start(self):
self.service.start()
- self.assertTrue(self.service._start.called)
+ self.assertTrue(self.service.dns_service._start.called)
+
+ def test_service_stop(self):
+ self.service.dns_service.stop = mock.Mock()
+ self.service.backend.stop = mock.Mock()
- def test_stop(self):
self.service.stop()
+ self.assertTrue(self.service.dns_service.stop.called)
+ self.assertTrue(self.service.backend.stop.called)
+
+ self.assertIn('Stopping agent service', self.stdlog.logger.output)
+
+ def test_service_name(self):
+ self.assertEqual('agent', self.service.service_name)
+
def test_get_backend(self):
backend = agent_backend.get_backend('fake', agent_service=self.service)
self.assertIsInstance(backend, impl_fake.FakeBackend)
@@ -52,17 +62,15 @@ class AgentServiceTest(designate.tests.TestCase):
@mock.patch.object(utils, 'cache_result')
def test_get_dns_application(self, mock_cache_result):
self.assertIsInstance(
- self.service._dns_application, dnsutils.SerializationMiddleware
+ self.service.dns_application, dnsutils.SerializationMiddleware
)
@mock.patch.object(utils, 'cache_result')
def test_get_dns_application_with_notify_delay(self, mock_cache_result):
self.service = service.Service()
- self.service._start = mock.Mock()
- self.service._rpc_server = mock.Mock()
self.CONF.set_override('notify_delay', 1.0, 'service:agent')
self.assertIsInstance(
- self.service._dns_application, dnsutils.SerializationMiddleware
+ self.service.dns_application, dnsutils.SerializationMiddleware
)
diff --git a/designate/tests/unit/mdns/test_service.py b/designate/tests/unit/mdns/test_service.py
index e305a5b0..7c7e45de 100644
--- a/designate/tests/unit/mdns/test_service.py
+++ b/designate/tests/unit/mdns/test_service.py
@@ -21,27 +21,42 @@ from oslo_config import fixture as cfg_fixture
import designate.dnsutils
import designate.rpc
import designate.service
-import designate.storage.base
+from designate import storage
import designate.utils
from designate.mdns import handler
from designate.mdns import service
+from designate.tests import fixtures
CONF = cfg.CONF
class MdnsServiceTest(oslotest.base.BaseTestCase):
- @mock.patch.object(designate.rpc, 'get_server')
- def setUp(self, mock_rpc_server):
+ @mock.patch.object(storage, 'get_storage', mock.Mock())
+ def setUp(self):
super(MdnsServiceTest, self).setUp()
+ self.stdlog = fixtures.StandardLogging()
+ self.useFixture(self.stdlog)
+
self.useFixture(cfg_fixture.Config(CONF))
self.service = service.Service()
- @mock.patch.object(designate.service.DNSService, '_start')
- def test_service_start(self, mock_service_start):
+ @mock.patch.object(designate.service.DNSService, 'start')
+ @mock.patch.object(designate.service.RPCService, 'start')
+ def test_service_start(self, mock_rpc_start, mock_dns_start):
self.service.start()
- self.assertTrue(mock_service_start.called)
+ self.assertTrue(mock_dns_start.called)
+ self.assertTrue(mock_rpc_start.called)
+
+ def test_service_stop(self):
+ self.service.dns_service.stop = mock.Mock()
+
+ self.service.stop()
+
+ self.assertTrue(self.service.dns_service.stop.called)
+
+ self.assertIn('Stopping mdns service', self.stdlog.logger.output)
def test_service_name(self):
self.assertEqual('mdns', self.service.service_name)
@@ -51,17 +66,13 @@ class MdnsServiceTest(oslotest.base.BaseTestCase):
self.service = service.Service()
- self.assertEqual('test-topic', self.service._rpc_topic)
+ self.assertEqual('test-topic', self.service.rpc_topic)
self.assertEqual('mdns', self.service.service_name)
- def test_rpc_endpoints(self):
- endpoints = self.service._rpc_endpoints
-
- self.assertIsInstance(endpoints[0], service.notify.NotifyEndpoint)
- self.assertIsInstance(endpoints[1], service.xfr.XfrEndpoint)
-
- @mock.patch.object(designate.storage.base.Storage, 'get_driver')
+ @mock.patch.object(storage, 'get_storage')
def test_storage_driver(self, mock_get_driver):
+ self.service._storage = None
+
mock_driver = mock.MagicMock()
mock_driver.name = 'noop_driver'
mock_get_driver.return_value = mock_driver
@@ -70,16 +81,12 @@ class MdnsServiceTest(oslotest.base.BaseTestCase):
self.assertTrue(mock_get_driver.called)
- @mock.patch.object(handler, 'RequestHandler', name='reqh')
- @mock.patch.object(designate.service.DNSService, '_start')
+ @mock.patch.object(handler, 'RequestHandler')
+ @mock.patch.object(designate.service.DNSService, 'start')
@mock.patch.object(designate.utils, 'cache_result')
- @mock.patch.object(designate.storage.base.Storage, 'get_driver')
- def test_dns_application(self, mock_req_handler, mock_cache_result,
- mock_service_start, mock_get_driver):
- mock_driver = mock.MagicMock()
- mock_driver.name = 'noop_driver'
- mock_get_driver.return_value = mock_driver
+ def test_dns_application(self, mock_cache_result, mock_dns_start,
+ mock_request_handler):
- app = self.service._dns_application
+ app = self.service.dns_application
self.assertIsInstance(app, designate.dnsutils.DNSMiddleware)
diff --git a/designate/tests/unit/pool_manager/test_service.py b/designate/tests/unit/pool_manager/test_service.py
index e96324ff..9ab3f808 100644
--- a/designate/tests/unit/pool_manager/test_service.py
+++ b/designate/tests/unit/pool_manager/test_service.py
@@ -102,7 +102,7 @@ class PoolManagerInitTest(tests.TestCase):
self.service = service.Service()
self.assertEqual('test-topic.794ccc2c-d751-44fe-b57f-8894c9f5c842',
- self.service._rpc_topic)
+ self.service.rpc_topic)
self.assertEqual('pool_manager', self.service.service_name)
@mock.patch('designate.service.RPCService.start')
diff --git a/designate/tests/unit/producer/test_service.py b/designate/tests/unit/producer/test_service.py
index d5468871..1379a5ff 100644
--- a/designate/tests/unit/producer/test_service.py
+++ b/designate/tests/unit/producer/test_service.py
@@ -19,18 +19,20 @@ Unit-test Producer service
"""
import mock
+import oslotest.base
from oslo_config import cfg
from oslo_config import fixture as cfg_fixture
-from oslotest import base as test
from designate.producer import service
+import designate.service
+from designate.tests import fixtures
from designate.tests.unit import RoObject
CONF = cfg.CONF
-@mock.patch.object(service.rpcapi.CentralAPI, 'get_instance')
-class ProducerTest(test.BaseTestCase):
+@mock.patch.object(service.rpcapi.CentralAPI, 'get_instance', mock.Mock())
+class ProducerTest(oslotest.base.BaseTestCase):
def setUp(self):
self.useFixture(cfg_fixture.Config(CONF))
@@ -46,28 +48,45 @@ class ProducerTest(test.BaseTestCase):
'producer_task:zone_purge': '',
})
super(ProducerTest, self).setUp()
+ self.stdlog = fixtures.StandardLogging()
+ self.useFixture(self.stdlog)
+
self.service = service.Service()
+ self.service.rpc_server = mock.Mock()
self.service._storage = mock.Mock()
- self.service._rpc_server = mock.Mock()
self.service._quota = mock.Mock()
- self.service.quota.limit_check = mock.Mock()
+ self.service._quota.limit_check = mock.Mock()
+
+ @mock.patch.object(service.tasks, 'PeriodicTask')
+ @mock.patch.object(service.coordination, 'Partitioner')
+ @mock.patch.object(designate.service.RPCService, 'start')
+ def test_service_start(self, mock_rpc_start, mock_partitioner,
+ mock_periodic_task):
+ self.service.coordination = mock.Mock()
+
+ self.service.start()
+
+ self.assertTrue(mock_rpc_start.called)
+
+ def test_service_stop(self):
+ self.service.coordination.stop = mock.Mock()
- def test_service_name(self, _):
+ self.service.stop()
+
+ self.assertTrue(self.service.coordination.stop.called)
+
+ self.assertIn('Stopping producer service', self.stdlog.logger.output)
+
+ def test_service_name(self):
self.assertEqual('producer', self.service.service_name)
- def test_producer_rpc_topic(self, _):
+ def test_producer_rpc_topic(self):
CONF.set_override('topic', 'test-topic', 'service:producer')
self.service = service.Service()
- self.assertEqual('test-topic', self.service._rpc_topic)
+ self.assertEqual('test-topic', self.service.rpc_topic)
self.assertEqual('producer', self.service.service_name)
- def test_central_api(self, _):
- capi = self.service.central_api
- self.assertIsInstance(capi, mock.MagicMock)
-
- @mock.patch.object(service.tasks, 'PeriodicTask')
- @mock.patch.object(service.coordination, 'Partitioner')
- def test_stark(self, _, mock_partitioner, mock_PeriodicTask):
- self.service.start()
+ def test_central_api(self):
+ self.assertIsInstance(self.service.central_api, mock.Mock)
diff --git a/designate/tests/unit/sink/test_service.py b/designate/tests/unit/sink/test_service.py
index 13ac41bc..b792b1f2 100644
--- a/designate/tests/unit/sink/test_service.py
+++ b/designate/tests/unit/sink/test_service.py
@@ -11,14 +11,13 @@
# under the License.mport threading
import mock
+import designate.tests
import designate.rpc
-from designate import tests
from designate.sink import service
from designate.tests import fixtures
-class TestSinkService(tests.TestCase):
-
+class TestSinkService(designate.tests.TestCase):
def setUp(self):
super(TestSinkService, self).setUp()
self.stdlog = fixtures.StandardLogging()
@@ -35,8 +34,7 @@ class TestSinkService(tests.TestCase):
self.assertTrue(mock_notification_listener.called)
- @mock.patch.object(designate.rpc, 'get_notification_listener')
- def test_service_stop(self, mock_notification_listener):
+ def test_service_stop(self):
self.service.stop()
self.assertIn('Stopping sink service', self.stdlog.logger.output)
diff --git a/designate/tests/unit/test_heartbeat.py b/designate/tests/unit/test_heartbeat.py
new file mode 100644
index 00000000..fbb01862
--- /dev/null
+++ b/designate/tests/unit/test_heartbeat.py
@@ -0,0 +1,53 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+import mock
+import oslotest.base
+from oslo_config import cfg
+from oslo_config import fixture as cfg_fixture
+
+from designate import service
+
+CONF = cfg.CONF
+
+
+class HeartbeatTest(oslotest.base.BaseTestCase):
+ def setUp(self):
+ super(HeartbeatTest, self).setUp()
+ self.useFixture(cfg_fixture.Config(CONF))
+
+ CONF.set_override('emitter_type', 'noop', 'heartbeat_emitter')
+
+ self.mock_tg = mock.Mock()
+ self.heartbeat = service.Heartbeat('test', self.mock_tg)
+
+ def test_get_status(self):
+ self.assertEqual(('UP', {}, {},), self.heartbeat.get_status())
+
+ def test_get_heartbeat_emitter(self):
+ self.assertEqual(
+ 'noop', self.heartbeat.heartbeat_emitter.__plugin_name__
+ )
+
+ def test_start_heartbeat(self):
+ self.assertFalse(self.heartbeat.heartbeat_emitter._running)
+
+ self.heartbeat.start()
+
+ self.assertTrue(self.heartbeat.heartbeat_emitter._running)
+
+ def test_stop_heartbeat(self):
+ self.assertFalse(self.heartbeat.heartbeat_emitter._running)
+
+ self.heartbeat.start()
+ self.heartbeat.stop()
+
+ self.assertFalse(self.heartbeat.heartbeat_emitter._running)
diff --git a/designate/tests/unit/workers/test_service.py b/designate/tests/unit/workers/test_service.py
index aca00dd4..37072f11 100644
--- a/designate/tests/unit/workers/test_service.py
+++ b/designate/tests/unit/workers/test_service.py
@@ -24,6 +24,7 @@ import designate.tests
from designate import backend
from designate import exceptions
from designate import objects
+from designate.tests import fixtures
from designate.worker import processing
from designate.worker import service
@@ -33,6 +34,8 @@ CONF = cfg.CONF
class TestService(oslotest.base.BaseTestCase):
def setUp(self):
super(TestService, self).setUp()
+ self.stdlog = fixtures.StandardLogging()
+ self.useFixture(self.stdlog)
self.useFixture(cfg_fixture.Config(CONF))
self.context = mock.Mock()
@@ -40,10 +43,16 @@ class TestService(oslotest.base.BaseTestCase):
self.service = service.Service()
@mock.patch.object(designate.service.RPCService, 'start')
- def test_service_start(self, mock_service_start):
+ def test_service_start(self, mock_rpc_start):
self.service.start()
- self.assertTrue(mock_service_start.called)
+ self.assertTrue(mock_rpc_start.called)
+
+ @mock.patch.object(designate.rpc, 'get_notification_listener')
+ def test_service_stop(self, mock_notification_listener):
+ self.service.stop()
+
+ self.assertIn('Stopping worker service', self.stdlog.logger.output)
def test_service_name(self):
self.assertEqual('worker', self.service.service_name)
@@ -53,7 +62,7 @@ class TestService(oslotest.base.BaseTestCase):
self.service = service.Service()
- self.assertEqual('test-topic', self.service._rpc_topic)
+ self.assertEqual('test-topic', self.service.rpc_topic)
self.assertEqual('worker', self.service.service_name)
def test_central_api(self):
diff --git a/designate/worker/service.py b/designate/worker/service.py
index b08cf923..af86cd5c 100644
--- a/designate/worker/service.py
+++ b/designate/worker/service.py
@@ -41,25 +41,38 @@ class AlsoNotifyTask(object):
pass
-class Service(service.RPCService, service.Service):
+class Service(service.RPCService):
RPC_API_VERSION = '1.0'
target = messaging.Target(version=RPC_API_VERSION)
+ def __init__(self):
+ self._central_api = None
+ self._storage = None
+
+ self._executor = None
+ self._pools_map = None
+
+ super(Service, self).__init__(
+ self.service_name, cfg.CONF['service:worker'].topic,
+ threads=cfg.CONF['service:worker'].threads,
+ )
+
@property
def central_api(self):
- if not hasattr(self, '_central_api'):
+ if not self._central_api:
self._central_api = central_api.CentralAPI.get_instance()
return self._central_api
- def _setup_target_backends(self, pool):
+ @staticmethod
+ def _setup_target_backends(pool):
for target in pool.targets:
# Fetch an instance of the Backend class
target.backend = backend.get_backend(target)
LOG.info('%d targets setup', len(pool.targets))
- if len(pool.targets) == 0:
+ if not pool.targets:
raise exceptions.NoPoolTargetsConfigured()
return pool
@@ -97,21 +110,21 @@ class Service(service.RPCService, service.Service):
@property
def storage(self):
- if not hasattr(self, '_storage'):
+ if not self._storage:
storage_driver = cfg.CONF['service:worker'].storage_driver
self._storage = storage.get_storage(storage_driver)
return self._storage
@property
def executor(self):
- if not hasattr(self, '_executor'):
+ if not self._executor:
# TODO(elarson): Create this based on config
self._executor = processing.Executor()
return self._executor
@property
def pools_map(self):
- if not hasattr(self, '_pools_map'):
+ if self._pools_map is None:
self._pools_map = {}
return self._pools_map
@@ -125,6 +138,9 @@ class Service(service.RPCService, service.Service):
super(Service, self).start()
LOG.info('Started worker')
+ def stop(self, graceful=True):
+ super(Service, self).stop(graceful)
+
def _do_zone_action(self, context, zone):
pool = self.get_pool(zone.pool_id)
all_tasks = []
diff --git a/etc/designate/designate-config-generator.conf b/etc/designate/designate-config-generator.conf
index d35bbceb..4ac8762d 100644
--- a/etc/designate/designate-config-generator.conf
+++ b/etc/designate/designate-config-generator.conf
@@ -8,6 +8,7 @@ namespace = oslo.policy
namespace = oslo.service.periodic_task
namespace = oslo.service.service
namespace = oslo.service.sslutils
+namespace = oslo.service.wsgi
namespace = oslo.db
namespace = oslo.middleware
namespace = oslo.concurrency
diff --git a/releasenotes/notes/new-service-layer-8023c242de89075a.yaml b/releasenotes/notes/new-service-layer-8023c242de89075a.yaml
new file mode 100644
index 00000000..06f87b53
--- /dev/null
+++ b/releasenotes/notes/new-service-layer-8023c242de89075a.yaml
@@ -0,0 +1,18 @@
+---
+upgrade:
+ - |
+ The previously deprecated options ``api_host``, ``api_port``, ``host`` and
+ ``port`` have been permanently removed and are replaced by ``listen``.
+
+ e.g.
+
+ .. code-block:: ini
+
+ [service:api]
+ listen = 0.0.0.0:9001
+
+ ..
+ - |
+ The Designate ``sink`` service will now use the heartbeat reporting system to
+ report its status. This was already the case for all other Designate
+ services.