summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2016-08-02 01:03:59 +0000
committerGerrit Code Review <review@openstack.org>2016-08-02 01:03:59 +0000
commitb259f88b092ebed47ce2b2138f15e70d590d3535 (patch)
treed95d5fded1780ad8ab99c3110cc56a5d52c7ebb5
parent3a87803db99c6d137f96d7105f56e457ef2c7c3b (diff)
parent9e61efa67d2d461626f79c1937dec6c50499568f (diff)
downloadoslo-messaging-b259f88b092ebed47ce2b2138f15e70d590d3535.tar.gz
Merge "[zmq] Use zmq.IMMEDIATE option for round-robin"
-rw-r--r--oslo_messaging/_drivers/zmq_driver/client/zmq_sockets_manager.py2
-rw-r--r--oslo_messaging/_drivers/zmq_driver/proxy/zmq_queue_proxy.py2
-rw-r--r--oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_dealer_consumer.py2
-rw-r--r--oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py2
-rw-r--r--oslo_messaging/_drivers/zmq_driver/zmq_socket.py15
5 files changed, 14 insertions, 9 deletions
diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_sockets_manager.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_sockets_manager.py
index 27e5f6e..890d5a1 100644
--- a/oslo_messaging/_drivers/zmq_driver/client/zmq_sockets_manager.py
+++ b/oslo_messaging/_drivers/zmq_driver/client/zmq_sockets_manager.py
@@ -67,7 +67,7 @@ class SocketsManager(object):
socket = self._check_for_new_hosts(target)
else:
socket = zmq_socket.ZmqSocket(self.conf, self.zmq_context,
- self.socket_type)
+ self.socket_type, immediate=False)
self._get_hosts_and_connect(socket, target)
return socket
diff --git a/oslo_messaging/_drivers/zmq_driver/proxy/zmq_queue_proxy.py b/oslo_messaging/_drivers/zmq_driver/proxy/zmq_queue_proxy.py
index 2e053f5..39de566 100644
--- a/oslo_messaging/_drivers/zmq_driver/proxy/zmq_queue_proxy.py
+++ b/oslo_messaging/_drivers/zmq_driver/proxy/zmq_queue_proxy.py
@@ -90,7 +90,7 @@ class UniversalQueueProxy(object):
payload.insert(0, routing_key)
payload.insert(0, msg_type)
return payload
- except (AssertionError, zmq.ZMQError):
+ except (AssertionError, ValueError, zmq.ZMQError):
LOG.error("Received message with wrong format")
return None
diff --git a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_dealer_consumer.py b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_dealer_consumer.py
index 3715b84..5f5e8ef 100644
--- a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_dealer_consumer.py
+++ b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_dealer_consumer.py
@@ -76,7 +76,7 @@ class DealerConsumer(zmq_consumer_base.SingleSocketConsumer):
else:
LOG.error(_LE("Unknown message type: %s"),
zmq_names.message_type_str(message_type))
- except (zmq.ZMQError, AssertionError) as e:
+ except (zmq.ZMQError, AssertionError, ValueError) as e:
LOG.error(_LE("Receiving message failure: %s"), str(e))
def cleanup(self):
diff --git a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py
index 99b65ed..f9913cb 100644
--- a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py
+++ b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py
@@ -63,7 +63,7 @@ class RouterConsumer(zmq_consumer_base.SingleSocketConsumer):
else:
LOG.error(_LE("Unknown message type: %s"),
zmq_names.message_type_str(msg_type))
- except (zmq.ZMQError, AssertionError) as e:
+ except (zmq.ZMQError, AssertionError, ValueError) as e:
LOG.error(_LE("Receiving message failed: %s"), str(e))
def cleanup(self):
diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_socket.py b/oslo_messaging/_drivers/zmq_driver/zmq_socket.py
index c50ffe4..14061b2 100644
--- a/oslo_messaging/_drivers/zmq_driver/zmq_socket.py
+++ b/oslo_messaging/_drivers/zmq_driver/zmq_socket.py
@@ -38,7 +38,8 @@ class ZmqSocket(object):
'msgpack': msgpack_serializer.MessagePackSerializer()
}
- def __init__(self, conf, context, socket_type, high_watermark=0):
+ def __init__(self, conf, context, socket_type, immediate=True,
+ high_watermark=0):
self.conf = conf
self.context = context
self.socket_type = socket_type
@@ -49,6 +50,8 @@ class ZmqSocket(object):
if self.conf.rpc_cast_timeout > 0:
self.close_linger = self.conf.rpc_cast_timeout * 1000
self.handle.setsockopt(zmq.LINGER, self.close_linger)
+ # Put messages to only connected queues
+ self.handle.setsockopt(zmq.IMMEDIATE, 1 if immediate else 0)
self.handle.identity = six.b(str(uuid.uuid4()))
self.connections = set()
@@ -162,8 +165,9 @@ class ZmqRandomPortSocket(ZmqSocket):
def __init__(self, conf, context, socket_type, host=None,
high_watermark=0):
- super(ZmqRandomPortSocket, self).__init__(conf, context, socket_type,
- high_watermark)
+ super(ZmqRandomPortSocket, self).__init__(
+ conf, context, socket_type, immediate=False,
+ high_watermark=high_watermark)
self.bind_address = zmq_address.get_tcp_random_address(self.conf)
if host is None:
host = conf.rpc_zmq_host
@@ -183,8 +187,9 @@ class ZmqFixedPortSocket(ZmqSocket):
def __init__(self, conf, context, socket_type, host, port,
high_watermark=0):
- super(ZmqFixedPortSocket, self).__init__(conf, context, socket_type,
- high_watermark)
+ super(ZmqFixedPortSocket, self).__init__(
+ conf, context, socket_type, immediate=False,
+ high_watermark=high_watermark)
self.connect_address = zmq_address.combine_address(host, port)
self.bind_address = zmq_address.get_tcp_direct_address(
zmq_address.combine_address(conf.rpc_zmq_bind_address, port))