summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorOleksii Zamiatin <ozamiatin@mirantis.com>2015-07-20 12:21:58 +0300
committerOleksii Zamiatin <ozamiatin@mirantis.com>2015-07-21 12:52:31 +0300
commite2c3e36d75dedcab07c9239f544e93aabe0faed5 (patch)
treeed08ec0e425a86c20df6613ee9b51e5ebfe3250d
parent75660cedacefd1c3cf1c7d6d2e131cdb6112671a (diff)
downloadoslo-messaging-e2c3e36d75dedcab07c9239f544e93aabe0faed5.tar.gz
Close sockets properly
All socket-connections should properly die after their parents being stopped. Change-Id: I6a83ed2d5ef194e8b068c1d8bd6813f48636c5fb
-rw-r--r--oslo_messaging/_drivers/impl_zmq.py14
-rw-r--r--oslo_messaging/_drivers/zmq_driver/broker/zmq_base_proxy.py8
-rw-r--r--oslo_messaging/_drivers/zmq_driver/broker/zmq_fanout_proxy.py3
-rw-r--r--oslo_messaging/_drivers/zmq_driver/broker/zmq_universal_proxy.py6
-rw-r--r--oslo_messaging/_drivers/zmq_driver/matchmaker/matchmaker_redis.py2
-rw-r--r--oslo_messaging/_drivers/zmq_driver/poller/green_poller.py12
-rw-r--r--oslo_messaging/_drivers/zmq_driver/poller/threading_poller.py14
-rw-r--r--oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_call_request.py12
-rw-r--r--oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_cast_dealer.py6
-rw-r--r--oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_client.py13
-rw-r--r--oslo_messaging/_drivers/zmq_driver/rpc/server/zmq_base_consumer.py18
-rw-r--r--oslo_messaging/_drivers/zmq_driver/rpc/server/zmq_server.py4
-rw-r--r--oslo_messaging/_drivers/zmq_driver/zmq_poller.py8
13 files changed, 87 insertions, 33 deletions
diff --git a/oslo_messaging/_drivers/impl_zmq.py b/oslo_messaging/_drivers/impl_zmq.py
index 69d2bf5..064fe7c 100644
--- a/oslo_messaging/_drivers/impl_zmq.py
+++ b/oslo_messaging/_drivers/impl_zmq.py
@@ -96,21 +96,20 @@ class ZmqDriver(base.BaseDriver):
conf.register_opts(zmq_opts)
conf.register_opts(executor_base._pool_opts)
self.conf = conf
- self.server = None
- self.client = None
+
self.matchmaker = driver.DriverManager(
'oslo.messaging.zmq.matchmaker',
self.conf.rpc_zmq_matchmaker,
).driver(self.conf)
+ self.server = zmq_server.ZmqServer(self.conf, self.matchmaker)
+ self.client = zmq_client.ZmqClient(self.conf, self.matchmaker,
+ allowed_remote_exmods)
super(ZmqDriver, self).__init__(conf, url, default_exchange,
allowed_remote_exmods)
def send(self, target, ctxt, message, wait_for_reply=None, timeout=None,
retry=None):
- if self.client is None:
- self.client = zmq_client.ZmqClient(self.conf, self.matchmaker,
- self._allowed_remote_exmods)
if wait_for_reply:
return self.client.call(target, ctxt, message, timeout, retry)
else:
@@ -121,8 +120,6 @@ class ZmqDriver(base.BaseDriver):
return None
def listen(self, target):
- if self.server is None:
- self.server = zmq_server.ZmqServer(self.conf, self.matchmaker)
self.server.listen(target)
return self.server
@@ -130,4 +127,5 @@ class ZmqDriver(base.BaseDriver):
return None
def cleanup(self):
- pass
+ self.client.cleanup()
+ self.server.cleanup()
diff --git a/oslo_messaging/_drivers/zmq_driver/broker/zmq_base_proxy.py b/oslo_messaging/_drivers/zmq_driver/broker/zmq_base_proxy.py
index c13d14d..758c15c 100644
--- a/oslo_messaging/_drivers/zmq_driver/broker/zmq_base_proxy.py
+++ b/oslo_messaging/_drivers/zmq_driver/broker/zmq_base_proxy.py
@@ -110,6 +110,9 @@ class BaseTcpFrontend(object):
LOG.info(_LI("Message %s received."), message)
return message
+ def close(self):
+ self.frontend.close()
+
@six.add_metaclass(abc.ABCMeta)
class BaseBackendMatcher(object):
@@ -124,6 +127,11 @@ class BaseBackendMatcher(object):
def redirect_to_backend(self, message):
"""Redirect message"""
+ def close(self):
+ if self.backends:
+ for backend in self.backends.values():
+ backend.close()
+
@six.add_metaclass(abc.ABCMeta)
class DirectBackendMatcher(BaseBackendMatcher):
diff --git a/oslo_messaging/_drivers/zmq_driver/broker/zmq_fanout_proxy.py b/oslo_messaging/_drivers/zmq_driver/broker/zmq_fanout_proxy.py
index 279ce3f..bf6492e 100644
--- a/oslo_messaging/_drivers/zmq_driver/broker/zmq_fanout_proxy.py
+++ b/oslo_messaging/_drivers/zmq_driver/broker/zmq_fanout_proxy.py
@@ -33,3 +33,6 @@ class PublisherBackend(base_proxy.BaseBackendMatcher):
target_pos = zmq_serializer.MESSAGE_CALL_TARGET_POSITION + 1
msg = message[target_pos:]
self.backend.send_multipart(msg)
+
+ def close(self):
+ self.backend.close()
diff --git a/oslo_messaging/_drivers/zmq_driver/broker/zmq_universal_proxy.py b/oslo_messaging/_drivers/zmq_driver/broker/zmq_universal_proxy.py
index c57a60f..82d3e02 100644
--- a/oslo_messaging/_drivers/zmq_driver/broker/zmq_universal_proxy.py
+++ b/oslo_messaging/_drivers/zmq_driver/broker/zmq_universal_proxy.py
@@ -50,6 +50,12 @@ class UniversalProxy(base_proxy.BaseProxy):
else:
self.tcp_frontend.redirect_outgoing_reply(message)
+ def stop(self):
+ self.poller.close()
+ super(UniversalProxy, self).stop()
+ self.tcp_frontend.close()
+ self.backend_matcher.close()
+
class BackendMatcher(base_proxy.BaseBackendMatcher):
diff --git a/oslo_messaging/_drivers/zmq_driver/matchmaker/matchmaker_redis.py b/oslo_messaging/_drivers/zmq_driver/matchmaker/matchmaker_redis.py
index 1ece3d2..834d35f 100644
--- a/oslo_messaging/_drivers/zmq_driver/matchmaker/matchmaker_redis.py
+++ b/oslo_messaging/_drivers/zmq_driver/matchmaker/matchmaker_redis.py
@@ -49,8 +49,8 @@ class RedisMatchMaker(base.MatchMakerBase):
)
def register(self, target, hostname):
- key = zmq_target.target_to_str(target)
if hostname not in self.get_hosts(target):
+ key = zmq_target.target_to_str(target)
self._redis.lpush(key, hostname)
def get_hosts(self, target):
diff --git a/oslo_messaging/_drivers/zmq_driver/poller/green_poller.py b/oslo_messaging/_drivers/zmq_driver/poller/green_poller.py
index f09bb01..3e6f514 100644
--- a/oslo_messaging/_drivers/zmq_driver/poller/green_poller.py
+++ b/oslo_messaging/_drivers/zmq_driver/poller/green_poller.py
@@ -29,12 +29,12 @@ class GreenPoller(zmq_poller.ZmqPoller):
def __init__(self):
self.incoming_queue = six.moves.queue.Queue()
self.green_pool = eventlet.GreenPool()
- self.sockets = []
+ self.threads = []
def register(self, socket, recv_method=None):
- self.sockets.append(socket)
- return self.green_pool.spawn(self._socket_receive, socket,
- recv_method)
+ self.threads.append(
+ self.green_pool.spawn(self._socket_receive, socket,
+ recv_method))
def _socket_receive(self, socket, recv_method=None):
while True:
@@ -58,6 +58,10 @@ class GreenPoller(zmq_poller.ZmqPoller):
return None, None
return incoming[0], incoming[1]
+ def close(self):
+ for thread in self.threads:
+ thread.kill()
+
class HoldReplyPoller(GreenPoller):
diff --git a/oslo_messaging/_drivers/zmq_driver/poller/threading_poller.py b/oslo_messaging/_drivers/zmq_driver/poller/threading_poller.py
index c6be793..7719310 100644
--- a/oslo_messaging/_drivers/zmq_driver/poller/threading_poller.py
+++ b/oslo_messaging/_drivers/zmq_driver/poller/threading_poller.py
@@ -42,7 +42,13 @@ class ThreadingPoller(zmq_poller.ZmqPoller):
def poll(self, timeout=None):
timeout = timeout * 1000 # zmq poller waits milliseconds
- sockets = dict(self.poller.poll(timeout=timeout))
+ sockets = None
+
+ try:
+ sockets = dict(self.poller.poll(timeout=timeout))
+ except zmq.ZMQError as e:
+ LOG.debug("Polling terminated with error: %s" % e)
+
if not sockets:
return None, None
for socket in sockets:
@@ -51,6 +57,12 @@ class ThreadingPoller(zmq_poller.ZmqPoller):
else:
return socket.recv_multipart(), socket
+ def resume_polling(self, socket):
+ pass # Nothing to do for threading poller
+
+ def close(self):
+ pass # Nothing to do for threading poller
+
class ThreadingExecutor(zmq_poller.Executor):
diff --git a/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_call_request.py b/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_call_request.py
index aff4f59..8bdd9c0 100644
--- a/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_call_request.py
+++ b/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_call_request.py
@@ -33,6 +33,7 @@ class CallRequest(Request):
retry=None, allowed_remote_exmods=None, matchmaker=None):
self.allowed_remote_exmods = allowed_remote_exmods or []
self.matchmaker = matchmaker
+ self.reply_poller = zmq_async.get_reply_poller()
try:
self.zmq_context = zmq.Context()
@@ -51,13 +52,16 @@ class CallRequest(Request):
LOG.error(_LE("Error connecting to socket: %s") % str(e))
raise
+ def close(self):
+ self.reply_poller.close()
+ self.socket.close()
+
def receive_reply(self):
# NOTE(ozamiatin): Check for retry here (no retries now)
- poller = zmq_async.get_reply_poller()
- poller.register(self.socket,
- recv_method=lambda socket: socket.recv_json())
+ self.reply_poller.register(
+ self.socket, recv_method=lambda socket: socket.recv_json())
- reply, socket = poller.poll(timeout=self.timeout)
+ reply, socket = self.reply_poller.poll(timeout=self.timeout)
if reply is None:
raise oslo_messaging.MessagingTimeout(
"Timeout %s seconds was reached" % self.timeout)
diff --git a/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_cast_dealer.py b/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_cast_dealer.py
index 2aad145..f28ed42 100644
--- a/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_cast_dealer.py
+++ b/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_cast_dealer.py
@@ -71,7 +71,13 @@ class DealerCastPublisher(zmq_cast_publisher.CastPublisherBase):
dealer_socket = self.zmq_context.socket(zmq.DEALER)
LOG.info(_LI("Connecting DEALER to %s") % address)
dealer_socket.connect(address)
+ self.outbound_sockets[address] = dealer_socket
return dealer_socket
except zmq.ZMQError:
LOG.error(_LE("Failed connecting DEALER to %s") % address)
raise
+
+ def cleanup(self):
+ if self.outbound_sockets:
+ for socket in self.outbound_sockets.values():
+ socket.close()
diff --git a/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_client.py b/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_client.py
index cdd291b..2bdbee1 100644
--- a/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_client.py
+++ b/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_client.py
@@ -12,6 +12,7 @@
# License for the specific language governing permissions and limitations
# under the License.
+import contextlib
from oslo_messaging._drivers.zmq_driver.rpc.client import zmq_call_request
from oslo_messaging._drivers.zmq_driver.rpc.client import zmq_cast_dealer
@@ -27,10 +28,14 @@ class ZmqClient(object):
matchmaker)
def call(self, target, context, message, timeout=None, retry=None):
- request = zmq_call_request.CallRequest(
- self.conf, target, context, message, timeout, retry,
- self.allowed_remote_exmods, self.matchmaker)
- return request()
+ with contextlib.closing(zmq_call_request.CallRequest(
+ self.conf, target, context, message, timeout, retry,
+ self.allowed_remote_exmods,
+ self.matchmaker)) as request:
+ return request()
def cast(self, target, context, message, timeout=None, retry=None):
self.cast_publisher.cast(target, context, message, timeout, retry)
+
+ def cleanup(self):
+ self.cast_publisher.cleanup()
diff --git a/oslo_messaging/_drivers/zmq_driver/rpc/server/zmq_base_consumer.py b/oslo_messaging/_drivers/zmq_driver/rpc/server/zmq_base_consumer.py
index 9fffe86..a74b7dc 100644
--- a/oslo_messaging/_drivers/zmq_driver/rpc/server/zmq_base_consumer.py
+++ b/oslo_messaging/_drivers/zmq_driver/rpc/server/zmq_base_consumer.py
@@ -12,7 +12,12 @@
# License for the specific language governing permissions and limitations
# under the License.
+import abc
+import six
+
+
+@six.add_metaclass(abc.ABCMeta)
class ConsumerBase(object):
def __init__(self, listener, conf, zmq_poller, context):
@@ -22,14 +27,11 @@ class ConsumerBase(object):
self.context = context
self.sockets_per_target = {}
- def poll(self, timeout=None):
- pass
-
- def stop(self):
- pass
-
def cleanup(self):
- pass
+ if self.sockets_per_target:
+ for socket in self.sockets_per_target.values():
+ socket.close()
+ @abc.abstractmethod
def listen(self, target):
- pass
+ """Listen for target"""
diff --git a/oslo_messaging/_drivers/zmq_driver/rpc/server/zmq_server.py b/oslo_messaging/_drivers/zmq_driver/rpc/server/zmq_server.py
index ef6e97a..9621e8c 100644
--- a/oslo_messaging/_drivers/zmq_driver/rpc/server/zmq_server.py
+++ b/oslo_messaging/_drivers/zmq_driver/rpc/server/zmq_server.py
@@ -47,7 +47,9 @@ class ZmqServer(base.Listener):
LOG.info("[Server] Stop")
def cleanup(self):
- pass
+ self.poller.close()
+ self.call_resp.cleanup()
+ self.fanout_resp.cleanup()
def listen(self, target):
LOG.info("[Server] Listen to Target %s" % target)
diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_poller.py b/oslo_messaging/_drivers/zmq_driver/zmq_poller.py
index dcd51ad..02d4ee8 100644
--- a/oslo_messaging/_drivers/zmq_driver/zmq_poller.py
+++ b/oslo_messaging/_drivers/zmq_driver/zmq_poller.py
@@ -22,11 +22,15 @@ class ZmqPoller(object):
@abc.abstractmethod
def register(self, socket, recv_method=None):
- 'Register socket to poll'
+ """Register socket to poll"""
@abc.abstractmethod
def poll(self, timeout=None):
- 'Poll for messages'
+ """Poll for messages"""
+
+ @abc.abstractmethod
+ def close(self):
+ """Terminate polling"""
@six.add_metaclass(abc.ABCMeta)