summaryrefslogtreecommitdiff
path: root/designate/service.py
diff options
context:
space:
mode:
Diffstat (limited to 'designate/service.py')
-rw-r--r--designate/service.py316
1 files changed, 113 insertions, 203 deletions
diff --git a/designate/service.py b/designate/service.py
index 0b499799..e4e590b5 100644
--- a/designate/service.py
+++ b/designate/service.py
@@ -17,241 +17,161 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
-import abc
+import errno
import socket
import struct
-import errno
-import six
-import eventlet.wsgi
import eventlet.debug
-from oslo_config import cfg
-import oslo_messaging as messaging
from oslo_log import log as logging
+import oslo_messaging as messaging
from oslo_service import service
from oslo_service import sslutils
+from oslo_service import wsgi
from oslo_utils import netutils
-import designate.conf
-from designate.i18n import _
-from designate.metrics import metrics
from designate import policy
from designate import rpc
from designate import service_status
-from designate import version
from designate import utils
-
-
-# TODO(kiall): These options have been cut+paste from the old WSGI code, and
-# should be moved into service:api etc..
-
+from designate import version
+import designate.conf
+from designate.i18n import _
+from designate.metrics import metrics
CONF = designate.conf.CONF
LOG = logging.getLogger(__name__)
-@six.add_metaclass(abc.ABCMeta)
class Service(service.Service):
- """
- Service class to be shared among the diverse service inside of Designate.
- """
- def __init__(self, threads=None):
+ def __init__(self, name, threads=None):
threads = threads or 1000
-
super(Service, self).__init__(threads)
-
- self._host = CONF.host
- self._service_config = CONF['service:%s' % self.service_name]
+ self.name = name
+ self.host = CONF.host
policy.init()
- # NOTE(kiall): All services need RPC initialized, as this is used
- # for clients AND servers. Hence, this is common to
- # all Designate services.
if not rpc.initialized():
rpc.init(CONF)
- @abc.abstractproperty
- def service_name(self):
- pass
-
def start(self):
- super(Service, self).start()
-
LOG.info('Starting %(name)s service (version: %(version)s)',
{
- 'name': self.service_name,
+ 'name': self.name,
'version': version.version_info.version_string()
})
+ super(Service, self).start()
- def stop(self):
- LOG.info('Stopping %(name)s service', {'name': self.service_name})
-
- super(Service, self).stop()
-
- def _get_listen_on_addresses(self, default_port):
- """
- Helper Method to handle migration from singular host/port to
- multiple binds
- """
- try:
- # The API service uses "api_host", and "api_port", others use
- # just host and port.
- host = self._service_config.api_host
- port = self._service_config.api_port
-
- except cfg.NoSuchOptError:
- host = self._service_config.host
- port = self._service_config.port
-
- if host or port is not None:
- LOG.warning("host and port config options used, the 'listen' "
- "option has been ignored")
-
- host = host or "0.0.0.0"
- # "port" might be 0 to pick a free port, usually during testing
- port = default_port if port is None else port
-
- return [(host, port)]
-
- else:
-
- return map(
- netutils.parse_host_port,
- set(self._service_config.listen)
- )
+ def stop(self, graceful=True):
+ LOG.info('Stopping %(name)s service', {'name': self.name})
+ super(Service, self).stop(graceful)
-class RPCService(object):
- """
- RPC Service mixin used by all Designate RPC Services
- """
- def __init__(self, *args, **kwargs):
- super(RPCService, self).__init__(*args, **kwargs)
+class Heartbeat(object):
+ def __init__(self, name, tg, rpc_api=None):
+ self.name = name
+ self.tg = tg
- LOG.debug("Creating RPC Server on topic '%s'", self._rpc_topic)
- self._rpc_server = rpc.get_server(
- messaging.Target(topic=self._rpc_topic, server=self._host),
- self._rpc_endpoints)
+ self._status = 'UP'
+ self._stats = {}
+ self._capabilities = {}
emitter_cls = service_status.HeartBeatEmitter.get_driver(
CONF.heartbeat_emitter.emitter_type
)
self.heartbeat_emitter = emitter_cls(
- self.service_name, self.tg, status_factory=self._get_status
+ self.name, self.tg,
+ status_factory=self.get_status, rpc_api=rpc_api
)
- def _get_status(self):
- status = "UP"
- stats = {}
- capabilities = {}
- return status, stats, capabilities
-
- @property
- def _rpc_endpoints(self):
- return [self]
-
- @property
- def _rpc_topic(self):
- return CONF['service:%s' % self.service_name].topic
+ def get_status(self):
+ return self._status, self._stats, self._capabilities
def start(self):
- super(RPCService, self).start()
-
- LOG.debug("Starting RPC server on topic '%s'", self._rpc_topic)
- self._rpc_server.start()
-
- # TODO(kiall): This probably belongs somewhere else, maybe the base
- # Service class?
- self.notifier = rpc.get_notifier(self.service_name)
-
- for e in self._rpc_endpoints:
- if e != self and hasattr(e, 'start'):
- e.start()
-
self.heartbeat_emitter.start()
def stop(self):
- LOG.debug("Stopping RPC server on topic '%s'", self._rpc_topic)
self.heartbeat_emitter.stop()
- for e in self._rpc_endpoints:
- if e != self and hasattr(e, 'stop'):
- e.stop()
- # Try to shut the connection down, but if we get any sort of
- # errors, go ahead and ignore them.. as we're shutting down anyway
- try:
- self._rpc_server.stop()
- except Exception:
- pass
+class RPCService(Service):
+ def __init__(self, name, rpc_topic, threads=None):
+ super(RPCService, self).__init__(name, threads)
+ LOG.debug("Creating RPC Server on topic '%s' for %s",
+ rpc_topic, self.name)
- super(RPCService, self).stop()
+ self.endpoints = [self]
+ self.notifier = None
+ self.rpc_server = None
+ self.rpc_topic = rpc_topic
- def wait(self):
- for e in self._rpc_endpoints:
- if e != self and hasattr(e, 'wait'):
- e.wait()
+ def override_endpoints(self, endpoints):
+ self.endpoints = endpoints
- super(RPCService, self).wait()
-
-
-@six.add_metaclass(abc.ABCMeta)
-class WSGIService(object):
- """
- WSGI Service mixin used by all Designate WSGI Services
- """
- def __init__(self, *args, **kwargs):
- super(WSGIService, self).__init__(*args, **kwargs)
+ def start(self):
+ super(RPCService, self).start()
+ target = messaging.Target(topic=self.rpc_topic, server=self.host)
+ self.rpc_server = rpc.get_server(target, self.endpoints)
+ self.rpc_server.start()
+ self.notifier = rpc.get_notifier(self.name)
- self._use_ssl = sslutils.is_enabled(CONF)
- self._wsgi_socks = []
+ def stop(self, graceful=True):
+ if self.rpc_server:
+ self.rpc_server.stop()
+ super(RPCService, self).stop(graceful)
- @abc.abstractproperty
- def _wsgi_application(self):
- pass
+ def wait(self):
+ super(RPCService, self).wait()
- def start(self):
- super(WSGIService, self).start()
- addresses = self._get_listen_on_addresses(9001)
+class WSGIService(Service):
+ def __init__(self, app, name, listen, max_url_len=None):
+ super(WSGIService, self).__init__(name)
+ self.app = app
+ self.name = name
- for address in addresses:
- self._start(address[0], address[1])
+ self.listen = listen
- def _start(self, host, port):
- wsgi_sock = utils.bind_tcp(
- host, port, CONF.backlog, CONF.tcp_keepidle)
+ self.servers = []
- if self._use_ssl:
- wsgi_sock = sslutils.wrap(CONF, wsgi_sock)
+ for address in self.listen:
+ host, port = netutils.parse_host_port(address)
+ server = wsgi.Server(
+ CONF, name, app,
+ host=host,
+ port=port,
+ pool_size=CONF['service:api'].threads,
+ use_ssl=sslutils.is_enabled(CONF),
+ max_url_len=max_url_len
+ )
- self._wsgi_socks.append(wsgi_sock)
+ self.servers.append(server)
- self.tg.add_thread(self._wsgi_handle, wsgi_sock)
+ def start(self):
+ for server in self.servers:
+ server.start()
+ super(WSGIService, self).start()
- def _wsgi_handle(self, wsgi_sock):
- logger = logging.getLogger('eventlet.wsgi')
- # Adjust wsgi MAX_HEADER_LINE to accept large tokens.
- eventlet.wsgi.MAX_HEADER_LINE = self._service_config.max_header_line
+ def stop(self, graceful=True):
+ for server in self.servers:
+ server.stop()
+ super(WSGIService, self).stop(graceful)
- eventlet.wsgi.server(wsgi_sock,
- self._wsgi_application,
- custom_pool=self.tg.pool,
- log=logger)
+ def wait(self):
+ for server in self.servers:
+ server.wait()
+ super(WSGIService, self).wait()
-@six.add_metaclass(abc.ABCMeta)
class DNSService(object):
- """
- DNS Service mixin used by all Designate DNS Services
- """
-
_TCP_RECV_MAX_SIZE = 65535
- def __init__(self, *args, **kwargs):
- super(DNSService, self).__init__(*args, **kwargs)
-
+ def __init__(self, app, tg, listen, tcp_backlog, tcp_recv_timeout):
+ self.app = app
+ self.tg = tg
+ self.tcp_backlog = tcp_backlog
+ self.tcp_recv_timeout = tcp_recv_timeout
+ self.listen = listen
metrics.init()
# Eventet will complain loudly about our use of multiple greentheads
@@ -261,21 +181,18 @@ class DNSService(object):
self._dns_socks_tcp = []
self._dns_socks_udp = []
- @abc.abstractproperty
- def _dns_application(self):
- pass
-
def start(self):
- super(DNSService, self).start()
-
- addresses = self._get_listen_on_addresses(self._dns_default_port)
+ addresses = map(
+ netutils.parse_host_port,
+ set(self.listen)
+ )
for address in addresses:
self._start(address[0], address[1])
def _start(self, host, port):
sock_tcp = utils.bind_tcp(
- host, port, self._service_config.tcp_backlog)
+ host, port, self.tcp_backlog)
sock_udp = utils.bind_udp(
host, port)
@@ -286,14 +203,7 @@ class DNSService(object):
self.tg.add_thread(self._dns_handle_tcp, sock_tcp)
self.tg.add_thread(self._dns_handle_udp, sock_udp)
- def wait(self):
- super(DNSService, self).wait()
-
def stop(self):
- # When the service is stopped, the threads for _handle_tcp and
- # _handle_udp are stopped too.
- super(DNSService, self).stop()
-
for sock_tcp in self._dns_socks_tcp:
sock_tcp.close()
@@ -301,7 +211,7 @@ class DNSService(object):
sock_udp.close()
def _dns_handle_tcp(self, sock_tcp):
- LOG.info("_handle_tcp thread started")
+ LOG.info('_handle_tcp thread started')
client = None
while True:
@@ -309,13 +219,13 @@ class DNSService(object):
# handle a new TCP connection
client, addr = sock_tcp.accept()
- if self._service_config.tcp_recv_timeout:
- client.settimeout(self._service_config.tcp_recv_timeout)
+ if self.tcp_recv_timeout:
+ client.settimeout(self.tcp_recv_timeout)
- LOG.debug("Handling TCP Request from: %(host)s:%(port)d",
+ LOG.debug('Handling TCP Request from: %(host)s:%(port)d',
{'host': addr[0], 'port': addr[1]})
if len(addr) == 4:
- LOG.debug("Flow info: %(host)s scope: %(port)d",
+ LOG.debug('Flow info: %(host)s scope: %(port)d',
{'host': addr[2], 'port': addr[3]})
# Dispatch a thread to handle the connection
@@ -327,21 +237,21 @@ class DNSService(object):
except socket.timeout:
if client:
client.close()
- LOG.warning("TCP Timeout from: %(host)s:%(port)d",
+ LOG.warning('TCP Timeout from: %(host)s:%(port)d',
{'host': addr[0], 'port': addr[1]})
except socket.error as e:
if client:
client.close()
errname = errno.errorcode[e.args[0]]
- LOG.warning("Socket error %(err)s from: %(host)s:%(port)d",
+ LOG.warning('Socket error %(err)s from: %(host)s:%(port)d',
{'host': addr[0], 'port': addr[1], 'err': errname})
except Exception:
if client:
client.close()
- LOG.exception("Unknown exception handling TCP request from: "
- "%(host)s:%(port)d",
+ LOG.exception('Unknown exception handling TCP request from: '
+ '%(host)s:%(port)d',
{'host': addr[0], 'port': addr[1]})
def _dns_handle_tcp_conn(self, addr, client):
@@ -369,7 +279,7 @@ class DNSService(object):
expected_length_raw = client.recv(2)
if len(expected_length_raw) == 0:
break
- (expected_length, ) = struct.unpack('!H', expected_length_raw)
+ (expected_length,) = struct.unpack('!H', expected_length_raw)
# Keep receiving data until we've got all the data we expect
# The buffer contains only one query at a time
@@ -385,7 +295,7 @@ class DNSService(object):
query = buf
# Call into the DNS Application itself with payload and addr
- for response in self._dns_application(
+ for response in self.app(
{'payload': query, 'addr': addr}):
# Send back a response only if present
@@ -398,20 +308,20 @@ class DNSService(object):
client.sendall(tcp_response)
except socket.timeout:
- LOG.info("TCP Timeout from: %(host)s:%(port)d",
+ LOG.info('TCP Timeout from: %(host)s:%(port)d',
{'host': host, 'port': port})
except socket.error as e:
errname = errno.errorcode[e.args[0]]
- LOG.warning("Socket error %(err)s from: %(host)s:%(port)d",
+ LOG.warning('Socket error %(err)s from: %(host)s:%(port)d',
{'host': host, 'port': port, 'err': errname})
except struct.error:
- LOG.warning("Invalid packet from: %(host)s:%(port)d",
+ LOG.warning('Invalid packet from: %(host)s:%(port)d',
{'host': host, 'port': port})
except Exception:
- LOG.exception("Unknown exception handling TCP request from: "
+ LOG.exception('Unknown exception handling TCP request from: '
"%(host)s:%(port)d", {'host': host, 'port': port})
finally:
if client:
@@ -424,7 +334,7 @@ class DNSService(object):
:type sock_udp: socket
:raises: None
"""
- LOG.info("_handle_udp thread started")
+ LOG.info('_handle_udp thread started')
while True:
try:
@@ -432,8 +342,8 @@ class DNSService(object):
# UDP recvfrom.
payload, addr = sock_udp.recvfrom(8192)
- LOG.debug("Handling UDP Request from: %(host)s:%(port)d",
- {'host': addr[0], 'port': addr[1]})
+ LOG.debug('Handling UDP Request from: %(host)s:%(port)d',
+ {'host': addr[0], 'port': addr[1]})
# Dispatch a thread to handle the query
self.tg.add_thread(self._dns_handle_udp_query, sock_udp, addr,
@@ -441,12 +351,12 @@ class DNSService(object):
except socket.error as e:
errname = errno.errorcode[e.args[0]]
- LOG.warning("Socket error %(err)s from: %(host)s:%(port)d",
+ LOG.warning('Socket error %(err)s from: %(host)s:%(port)d',
{'host': addr[0], 'port': addr[1], 'err': errname})
except Exception:
- LOG.exception("Unknown exception handling UDP request from: "
- "%(host)s:%(port)d",
+ LOG.exception('Unknown exception handling UDP request from: '
+ '%(host)s:%(port)d',
{'host': addr[0], 'port': addr[1]})
def _dns_handle_udp_query(self, sock, addr, payload):
@@ -463,7 +373,7 @@ class DNSService(object):
"""
try:
# Call into the DNS Application itself with the payload and addr
- for response in self._dns_application(
+ for response in self.app(
{'payload': payload, 'addr': addr}):
# Send back a response only if present
@@ -471,7 +381,7 @@ class DNSService(object):
sock.sendto(response, addr)
except Exception:
- LOG.exception("Unhandled exception while processing request from "
+ LOG.exception('Unhandled exception while processing request from '
"%(host)s:%(port)d",
{'host': addr[0], 'port': addr[1]})