summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMark McLoughlin <markmc@redhat.com>2013-08-28 13:48:19 +0100
committerMark McLoughlin <markmc@redhat.com>2013-08-28 14:06:36 +0100
commitb6d808774683bb0dab13fddddf49543f6bf00627 (patch)
tree4518f6f645d2d9184e2b324274f35264d9730746
parenta2d113198c684e585c5bab9e313b4ede8a6aee4f (diff)
downloadoslo-messaging-b6d808774683bb0dab13fddddf49543f6bf00627.tar.gz
Implement the server side of ZmqDriver
This implements the server side of the driver without modifying the existing code by allowing the driver to spawn off multiple greenthreads as before, but queueing any dispatched messages so that the executor can still do listener.poll() to dispatch messages itself. This is a hack, but it's a starting point. Change-Id: Ie299c2695d81d0473cea81d40114326b89de0011
-rw-r--r--oslo/messaging/_drivers/impl_zmq.py90
1 files changed, 82 insertions, 8 deletions
diff --git a/oslo/messaging/_drivers/impl_zmq.py b/oslo/messaging/_drivers/impl_zmq.py
index 71d29ea..6dcadf6 100644
--- a/oslo/messaging/_drivers/impl_zmq.py
+++ b/oslo/messaging/_drivers/impl_zmq.py
@@ -14,12 +14,15 @@
# License for the specific language governing permissions and limitations
# under the License.
+import collections
import logging
import os
import pprint
+import Queue
import re
import socket
import sys
+import threading
import types
import uuid
@@ -77,7 +80,12 @@ zmq_opts = [
cfg.StrOpt('rpc_zmq_host', default=socket.gethostname(),
help='Name of this node. Must be a valid hostname, FQDN, or '
- 'IP address. Must match "host" option, if running Nova.')
+ 'IP address. Must match "host" option, if running Nova.'),
+
+ cfg.IntOpt('rpc_cast_timeout',
+ default=30,
+ help='Seconds to wait before a cast expires (TTL). '
+ 'Only supported by impl_zmq.'),
]
CONF = cfg.CONF
@@ -621,7 +629,7 @@ class Connection(rpc_common.Connection):
def _cast(addr, context, topic, msg, timeout=None, envelope=False,
- _msg_id=None):
+ _msg_id=None, allowed_remote_exmods=[]):
timeout_cast = timeout or CONF.rpc_cast_timeout
payload = [RpcContext.marshal(context), msg]
@@ -639,7 +647,7 @@ def _cast(addr, context, topic, msg, timeout=None, envelope=False,
def _call(addr, context, topic, msg, timeout=None,
- envelope=False):
+ envelope=False, allowed_remote_exmods=[]):
# timeout_response is how long we wait for a response
timeout = timeout or CONF.rpc_response_timeout
@@ -709,13 +717,14 @@ def _call(addr, context, topic, msg, timeout=None,
# responses for Exceptions.
for resp in responses:
if isinstance(resp, types.DictType) and 'exc' in resp:
- raise rpc_common.deserialize_remote_exception(CONF, resp['exc'])
+ raise rpc_common.deserialize_remote_exception(
+ resp['exc'], allowed_remote_exmods)
return responses[-1]
def _multi_send(method, context, topic, msg, timeout=None,
- envelope=False, _msg_id=None):
+ envelope=False, _msg_id=None, allowed_remote_exmods=[]):
"""Wraps the sending of messages.
Dispatches to the matchmaker and sends message to all relevant hosts.
@@ -744,7 +753,7 @@ def _multi_send(method, context, topic, msg, timeout=None,
_msg_id)
return
return method(_addr, context, _topic, msg, timeout,
- envelope)
+ envelope, allowed_remote_exmods)
def create_connection(conf, new=True):
@@ -820,6 +829,59 @@ def _get_matchmaker(*args, **kwargs):
return matchmaker
+class ZmqIncomingMessage(base.IncomingMessage):
+
+ ReceivedReply = collections.namedtuple(
+ 'ReceivedReply', ['reply', 'failure', 'log_failure'])
+
+ def __init__(self, listener, ctxt, message):
+ super(ZmqIncomingMessage, self).__init__(listener, ctxt, message)
+ self.condition = threading.Condition()
+ self.received = None
+
+ def reply(self, reply=None, failure=None, log_failure=True):
+ self.received = self.ReceivedReply(reply, failure, log_failure)
+ with self.condition:
+ self.condition.notify()
+
+
+class ZmqListener(base.Listener):
+
+ def __init__(self, driver, target):
+ super(ZmqListener, self).__init__(driver, target)
+ self.incoming_queue = Queue.Queue()
+
+ def dispatch(self, ctxt, version, method, namespace, **kwargs):
+ message = {
+ 'method': method,
+ 'args': kwargs
+ }
+ if version:
+ message['version'] = version
+ if namespace:
+ message['namespace'] = namespace
+
+ incoming = ZmqIncomingMessage(self,
+ ctxt.to_dict(),
+ message)
+
+ self.incoming_queue.put(incoming)
+
+ with incoming.condition:
+ incoming.condition.wait()
+
+ assert incoming.received
+
+ if incoming.received.failure:
+ raise incoming.received.failure
+ else:
+ return incoming.received.reply
+
+ def poll(self):
+ while True:
+ return self.incoming_queue.get()
+
+
class ZmqDriver(base.BaseDriver):
# FIXME(markmc): allow this driver to be used without eventlet
@@ -869,7 +931,8 @@ class ZmqDriver(base.BaseDriver):
topic = 'fanout~' + topic
reply = _multi_send(method, context, topic, message,
- envelope=envelope)
+ envelope=envelope,
+ allowed_remote_exmods=self._allowed_remote_exmods)
if wait_for_reply:
return reply[-1]
@@ -884,7 +947,18 @@ class ZmqDriver(base.BaseDriver):
return self._send(target, ctxt, message, envelope=(version == 2.0))
def listen(self, target):
- raise NotImplementedError()
+ conn = create_connection(self.conf)
+
+ listener = ZmqListener(self, target)
+
+ conn.create_consumer(target.topic, listener)
+ conn.create_consumer('%s.%s' % (target.topic, target.server),
+ listener)
+ conn.create_consumer(target.topic, listener, fanout=True)
+
+ conn.consume_in_thread()
+
+ return listener
def cleanup(self):
cleanup()