diff options
author | Jenkins <jenkins@review.openstack.org> | 2016-08-02 01:03:59 +0000 |
---|---|---|
committer | Gerrit Code Review <review@openstack.org> | 2016-08-02 01:03:59 +0000 |
commit | b259f88b092ebed47ce2b2138f15e70d590d3535 (patch) | |
tree | d95d5fded1780ad8ab99c3110cc56a5d52c7ebb5 | |
parent | 3a87803db99c6d137f96d7105f56e457ef2c7c3b (diff) | |
parent | 9e61efa67d2d461626f79c1937dec6c50499568f (diff) | |
download | oslo-messaging-b259f88b092ebed47ce2b2138f15e70d590d3535.tar.gz |
Merge "[zmq] Use zmq.IMMEDIATE option for round-robin"
5 files changed, 14 insertions, 9 deletions
diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_sockets_manager.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_sockets_manager.py index 27e5f6e..890d5a1 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/zmq_sockets_manager.py +++ b/oslo_messaging/_drivers/zmq_driver/client/zmq_sockets_manager.py @@ -67,7 +67,7 @@ class SocketsManager(object): socket = self._check_for_new_hosts(target) else: socket = zmq_socket.ZmqSocket(self.conf, self.zmq_context, - self.socket_type) + self.socket_type, immediate=False) self._get_hosts_and_connect(socket, target) return socket diff --git a/oslo_messaging/_drivers/zmq_driver/proxy/zmq_queue_proxy.py b/oslo_messaging/_drivers/zmq_driver/proxy/zmq_queue_proxy.py index 2e053f5..39de566 100644 --- a/oslo_messaging/_drivers/zmq_driver/proxy/zmq_queue_proxy.py +++ b/oslo_messaging/_drivers/zmq_driver/proxy/zmq_queue_proxy.py @@ -90,7 +90,7 @@ class UniversalQueueProxy(object): payload.insert(0, routing_key) payload.insert(0, msg_type) return payload - except (AssertionError, zmq.ZMQError): + except (AssertionError, ValueError, zmq.ZMQError): LOG.error("Received message with wrong format") return None diff --git a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_dealer_consumer.py b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_dealer_consumer.py index 3715b84..5f5e8ef 100644 --- a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_dealer_consumer.py +++ b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_dealer_consumer.py @@ -76,7 +76,7 @@ class DealerConsumer(zmq_consumer_base.SingleSocketConsumer): else: LOG.error(_LE("Unknown message type: %s"), zmq_names.message_type_str(message_type)) - except (zmq.ZMQError, AssertionError) as e: + except (zmq.ZMQError, AssertionError, ValueError) as e: LOG.error(_LE("Receiving message failure: %s"), str(e)) def cleanup(self): diff --git a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py index 99b65ed..f9913cb 100644 --- a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py +++ b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py @@ -63,7 +63,7 @@ class RouterConsumer(zmq_consumer_base.SingleSocketConsumer): else: LOG.error(_LE("Unknown message type: %s"), zmq_names.message_type_str(msg_type)) - except (zmq.ZMQError, AssertionError) as e: + except (zmq.ZMQError, AssertionError, ValueError) as e: LOG.error(_LE("Receiving message failed: %s"), str(e)) def cleanup(self): diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_socket.py b/oslo_messaging/_drivers/zmq_driver/zmq_socket.py index c50ffe4..14061b2 100644 --- a/oslo_messaging/_drivers/zmq_driver/zmq_socket.py +++ b/oslo_messaging/_drivers/zmq_driver/zmq_socket.py @@ -38,7 +38,8 @@ class ZmqSocket(object): 'msgpack': msgpack_serializer.MessagePackSerializer() } - def __init__(self, conf, context, socket_type, high_watermark=0): + def __init__(self, conf, context, socket_type, immediate=True, + high_watermark=0): self.conf = conf self.context = context self.socket_type = socket_type @@ -49,6 +50,8 @@ class ZmqSocket(object): if self.conf.rpc_cast_timeout > 0: self.close_linger = self.conf.rpc_cast_timeout * 1000 self.handle.setsockopt(zmq.LINGER, self.close_linger) + # Put messages to only connected queues + self.handle.setsockopt(zmq.IMMEDIATE, 1 if immediate else 0) self.handle.identity = six.b(str(uuid.uuid4())) self.connections = set() @@ -162,8 +165,9 @@ class ZmqRandomPortSocket(ZmqSocket): def __init__(self, conf, context, socket_type, host=None, high_watermark=0): - super(ZmqRandomPortSocket, self).__init__(conf, context, socket_type, - high_watermark) + super(ZmqRandomPortSocket, self).__init__( + conf, context, socket_type, immediate=False, + high_watermark=high_watermark) self.bind_address = zmq_address.get_tcp_random_address(self.conf) if host is None: host = conf.rpc_zmq_host @@ -183,8 +187,9 @@ class ZmqFixedPortSocket(ZmqSocket): def __init__(self, conf, context, socket_type, host, port, high_watermark=0): - super(ZmqFixedPortSocket, self).__init__(conf, context, socket_type, - high_watermark) + super(ZmqFixedPortSocket, self).__init__( + conf, context, socket_type, immediate=False, + high_watermark=high_watermark) self.connect_address = zmq_address.combine_address(host, port) self.bind_address = zmq_address.get_tcp_direct_address( zmq_address.combine_address(conf.rpc_zmq_bind_address, port)) |