diff options
author | Oleksii Zamiatin <ozamiatin@mirantis.com> | 2015-07-20 12:21:58 +0300 |
---|---|---|
committer | Oleksii Zamiatin <ozamiatin@mirantis.com> | 2015-07-21 12:52:31 +0300 |
commit | e2c3e36d75dedcab07c9239f544e93aabe0faed5 (patch) | |
tree | ed08ec0e425a86c20df6613ee9b51e5ebfe3250d | |
parent | 75660cedacefd1c3cf1c7d6d2e131cdb6112671a (diff) | |
download | oslo-messaging-e2c3e36d75dedcab07c9239f544e93aabe0faed5.tar.gz |
Close sockets properly
All socket-connections should properly
die after their parents being stopped.
Change-Id: I6a83ed2d5ef194e8b068c1d8bd6813f48636c5fb
13 files changed, 87 insertions, 33 deletions
diff --git a/oslo_messaging/_drivers/impl_zmq.py b/oslo_messaging/_drivers/impl_zmq.py index 69d2bf5..064fe7c 100644 --- a/oslo_messaging/_drivers/impl_zmq.py +++ b/oslo_messaging/_drivers/impl_zmq.py @@ -96,21 +96,20 @@ class ZmqDriver(base.BaseDriver): conf.register_opts(zmq_opts) conf.register_opts(executor_base._pool_opts) self.conf = conf - self.server = None - self.client = None + self.matchmaker = driver.DriverManager( 'oslo.messaging.zmq.matchmaker', self.conf.rpc_zmq_matchmaker, ).driver(self.conf) + self.server = zmq_server.ZmqServer(self.conf, self.matchmaker) + self.client = zmq_client.ZmqClient(self.conf, self.matchmaker, + allowed_remote_exmods) super(ZmqDriver, self).__init__(conf, url, default_exchange, allowed_remote_exmods) def send(self, target, ctxt, message, wait_for_reply=None, timeout=None, retry=None): - if self.client is None: - self.client = zmq_client.ZmqClient(self.conf, self.matchmaker, - self._allowed_remote_exmods) if wait_for_reply: return self.client.call(target, ctxt, message, timeout, retry) else: @@ -121,8 +120,6 @@ class ZmqDriver(base.BaseDriver): return None def listen(self, target): - if self.server is None: - self.server = zmq_server.ZmqServer(self.conf, self.matchmaker) self.server.listen(target) return self.server @@ -130,4 +127,5 @@ class ZmqDriver(base.BaseDriver): return None def cleanup(self): - pass + self.client.cleanup() + self.server.cleanup() diff --git a/oslo_messaging/_drivers/zmq_driver/broker/zmq_base_proxy.py b/oslo_messaging/_drivers/zmq_driver/broker/zmq_base_proxy.py index c13d14d..758c15c 100644 --- a/oslo_messaging/_drivers/zmq_driver/broker/zmq_base_proxy.py +++ b/oslo_messaging/_drivers/zmq_driver/broker/zmq_base_proxy.py @@ -110,6 +110,9 @@ class BaseTcpFrontend(object): LOG.info(_LI("Message %s received."), message) return message + def close(self): + self.frontend.close() + @six.add_metaclass(abc.ABCMeta) class BaseBackendMatcher(object): @@ -124,6 +127,11 @@ class BaseBackendMatcher(object): def redirect_to_backend(self, message): """Redirect message""" + def close(self): + if self.backends: + for backend in self.backends.values(): + backend.close() + @six.add_metaclass(abc.ABCMeta) class DirectBackendMatcher(BaseBackendMatcher): diff --git a/oslo_messaging/_drivers/zmq_driver/broker/zmq_fanout_proxy.py b/oslo_messaging/_drivers/zmq_driver/broker/zmq_fanout_proxy.py index 279ce3f..bf6492e 100644 --- a/oslo_messaging/_drivers/zmq_driver/broker/zmq_fanout_proxy.py +++ b/oslo_messaging/_drivers/zmq_driver/broker/zmq_fanout_proxy.py @@ -33,3 +33,6 @@ class PublisherBackend(base_proxy.BaseBackendMatcher): target_pos = zmq_serializer.MESSAGE_CALL_TARGET_POSITION + 1 msg = message[target_pos:] self.backend.send_multipart(msg) + + def close(self): + self.backend.close() diff --git a/oslo_messaging/_drivers/zmq_driver/broker/zmq_universal_proxy.py b/oslo_messaging/_drivers/zmq_driver/broker/zmq_universal_proxy.py index c57a60f..82d3e02 100644 --- a/oslo_messaging/_drivers/zmq_driver/broker/zmq_universal_proxy.py +++ b/oslo_messaging/_drivers/zmq_driver/broker/zmq_universal_proxy.py @@ -50,6 +50,12 @@ class UniversalProxy(base_proxy.BaseProxy): else: self.tcp_frontend.redirect_outgoing_reply(message) + def stop(self): + self.poller.close() + super(UniversalProxy, self).stop() + self.tcp_frontend.close() + self.backend_matcher.close() + class BackendMatcher(base_proxy.BaseBackendMatcher): diff --git a/oslo_messaging/_drivers/zmq_driver/matchmaker/matchmaker_redis.py b/oslo_messaging/_drivers/zmq_driver/matchmaker/matchmaker_redis.py index 1ece3d2..834d35f 100644 --- a/oslo_messaging/_drivers/zmq_driver/matchmaker/matchmaker_redis.py +++ b/oslo_messaging/_drivers/zmq_driver/matchmaker/matchmaker_redis.py @@ -49,8 +49,8 @@ class RedisMatchMaker(base.MatchMakerBase): ) def register(self, target, hostname): - key = zmq_target.target_to_str(target) if hostname not in self.get_hosts(target): + key = zmq_target.target_to_str(target) self._redis.lpush(key, hostname) def get_hosts(self, target): diff --git a/oslo_messaging/_drivers/zmq_driver/poller/green_poller.py b/oslo_messaging/_drivers/zmq_driver/poller/green_poller.py index f09bb01..3e6f514 100644 --- a/oslo_messaging/_drivers/zmq_driver/poller/green_poller.py +++ b/oslo_messaging/_drivers/zmq_driver/poller/green_poller.py @@ -29,12 +29,12 @@ class GreenPoller(zmq_poller.ZmqPoller): def __init__(self): self.incoming_queue = six.moves.queue.Queue() self.green_pool = eventlet.GreenPool() - self.sockets = [] + self.threads = [] def register(self, socket, recv_method=None): - self.sockets.append(socket) - return self.green_pool.spawn(self._socket_receive, socket, - recv_method) + self.threads.append( + self.green_pool.spawn(self._socket_receive, socket, + recv_method)) def _socket_receive(self, socket, recv_method=None): while True: @@ -58,6 +58,10 @@ class GreenPoller(zmq_poller.ZmqPoller): return None, None return incoming[0], incoming[1] + def close(self): + for thread in self.threads: + thread.kill() + class HoldReplyPoller(GreenPoller): diff --git a/oslo_messaging/_drivers/zmq_driver/poller/threading_poller.py b/oslo_messaging/_drivers/zmq_driver/poller/threading_poller.py index c6be793..7719310 100644 --- a/oslo_messaging/_drivers/zmq_driver/poller/threading_poller.py +++ b/oslo_messaging/_drivers/zmq_driver/poller/threading_poller.py @@ -42,7 +42,13 @@ class ThreadingPoller(zmq_poller.ZmqPoller): def poll(self, timeout=None): timeout = timeout * 1000 # zmq poller waits milliseconds - sockets = dict(self.poller.poll(timeout=timeout)) + sockets = None + + try: + sockets = dict(self.poller.poll(timeout=timeout)) + except zmq.ZMQError as e: + LOG.debug("Polling terminated with error: %s" % e) + if not sockets: return None, None for socket in sockets: @@ -51,6 +57,12 @@ class ThreadingPoller(zmq_poller.ZmqPoller): else: return socket.recv_multipart(), socket + def resume_polling(self, socket): + pass # Nothing to do for threading poller + + def close(self): + pass # Nothing to do for threading poller + class ThreadingExecutor(zmq_poller.Executor): diff --git a/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_call_request.py b/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_call_request.py index aff4f59..8bdd9c0 100644 --- a/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_call_request.py +++ b/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_call_request.py @@ -33,6 +33,7 @@ class CallRequest(Request): retry=None, allowed_remote_exmods=None, matchmaker=None): self.allowed_remote_exmods = allowed_remote_exmods or [] self.matchmaker = matchmaker + self.reply_poller = zmq_async.get_reply_poller() try: self.zmq_context = zmq.Context() @@ -51,13 +52,16 @@ class CallRequest(Request): LOG.error(_LE("Error connecting to socket: %s") % str(e)) raise + def close(self): + self.reply_poller.close() + self.socket.close() + def receive_reply(self): # NOTE(ozamiatin): Check for retry here (no retries now) - poller = zmq_async.get_reply_poller() - poller.register(self.socket, - recv_method=lambda socket: socket.recv_json()) + self.reply_poller.register( + self.socket, recv_method=lambda socket: socket.recv_json()) - reply, socket = poller.poll(timeout=self.timeout) + reply, socket = self.reply_poller.poll(timeout=self.timeout) if reply is None: raise oslo_messaging.MessagingTimeout( "Timeout %s seconds was reached" % self.timeout) diff --git a/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_cast_dealer.py b/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_cast_dealer.py index 2aad145..f28ed42 100644 --- a/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_cast_dealer.py +++ b/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_cast_dealer.py @@ -71,7 +71,13 @@ class DealerCastPublisher(zmq_cast_publisher.CastPublisherBase): dealer_socket = self.zmq_context.socket(zmq.DEALER) LOG.info(_LI("Connecting DEALER to %s") % address) dealer_socket.connect(address) + self.outbound_sockets[address] = dealer_socket return dealer_socket except zmq.ZMQError: LOG.error(_LE("Failed connecting DEALER to %s") % address) raise + + def cleanup(self): + if self.outbound_sockets: + for socket in self.outbound_sockets.values(): + socket.close() diff --git a/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_client.py b/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_client.py index cdd291b..2bdbee1 100644 --- a/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_client.py +++ b/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_client.py @@ -12,6 +12,7 @@ # License for the specific language governing permissions and limitations # under the License. +import contextlib from oslo_messaging._drivers.zmq_driver.rpc.client import zmq_call_request from oslo_messaging._drivers.zmq_driver.rpc.client import zmq_cast_dealer @@ -27,10 +28,14 @@ class ZmqClient(object): matchmaker) def call(self, target, context, message, timeout=None, retry=None): - request = zmq_call_request.CallRequest( - self.conf, target, context, message, timeout, retry, - self.allowed_remote_exmods, self.matchmaker) - return request() + with contextlib.closing(zmq_call_request.CallRequest( + self.conf, target, context, message, timeout, retry, + self.allowed_remote_exmods, + self.matchmaker)) as request: + return request() def cast(self, target, context, message, timeout=None, retry=None): self.cast_publisher.cast(target, context, message, timeout, retry) + + def cleanup(self): + self.cast_publisher.cleanup() diff --git a/oslo_messaging/_drivers/zmq_driver/rpc/server/zmq_base_consumer.py b/oslo_messaging/_drivers/zmq_driver/rpc/server/zmq_base_consumer.py index 9fffe86..a74b7dc 100644 --- a/oslo_messaging/_drivers/zmq_driver/rpc/server/zmq_base_consumer.py +++ b/oslo_messaging/_drivers/zmq_driver/rpc/server/zmq_base_consumer.py @@ -12,7 +12,12 @@ # License for the specific language governing permissions and limitations # under the License. +import abc +import six + + +@six.add_metaclass(abc.ABCMeta) class ConsumerBase(object): def __init__(self, listener, conf, zmq_poller, context): @@ -22,14 +27,11 @@ class ConsumerBase(object): self.context = context self.sockets_per_target = {} - def poll(self, timeout=None): - pass - - def stop(self): - pass - def cleanup(self): - pass + if self.sockets_per_target: + for socket in self.sockets_per_target.values(): + socket.close() + @abc.abstractmethod def listen(self, target): - pass + """Listen for target""" diff --git a/oslo_messaging/_drivers/zmq_driver/rpc/server/zmq_server.py b/oslo_messaging/_drivers/zmq_driver/rpc/server/zmq_server.py index ef6e97a..9621e8c 100644 --- a/oslo_messaging/_drivers/zmq_driver/rpc/server/zmq_server.py +++ b/oslo_messaging/_drivers/zmq_driver/rpc/server/zmq_server.py @@ -47,7 +47,9 @@ class ZmqServer(base.Listener): LOG.info("[Server] Stop") def cleanup(self): - pass + self.poller.close() + self.call_resp.cleanup() + self.fanout_resp.cleanup() def listen(self, target): LOG.info("[Server] Listen to Target %s" % target) diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_poller.py b/oslo_messaging/_drivers/zmq_driver/zmq_poller.py index dcd51ad..02d4ee8 100644 --- a/oslo_messaging/_drivers/zmq_driver/zmq_poller.py +++ b/oslo_messaging/_drivers/zmq_driver/zmq_poller.py @@ -22,11 +22,15 @@ class ZmqPoller(object): @abc.abstractmethod def register(self, socket, recv_method=None): - 'Register socket to poll' + """Register socket to poll""" @abc.abstractmethod def poll(self, timeout=None): - 'Poll for messages' + """Poll for messages""" + + @abc.abstractmethod + def close(self): + """Terminate polling""" @six.add_metaclass(abc.ABCMeta) |