diff options
Diffstat (limited to 'designate/service.py')
-rw-r--r-- | designate/service.py | 316 |
1 files changed, 113 insertions, 203 deletions
diff --git a/designate/service.py b/designate/service.py index 0b499799..e4e590b5 100644 --- a/designate/service.py +++ b/designate/service.py @@ -17,241 +17,161 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. -import abc +import errno import socket import struct -import errno -import six -import eventlet.wsgi import eventlet.debug -from oslo_config import cfg -import oslo_messaging as messaging from oslo_log import log as logging +import oslo_messaging as messaging from oslo_service import service from oslo_service import sslutils +from oslo_service import wsgi from oslo_utils import netutils -import designate.conf -from designate.i18n import _ -from designate.metrics import metrics from designate import policy from designate import rpc from designate import service_status -from designate import version from designate import utils - - -# TODO(kiall): These options have been cut+paste from the old WSGI code, and -# should be moved into service:api etc.. - +from designate import version +import designate.conf +from designate.i18n import _ +from designate.metrics import metrics CONF = designate.conf.CONF LOG = logging.getLogger(__name__) -@six.add_metaclass(abc.ABCMeta) class Service(service.Service): - """ - Service class to be shared among the diverse service inside of Designate. - """ - def __init__(self, threads=None): + def __init__(self, name, threads=None): threads = threads or 1000 - super(Service, self).__init__(threads) - - self._host = CONF.host - self._service_config = CONF['service:%s' % self.service_name] + self.name = name + self.host = CONF.host policy.init() - # NOTE(kiall): All services need RPC initialized, as this is used - # for clients AND servers. Hence, this is common to - # all Designate services. if not rpc.initialized(): rpc.init(CONF) - @abc.abstractproperty - def service_name(self): - pass - def start(self): - super(Service, self).start() - LOG.info('Starting %(name)s service (version: %(version)s)', { - 'name': self.service_name, + 'name': self.name, 'version': version.version_info.version_string() }) + super(Service, self).start() - def stop(self): - LOG.info('Stopping %(name)s service', {'name': self.service_name}) - - super(Service, self).stop() - - def _get_listen_on_addresses(self, default_port): - """ - Helper Method to handle migration from singular host/port to - multiple binds - """ - try: - # The API service uses "api_host", and "api_port", others use - # just host and port. - host = self._service_config.api_host - port = self._service_config.api_port - - except cfg.NoSuchOptError: - host = self._service_config.host - port = self._service_config.port - - if host or port is not None: - LOG.warning("host and port config options used, the 'listen' " - "option has been ignored") - - host = host or "0.0.0.0" - # "port" might be 0 to pick a free port, usually during testing - port = default_port if port is None else port - - return [(host, port)] - - else: - - return map( - netutils.parse_host_port, - set(self._service_config.listen) - ) + def stop(self, graceful=True): + LOG.info('Stopping %(name)s service', {'name': self.name}) + super(Service, self).stop(graceful) -class RPCService(object): - """ - RPC Service mixin used by all Designate RPC Services - """ - def __init__(self, *args, **kwargs): - super(RPCService, self).__init__(*args, **kwargs) +class Heartbeat(object): + def __init__(self, name, tg, rpc_api=None): + self.name = name + self.tg = tg - LOG.debug("Creating RPC Server on topic '%s'", self._rpc_topic) - self._rpc_server = rpc.get_server( - messaging.Target(topic=self._rpc_topic, server=self._host), - self._rpc_endpoints) + self._status = 'UP' + self._stats = {} + self._capabilities = {} emitter_cls = service_status.HeartBeatEmitter.get_driver( CONF.heartbeat_emitter.emitter_type ) self.heartbeat_emitter = emitter_cls( - self.service_name, self.tg, status_factory=self._get_status + self.name, self.tg, + status_factory=self.get_status, rpc_api=rpc_api ) - def _get_status(self): - status = "UP" - stats = {} - capabilities = {} - return status, stats, capabilities - - @property - def _rpc_endpoints(self): - return [self] - - @property - def _rpc_topic(self): - return CONF['service:%s' % self.service_name].topic + def get_status(self): + return self._status, self._stats, self._capabilities def start(self): - super(RPCService, self).start() - - LOG.debug("Starting RPC server on topic '%s'", self._rpc_topic) - self._rpc_server.start() - - # TODO(kiall): This probably belongs somewhere else, maybe the base - # Service class? - self.notifier = rpc.get_notifier(self.service_name) - - for e in self._rpc_endpoints: - if e != self and hasattr(e, 'start'): - e.start() - self.heartbeat_emitter.start() def stop(self): - LOG.debug("Stopping RPC server on topic '%s'", self._rpc_topic) self.heartbeat_emitter.stop() - for e in self._rpc_endpoints: - if e != self and hasattr(e, 'stop'): - e.stop() - # Try to shut the connection down, but if we get any sort of - # errors, go ahead and ignore them.. as we're shutting down anyway - try: - self._rpc_server.stop() - except Exception: - pass +class RPCService(Service): + def __init__(self, name, rpc_topic, threads=None): + super(RPCService, self).__init__(name, threads) + LOG.debug("Creating RPC Server on topic '%s' for %s", + rpc_topic, self.name) - super(RPCService, self).stop() + self.endpoints = [self] + self.notifier = None + self.rpc_server = None + self.rpc_topic = rpc_topic - def wait(self): - for e in self._rpc_endpoints: - if e != self and hasattr(e, 'wait'): - e.wait() + def override_endpoints(self, endpoints): + self.endpoints = endpoints - super(RPCService, self).wait() - - -@six.add_metaclass(abc.ABCMeta) -class WSGIService(object): - """ - WSGI Service mixin used by all Designate WSGI Services - """ - def __init__(self, *args, **kwargs): - super(WSGIService, self).__init__(*args, **kwargs) + def start(self): + super(RPCService, self).start() + target = messaging.Target(topic=self.rpc_topic, server=self.host) + self.rpc_server = rpc.get_server(target, self.endpoints) + self.rpc_server.start() + self.notifier = rpc.get_notifier(self.name) - self._use_ssl = sslutils.is_enabled(CONF) - self._wsgi_socks = [] + def stop(self, graceful=True): + if self.rpc_server: + self.rpc_server.stop() + super(RPCService, self).stop(graceful) - @abc.abstractproperty - def _wsgi_application(self): - pass + def wait(self): + super(RPCService, self).wait() - def start(self): - super(WSGIService, self).start() - addresses = self._get_listen_on_addresses(9001) +class WSGIService(Service): + def __init__(self, app, name, listen, max_url_len=None): + super(WSGIService, self).__init__(name) + self.app = app + self.name = name - for address in addresses: - self._start(address[0], address[1]) + self.listen = listen - def _start(self, host, port): - wsgi_sock = utils.bind_tcp( - host, port, CONF.backlog, CONF.tcp_keepidle) + self.servers = [] - if self._use_ssl: - wsgi_sock = sslutils.wrap(CONF, wsgi_sock) + for address in self.listen: + host, port = netutils.parse_host_port(address) + server = wsgi.Server( + CONF, name, app, + host=host, + port=port, + pool_size=CONF['service:api'].threads, + use_ssl=sslutils.is_enabled(CONF), + max_url_len=max_url_len + ) - self._wsgi_socks.append(wsgi_sock) + self.servers.append(server) - self.tg.add_thread(self._wsgi_handle, wsgi_sock) + def start(self): + for server in self.servers: + server.start() + super(WSGIService, self).start() - def _wsgi_handle(self, wsgi_sock): - logger = logging.getLogger('eventlet.wsgi') - # Adjust wsgi MAX_HEADER_LINE to accept large tokens. - eventlet.wsgi.MAX_HEADER_LINE = self._service_config.max_header_line + def stop(self, graceful=True): + for server in self.servers: + server.stop() + super(WSGIService, self).stop(graceful) - eventlet.wsgi.server(wsgi_sock, - self._wsgi_application, - custom_pool=self.tg.pool, - log=logger) + def wait(self): + for server in self.servers: + server.wait() + super(WSGIService, self).wait() -@six.add_metaclass(abc.ABCMeta) class DNSService(object): - """ - DNS Service mixin used by all Designate DNS Services - """ - _TCP_RECV_MAX_SIZE = 65535 - def __init__(self, *args, **kwargs): - super(DNSService, self).__init__(*args, **kwargs) - + def __init__(self, app, tg, listen, tcp_backlog, tcp_recv_timeout): + self.app = app + self.tg = tg + self.tcp_backlog = tcp_backlog + self.tcp_recv_timeout = tcp_recv_timeout + self.listen = listen metrics.init() # Eventet will complain loudly about our use of multiple greentheads @@ -261,21 +181,18 @@ class DNSService(object): self._dns_socks_tcp = [] self._dns_socks_udp = [] - @abc.abstractproperty - def _dns_application(self): - pass - def start(self): - super(DNSService, self).start() - - addresses = self._get_listen_on_addresses(self._dns_default_port) + addresses = map( + netutils.parse_host_port, + set(self.listen) + ) for address in addresses: self._start(address[0], address[1]) def _start(self, host, port): sock_tcp = utils.bind_tcp( - host, port, self._service_config.tcp_backlog) + host, port, self.tcp_backlog) sock_udp = utils.bind_udp( host, port) @@ -286,14 +203,7 @@ class DNSService(object): self.tg.add_thread(self._dns_handle_tcp, sock_tcp) self.tg.add_thread(self._dns_handle_udp, sock_udp) - def wait(self): - super(DNSService, self).wait() - def stop(self): - # When the service is stopped, the threads for _handle_tcp and - # _handle_udp are stopped too. - super(DNSService, self).stop() - for sock_tcp in self._dns_socks_tcp: sock_tcp.close() @@ -301,7 +211,7 @@ class DNSService(object): sock_udp.close() def _dns_handle_tcp(self, sock_tcp): - LOG.info("_handle_tcp thread started") + LOG.info('_handle_tcp thread started') client = None while True: @@ -309,13 +219,13 @@ class DNSService(object): # handle a new TCP connection client, addr = sock_tcp.accept() - if self._service_config.tcp_recv_timeout: - client.settimeout(self._service_config.tcp_recv_timeout) + if self.tcp_recv_timeout: + client.settimeout(self.tcp_recv_timeout) - LOG.debug("Handling TCP Request from: %(host)s:%(port)d", + LOG.debug('Handling TCP Request from: %(host)s:%(port)d', {'host': addr[0], 'port': addr[1]}) if len(addr) == 4: - LOG.debug("Flow info: %(host)s scope: %(port)d", + LOG.debug('Flow info: %(host)s scope: %(port)d', {'host': addr[2], 'port': addr[3]}) # Dispatch a thread to handle the connection @@ -327,21 +237,21 @@ class DNSService(object): except socket.timeout: if client: client.close() - LOG.warning("TCP Timeout from: %(host)s:%(port)d", + LOG.warning('TCP Timeout from: %(host)s:%(port)d', {'host': addr[0], 'port': addr[1]}) except socket.error as e: if client: client.close() errname = errno.errorcode[e.args[0]] - LOG.warning("Socket error %(err)s from: %(host)s:%(port)d", + LOG.warning('Socket error %(err)s from: %(host)s:%(port)d', {'host': addr[0], 'port': addr[1], 'err': errname}) except Exception: if client: client.close() - LOG.exception("Unknown exception handling TCP request from: " - "%(host)s:%(port)d", + LOG.exception('Unknown exception handling TCP request from: ' + '%(host)s:%(port)d', {'host': addr[0], 'port': addr[1]}) def _dns_handle_tcp_conn(self, addr, client): @@ -369,7 +279,7 @@ class DNSService(object): expected_length_raw = client.recv(2) if len(expected_length_raw) == 0: break - (expected_length, ) = struct.unpack('!H', expected_length_raw) + (expected_length,) = struct.unpack('!H', expected_length_raw) # Keep receiving data until we've got all the data we expect # The buffer contains only one query at a time @@ -385,7 +295,7 @@ class DNSService(object): query = buf # Call into the DNS Application itself with payload and addr - for response in self._dns_application( + for response in self.app( {'payload': query, 'addr': addr}): # Send back a response only if present @@ -398,20 +308,20 @@ class DNSService(object): client.sendall(tcp_response) except socket.timeout: - LOG.info("TCP Timeout from: %(host)s:%(port)d", + LOG.info('TCP Timeout from: %(host)s:%(port)d', {'host': host, 'port': port}) except socket.error as e: errname = errno.errorcode[e.args[0]] - LOG.warning("Socket error %(err)s from: %(host)s:%(port)d", + LOG.warning('Socket error %(err)s from: %(host)s:%(port)d', {'host': host, 'port': port, 'err': errname}) except struct.error: - LOG.warning("Invalid packet from: %(host)s:%(port)d", + LOG.warning('Invalid packet from: %(host)s:%(port)d', {'host': host, 'port': port}) except Exception: - LOG.exception("Unknown exception handling TCP request from: " + LOG.exception('Unknown exception handling TCP request from: ' "%(host)s:%(port)d", {'host': host, 'port': port}) finally: if client: @@ -424,7 +334,7 @@ class DNSService(object): :type sock_udp: socket :raises: None """ - LOG.info("_handle_udp thread started") + LOG.info('_handle_udp thread started') while True: try: @@ -432,8 +342,8 @@ class DNSService(object): # UDP recvfrom. payload, addr = sock_udp.recvfrom(8192) - LOG.debug("Handling UDP Request from: %(host)s:%(port)d", - {'host': addr[0], 'port': addr[1]}) + LOG.debug('Handling UDP Request from: %(host)s:%(port)d', + {'host': addr[0], 'port': addr[1]}) # Dispatch a thread to handle the query self.tg.add_thread(self._dns_handle_udp_query, sock_udp, addr, @@ -441,12 +351,12 @@ class DNSService(object): except socket.error as e: errname = errno.errorcode[e.args[0]] - LOG.warning("Socket error %(err)s from: %(host)s:%(port)d", + LOG.warning('Socket error %(err)s from: %(host)s:%(port)d', {'host': addr[0], 'port': addr[1], 'err': errname}) except Exception: - LOG.exception("Unknown exception handling UDP request from: " - "%(host)s:%(port)d", + LOG.exception('Unknown exception handling UDP request from: ' + '%(host)s:%(port)d', {'host': addr[0], 'port': addr[1]}) def _dns_handle_udp_query(self, sock, addr, payload): @@ -463,7 +373,7 @@ class DNSService(object): """ try: # Call into the DNS Application itself with the payload and addr - for response in self._dns_application( + for response in self.app( {'payload': payload, 'addr': addr}): # Send back a response only if present @@ -471,7 +381,7 @@ class DNSService(object): sock.sendto(response, addr) except Exception: - LOG.exception("Unhandled exception while processing request from " + LOG.exception('Unhandled exception while processing request from ' "%(host)s:%(port)d", {'host': addr[0], 'port': addr[1]}) |