summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorOleksii Zamiatin <ozamiatin@mirantis.com>2015-06-19 14:29:18 +0300
committerOleksii Zamiatin <ozamiatin@mirantis.com>2015-06-30 18:16:02 +0300
commit73cd49129f0ce2a799938b4fec9dcd847b0a77ad (patch)
tree1a06b091be5a4a923917b5b5362f32ad1fcbfabc
parent76ec03c8f9fcc33cf6b407c05bea6fcbb5c74126 (diff)
downloadoslo-messaging-73cd49129f0ce2a799938b4fec9dcd847b0a77ad.tar.gz
Initial commit for new zmq driver implementation
- Minimal RPC (CALL + direct CAST) implementation - Has up and running oslo_messaging/tests/drivers/test_impl_zmq - Pep8 fixed. - Works over REQ/REP pipeline according to [1] - Has a beginning of eventlet/threading behavior differentiation Fanout and Notifier are not yet supported Devstack not yet fixed Functional tests not yet fixed ..[1] - https://review.openstack.org/#/c/171131/ Change-Id: I44cd48070bf7c7f46152fdf0e54664a7dee97de9
-rw-r--r--oslo_messaging/_drivers/impl_zmq.py1069
-rw-r--r--oslo_messaging/_drivers/zmq_driver/__init__.py0
-rw-r--r--oslo_messaging/_drivers/zmq_driver/broker/__init__.py1
-rw-r--r--oslo_messaging/_drivers/zmq_driver/broker/zmq_base_proxy.py92
-rw-r--r--oslo_messaging/_drivers/zmq_driver/broker/zmq_broker.py71
-rw-r--r--oslo_messaging/_drivers/zmq_driver/broker/zmq_call_proxy.py110
-rw-r--r--oslo_messaging/_drivers/zmq_driver/broker/zmq_cast_proxy.py79
-rw-r--r--oslo_messaging/_drivers/zmq_driver/notifier/__init__.py1
-rw-r--r--oslo_messaging/_drivers/zmq_driver/poller/__init__.py0
-rw-r--r--oslo_messaging/_drivers/zmq_driver/poller/green_poller.py111
-rw-r--r--oslo_messaging/_drivers/zmq_driver/poller/threading_poller.py50
-rw-r--r--oslo_messaging/_drivers/zmq_driver/rpc/__init__.py0
-rw-r--r--oslo_messaging/_drivers/zmq_driver/rpc/client/__init__.py0
-rw-r--r--oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_call_request.py49
-rw-r--r--oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_cast_dealer.py72
-rw-r--r--oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_cast_publisher.py40
-rw-r--r--oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_client.py33
-rw-r--r--oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_request.py76
-rw-r--r--oslo_messaging/_drivers/zmq_driver/rpc/server/__init__.py0
-rw-r--r--oslo_messaging/_drivers/zmq_driver/rpc/server/zmq_base_consumer.py35
-rw-r--r--oslo_messaging/_drivers/zmq_driver/rpc/server/zmq_call_responder.py96
-rw-r--r--oslo_messaging/_drivers/zmq_driver/rpc/server/zmq_server.py49
-rw-r--r--oslo_messaging/_drivers/zmq_driver/zmq_async.py59
-rw-r--r--oslo_messaging/_drivers/zmq_driver/zmq_context.py33
-rw-r--r--oslo_messaging/_drivers/zmq_driver/zmq_poller.py48
-rw-r--r--oslo_messaging/_drivers/zmq_driver/zmq_serializer.py54
-rw-r--r--oslo_messaging/_drivers/zmq_driver/zmq_topic.py61
-rw-r--r--oslo_messaging/tests/drivers/test_impl_zmq.py481
-rw-r--r--tests/drivers/test_impl_zmq.py458
29 files changed, 1503 insertions, 1725 deletions
diff --git a/oslo_messaging/_drivers/impl_zmq.py b/oslo_messaging/_drivers/impl_zmq.py
index f673b9c..7357aa3 100644
--- a/oslo_messaging/_drivers/impl_zmq.py
+++ b/oslo_messaging/_drivers/impl_zmq.py
@@ -12,39 +12,20 @@
# License for the specific language governing permissions and limitations
# under the License.
-import collections
import logging
-import os
import pprint
-import re
import socket
-import sys
-import threading
-import types
-import uuid
-import eventlet
-import greenlet
from oslo_config import cfg
-from oslo_serialization import jsonutils
-from oslo_utils import excutils
-from oslo_utils import importutils
-import six
-from six import moves
-from stevedore import driver
from oslo_messaging._drivers import base
from oslo_messaging._drivers import common as rpc_common
-from oslo_messaging._executors import base as executor_base # FIXME(markmc)
-from oslo_messaging._i18n import _, _LE, _LW
-from oslo_messaging._drivers import pool
+from oslo_messaging._drivers.zmq_driver.rpc.client import zmq_client
+from oslo_messaging._drivers.zmq_driver.rpc.server import zmq_server
+from oslo_messaging._executors import base as executor_base
-zmq = importutils.try_import('eventlet.green.zmq')
-
-# for convenience, are not modified.
pformat = pprint.pformat
-Timeout = eventlet.timeout.Timeout
LOG = logging.getLogger(__name__)
RPCException = rpc_common.RPCException
@@ -62,6 +43,11 @@ zmq_opts = [
help='MatchMaker driver.',
),
+ cfg.BoolOpt('rpc_zmq_all_req_rep',
+ default=True,
+ deprecated_group='DEFAULT',
+ help='Use REQ/REP pattern for all methods CALL/CAST/FANOUT.'),
+
# The following port is unassigned by IANA as of 2012-05-21
cfg.IntOpt('rpc_zmq_port', default=9501,
help='ZeroMQ receiver listening port.'),
@@ -87,905 +73,6 @@ zmq_opts = [
'Only supported by impl_zmq.'),
]
-CONF = cfg.CONF
-
-matchmaker = None # memoized matchmaker object
-
-
-def _serialize(data):
- """Serialization wrapper.
-
- We prefer using JSON, but it cannot encode all types.
- Error if a developer passes us bad data.
- """
- try:
- return jsonutils.dumps(data, ensure_ascii=True)
- except TypeError:
- with excutils.save_and_reraise_exception():
- LOG.error(_("JSON serialization failed."))
-
-
-def _deserialize(data):
- """Deserialization wrapper."""
- LOG.debug("Deserializing: %r", data)
- return jsonutils.loads(data)
-
-
-class ZmqSocket(object):
- """A tiny wrapper around ZeroMQ.
-
- Simplifies the send/recv protocol and connection management.
- Can be used as a Context (supports the 'with' statement).
- """
-
- def __init__(self, addr, zmq_type, bind=True, subscribe=None, ctxt=None):
- self.ctxt = ctxt or zmq.Context(CONF.rpc_zmq_contexts)
- self.sock = self.ctxt.socket(zmq_type)
-
- # Enable IPv6-support in libzmq.
- # When IPv6 is enabled, a socket will connect to, or accept
- # connections from, both IPv4 and IPv6 hosts.
- try:
- self.sock.ipv6 = True
- except AttributeError:
- # NOTE(dhellmann): Sometimes the underlying library does
- # not recognize the IPV6 option. There's nothing we can
- # really do in that case, so ignore the error and keep
- # trying to work.
- pass
-
- self.addr = addr
- self.type = zmq_type
- self.subscriptions = []
-
- # Support failures on sending/receiving on wrong socket type.
- self.can_recv = zmq_type in (zmq.PULL, zmq.SUB)
- self.can_send = zmq_type in (zmq.PUSH, zmq.PUB)
- self.can_sub = zmq_type in (zmq.SUB, )
-
- # Support list, str, & None for subscribe arg (cast to list)
- do_sub = {
- list: subscribe,
- str: [subscribe],
- type(None): []
- }[type(subscribe)]
-
- for f in do_sub:
- self.subscribe(f)
-
- str_data = {'addr': addr, 'type': self.socket_s(),
- 'subscribe': subscribe, 'bind': bind}
-
- LOG.debug("Connecting to %(addr)s with %(type)s", str_data)
- LOG.debug("-> Subscribed to %(subscribe)s", str_data)
- LOG.debug("-> bind: %(bind)s", str_data)
-
- try:
- if bind:
- self.sock.bind(addr)
- else:
- self.sock.connect(addr)
- except Exception:
- raise RPCException(_("Could not open socket."))
-
- def socket_s(self):
- """Get socket type as string."""
- t_enum = ('PUSH', 'PULL', 'PUB', 'SUB', 'REP', 'REQ', 'ROUTER',
- 'DEALER')
- return dict(map(lambda t: (getattr(zmq, t), t), t_enum))[self.type]
-
- def subscribe(self, msg_filter):
- """Subscribe."""
- if not self.can_sub:
- raise RPCException("Cannot subscribe on this socket.")
- LOG.debug("Subscribing to %s", msg_filter)
-
- try:
- arg = msg_filter
- if six.PY3:
- arg = arg.encode('utf-8')
- self.sock.setsockopt(zmq.SUBSCRIBE, arg)
- except Exception:
- return
-
- self.subscriptions.append(msg_filter)
-
- def unsubscribe(self, msg_filter):
- """Unsubscribe."""
- if msg_filter not in self.subscriptions:
- return
- arg = msg_filter
- if six.PY3:
- arg = arg.encode('utf-8')
- self.sock.setsockopt(zmq.UNSUBSCRIBE, arg)
- self.subscriptions.remove(msg_filter)
-
- @property
- def closed(self):
- return self.sock is None or self.sock.closed
-
- def close(self):
- if self.sock is None or self.sock.closed:
- return
-
- # We must unsubscribe, or we'll leak descriptors.
- if self.subscriptions:
- for f in self.subscriptions:
- try:
- self.sock.setsockopt(zmq.UNSUBSCRIBE, f)
- except Exception:
- pass
- self.subscriptions = []
-
- try:
- # Default is to linger
- self.sock.close()
- self.ctxt.term()
- except Exception:
- # While this is a bad thing to happen,
- # it would be much worse if some of the code calling this
- # were to fail. For now, lets log, and later evaluate
- # if we can safely raise here.
- LOG.error("ZeroMQ socket could not be closed.")
- self.sock = None
-
- def recv(self, **kwargs):
- if not self.can_recv:
- raise RPCException(_("You cannot recv on this socket."))
- return self.sock.recv_multipart(**kwargs)
-
- def send(self, data, **kwargs):
- if not self.can_send:
- raise RPCException(_("You cannot send on this socket."))
- self.sock.send_multipart(data, **kwargs)
-
-
-class ZmqClient(object):
- """Client for ZMQ sockets."""
-
- def __init__(self, addr, ctxt=None):
- self.address = addr
- self.outq = ZmqSocket(addr, zmq.PUSH, bind=False, ctxt=ctxt)
-
- def cast(self, msg_id, topic, data, envelope):
- msg_id = msg_id or '0'
-
- if six.PY3:
- msg_id = msg_id.encode('utf-8')
-
- if not envelope:
- data = _serialize(data)
- if six.PY3:
- data = data.encode('utf-8')
- data = (msg_id, topic, b'cast', data)
- self.outq.send([bytes(item) for item in data])
- return
-
- rpc_envelope = rpc_common.serialize_msg(data[1])
- zmq_msg = moves.reduce(lambda x, y: x + y, rpc_envelope.items())
- data = (msg_id, topic, b'impl_zmq_v2', data[0]) + zmq_msg
- self.outq.send([bytes(item) for item in data])
-
- def close(self):
- self.outq.close()
-
-
-class ZmqClientContext(object):
- """This is essentially a wrapper around ZmqClient that supports 'with'.
- It can also return a new ZmqClient, or one from a pool.
-
- The function will also catch when an instance of this class is to be
- deleted. With that we can return ZmqClients to the pool on exceptions
- and so forth without making the caller be responsible for catching them.
- If possible the function makes sure to return a client to the pool.
-
- Based on amqp.ConnectionContext.
- """
-
- def __init__(self, address, connection_pool=None, pooled=False):
- self.connection = None
- self.connection_pool = connection_pool
- self.pooled = pooled
- if self.pooled and self.connection_pool is not None:
- self.connection = self.connection_pool.get(address)
- else:
- self.connection = ZmqClient(address)
-
- def __enter__(self):
- """When with ZmqClientContext() is used, return self."""
- return self
-
- def _done(self):
- """If the client came from a pool, clean it up and put it back.
- If it did not come from a pool, close it.
- """
- if self.connection:
- if self.pooled and self.connection_pool is not None:
- # Reset the connection so it's ready for the next caller
- # to grab from the pool
- self.connection_pool.put(self.connection)
- else:
- try:
- self.connection.close()
- except Exception:
- pass
- self.connection = None
-
- def __exit__(self, exc_type, exc_value, tb):
- """End of 'with' statement. We're done here."""
- self._done()
-
- def __del__(self):
- """Caller is done with this client. Make sure we cleaned up."""
- self._done()
-
- def close(self):
- """Caller is done with this client."""
- self._done()
-
- def __getattr__(self, key):
- """Proxy all other calls to the ZmqClient instance."""
- if self.connection:
- return getattr(self.connection, key)
- else:
- raise rpc_common.InvalidRPCConnectionReuse()
-
-
-class RpcContext(rpc_common.CommonRpcContext):
- """Context that supports replying to a rpc.call."""
- def __init__(self, **kwargs):
- self.replies = []
- super(RpcContext, self).__init__(**kwargs)
-
- def deepcopy(self):
- values = self.to_dict()
- values['replies'] = self.replies
- return self.__class__(**values)
-
- def reply(self, reply=None, failure=None, ending=False):
- if ending:
- return
- self.replies.append(reply)
-
- @classmethod
- def marshal(self, ctx):
- if not isinstance(ctx, dict):
- ctx_data = ctx.to_dict()
- else:
- ctx_data = ctx
- return _serialize(ctx_data)
-
- @classmethod
- def unmarshal(self, data):
- return RpcContext.from_dict(_deserialize(data))
-
-
-class InternalContext(object):
- """Used by ConsumerBase as a private context for - methods."""
-
- def __init__(self, proxy):
- self.proxy = proxy
- self.msg_waiter = None
-
- def _get_response(self, ctx, proxy, topic, data):
- """Process a curried message and cast the result to topic."""
- LOG.debug("Running func with context: %s", ctx.to_dict())
- data.setdefault('version', None)
- data.setdefault('args', {})
-
- try:
- if not data.get("method"):
- raise KeyError
- result = proxy.dispatch(ctx, data)
- return ConsumerBase.normalize_reply(result, ctx.replies)
- except greenlet.GreenletExit:
- # ignore these since they are just from shutdowns
- pass
- except rpc_common.ClientException as e:
- LOG.debug("Expected exception during message handling (%s)",
- e._exc_info[1])
- return {'exc':
- rpc_common.serialize_remote_exception(e._exc_info,
- log_failure=False)}
- except Exception:
- LOG.error(_("Exception during message handling"))
- return {'exc':
- rpc_common.serialize_remote_exception(sys.exc_info())}
-
- def reply(self, driver, ctx, proxy,
- msg_id=None, context=None, topic=None, msg=None):
- """Reply to a casted call."""
- # NOTE(ewindisch): context kwarg exists for Grizzly compat.
- # this may be able to be removed earlier than
- # 'I' if ConsumerBase.process were refactored.
- if type(msg) is list:
- payload = msg[-1]
- else:
- payload = msg
-
- response = ConsumerBase.normalize_reply(
- self._get_response(ctx, proxy, topic, payload),
- ctx.replies)
-
- LOG.debug("Sending reply")
- _multi_send(driver, _cast, ctx, topic, {
- 'method': '-process_reply',
- 'args': {
- 'msg_id': msg_id, # Include for Folsom compat.
- 'response': response
- }
- }, _msg_id=msg_id, pooled=True)
-
-
-class ConsumerBase(object):
- """Base Consumer."""
-
- def __init__(self, driver):
- self.driver = driver
- self.private_ctx = InternalContext(None)
-
- @classmethod
- def normalize_reply(self, result, replies):
- # TODO(ewindisch): re-evaluate and document this method.
- if isinstance(result, types.GeneratorType):
- return list(result)
- elif replies:
- return replies
- else:
- return [result]
-
- def process(self, proxy, ctx, data):
- data.setdefault('version', None)
- data.setdefault('args', {})
-
- # Method starting with - are
- # processed internally. (non-valid method name)
- method = data.get('method')
- # Internal method
- # uses internal context for safety.
- if method == '-reply':
- self.private_ctx.reply(self.driver, ctx, proxy, **data['args'])
- return
-
- proxy.dispatch(ctx, data)
-
-
-class ZmqBaseReactor(ConsumerBase):
- """A consumer class implementing a centralized casting broker (PULL-PUSH).
-
- Used for RoundRobin requests.
- """
-
- def __init__(self, conf, driver=None):
- super(ZmqBaseReactor, self).__init__(driver)
-
- self.driver = driver
- self.proxies = {}
- self.threads = []
- self.sockets = []
- self.subscribe = {}
-
- self.pool = eventlet.greenpool.GreenPool(conf.rpc_thread_pool_size)
-
- def register(self, proxy, in_addr, zmq_type_in,
- in_bind=True, subscribe=None):
-
- LOG.info(_("Registering reactor"))
-
- if zmq_type_in not in (zmq.PULL, zmq.SUB):
- raise RPCException("Bad input socktype")
-
- # Items push in.
- inq = ZmqSocket(in_addr, zmq_type_in, bind=in_bind,
- subscribe=subscribe)
-
- self.proxies[inq] = proxy
- self.sockets.append(inq)
-
- LOG.info(_("In reactor registered"))
-
- def consume_in_thread(self):
- def _consume(sock):
- LOG.info(_("Consuming socket"))
- while not sock.closed:
- self.consume(sock)
-
- for k in self.proxies.keys():
- self.threads.append(
- self.pool.spawn(_consume, k)
- )
-
- def wait(self):
- for t in self.threads:
- t.wait()
-
- def close(self):
- for t in self.threads:
- t.kill()
-
- for s in self.sockets:
- s.close()
-
-
-class ZmqProxy(ZmqBaseReactor):
- """A consumer class implementing a topic-based proxy.
-
- Forwards to IPC sockets.
- """
-
- def __init__(self, conf):
- super(ZmqProxy, self).__init__(conf)
- pathsep = set((os.path.sep or '', os.path.altsep or '', '/', '\\'))
- self.badchars = re.compile(r'[%s]' % re.escape(''.join(pathsep)))
-
- self.topic_proxy = {}
-
- def consume(self, sock):
- ipc_dir = CONF.rpc_zmq_ipc_dir
-
- data = sock.recv(copy=False)
- topic = data[1].bytes
- if six.PY3:
- topic = topic.decode('utf-8')
-
- if topic.startswith('fanout~'):
- sock_type = zmq.PUB
- topic = topic.split('.', 1)[0]
- elif topic.startswith('zmq_replies'):
- sock_type = zmq.PUB
- else:
- sock_type = zmq.PUSH
-
- if topic not in self.topic_proxy:
- def publisher(waiter):
- LOG.info(_("Creating proxy for topic: %s"), topic)
-
- try:
- # The topic is received over the network,
- # don't trust this input.
- if self.badchars.search(topic) is not None:
- emsg = _("Topic contained dangerous characters.")
- LOG.warn(emsg)
- raise RPCException(emsg)
-
- out_sock = ZmqSocket("ipc://%s/zmq_topic_%s" %
- (ipc_dir, topic),
- sock_type, bind=True)
- except RPCException:
- waiter.send_exception(*sys.exc_info())
- return
-
- self.topic_proxy[topic] = eventlet.queue.LightQueue(
- CONF.rpc_zmq_topic_backlog)
- self.sockets.append(out_sock)
-
- # It takes some time for a pub socket to open,
- # before we can have any faith in doing a send() to it.
- if sock_type == zmq.PUB:
- eventlet.sleep(.5)
-
- waiter.send(True)
-
- while(True):
- data = self.topic_proxy[topic].get()
- out_sock.send(data, copy=False)
-
- wait_sock_creation = eventlet.event.Event()
- eventlet.spawn(publisher, wait_sock_creation)
-
- try:
- wait_sock_creation.wait()
- except RPCException:
- LOG.error(_("Topic socket file creation failed."))
- return
-
- try:
- self.topic_proxy[topic].put_nowait(data)
- except eventlet.queue.Full:
- LOG.error(_("Local per-topic backlog buffer full for topic "
- "%s. Dropping message."), topic)
-
- def consume_in_thread(self):
- """Runs the ZmqProxy service."""
- ipc_dir = CONF.rpc_zmq_ipc_dir
- consume_in = "tcp://%s:%s" % \
- (CONF.rpc_zmq_bind_address,
- CONF.rpc_zmq_port)
- consumption_proxy = InternalContext(None)
-
- try:
- os.makedirs(ipc_dir)
- except os.error:
- if not os.path.isdir(ipc_dir):
- with excutils.save_and_reraise_exception():
- LOG.error(_("Required IPC directory does not exist at"
- " %s"), ipc_dir)
- try:
- self.register(consumption_proxy,
- consume_in,
- zmq.PULL)
- except zmq.ZMQError:
- if os.access(ipc_dir, os.X_OK):
- with excutils.save_and_reraise_exception():
- LOG.error(_("Permission denied to IPC directory at"
- " %s"), ipc_dir)
- with excutils.save_and_reraise_exception():
- LOG.error(_("Could not create ZeroMQ receiver daemon. "
- "Socket may already be in use."))
-
- super(ZmqProxy, self).consume_in_thread()
-
-
-def unflatten_envelope(packenv):
- """Unflattens the RPC envelope.
-
- Takes a list and returns a dictionary.
- i.e. [1,2,3,4] => {1: 2, 3: 4}
- """
- i = iter(packenv)
- h = {}
- try:
- while True:
- k = six.next(i)
- h[k] = six.next(i)
- except StopIteration:
- return h
-
-
-class ZmqReactor(ZmqBaseReactor):
- """A consumer class implementing a consumer for messages.
-
- Can also be used as a 1:1 proxy
- """
-
- def __init__(self, conf, driver):
- super(ZmqReactor, self).__init__(conf, driver)
-
- def consume(self, sock):
- # TODO(ewindisch): use zero-copy (i.e. references, not copying)
- data = sock.recv()
- LOG.debug("CONSUMER RECEIVED DATA: %s", data)
-
- proxy = self.proxies[sock]
-
- if data[2] == b'cast': # Legacy protocol
- packenv = data[3]
-
- ctx, msg = _deserialize(packenv)
- request = rpc_common.deserialize_msg(msg)
- ctx = RpcContext.unmarshal(ctx)
- elif data[2] == b'impl_zmq_v2':
- packenv = data[4:]
-
- msg = unflatten_envelope(packenv)
- request = rpc_common.deserialize_msg(msg)
-
- # Unmarshal only after verifying the message.
- ctx = RpcContext.unmarshal(data[3])
- else:
- LOG.error(_("ZMQ Envelope version unsupported or unknown."))
- return
-
- self.pool.spawn_n(self.process, proxy, ctx, request)
-
-
-class Connection(rpc_common.Connection):
- """Manages connections and threads."""
-
- def __init__(self, conf, driver):
- self.topics = []
- self.reactor = ZmqReactor(conf, driver)
-
- def create_consumer(self, topic, proxy, fanout=False):
- # Register with matchmaker.
- _get_matchmaker().register(topic, CONF.rpc_zmq_host)
-
- # Subscription scenarios
- if fanout:
- sock_type = zmq.SUB
- subscribe = ('', fanout)[type(fanout) == str]
- topic = 'fanout~' + topic.split('.', 1)[0]
- else:
- sock_type = zmq.PULL
- subscribe = None
- topic = '.'.join((topic.split('.', 1)[0], CONF.rpc_zmq_host))
-
- if topic in self.topics:
- LOG.info(_("Skipping topic registration. Already registered."))
- return
-
- # Receive messages from (local) proxy
- inaddr = "ipc://%s/zmq_topic_%s" % \
- (CONF.rpc_zmq_ipc_dir, topic)
-
- LOG.debug("Consumer is a zmq.%s",
- ['PULL', 'SUB'][sock_type == zmq.SUB])
-
- self.reactor.register(proxy, inaddr, sock_type,
- subscribe=subscribe, in_bind=False)
- self.topics.append(topic)
-
- def close(self):
- mm = _get_matchmaker()
- mm.stop_heartbeat()
- for topic in self.topics:
- try:
- mm.unregister(topic, CONF.rpc_zmq_host)
- except Exception as err:
- LOG.error(_LE('Unable to unregister topic %(topic)s'
- ' from matchmaker: %(err)s') %
- {'topic': topic, 'err': err})
-
- self.reactor.close()
- self.topics = []
-
- def wait(self):
- self.reactor.wait()
-
- def consume_in_thread(self):
- _get_matchmaker().start_heartbeat()
- self.reactor.consume_in_thread()
-
-
-def _cast(driver, addr, context, topic, msg, timeout=None, envelope=False,
- _msg_id=None, allowed_remote_exmods=None, pooled=False):
- allowed_remote_exmods = allowed_remote_exmods or []
- timeout_cast = timeout or CONF.rpc_cast_timeout
- payload = [RpcContext.marshal(context), msg]
- if six.PY3:
- topic = topic.encode('utf-8')
-
- with Timeout(timeout_cast, exception=rpc_common.Timeout):
- with driver.get_connection(addr, pooled) as conn:
- try:
- # assumes cast can't return an exception
- conn.cast(_msg_id, topic, payload, envelope)
- except zmq.ZMQError:
- raise RPCException("Cast failed. ZMQ Socket Exception")
-
-
-def _call(driver, addr, context, topic, msg, timeout=None,
- envelope=False, allowed_remote_exmods=None, pooled=False):
- allowed_remote_exmods = allowed_remote_exmods or []
- # timeout_response is how long we wait for a response
- timeout = timeout or CONF.rpc_response_timeout
-
- # The msg_id is used to track replies.
- msg_id = uuid.uuid4().hex
-
- # Replies always come into the reply service.
- reply_topic = "zmq_replies.%s" % CONF.rpc_zmq_host
-
- LOG.debug("Creating payload")
- # Curry the original request into a reply method.
- mcontext = RpcContext.marshal(context)
- payload = {
- 'method': '-reply',
- 'args': {
- 'msg_id': msg_id,
- 'topic': reply_topic,
- # TODO(ewindisch): safe to remove mcontext in I.
- 'msg': [mcontext, msg]
- }
- }
-
- LOG.debug("Creating queue socket for reply waiter")
-
- # Messages arriving async.
- # TODO(ewindisch): have reply consumer with dynamic subscription mgmt
- with Timeout(timeout, exception=rpc_common.Timeout):
- try:
- msg_waiter = ZmqSocket(
- "ipc://%s/zmq_topic_zmq_replies.%s" %
- (CONF.rpc_zmq_ipc_dir,
- CONF.rpc_zmq_host),
- zmq.SUB, subscribe=msg_id, bind=False
- )
-
- LOG.debug("Sending cast: %s", topic)
- _cast(driver, addr, context, topic, payload, envelope=envelope,
- pooled=pooled)
-
- LOG.debug("Cast sent; Waiting reply")
- # Blocks until receives reply
- msg = msg_waiter.recv()
- if msg is None:
- raise rpc_common.Timeout()
- LOG.debug("Received message: %s", msg)
- LOG.debug("Unpacking response")
-
- if msg[2] == b'cast': # Legacy version
- raw_msg = _deserialize(msg[-1])[-1]
- elif msg[2] == b'impl_zmq_v2':
- rpc_envelope = unflatten_envelope(msg[4:])
- raw_msg = rpc_common.deserialize_msg(rpc_envelope)
- else:
- raise rpc_common.UnsupportedRpcEnvelopeVersion(
- _("Unsupported or unknown ZMQ envelope returned."))
-
- responses = raw_msg['args']['response']
- # ZMQError trumps the Timeout error.
- except zmq.ZMQError:
- raise RPCException("ZMQ Socket Error")
- except (IndexError, KeyError):
- raise RPCException(_("RPC Message Invalid."))
- finally:
- if 'msg_waiter' in vars():
- msg_waiter.close()
-
- # It seems we don't need to do all of the following,
- # but perhaps it would be useful for multicall?
- # One effect of this is that we're checking all
- # responses for Exceptions.
- for resp in responses:
- if isinstance(resp, dict) and 'exc' in resp:
- raise rpc_common.deserialize_remote_exception(
- resp['exc'], allowed_remote_exmods)
-
- return responses[-1]
-
-
-def _multi_send(driver, method, context, topic, msg, timeout=None,
- envelope=False, _msg_id=None, allowed_remote_exmods=None,
- pooled=False):
- """Wraps the sending of messages.
-
- Dispatches to the matchmaker and sends message to all relevant hosts.
- """
- allowed_remote_exmods = allowed_remote_exmods or []
- conf = CONF
- LOG.debug(' '.join(map(pformat, (topic, msg))))
-
- queues = _get_matchmaker().queues(topic)
- LOG.debug("Sending message(s) to: %s", queues)
-
- # Don't stack if we have no matchmaker results
- if not queues:
- warn_log = _LW("No matchmaker results. Not sending.")
-
- if method.__name__ == '_cast':
- LOG.warn(warn_log)
- return
-
- # While not strictly a timeout, callers know how to handle
- # this exception and a timeout isn't too big a lie.
- raise rpc_common.Timeout(warn_log)
-
- # This supports brokerless fanout (addresses > 1)
- return_val = None
- for queue in queues:
- _topic, ip_addr = queue
- _addr = "tcp://%s:%s" % (ip_addr, conf.rpc_zmq_port)
-
- if method.__name__ == '_cast':
- eventlet.spawn_n(method, driver, _addr, context,
- _topic, msg, timeout, envelope, _msg_id,
- None, pooled)
- else:
- return_val = method(driver, _addr, context, _topic, msg, timeout,
- envelope, allowed_remote_exmods, pooled)
-
- return return_val
-
-
-def _get_matchmaker(*args, **kwargs):
- global matchmaker
- mm_name = CONF.rpc_zmq_matchmaker
-
- # Back compatibility for old class names
- mm_mapping = {
- 'oslo_messaging._drivers.matchmaker_redis.MatchMakerRedis': 'redis',
- 'oslo_messaging._drivers.matchmaker_ring.MatchMakerRing': 'ring',
- 'oslo_messaging._drivers.matchmaker.MatchMakerLocalhost': 'local',
- 'oslo.messaging._drivers.matchmaker_redis.MatchMakerRedis': 'redis',
- 'oslo.messaging._drivers.matchmaker_ring.MatchMakerRing': 'ring',
- 'oslo.messaging._drivers.matchmaker.MatchMakerLocalhost': 'local'}
- if mm_name in mm_mapping:
- LOG.warn(_LW('rpc_zmq_matchmaker = %(old_val)s is deprecated. '
- 'It is suggested to change the value to %(new_val)s.'),
- {'old_val': mm_name, 'new_val': mm_mapping[mm_name]})
- mm_name = mm_mapping[mm_name]
-
- if not matchmaker:
- mgr = driver.DriverManager('oslo.messaging.zmq.matchmaker',
- mm_name)
- matchmaker = mgr.driver(*args, **kwargs)
- return matchmaker
-
-
-class ZmqIncomingMessage(base.IncomingMessage):
-
- ReceivedReply = collections.namedtuple(
- 'ReceivedReply', ['reply', 'failure', 'log_failure'])
-
- def __init__(self, listener, ctxt, message):
- super(ZmqIncomingMessage, self).__init__(listener, ctxt, message)
- self.condition = threading.Condition()
- self.received = None
-
- def reply(self, reply=None, failure=None, log_failure=True):
- self.received = self.ReceivedReply(reply, failure, log_failure)
- with self.condition:
- self.condition.notify()
-
- def requeue(self):
- LOG.debug("WARNING: requeue not supported")
-
-
-class ZmqListener(base.Listener):
-
- def __init__(self, driver):
- super(ZmqListener, self).__init__(driver)
- self.incoming_queue = moves.queue.Queue()
-
- def dispatch(self, ctxt, message):
- incoming = ZmqIncomingMessage(self,
- ctxt.to_dict(),
- message)
-
- self.incoming_queue.put(incoming)
-
- with incoming.condition:
- incoming.condition.wait()
-
- assert incoming.received
-
- if incoming.received.failure:
- raise incoming.received.failure
- else:
- return incoming.received.reply
-
- def poll(self, timeout=None):
- try:
- return self.incoming_queue.get(timeout=timeout)
- except six.moves.queue.Empty:
- # timeout
- return None
-
-
-class ZmqClientPool(pool.Pool):
- """Class that implements a pool of Zmq Clients for a single endpoint"""
- def __init__(self, conf, address, connection_cls, ctxt):
- self.connection_cls = connection_cls
- self.ctxt = ctxt
- self.address = address
- super(ZmqClientPool, self).__init__(conf.rpc_conn_pool_size)
-
- def create(self):
- LOG.debug('Pool creating new ZMQ connection for %s' % self.address)
- return self.connection_cls(self.address, self.ctxt)
-
- def empty(self):
- for item in self.iter_free():
- item.close()
-
-
-class ZmqClientPoolManager(object):
- """Class that manages pools of clients for Zmq endpoints"""
-
- def __init__(self, conf, ctxt=None):
- self._pools = {}
- self._lock = threading.Lock()
- self.conf = conf
- self.ctxt = ctxt
-
- def get(self, address):
- if address not in self._pools:
- with self._lock:
- if address not in self._pools:
- self._pools[address] = ZmqClientPool(self.conf,
- address,
- ZmqClient,
- self.ctxt)
- return self._pools[address].get()
-
- def put(self, item):
- self._pools[item.address].put(item)
-
- def empty(self):
- for p in self._pools:
- self._pools[p].empty()
-
class ZmqDriver(base.BaseDriver):
"""ZeroMQ Driver
@@ -994,142 +81,38 @@ class ZmqDriver(base.BaseDriver):
"""
- # FIXME(markmc): allow this driver to be used without eventlet
-
def __init__(self, conf, url, default_exchange=None,
allowed_remote_exmods=None):
- if not zmq:
- raise ImportError("Failed to import eventlet.green.zmq")
conf.register_opts(zmq_opts)
conf.register_opts(executor_base._pool_opts)
- conf.register_opts(base.base_opts)
-
+ self.conf = conf
+ self.server = None
+ self.client = None
+ self.matchmaker = None
super(ZmqDriver, self).__init__(conf, url, default_exchange,
allowed_remote_exmods)
- # FIXME(markmc): handle default_exchange
-
- # FIXME(markmc): handle transport URL
- if self._url.hosts:
- raise NotImplementedError('The ZeroMQ driver does not yet support '
- 'transport URLs')
-
- # FIXME(markmc): use self.conf everywhere
- if self.conf is not CONF:
- raise NotImplementedError('The ZeroMQ driver currently only works '
- 'with oslo.config.cfg.CONF')
-
- self.listeners = []
-
- # NOTE(jamespage): Create pool manager on first use to deal with
- # os.fork calls in openstack daemons.
- self._pool = None
- self._pid = None
- self._lock = threading.Lock()
-
- def _configure_pool_manager(func):
- """Causes a new pool manager to be created when the messaging service
- is first used by the current process. This is important as all
- connections in the pools manager by the pool manager will share the
- same ZMQ context, which must not be shared across OS processes.
- """
- def wrap(self, *args, **kws):
- with self._lock:
- old_pid = self._pid
- self._pid = os.getpid()
-
- if old_pid != self._pid:
- # Create fresh pool manager for the current process
- # along with a new ZMQ context.
- self._pool = ZmqClientPoolManager(
- self.conf,
- zmq.Context(self.conf.rpc_zmq_contexts)
- )
- return func(self, *args, **kws)
- return wrap
-
- def _send(self, target, ctxt, message,
- wait_for_reply=None, timeout=None, envelope=False):
-
- if wait_for_reply:
- method = _call
- else:
- method = _cast
-
- topic = target.topic
- if target.fanout:
- # NOTE(ewindisch): fanout~ is used because it avoid splitting on
- # and acts as a non-subtle hint to the matchmaker and ZmqProxy.
- topic = 'fanout~' + topic
- elif target.server:
- topic = '%s.%s' % (topic, target.server)
-
- reply = _multi_send(self, method, ctxt, topic, message,
- envelope=envelope,
- allowed_remote_exmods=self._allowed_remote_exmods,
- pooled=True)
-
- if wait_for_reply:
- return reply[-1]
-
- @_configure_pool_manager
def send(self, target, ctxt, message, wait_for_reply=None, timeout=None,
retry=None):
- # NOTE(sileht): retry is not implemented because this driver never
- # retry anything
- return self._send(target, ctxt, message, wait_for_reply, timeout)
+ if self.client is None:
+ self.client = zmq_client.ZmqClient(self.conf, self.matchmaker)
+ if wait_for_reply:
+ return self.client.call(target, ctxt, message, timeout, retry)
+ else:
+ self.client.cast(target, ctxt, message, timeout, retry)
+ return None
- @_configure_pool_manager
def send_notification(self, target, ctxt, message, version, retry=None):
- # NOTE(ewindisch): dot-priority in rpc notifier does not
- # work with our assumptions.
- # NOTE(sileht): retry is not implemented because this driver never
- # retry anything
- target = target(topic=target.topic.replace('.', '-'))
- return self._send(target, ctxt, message, envelope=(version == 2.0))
+ return None
- @_configure_pool_manager
def listen(self, target):
- conn = Connection(self.conf, self)
+ if self.server is None:
+ self.server = zmq_server.ZmqServer(self.conf, self.matchmaker)
+ self.server.listen(target)
+ return self.server
- listener = ZmqListener(self)
-
- conn.create_consumer(target.topic, listener)
- conn.create_consumer('%s.%s' % (target.topic, target.server),
- listener)
- conn.create_consumer(target.topic, listener, fanout=True)
-
- conn.consume_in_thread()
- self.listeners.append(conn)
-
- return listener
-
- @_configure_pool_manager
def listen_for_notifications(self, targets_and_priorities, pool):
- # NOTE(sileht): this listener implementation is limited
- # because zeromq doesn't support:
- # * requeing message
- # * pool
- conn = Connection(self.conf, self)
-
- listener = ZmqListener(self)
- for target, priority in targets_and_priorities:
- # NOTE(ewindisch): dot-priority in rpc notifier does not
- # work with our assumptions.
- # NOTE(sileht): create_consumer doesn't support target.exchange
- conn.create_consumer('%s-%s' % (target.topic, priority),
- listener)
- conn.consume_in_thread()
- self.listeners.append(conn)
-
- return listener
+ return None
def cleanup(self):
- for c in self.listeners:
- c.close()
- self.listeners = []
- if self._pool:
- self._pool.empty()
-
- def get_connection(self, address, pooled=False):
- return ZmqClientContext(address, self._pool, pooled)
+ pass
diff --git a/oslo_messaging/_drivers/zmq_driver/__init__.py b/oslo_messaging/_drivers/zmq_driver/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/oslo_messaging/_drivers/zmq_driver/__init__.py
diff --git a/oslo_messaging/_drivers/zmq_driver/broker/__init__.py b/oslo_messaging/_drivers/zmq_driver/broker/__init__.py
new file mode 100644
index 0000000..8af3e63
--- /dev/null
+++ b/oslo_messaging/_drivers/zmq_driver/broker/__init__.py
@@ -0,0 +1 @@
+__author__ = 'ozamiatin'
diff --git a/oslo_messaging/_drivers/zmq_driver/broker/zmq_base_proxy.py b/oslo_messaging/_drivers/zmq_driver/broker/zmq_base_proxy.py
new file mode 100644
index 0000000..9e11a08
--- /dev/null
+++ b/oslo_messaging/_drivers/zmq_driver/broker/zmq_base_proxy.py
@@ -0,0 +1,92 @@
+# Copyright 2015 Mirantis, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import abc
+
+import six
+
+from oslo_messaging._drivers.zmq_driver import zmq_async
+
+
+@six.add_metaclass(abc.ABCMeta)
+class BaseProxy(object):
+
+ def __init__(self, conf, context):
+ super(BaseProxy, self).__init__()
+ self.conf = conf
+ self.context = context
+ self.executor = zmq_async.get_executor(self.run)
+
+ @abc.abstractmethod
+ def run(self):
+ "Main execution point of the proxy"
+
+ def start(self):
+ self.executor.execute()
+
+ def stop(self):
+ self.executor.stop()
+
+ def wait(self):
+ self.executor.wait()
+
+
+@six.add_metaclass(abc.ABCMeta)
+class BaseTcpFrontend(object):
+
+ def __init__(self, conf, poller, context):
+ self.conf = conf
+ self.poller = poller
+ self.context = context
+
+ def receive_incoming(self):
+ message, socket = self.poller.poll(1)
+ return message
+
+
+@six.add_metaclass(abc.ABCMeta)
+class BaseBackendMatcher(object):
+
+ def __init__(self, conf, poller, context):
+ self.conf = conf
+ self.context = context
+ self.backends = {}
+ self.poller = poller
+
+ def redirect_to_backend(self, message):
+ backend, topic = self._match_backend(message)
+ self._send_message(backend, message, topic)
+
+ def _match_backend(self, message):
+ topic = self._get_topic(message)
+ ipc_address = self._get_ipc_address(topic)
+ if ipc_address not in self.backends:
+ self._create_backend(ipc_address)
+ return self.backend, topic
+
+ @abc.abstractmethod
+ def _get_topic(self, message):
+ "Extract topic from message"
+
+ @abc.abstractmethod
+ def _get_ipc_address(self, topic):
+ "Get ipc backend address from topic"
+
+ @abc.abstractmethod
+ def _send_message(self, backend, message, topic):
+ "Backend specific sending logic"
+
+ @abc.abstractmethod
+ def _create_backend(self, ipc_address):
+ "Backend specific socket opening logic"
diff --git a/oslo_messaging/_drivers/zmq_driver/broker/zmq_broker.py b/oslo_messaging/_drivers/zmq_driver/broker/zmq_broker.py
new file mode 100644
index 0000000..a0d3f4f
--- /dev/null
+++ b/oslo_messaging/_drivers/zmq_driver/broker/zmq_broker.py
@@ -0,0 +1,71 @@
+# Copyright 2015 Mirantis, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import logging
+import os
+
+from oslo_utils import excutils
+
+from oslo_messaging._drivers.zmq_driver.broker.zmq_call_proxy import CallProxy
+from oslo_messaging._drivers.zmq_driver import zmq_async
+from oslo_messaging._i18n import _LE, _LI
+
+
+LOG = logging.getLogger(__name__)
+
+zmq = zmq_async.import_zmq()
+
+
+class ZmqBroker(object):
+ """Local messaging IPC broker (nodes are still peers).
+
+ The main purpose is to have one TCP connection
+ (one TCP port assigned for ZMQ messaging) per node.
+ There could be a number of services running on a node.
+ Without such broker a number of opened TCP ports used for
+ messaging become unpredictable for the engine.
+
+ All messages are coming to TCP ROUTER socket and then
+ distributed between their targets by topic via IPC.
+ """
+
+ def __init__(self, conf):
+ super(ZmqBroker, self).__init__()
+ self.conf = conf
+ self.context = zmq.Context()
+ self.proxies = [CallProxy(conf, self.context)]
+ self._create_ipc_dirs()
+
+ def _create_ipc_dirs(self):
+ ipc_dir = self.conf.rpc_zmq_ipc_dir
+ try:
+ os.makedirs("%s/fanout" % ipc_dir)
+ except os.error:
+ if not os.path.isdir(ipc_dir):
+ with excutils.save_and_reraise_exception():
+ LOG.error(_LE("Required IPC directory does not exist at"
+ " %s"), ipc_dir)
+
+ def start(self):
+ for proxy in self.proxies:
+ proxy.start()
+
+ def wait(self):
+ for proxy in self.proxies:
+ proxy.wait()
+
+ def close(self):
+ LOG.info(_LI("Broker shutting down ..."))
+ for proxy in self.proxies:
+ proxy.stop()
diff --git a/oslo_messaging/_drivers/zmq_driver/broker/zmq_call_proxy.py b/oslo_messaging/_drivers/zmq_driver/broker/zmq_call_proxy.py
new file mode 100644
index 0000000..f4471b5
--- /dev/null
+++ b/oslo_messaging/_drivers/zmq_driver/broker/zmq_call_proxy.py
@@ -0,0 +1,110 @@
+# Copyright 2015 Mirantis, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import logging
+
+from oslo_messaging._drivers.common import RPCException
+import oslo_messaging._drivers.zmq_driver.broker.zmq_base_proxy as base_proxy
+from oslo_messaging._drivers.zmq_driver import zmq_async
+from oslo_messaging._drivers.zmq_driver import zmq_serializer
+from oslo_messaging._drivers.zmq_driver import zmq_topic
+from oslo_messaging._i18n import _LE, _LI
+
+LOG = logging.getLogger(__name__)
+
+zmq = zmq_async.import_zmq()
+
+
+class CallProxy(base_proxy.BaseProxy):
+
+ def __init__(self, conf, context):
+ super(CallProxy, self).__init__(conf, context)
+ self.tcp_frontend = FrontendTcpRouter(self.conf, context)
+ self.backend_matcher = CallBackendMatcher(self.conf, context)
+ LOG.info(_LI("Starting call proxy thread"))
+
+ def run(self):
+ message = self.tcp_frontend.receive_incoming()
+ if message is not None:
+ self.backend_matcher.redirect_to_backend(message)
+
+ reply, socket = self.backend_matcher.receive_outgoing_reply()
+ if reply is not None:
+ self.tcp_frontend.redirect_outgoing_reply(reply)
+
+
+class CallBackendMatcher(base_proxy.BaseBackendMatcher):
+
+ def __init__(self, conf, context):
+ super(CallBackendMatcher, self).__init__(conf,
+ zmq_async.get_poller(),
+ context)
+ self.backend = self.context.socket(zmq.DEALER)
+ self.poller.register(self.backend)
+
+ def receive_outgoing_reply(self):
+ reply_message = self.poller.poll(1)
+ return reply_message
+
+ def _get_topic(self, message):
+ topic, server = zmq_serializer.get_topic_from_call_message(message)
+ return zmq_topic.Topic(self.conf, topic, server)
+
+ def _get_ipc_address(self, topic):
+ return zmq_topic.get_ipc_address_call(self.conf, topic)
+
+ def _send_message(self, backend, message, topic):
+ # Empty needed for awaiting REP socket to work properly
+ # (DEALER-REP usage specific)
+ backend.send(b'', zmq.SNDMORE)
+ backend.send_multipart(message)
+
+ def _create_backend(self, ipc_address):
+ self.backend.connect(ipc_address)
+ self.backends[str(ipc_address)] = True
+
+
+class FrontendTcpRouter(base_proxy.BaseTcpFrontend):
+
+ def __init__(self, conf, context):
+ super(FrontendTcpRouter, self).__init__(conf,
+ zmq_async.get_poller(),
+ context)
+
+ try:
+ self.frontend = self.context.socket(zmq.ROUTER)
+ bind_address = zmq_topic.get_tcp_bind_address(conf.rpc_zmq_port)
+ LOG.info(_LI("Binding to TCP ROUTER %s") % bind_address)
+ self.frontend.bind(bind_address)
+ self.poller.register(self.frontend)
+ except zmq.ZMQError:
+ errmsg = _LE("Could not create ZeroMQ receiver daemon. "
+ "Socket may already be in use.")
+ LOG.error(errmsg)
+ raise RPCException(errmsg)
+
+ @staticmethod
+ def _reduce_empty(reply):
+ reply.pop(0)
+ return reply
+
+ def redirect_outgoing_reply(self, reply):
+ self._reduce_empty(reply)
+ try:
+ self.frontend.send_multipart(reply)
+ LOG.info(_LI("Redirecting reply to client %s") % reply)
+ except zmq.ZMQError:
+ errmsg = _LE("Failed redirecting reply to client %s") % reply
+ LOG.error(errmsg)
+ raise RPCException(errmsg)
diff --git a/oslo_messaging/_drivers/zmq_driver/broker/zmq_cast_proxy.py b/oslo_messaging/_drivers/zmq_driver/broker/zmq_cast_proxy.py
new file mode 100644
index 0000000..8eef8be
--- /dev/null
+++ b/oslo_messaging/_drivers/zmq_driver/broker/zmq_cast_proxy.py
@@ -0,0 +1,79 @@
+# Copyright 2015 Mirantis, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import logging
+
+import oslo_messaging._drivers.zmq_driver.broker.zmq_base_proxy as base_proxy
+from oslo_messaging._drivers.zmq_driver import zmq_async
+from oslo_messaging._drivers.zmq_driver import zmq_serializer
+from oslo_messaging._drivers.zmq_driver import zmq_topic
+from oslo_messaging._i18n import _LI
+
+zmq = zmq_async.import_zmq()
+
+LOG = logging.getLogger(__name__)
+
+
+class CastProxy(base_proxy.BaseProxy):
+
+ def __init__(self, conf, context):
+ super(CastProxy, self).__init__(conf, context)
+ self.tcp_frontend = FrontendTcpPull(self.conf, context)
+ self.backend_matcher = CastPushBackendMatcher(self.conf, context)
+ LOG.info(_LI("Starting cast proxy thread"))
+
+ def run(self):
+ message = self.tcp_frontend.receive_incoming()
+ if message is not None:
+ self.backend_matcher.redirect_to_backend(message)
+
+
+class FrontendTcpPull(base_proxy.BaseTcpFrontend):
+
+ def __init__(self, conf, context):
+ super(FrontendTcpPull, self).__init__(conf, zmq_async.get_poller(),
+ context)
+ self.frontend = self.context.socket(zmq.PULL)
+ address = zmq_topic.get_tcp_bind_address(conf.rpc_zmq_fanout_port)
+ LOG.info(_LI("Binding to TCP PULL %s") % address)
+ self.frontend.bind(address)
+ self.poller.register(self.frontend)
+
+ def _receive_message(self):
+ message = self.poller.poll()
+ return message
+
+
+class CastPushBackendMatcher(base_proxy.BaseBackendMatcher):
+
+ def __init__(self, conf, context):
+ super(CastPushBackendMatcher, self).__init__(conf,
+ zmq_async.get_poller(),
+ context)
+ self.backend = self.context.socket(zmq.PUSH)
+
+ def _get_topic(self, message):
+ topic, server = zmq_serializer.get_topic_from_cast_message(message)
+ return zmq_topic.Topic(self.conf, topic, server)
+
+ def _get_ipc_address(self, topic):
+ return zmq_topic.get_ipc_address_cast(self.conf, topic)
+
+ def _send_message(self, backend, message, topic):
+ backend.send_multipart(message)
+
+ def _create_backend(self, ipc_address):
+ LOG.debug("[Cast Proxy] Creating PUSH backend %s", ipc_address)
+ self.backend.connect(ipc_address)
+ self.backends[str(ipc_address)] = True
diff --git a/oslo_messaging/_drivers/zmq_driver/notifier/__init__.py b/oslo_messaging/_drivers/zmq_driver/notifier/__init__.py
new file mode 100644
index 0000000..8af3e63
--- /dev/null
+++ b/oslo_messaging/_drivers/zmq_driver/notifier/__init__.py
@@ -0,0 +1 @@
+__author__ = 'ozamiatin'
diff --git a/oslo_messaging/_drivers/zmq_driver/poller/__init__.py b/oslo_messaging/_drivers/zmq_driver/poller/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/oslo_messaging/_drivers/zmq_driver/poller/__init__.py
diff --git a/oslo_messaging/_drivers/zmq_driver/poller/green_poller.py b/oslo_messaging/_drivers/zmq_driver/poller/green_poller.py
new file mode 100644
index 0000000..b2c26c8
--- /dev/null
+++ b/oslo_messaging/_drivers/zmq_driver/poller/green_poller.py
@@ -0,0 +1,111 @@
+# Copyright 2015 Mirantis, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import logging
+import threading
+
+import eventlet
+import six
+
+from oslo_messaging._drivers import common as rpc_common
+from oslo_messaging._drivers.zmq_driver import zmq_async
+from oslo_messaging._drivers.zmq_driver import zmq_poller
+
+LOG = logging.getLogger(__name__)
+
+zmq = zmq_async.import_zmq()
+
+
+class GreenPoller(zmq_poller.ZmqPoller):
+
+ def __init__(self):
+ self.incoming_queue = six.moves.queue.Queue()
+ self.green_pool = eventlet.GreenPool()
+ self.sockets = []
+
+ def register(self, socket, recv_method=None):
+ self.sockets.append(socket)
+ return self.green_pool.spawn(self._socket_receive, socket,
+ recv_method)
+
+ def _socket_receive(self, socket, recv_method=None):
+ while True:
+ if recv_method:
+ incoming = recv_method(socket)
+ else:
+ incoming = socket.recv_multipart()
+ self.incoming_queue.put((incoming, socket))
+ eventlet.sleep()
+
+ def poll(self, timeout=None):
+ incoming = None
+ try:
+ with eventlet.Timeout(timeout, exception=rpc_common.Timeout):
+ while incoming is None:
+ try:
+ incoming = self.incoming_queue.get_nowait()
+ except six.moves.queue.Empty:
+ eventlet.sleep()
+ except rpc_common.Timeout:
+ return None, None
+ return incoming[0], incoming[1]
+
+
+class HoldReplyPoller(GreenPoller):
+
+ def __init__(self):
+ super(HoldReplyPoller, self).__init__()
+ self.event_by_socket = {}
+
+ def register(self, socket, recv_method=None):
+ super(HoldReplyPoller, self).register(socket, recv_method)
+ self.event_by_socket[socket] = threading.Event()
+
+ def resume_polling(self, socket):
+ pause = self.event_by_socket[socket]
+ pause.set()
+
+ def _socket_receive(self, socket, recv_method=None):
+ pause = self.event_by_socket[socket]
+ while True:
+ pause.clear()
+ if recv_method:
+ incoming = recv_method(socket)
+ else:
+ incoming = socket.recv_multipart()
+ self.incoming_queue.put((incoming, socket))
+ pause.wait()
+
+
+class GreenExecutor(zmq_poller.Executor):
+
+ def __init__(self, method):
+ self._method = method
+ super(GreenExecutor, self).__init__(None)
+
+ def _loop(self):
+ while True:
+ self._method()
+ eventlet.sleep()
+
+ def execute(self):
+ self.thread = eventlet.spawn(self._loop)
+
+ def wait(self):
+ if self.thread is not None:
+ self.thread.wait()
+
+ def stop(self):
+ if self.thread is not None:
+ self.thread.kill()
diff --git a/oslo_messaging/_drivers/zmq_driver/poller/threading_poller.py b/oslo_messaging/_drivers/zmq_driver/poller/threading_poller.py
new file mode 100644
index 0000000..e4317c4
--- /dev/null
+++ b/oslo_messaging/_drivers/zmq_driver/poller/threading_poller.py
@@ -0,0 +1,50 @@
+# Copyright 2015 Mirantis, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import logging
+import threading
+
+import zmq
+
+from oslo_messaging._drivers.zmq_driver import zmq_poller
+
+LOG = logging.getLogger(__name__)
+
+
+class ThreadingPoller(zmq_poller.ZmqPoller):
+
+ def __init__(self):
+ self.poller = zmq.Poller()
+
+ def register(self, socket):
+ self.poller.register(socket, zmq.POLLOUT)
+
+ def poll(self, timeout=None):
+ socks = dict(self.poller.poll(timeout))
+ for socket in socks:
+ incoming = socket.recv()
+ return incoming
+
+
+class ThreadingExecutor(zmq_poller.Executor):
+
+ def __init__(self, method):
+ thread = threading.Thread(target=method)
+ super(ThreadingExecutor, self).__init__(thread)
+
+ def execute(self):
+ self.thread.start()
+
+ def wait(self):
+ self.thread.join()
diff --git a/oslo_messaging/_drivers/zmq_driver/rpc/__init__.py b/oslo_messaging/_drivers/zmq_driver/rpc/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/oslo_messaging/_drivers/zmq_driver/rpc/__init__.py
diff --git a/oslo_messaging/_drivers/zmq_driver/rpc/client/__init__.py b/oslo_messaging/_drivers/zmq_driver/rpc/client/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/oslo_messaging/_drivers/zmq_driver/rpc/client/__init__.py
diff --git a/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_call_request.py b/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_call_request.py
new file mode 100644
index 0000000..fb20efd
--- /dev/null
+++ b/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_call_request.py
@@ -0,0 +1,49 @@
+# Copyright 2015 Mirantis, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import logging
+
+from oslo_messaging._drivers.zmq_driver.rpc.client.zmq_request import Request
+from oslo_messaging._drivers.zmq_driver import zmq_async
+from oslo_messaging._drivers.zmq_driver import zmq_topic
+from oslo_messaging._i18n import _LE, _LI
+
+LOG = logging.getLogger(__name__)
+
+zmq = zmq_async.import_zmq()
+
+
+class CallRequest(Request):
+
+ def __init__(self, conf, target, context, message, timeout=None,
+ retry=None):
+ try:
+ self.zmq_context = zmq.Context()
+ socket = self.zmq_context.socket(zmq.REQ)
+
+ super(CallRequest, self).__init__(conf, target, context,
+ message, socket, timeout, retry)
+
+ self.connect_address = zmq_topic.get_tcp_address_call(conf,
+ self.topic)
+ LOG.info(_LI("Connecting REQ to %s") % self.connect_address)
+ self.socket.connect(self.connect_address)
+ except zmq.ZMQError as e:
+ LOG.error(_LE("Error connecting to socket: %s") % str(e))
+
+ def receive_reply(self):
+ # NOTE(ozamiatin): Check for retry here (no retries now)
+ self.socket.setsockopt(zmq.RCVTIMEO, self.timeout)
+ reply = self.socket.recv_json()
+ return reply[u'reply']
diff --git a/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_cast_dealer.py b/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_cast_dealer.py
new file mode 100644
index 0000000..40fddd9
--- /dev/null
+++ b/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_cast_dealer.py
@@ -0,0 +1,72 @@
+# Copyright 2015 Mirantis, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import logging
+
+from oslo_messaging._drivers.zmq_driver.rpc.client import zmq_cast_publisher
+from oslo_messaging._drivers.zmq_driver.rpc.client.zmq_request import Request
+from oslo_messaging._drivers.zmq_driver import zmq_async
+from oslo_messaging._drivers.zmq_driver import zmq_topic
+from oslo_messaging._i18n import _LE, _LI
+
+LOG = logging.getLogger(__name__)
+
+zmq = zmq_async.import_zmq()
+
+
+class CastRequest(Request):
+
+ def __init__(self, conf, target, context,
+ message, socket, address, timeout=None, retry=None):
+ self.connect_address = address
+ super(CastRequest, self).__init__(conf, target, context, message,
+ socket, timeout, retry)
+
+ def __call__(self, *args, **kwargs):
+ self.send_request()
+
+ def send_request(self):
+ self.socket.send(b'', zmq.SNDMORE)
+ super(CastRequest, self).send_request()
+
+ def receive_reply(self):
+ # Ignore reply for CAST
+ pass
+
+
+class DealerCastPublisher(zmq_cast_publisher.CastPublisherBase):
+
+ def __init__(self, conf, matchmaker):
+ super(DealerCastPublisher, self).__init__(conf)
+ self.matchmaker = matchmaker
+
+ def cast(self, target, context,
+ message, timeout=None, retry=None):
+ topic = zmq_topic.Topic.from_target(self.conf, target)
+ connect_address = zmq_topic.get_tcp_address_call(self.conf, topic)
+ dealer_socket = self._create_socket(connect_address)
+ request = CastRequest(self.conf, target, context, message,
+ dealer_socket, connect_address, timeout, retry)
+ request.send_request()
+
+ def _create_socket(self, address):
+ if address in self.outbound_sockets:
+ return self.outbound_sockets[address]
+ try:
+ dealer_socket = self.zmq_context.socket(zmq.DEALER)
+ LOG.info(_LI("Connecting DEALER to %s") % address)
+ dealer_socket.connect(address)
+ except zmq.ZMQError:
+ LOG.error(_LE("Failed connecting DEALER to %s") % address)
+ return dealer_socket
diff --git a/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_cast_publisher.py b/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_cast_publisher.py
new file mode 100644
index 0000000..0984545
--- /dev/null
+++ b/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_cast_publisher.py
@@ -0,0 +1,40 @@
+# Copyright 2015 Mirantis, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import abc
+import logging
+
+import six
+
+from oslo_messaging._drivers.zmq_driver import zmq_async
+
+
+LOG = logging.getLogger(__name__)
+
+zmq = zmq_async.import_zmq()
+
+
+@six.add_metaclass(abc.ABCMeta)
+class CastPublisherBase(object):
+
+ def __init__(self, conf):
+ self.conf = conf
+ self.zmq_context = zmq.Context()
+ self.outbound_sockets = {}
+ super(CastPublisherBase, self).__init__()
+
+ @abc.abstractmethod
+ def cast(self, target, context,
+ message, timeout=None, retry=None):
+ "Send CAST to target"
diff --git a/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_client.py b/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_client.py
new file mode 100644
index 0000000..a4eed49
--- /dev/null
+++ b/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_client.py
@@ -0,0 +1,33 @@
+# Copyright 2015 Mirantis, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+
+from oslo_messaging._drivers.zmq_driver.rpc.client import zmq_call_request
+from oslo_messaging._drivers.zmq_driver.rpc.client import zmq_cast_dealer
+
+
+class ZmqClient(object):
+
+ def __init__(self, conf, matchmaker=None):
+ self.conf = conf
+ self.cast_publisher = zmq_cast_dealer.DealerCastPublisher(conf,
+ matchmaker)
+
+ def call(self, target, context, message, timeout=None, retry=None):
+ request = zmq_call_request.CallRequest(self.conf, target, context,
+ message, timeout, retry)
+ return request()
+
+ def cast(self, target, context, message, timeout=None, retry=None):
+ self.cast_publisher.cast(target, context, message, timeout, retry)
diff --git a/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_request.py b/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_request.py
new file mode 100644
index 0000000..2bfe755
--- /dev/null
+++ b/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_request.py
@@ -0,0 +1,76 @@
+# Copyright 2015 Mirantis, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import abc
+from abc import abstractmethod
+import logging
+import uuid
+
+import six
+
+from oslo_messaging._drivers.zmq_driver import zmq_async
+from oslo_messaging._drivers.zmq_driver import zmq_topic
+from oslo_messaging._i18n import _LE
+
+LOG = logging.getLogger(__name__)
+
+zmq = zmq_async.import_zmq()
+
+
+@six.add_metaclass(abc.ABCMeta)
+class Request(object):
+
+ def __init__(self, conf, target, context, message,
+ socket, timeout=None, retry=None):
+
+ if message['method'] is None:
+ errmsg = _LE("No method specified for RPC call")
+ LOG.error(errmsg)
+ raise KeyError(errmsg)
+
+ self.msg_id = uuid.uuid4().hex
+ self.target = target
+ self.context = context
+ self.message = message
+ self.timeout = self._to_milliseconds(conf, timeout)
+ self.retry = retry
+ self.reply = None
+ self.socket = socket
+ self.topic = zmq_topic.Topic.from_target(conf, target)
+
+ @staticmethod
+ def _to_milliseconds(conf, timeout):
+ return timeout * 1000 if timeout else conf.rpc_response_timeout * 1000
+
+ @property
+ def is_replied(self):
+ return self.reply is not None
+
+ @property
+ def is_timed_out(self):
+ return False
+
+ def send_request(self):
+ self.socket.send_string(str(self.topic), zmq.SNDMORE)
+ self.socket.send_string(self.msg_id, zmq.SNDMORE)
+ self.socket.send_json(self.context, zmq.SNDMORE)
+ self.socket.send_json(self.message)
+
+ def __call__(self):
+ self.send_request()
+ return self.receive_reply()
+
+ @abstractmethod
+ def receive_reply(self):
+ "Receive reply from server side"
diff --git a/oslo_messaging/_drivers/zmq_driver/rpc/server/__init__.py b/oslo_messaging/_drivers/zmq_driver/rpc/server/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/oslo_messaging/_drivers/zmq_driver/rpc/server/__init__.py
diff --git a/oslo_messaging/_drivers/zmq_driver/rpc/server/zmq_base_consumer.py b/oslo_messaging/_drivers/zmq_driver/rpc/server/zmq_base_consumer.py
new file mode 100644
index 0000000..2eeb55f
--- /dev/null
+++ b/oslo_messaging/_drivers/zmq_driver/rpc/server/zmq_base_consumer.py
@@ -0,0 +1,35 @@
+# Copyright 2015 Mirantis, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+
+class ConsumerBase(object):
+
+ def __init__(self, listener, conf, zmq_poller, context):
+ self.listener = listener
+ self.conf = conf
+ self.poller = zmq_poller
+ self.context = context
+ self.sockets_per_topic = {}
+
+ def poll(self, timeout=None):
+ pass
+
+ def stop(self):
+ pass
+
+ def cleanup(self):
+ pass
+
+ def listen(self, target):
+ pass
diff --git a/oslo_messaging/_drivers/zmq_driver/rpc/server/zmq_call_responder.py b/oslo_messaging/_drivers/zmq_driver/rpc/server/zmq_call_responder.py
new file mode 100644
index 0000000..959ffd7
--- /dev/null
+++ b/oslo_messaging/_drivers/zmq_driver/rpc/server/zmq_call_responder.py
@@ -0,0 +1,96 @@
+# Copyright 2015 Mirantis, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+
+import logging
+
+from oslo_messaging._drivers import base
+from oslo_messaging._drivers.zmq_driver.rpc.server import zmq_base_consumer
+from oslo_messaging._drivers.zmq_driver import zmq_async
+from oslo_messaging._drivers.zmq_driver import zmq_topic as topic_utils
+from oslo_messaging._i18n import _LE
+
+
+LOG = logging.getLogger(__name__)
+
+zmq = zmq_async.import_zmq()
+
+
+class ZmqIncomingRequest(base.IncomingMessage):
+
+ def __init__(self, listener, context, message, socket, rep_id, poller):
+ super(ZmqIncomingRequest, self).__init__(listener, context, message)
+ self.reply_socket = socket
+ self.reply_id = rep_id
+ self.received = None
+ self.poller = poller
+
+ def reply(self, reply=None, failure=None, log_failure=True):
+ message_reply = {u'reply': reply,
+ u'failure': failure,
+ u'log_failure': log_failure}
+ LOG.debug("Replying %s REP", (str(message_reply)))
+ self.received = True
+ self.reply_socket.send(self.reply_id, zmq.SNDMORE)
+ self.reply_socket.send(b'', zmq.SNDMORE)
+ self.reply_socket.send_json(message_reply)
+ self.poller.resume_polling(self.reply_socket)
+
+ def acknowledge(self):
+ pass
+
+ def requeue(self):
+ pass
+
+
+class CallResponder(zmq_base_consumer.ConsumerBase):
+
+ def __init__(self, listener, conf, poller, context):
+ super(CallResponder, self).__init__(listener, conf, poller, context)
+
+ def poll(self, timeout=None):
+ try:
+ incoming, socket = self.poller.poll(timeout)
+ reply_id, context, message = incoming
+ LOG.debug("[Server] REP Received message %s" % str(message))
+ incoming = ZmqIncomingRequest(self.listener,
+ context,
+ message, socket,
+ reply_id,
+ self.poller)
+ return incoming
+
+ except zmq.ZMQError as e:
+ LOG.error(_LE("Receiving message failed ... {}"), e)
+
+ def listen(self, target):
+
+ def _receive_message(socket):
+ reply_id = socket.recv()
+ empty = socket.recv()
+ assert empty == b'', 'Bad format: empty separator expected'
+ topic = socket.recv_string()
+ assert topic is not None, 'Bad format: topic string expected'
+ msg_id = socket.recv_string()
+ assert msg_id is not None, 'Bad format: message ID expected'
+ context = socket.recv_json()
+ message = socket.recv_json()
+ return (reply_id, context, message)
+
+ topic = topic_utils.Topic.from_target(self.conf, target)
+ ipc_rep_address = topic_utils.get_ipc_address_call(self.conf, topic)
+ rep_socket = self.context.socket(zmq.REP)
+ rep_socket.bind(ipc_rep_address)
+ self.sockets_per_topic[str(topic)] = rep_socket
+ self.poller.register(rep_socket, _receive_message)
diff --git a/oslo_messaging/_drivers/zmq_driver/rpc/server/zmq_server.py b/oslo_messaging/_drivers/zmq_driver/rpc/server/zmq_server.py
new file mode 100644
index 0000000..e6f67ab
--- /dev/null
+++ b/oslo_messaging/_drivers/zmq_driver/rpc/server/zmq_server.py
@@ -0,0 +1,49 @@
+# Copyright 2015 Mirantis, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import logging
+
+from oslo_messaging._drivers import base
+from oslo_messaging._drivers.zmq_driver.rpc.server import zmq_call_responder
+from oslo_messaging._drivers.zmq_driver import zmq_async
+
+LOG = logging.getLogger(__name__)
+
+zmq = zmq_async.import_zmq()
+
+
+class ZmqServer(base.Listener):
+
+ def __init__(self, conf, matchmaker=None):
+ LOG.info("[Server] __init__")
+ self.conf = conf
+ self.context = zmq.Context()
+ poller = zmq_async.get_reply_poller()
+ self.call_responder = zmq_call_responder.CallResponder(self, conf,
+ poller,
+ self.context)
+
+ def poll(self, timeout=None):
+ incoming = self.call_responder.poll(timeout)
+ return incoming
+
+ def stop(self):
+ LOG.info("[Server] Stop")
+
+ def cleanup(self):
+ pass
+
+ def listen(self, target):
+ LOG.info("[Server] Listen to Target %s" % target)
+ self.call_responder.listen(target)
diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_async.py b/oslo_messaging/_drivers/zmq_driver/zmq_async.py
new file mode 100644
index 0000000..3694d0f
--- /dev/null
+++ b/oslo_messaging/_drivers/zmq_driver/zmq_async.py
@@ -0,0 +1,59 @@
+# Copyright 2015 Mirantis, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import logging
+
+from oslo_utils import importutils
+
+from oslo_messaging._i18n import _LE
+
+LOG = logging.getLogger(__name__)
+
+green_zmq = importutils.try_import('eventlet.green.zmq')
+
+
+def import_zmq():
+ imported_zmq = green_zmq or importutils.try_import('zmq')
+ if imported_zmq is None:
+ errmsg = _LE("ZeroMQ not found!")
+ LOG.error(errmsg)
+ raise ImportError(errmsg)
+ return imported_zmq
+
+
+def get_poller():
+ if green_zmq:
+ from oslo_messaging._drivers.zmq_driver.poller import green_poller
+ return green_poller.GreenPoller()
+ else:
+ from oslo_messaging._drivers.zmq_driver.poller import threading_poller
+ return threading_poller.ThreadingPoller()
+
+
+def get_reply_poller():
+ if green_zmq:
+ from oslo_messaging._drivers.zmq_driver.poller import green_poller
+ return green_poller.HoldReplyPoller()
+ else:
+ from oslo_messaging._drivers.zmq_driver.poller import threading_poller
+ return threading_poller.ThreadingPoller()
+
+
+def get_executor(method):
+ if green_zmq is not None:
+ from oslo_messaging._drivers.zmq_driver.poller import green_poller
+ return green_poller.GreenExecutor(method)
+ else:
+ from oslo_messaging._drivers.zmq_driver.poller import threading_poller
+ return threading_poller.ThreadingExecutor()
diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_context.py b/oslo_messaging/_drivers/zmq_driver/zmq_context.py
new file mode 100644
index 0000000..f986e41
--- /dev/null
+++ b/oslo_messaging/_drivers/zmq_driver/zmq_context.py
@@ -0,0 +1,33 @@
+# Copyright 2015 Mirantis, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+
+from oslo_messaging._drivers import common as rpc_common
+
+
+class RpcContext(rpc_common.CommonRpcContext):
+ """Context that supports replying to a rpc.call."""
+ def __init__(self, **kwargs):
+ self.replies = []
+ super(RpcContext, self).__init__(**kwargs)
+
+ def deepcopy(self):
+ values = self.to_dict()
+ values['replies'] = self.replies
+ return self.__class__(**values)
+
+ def reply(self, reply=None, failure=None, ending=False):
+ if ending:
+ return
+ self.replies.append(reply)
diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_poller.py b/oslo_messaging/_drivers/zmq_driver/zmq_poller.py
new file mode 100644
index 0000000..dcd51ad
--- /dev/null
+++ b/oslo_messaging/_drivers/zmq_driver/zmq_poller.py
@@ -0,0 +1,48 @@
+# Copyright 2015 Mirantis, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import abc
+
+import six
+
+
+@six.add_metaclass(abc.ABCMeta)
+class ZmqPoller(object):
+
+ @abc.abstractmethod
+ def register(self, socket, recv_method=None):
+ 'Register socket to poll'
+
+ @abc.abstractmethod
+ def poll(self, timeout=None):
+ 'Poll for messages'
+
+
+@six.add_metaclass(abc.ABCMeta)
+class Executor(object):
+
+ def __init__(self, thread):
+ self.thread = thread
+
+ @abc.abstractmethod
+ def execute(self):
+ 'Run execution'
+
+ @abc.abstractmethod
+ def stop(self):
+ 'Stop execution'
+
+ @abc.abstractmethod
+ def wait(self):
+ 'Wait until pass'
diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_serializer.py b/oslo_messaging/_drivers/zmq_driver/zmq_serializer.py
new file mode 100644
index 0000000..0f0733a
--- /dev/null
+++ b/oslo_messaging/_drivers/zmq_driver/zmq_serializer.py
@@ -0,0 +1,54 @@
+# Copyright 2015 Mirantis, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import logging
+import os
+import re
+
+import six
+
+from oslo_messaging._drivers import common as rpc_common
+from oslo_messaging._i18n import _LE, _LW
+
+LOG = logging.getLogger(__name__)
+
+MESSAGE_CALL_TOPIC_POSITION = 2
+
+
+def _get_topic_from_msg(message, position):
+ pathsep = set((os.path.sep or '', os.path.altsep or '', '/', '\\'))
+ badchars = re.compile(r'[%s]' % re.escape(''.join(pathsep)))
+ topic = message[position]
+ topic_items = None
+
+ if six.PY3:
+ topic = topic.decode('utf-8')
+
+ try:
+ # The topic is received over the network,
+ # don't trust this input.
+ if badchars.search(topic) is not None:
+ emsg = _LW("Topic contained dangerous characters")
+ LOG.warn(emsg)
+ raise rpc_common.RPCException(emsg)
+ topic_items = topic.split('.', 1)
+ except Exception as e:
+ errmsg = _LE("Failed topic string parsing, %s") % str(e)
+ LOG.error(errmsg)
+ rpc_common.RPCException(errmsg)
+ return topic_items[0], topic_items[1]
+
+
+def get_topic_from_call_message(message):
+ return _get_topic_from_msg(message, MESSAGE_CALL_TOPIC_POSITION)
diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_topic.py b/oslo_messaging/_drivers/zmq_driver/zmq_topic.py
new file mode 100644
index 0000000..c338b69
--- /dev/null
+++ b/oslo_messaging/_drivers/zmq_driver/zmq_topic.py
@@ -0,0 +1,61 @@
+# Copyright 2015 Mirantis, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+
+def get_ipc_address_call(conf, topic):
+ return "ipc://%s/%s" % (conf.rpc_zmq_ipc_dir, str(topic))
+
+
+def get_tcp_bind_address(port):
+ return "tcp://*:%s" % port
+
+
+def get_tcp_address_call(conf, topic):
+ return "tcp://%s:%s" % (topic.server, conf.rpc_zmq_port)
+
+
+def get_ipc_address_cast(conf, topic):
+ return "ipc://%s/fanout/%s" % (conf.rpc_zmq_ipc_dir, str(topic))
+
+
+class Topic(object):
+
+ def __init__(self, conf, topic, server=None, fanout=False):
+
+ if server is None:
+ self.server = conf.rpc_zmq_host
+ else:
+ self.server = server
+
+ self._topic = topic
+ self.fanout = fanout
+
+ @staticmethod
+ def _extract_cinder_server(server):
+ return server.split('@', 1)[0]
+
+ @staticmethod
+ def from_target(conf, target):
+ if target.server is not None:
+ return Topic(conf, target.topic, target.server,
+ fanout=target.fanout)
+ else:
+ return Topic(conf, target.topic, fanout=target.fanout)
+
+ @property
+ def topic(self):
+ return self._topic if self._topic else ""
+
+ def __str__(self, *args, **kwargs):
+ return "%s.%s" % (self.topic, self.server)
diff --git a/oslo_messaging/tests/drivers/test_impl_zmq.py b/oslo_messaging/tests/drivers/test_impl_zmq.py
index 85e5dd3..a6eef2f 100644
--- a/oslo_messaging/tests/drivers/test_impl_zmq.py
+++ b/oslo_messaging/tests/drivers/test_impl_zmq.py
@@ -1,5 +1,4 @@
-# Copyright 2014 Canonical, Ltd.
-# All Rights Reserved.
+# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
@@ -15,27 +14,51 @@
import logging
import socket
+import threading
import fixtures
-from oslo_utils import importutils
import testtools
-try:
- import zmq
-except ImportError:
- zmq = None
-
import oslo_messaging
-from oslo_messaging._drivers import common as rpc_common
+from oslo_messaging._drivers import impl_zmq
+from oslo_messaging._drivers.zmq_driver.broker.zmq_broker import ZmqBroker
+from oslo_messaging._drivers.zmq_driver import zmq_async
+from oslo_messaging._i18n import _
from oslo_messaging.tests import utils as test_utils
-from six.moves import mock
-# eventlet is not yet py3 compatible, so skip if not installed
-eventlet = importutils.try_import('eventlet')
+LOG = logging.getLogger(__name__)
-impl_zmq = importutils.try_import('oslo_messaging._drivers.impl_zmq')
+zmq = zmq_async.import_zmq()
-LOG = logging.getLogger(__name__)
+
+class TestRPCServerListener(object):
+
+ def __init__(self, driver):
+ self.driver = driver
+ self.target = None
+ self.listener = None
+ self.executor = zmq_async.get_executor(self._run)
+ self._stop = threading.Event()
+ self._received = threading.Event()
+ self.message = None
+
+ def listen(self, target):
+ self.target = target
+ self.listener = self.driver.listen(self.target)
+ self.executor.execute()
+
+ def _run(self):
+ try:
+ message = self.listener.poll()
+ if message is not None:
+ self._received.set()
+ self.message = message
+ message.reply(reply=True)
+ except Exception:
+ LOG.exception(_("Unexpected exception occurred."))
+
+ def stop(self):
+ self.executor.stop()
def get_unused_port():
@@ -70,10 +93,11 @@ class ZmqBaseTestCase(test_utils.BaseTestCase):
# Start RPC
LOG.info("Running internal zmq receiver.")
- self.reactor = impl_zmq.ZmqProxy(self.conf)
- self.reactor.consume_in_thread()
+ self.broker = ZmqBroker(self.conf)
+ self.broker.start()
+
+ self.listener = TestRPCServerListener(self.driver)
- self.matchmaker = impl_zmq._get_matchmaker(host='127.0.0.1')
self.addCleanup(stopRpc(self.__dict__))
@@ -94,380 +118,127 @@ class stopRpc(object):
self.attrs = attrs
def __call__(self):
- if self.attrs['reactor']:
- self.attrs['reactor'].close()
+ if self.attrs['broker']:
+ self.attrs['broker'].close()
if self.attrs['driver']:
self.attrs['driver'].cleanup()
+ if self.attrs['listener']:
+ self.attrs['listener'].stop()
class TestZmqBasics(ZmqBaseTestCase):
- def test_start_stop_listener(self):
- target = oslo_messaging.Target(topic='testtopic')
- listener = self.driver.listen(target)
- result = listener.poll(0.01)
- self.assertEqual(result, None)
-
def test_send_receive_raises(self):
"""Call() without method."""
target = oslo_messaging.Target(topic='testtopic')
- self.driver.listen(target)
+ self.listener.listen(target)
self.assertRaises(
KeyError,
self.driver.send,
target, {}, {'tx_id': 1}, wait_for_reply=True)
- @mock.patch('oslo_messaging._drivers.impl_zmq.ZmqIncomingMessage')
- def test_send_receive_topic(self, mock_msg):
- """Call() with method."""
- mock_msg.return_value = msg = mock.MagicMock()
- msg.received = received = mock.MagicMock()
- received.failure = False
- received.reply = True
- msg.condition = condition = mock.MagicMock()
- condition.wait.return_value = True
+ def test_send_receive_topic(self):
+ """Call() with topic."""
target = oslo_messaging.Target(topic='testtopic')
- self.driver.listen(target)
+ self.listener.listen(target)
result = self.driver.send(
target, {},
{'method': 'hello-world', 'tx_id': 1},
wait_for_reply=True)
- self.assertEqual(result, True)
-
- @mock.patch('oslo_messaging._drivers.impl_zmq._call', autospec=True)
- def test_send_receive_fanout(self, mock_call):
- target = oslo_messaging.Target(topic='testtopic', fanout=True)
- self.driver.listen(target)
+ self.assertIsNotNone(result)
- mock_call.__name__ = '_call'
- mock_call.return_value = [True]
+ def test_send_noreply(self):
+ """Cast() with topic."""
+ target = oslo_messaging.Target(topic='testtopic', server="127.0.0.1")
+ self.listener.listen(target)
result = self.driver.send(
target, {},
{'method': 'hello-world', 'tx_id': 1},
- wait_for_reply=True)
+ wait_for_reply=False)
- self.assertEqual(result, True)
- mock_call.assert_called_once_with(
- self.driver,
- 'tcp://127.0.0.1:%s' % self.conf['rpc_zmq_port'],
- {}, 'fanout~testtopic.127.0.0.1',
- {'tx_id': 1, 'method': 'hello-world'},
- None, False, [], True)
-
- @mock.patch('oslo_messaging._drivers.impl_zmq._call', autospec=True)
- def test_send_receive_direct(self, mock_call):
- # Also verifies fix for bug http://pad.lv/1301723
- target = oslo_messaging.Target(topic='testtopic', server='localhost')
- self.driver.listen(target)
+ self.listener._received.wait()
- mock_call.__name__ = '_call'
- mock_call.return_value = [True]
+ self.assertIsNone(result)
+ self.assertEqual(True, self.listener._received.isSet())
+ method = self.listener.message.message[u'method']
+ self.assertEqual(u'hello-world', method)
+
+ @testtools.skip("Not implemented feature")
+ def test_send_fanout(self):
+ target = oslo_messaging.Target(topic='testtopic', fanout=True)
+ self.driver.listen(target)
result = self.driver.send(
target, {},
{'method': 'hello-world', 'tx_id': 1},
- wait_for_reply=True)
+ wait_for_reply=False)
- self.assertEqual(result, True)
- mock_call.assert_called_once_with(
- self.driver,
- 'tcp://localhost:%s' % self.conf['rpc_zmq_port'],
- {}, 'testtopic.localhost',
- {'tx_id': 1, 'method': 'hello-world'},
- None, False, [], True)
+ self.assertIsNone(result)
+ self.assertEqual(True, self.listener._received.isSet())
+ msg_pattern = "{'method': 'hello-world', 'tx_id': 1}"
+ self.assertEqual(msg_pattern, self.listener.message)
+ def test_send_receive_direct(self):
+ """Call() without topic."""
-class TestZmqSocket(test_utils.BaseTestCase):
+ target = oslo_messaging.Target(server='127.0.0.1')
+ self.listener.listen(target)
+ message = {'method': 'hello-world', 'tx_id': 1}
+ context = {}
+ result = self.driver.send(target, context, message,
+ wait_for_reply=True)
+ self.assertTrue(result)
- @testtools.skipIf(zmq is None, "zmq not available")
- def setUp(self):
- super(TestZmqSocket, self).setUp()
- self.messaging_conf.transport_driver = 'zmq'
- # Get driver
- transport = oslo_messaging.get_transport(self.conf)
- self.driver = transport._driver
- @mock.patch('oslo_messaging._drivers.impl_zmq.ZmqSocket.subscribe')
- @mock.patch('oslo_messaging._drivers.impl_zmq.zmq.Context')
- def test_zmqsocket_init_type_pull(self, mock_context, mock_subscribe):
- mock_ctxt = mock.Mock()
- mock_context.return_value = mock_ctxt
- mock_sock = mock.Mock()
- mock_ctxt.socket = mock.Mock(return_value=mock_sock)
- mock_sock.connect = mock.Mock()
- mock_sock.bind = mock.Mock()
- addr = '127.0.0.1'
-
- sock = impl_zmq.ZmqSocket(addr, impl_zmq.zmq.PULL, bind=False,
- subscribe=None)
- self.assertTrue(sock.can_recv)
- self.assertFalse(sock.can_send)
- self.assertFalse(sock.can_sub)
- self.assertTrue(mock_sock.connect.called)
- self.assertFalse(mock_sock.bind.called)
-
- @mock.patch('oslo_messaging._drivers.impl_zmq.ZmqSocket.subscribe')
- @mock.patch('oslo_messaging._drivers.impl_zmq.zmq.Context')
- def test_zmqsocket_init_type_sub(self, mock_context, mock_subscribe):
- mock_ctxt = mock.Mock()
- mock_context.return_value = mock_ctxt
- mock_sock = mock.Mock()
- mock_ctxt.socket = mock.Mock(return_value=mock_sock)
- mock_sock.connect = mock.Mock()
- mock_sock.bind = mock.Mock()
- addr = '127.0.0.1'
-
- sock = impl_zmq.ZmqSocket(addr, impl_zmq.zmq.SUB, bind=False,
- subscribe=None)
- self.assertTrue(sock.can_recv)
- self.assertFalse(sock.can_send)
- self.assertTrue(sock.can_sub)
- self.assertTrue(mock_sock.connect.called)
- self.assertFalse(mock_sock.bind.called)
-
- @mock.patch('oslo_messaging._drivers.impl_zmq.ZmqSocket.subscribe')
- @mock.patch('oslo_messaging._drivers.impl_zmq.zmq.Context')
- def test_zmqsocket_init_type_push(self, mock_context, mock_subscribe):
- mock_ctxt = mock.Mock()
- mock_context.return_value = mock_ctxt
- mock_sock = mock.Mock()
- mock_ctxt.socket = mock.Mock(return_value=mock_sock)
- mock_sock.connect = mock.Mock()
- mock_sock.bind = mock.Mock()
- addr = '127.0.0.1'
-
- sock = impl_zmq.ZmqSocket(addr, impl_zmq.zmq.PUSH, bind=False,
- subscribe=None)
- self.assertFalse(sock.can_recv)
- self.assertTrue(sock.can_send)
- self.assertFalse(sock.can_sub)
- self.assertTrue(mock_sock.connect.called)
- self.assertFalse(mock_sock.bind.called)
-
- @mock.patch('oslo_messaging._drivers.impl_zmq.ZmqSocket.subscribe')
- @mock.patch('oslo_messaging._drivers.impl_zmq.zmq.Context')
- def test_zmqsocket_init_type_pub(self, mock_context, mock_subscribe):
- mock_ctxt = mock.Mock()
- mock_context.return_value = mock_ctxt
- mock_sock = mock.Mock()
- mock_ctxt.socket = mock.Mock(return_value=mock_sock)
- mock_sock.connect = mock.Mock()
- mock_sock.bind = mock.Mock()
- addr = '127.0.0.1'
-
- sock = impl_zmq.ZmqSocket(addr, impl_zmq.zmq.PUB, bind=False,
- subscribe=None)
- self.assertFalse(sock.can_recv)
- self.assertTrue(sock.can_send)
- self.assertFalse(sock.can_sub)
- self.assertTrue(mock_sock.connect.called)
- self.assertFalse(mock_sock.bind.called)
-
-
-class TestZmqIncomingMessage(test_utils.BaseTestCase):
+class TestPoller(test_utils.BaseTestCase):
- @testtools.skipIf(zmq is None, "zmq not available")
def setUp(self):
- super(TestZmqIncomingMessage, self).setUp()
- self.messaging_conf.transport_driver = 'zmq'
- # Get driver
- transport = oslo_messaging.get_transport(self.conf)
- self.driver = transport._driver
+ super(TestPoller, self).setUp()
+ self.poller = zmq_async.get_poller()
+ self.ctx = zmq.Context()
+ self.ADDR_REQ = "ipc://request1"
+
+ def test_poll_blocking(self):
+
+ rep = self.ctx.socket(zmq.REP)
+ rep.bind(self.ADDR_REQ)
+
+ reply_poller = zmq_async.get_reply_poller()
+ reply_poller.register(rep)
+
+ def listener():
+ incoming, socket = reply_poller.poll()
+ self.assertEqual(b'Hello', incoming[0])
+ socket.send_string('Reply')
+ reply_poller.resume_polling(socket)
+
+ executor = zmq_async.get_executor(listener)
+ executor.execute()
+
+ req1 = self.ctx.socket(zmq.REQ)
+ req1.connect(self.ADDR_REQ)
+
+ req2 = self.ctx.socket(zmq.REQ)
+ req2.connect(self.ADDR_REQ)
+
+ req1.send_string('Hello')
+ req2.send_string('Hello')
+
+ reply = req1.recv_string()
+ self.assertEqual('Reply', reply)
+
+ reply = req2.recv_string()
+ self.assertEqual('Reply', reply)
+
+ def test_poll_timeout(self):
+ rep = self.ctx.socket(zmq.REP)
+ rep.bind(self.ADDR_REQ)
+
+ reply_poller = zmq_async.get_reply_poller()
+ reply_poller.register(rep)
- def test_zmqincomingmessage(self):
- msg = impl_zmq.ZmqIncomingMessage(mock.Mock(), None, 'msg.foo')
- msg.reply("abc")
- self.assertIsInstance(
- msg.received, impl_zmq.ZmqIncomingMessage.ReceivedReply)
- self.assertIsInstance(
- msg.received, impl_zmq.ZmqIncomingMessage.ReceivedReply)
- self.assertEqual(msg.received.reply, "abc")
- msg.requeue()
-
-
-class TestZmqConnection(ZmqBaseTestCase):
-
- @mock.patch('oslo_messaging._drivers.impl_zmq.ZmqReactor', autospec=True)
- def test_zmqconnection_create_consumer(self, mock_reactor):
-
- mock_reactor.register = mock.Mock()
- conn = impl_zmq.Connection(self.driver.conf, self.driver)
- topic = 'topic.foo'
- context = mock.Mock()
- inaddr = ('ipc://%s/zmq_topic_topic.127.0.0.1' %
- (self.internal_ipc_dir))
- # No Fanout
- conn.create_consumer(topic, context)
- conn.reactor.register.assert_called_with(context, inaddr,
- impl_zmq.zmq.PULL,
- subscribe=None, in_bind=False)
-
- # Reset for next bunch of checks
- conn.reactor.register.reset_mock()
-
- # Fanout
- inaddr = ('ipc://%s/zmq_topic_fanout~topic' %
- (self.internal_ipc_dir))
- conn.create_consumer(topic, context, fanout='subscriber.foo')
- conn.reactor.register.assert_called_with(context, inaddr,
- impl_zmq.zmq.SUB,
- subscribe='subscriber.foo',
- in_bind=False)
-
- @mock.patch('oslo_messaging._drivers.impl_zmq.ZmqReactor', autospec=True)
- def test_zmqconnection_create_consumer_topic_exists(self, mock_reactor):
- mock_reactor.register = mock.Mock()
- conn = impl_zmq.Connection(self.driver.conf, self.driver)
- topic = 'topic.foo'
- context = mock.Mock()
- inaddr = ('ipc://%s/zmq_topic_topic.127.0.0.1' %
- (self.internal_ipc_dir))
-
- conn.create_consumer(topic, context)
- conn.reactor.register.assert_called_with(
- context, inaddr, impl_zmq.zmq.PULL, subscribe=None, in_bind=False)
- conn.reactor.register.reset_mock()
- # Call again with same topic
- conn.create_consumer(topic, context)
- self.assertFalse(conn.reactor.register.called)
-
- @mock.patch('oslo_messaging._drivers.impl_zmq._get_matchmaker',
- autospec=True)
- @mock.patch('oslo_messaging._drivers.impl_zmq.ZmqReactor', autospec=True)
- def test_zmqconnection_close(self, mock_reactor, mock_getmatchmaker):
- conn = impl_zmq.Connection(self.driver.conf, self.driver)
- conn.reactor.close = mock.Mock()
- mock_getmatchmaker.return_value.stop_heartbeat = mock.Mock()
- conn.close()
- self.assertTrue(mock_getmatchmaker.return_value.stop_heartbeat.called)
- self.assertTrue(conn.reactor.close.called)
-
- @mock.patch('oslo_messaging._drivers.impl_zmq.ZmqReactor', autospec=True)
- def test_zmqconnection_wait(self, mock_reactor):
- conn = impl_zmq.Connection(self.driver, self.driver)
- conn.reactor.wait = mock.Mock()
- conn.wait()
- self.assertTrue(conn.reactor.wait.called)
-
- @mock.patch('oslo_messaging._drivers.impl_zmq._get_matchmaker',
- autospec=True)
- @mock.patch('oslo_messaging._drivers.impl_zmq.ZmqReactor', autospec=True)
- def test_zmqconnection_consume_in_thread(self, mock_reactor,
- mock_getmatchmaker):
- mock_getmatchmaker.return_value.start_heartbeat = mock.Mock()
- conn = impl_zmq.Connection(self.driver, self.driver)
- conn.reactor.consume_in_thread = mock.Mock()
- conn.consume_in_thread()
- self.assertTrue(mock_getmatchmaker.return_value.start_heartbeat.called)
- self.assertTrue(conn.reactor.consume_in_thread.called)
-
-
-class TestZmqListener(ZmqBaseTestCase):
-
- def test_zmqlistener_no_msg(self):
- listener = impl_zmq.ZmqListener(self.driver)
- # Timeout = 0 should return straight away since the queue is empty
- listener.poll(timeout=0)
-
- def test_zmqlistener_w_msg(self):
- listener = impl_zmq.ZmqListener(self.driver)
- kwargs = {'a': 1, 'b': 2}
- m = mock.Mock()
- ctxt = mock.Mock(autospec=impl_zmq.RpcContext)
- message = {'namespace': 'name.space', 'method': m.fake_method,
- 'args': kwargs}
- eventlet.spawn_n(listener.dispatch, ctxt, message)
- resp = listener.poll(timeout=10)
- msg = {'method': m.fake_method, 'namespace': 'name.space',
- 'args': kwargs}
- self.assertEqual(resp.message, msg)
-
-
-class TestZmqDriver(ZmqBaseTestCase):
-
- @mock.patch('oslo_messaging._drivers.impl_zmq._cast', autospec=True)
- @mock.patch('oslo_messaging._drivers.matchmaker.MatchMakerBase.queues',
- autospec=True)
- def test_zmqdriver_multi_send_cast_with_no_queues(self,
- mock_queues,
- mock_cast):
- context = mock.Mock(autospec=impl_zmq.RpcContext)
- topic = 'testtopic'
- msg = 'jeronimo'
-
- with mock.patch.object(impl_zmq.LOG, 'warn') as flog:
- mock_queues.return_value = None
- impl_zmq._multi_send(self.driver, mock_cast,
- context, topic, msg)
- self.assertEqual(1, flog.call_count)
- args, kwargs = flog.call_args
- self.assertIn('No matchmaker results', args[0])
-
- @mock.patch('oslo_messaging._drivers.impl_zmq._call', autospec=True)
- @mock.patch('oslo_messaging._drivers.matchmaker.MatchMakerBase.queues',
- autospec=True)
- def test_zmqdriver_multi_send_call_with_no_queues(self,
- mock_queues,
- mock_call):
- context = mock.Mock(autospec=impl_zmq.RpcContext)
- topic = 'testtopic'
- msg = 'jeronimo'
-
- mock_queues.return_value = None
- self.assertRaises(rpc_common.Timeout,
- impl_zmq._multi_send, self.driver,
- mock_call, context, topic, msg)
-
- @mock.patch('oslo_messaging._drivers.impl_zmq._cast', autospec=True)
- @mock.patch('oslo_messaging._drivers.impl_zmq._multi_send', autospec=True)
- def test_zmqdriver_send(self, mock_multi_send, mock_cast):
- context = mock.Mock(autospec=impl_zmq.RpcContext)
- topic = 'testtopic'
- msg = 'jeronimo'
- self.driver.send(oslo_messaging.Target(topic=topic), context, msg,
- False, 0, False)
- mock_multi_send.assert_called_with(self.driver, mock_cast, context,
- topic, msg,
- allowed_remote_exmods=[],
- envelope=False, pooled=True)
-
- @mock.patch('oslo_messaging._drivers.impl_zmq._cast', autospec=True)
- @mock.patch('oslo_messaging._drivers.impl_zmq._multi_send', autospec=True)
- def test_zmqdriver_send_notification(self, mock_multi_send, mock_cast):
- context = mock.Mock(autospec=impl_zmq.RpcContext)
- topic = 'testtopic.foo'
- topic_reformat = 'testtopic-foo'
- msg = 'jeronimo'
- self.driver.send_notification(oslo_messaging.Target(topic=topic),
- context, msg, False, False)
- mock_multi_send.assert_called_with(self.driver, mock_cast, context,
- topic_reformat, msg,
- allowed_remote_exmods=[],
- envelope=False, pooled=True)
-
- @mock.patch('oslo_messaging._drivers.impl_zmq.ZmqListener', autospec=True)
- @mock.patch('oslo_messaging._drivers.impl_zmq.Connection', autospec=True)
- def test_zmqdriver_listen(self, mock_connection, mock_listener):
- mock_listener.return_value = listener = mock.Mock()
- mock_connection.return_value = conn = mock.Mock()
- conn.create_consumer = mock.Mock()
- conn.consume_in_thread = mock.Mock()
- topic = 'testtopic.foo'
- self.driver.listen(oslo_messaging.Target(topic=topic))
- conn.create_consumer.assert_called_with(topic, listener, fanout=True)
-
- @mock.patch('oslo_messaging._drivers.impl_zmq.ZmqListener', autospec=True)
- @mock.patch('oslo_messaging._drivers.impl_zmq.Connection', autospec=True)
- def test_zmqdriver_listen_for_notification(self, mock_connection,
- mock_listener):
- mock_listener.return_value = listener = mock.Mock()
- mock_connection.return_value = conn = mock.Mock()
- conn.create_consumer = mock.Mock()
- conn.consume_in_thread = mock.Mock()
- topic = 'testtopic.foo'
- data = [(oslo_messaging.Target(topic=topic), 0)]
- # NOTE(jamespage): Pooling not supported, just pass None for now.
- self.driver.listen_for_notifications(data, None)
- conn.create_consumer.assert_called_with("%s-%s" % (topic, 0), listener)
+ incoming, socket = reply_poller.poll(1)
+ self.assertIsNone(incoming)
+ self.assertIsNone(socket)
diff --git a/tests/drivers/test_impl_zmq.py b/tests/drivers/test_impl_zmq.py
index ddc6753..a6eef2f 100644
--- a/tests/drivers/test_impl_zmq.py
+++ b/tests/drivers/test_impl_zmq.py
@@ -1,5 +1,4 @@
-# Copyright 2014 Canonical, Ltd.
-# All Rights Reserved.
+# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
@@ -15,27 +14,51 @@
import logging
import socket
+import threading
import fixtures
import testtools
-from six.moves import mock
+import oslo_messaging
+from oslo_messaging._drivers import impl_zmq
+from oslo_messaging._drivers.zmq_driver.broker.zmq_broker import ZmqBroker
+from oslo_messaging._drivers.zmq_driver import zmq_async
+from oslo_messaging._i18n import _
+from oslo_messaging.tests import utils as test_utils
-try:
- import zmq
-except ImportError:
- zmq = None
+LOG = logging.getLogger(__name__)
-from oslo import messaging
-from oslo.utils import importutils
-from oslo_messaging.tests import utils as test_utils
+zmq = zmq_async.import_zmq()
-# eventlet is not yet py3 compatible, so skip if not installed
-eventlet = importutils.try_import('eventlet')
-impl_zmq = importutils.try_import('oslo_messaging._drivers.impl_zmq')
+class TestRPCServerListener(object):
-LOG = logging.getLogger(__name__)
+ def __init__(self, driver):
+ self.driver = driver
+ self.target = None
+ self.listener = None
+ self.executor = zmq_async.get_executor(self._run)
+ self._stop = threading.Event()
+ self._received = threading.Event()
+ self.message = None
+
+ def listen(self, target):
+ self.target = target
+ self.listener = self.driver.listen(self.target)
+ self.executor.execute()
+
+ def _run(self):
+ try:
+ message = self.listener.poll()
+ if message is not None:
+ self._received.set()
+ self.message = message
+ message.reply(reply=True)
+ except Exception:
+ LOG.exception(_("Unexpected exception occurred."))
+
+ def stop(self):
+ self.executor.stop()
def get_unused_port():
@@ -56,7 +79,7 @@ class ZmqBaseTestCase(test_utils.BaseTestCase):
super(ZmqBaseTestCase, self).setUp()
self.messaging_conf.transport_driver = 'zmq'
# Get driver
- transport = messaging.get_transport(self.conf)
+ transport = oslo_messaging.get_transport(self.conf)
self.driver = transport._driver
# Set config values
@@ -70,10 +93,11 @@ class ZmqBaseTestCase(test_utils.BaseTestCase):
# Start RPC
LOG.info("Running internal zmq receiver.")
- self.reactor = impl_zmq.ZmqProxy(self.conf)
- self.reactor.consume_in_thread()
+ self.broker = ZmqBroker(self.conf)
+ self.broker.start()
+
+ self.listener = TestRPCServerListener(self.driver)
- self.matchmaker = impl_zmq._get_matchmaker(host='127.0.0.1')
self.addCleanup(stopRpc(self.__dict__))
@@ -85,7 +109,7 @@ class TestConfZmqDriverLoad(test_utils.BaseTestCase):
self.messaging_conf.transport_driver = 'zmq'
def test_driver_load(self):
- transport = messaging.get_transport(self.conf)
+ transport = oslo_messaging.get_transport(self.conf)
self.assertIsInstance(transport._driver, impl_zmq.ZmqDriver)
@@ -94,347 +118,127 @@ class stopRpc(object):
self.attrs = attrs
def __call__(self):
- if self.attrs['reactor']:
- self.attrs['reactor'].close()
+ if self.attrs['broker']:
+ self.attrs['broker'].close()
if self.attrs['driver']:
self.attrs['driver'].cleanup()
+ if self.attrs['listener']:
+ self.attrs['listener'].stop()
class TestZmqBasics(ZmqBaseTestCase):
- def test_start_stop_listener(self):
- target = messaging.Target(topic='testtopic')
- listener = self.driver.listen(target)
- result = listener.poll(0.01)
- self.assertEqual(result, None)
-
def test_send_receive_raises(self):
"""Call() without method."""
- target = messaging.Target(topic='testtopic')
- self.driver.listen(target)
+ target = oslo_messaging.Target(topic='testtopic')
+ self.listener.listen(target)
self.assertRaises(
KeyError,
self.driver.send,
target, {}, {'tx_id': 1}, wait_for_reply=True)
- @mock.patch('oslo_messaging._drivers.impl_zmq.ZmqIncomingMessage')
- def test_send_receive_topic(self, mock_msg):
- """Call() with method."""
- mock_msg.return_value = msg = mock.MagicMock()
- msg.received = received = mock.MagicMock()
- received.failure = False
- received.reply = True
- msg.condition = condition = mock.MagicMock()
- condition.wait.return_value = True
-
- target = messaging.Target(topic='testtopic')
- self.driver.listen(target)
+ def test_send_receive_topic(self):
+ """Call() with topic."""
+
+ target = oslo_messaging.Target(topic='testtopic')
+ self.listener.listen(target)
result = self.driver.send(
target, {},
{'method': 'hello-world', 'tx_id': 1},
wait_for_reply=True)
- self.assertEqual(result, True)
+ self.assertIsNotNone(result)
- @mock.patch('oslo_messaging._drivers.impl_zmq._call', autospec=True)
- def test_send_receive_fanout(self, mock_call):
- target = messaging.Target(topic='testtopic', fanout=True)
- self.driver.listen(target)
-
- mock_call.__name__ = '_call'
- mock_call.return_value = [True]
+ def test_send_noreply(self):
+ """Cast() with topic."""
+ target = oslo_messaging.Target(topic='testtopic', server="127.0.0.1")
+ self.listener.listen(target)
result = self.driver.send(
target, {},
{'method': 'hello-world', 'tx_id': 1},
- wait_for_reply=True)
+ wait_for_reply=False)
- self.assertEqual(result, True)
- mock_call.assert_called_once_with(
- self.driver,
- 'tcp://127.0.0.1:%s' % self.conf['rpc_zmq_port'],
- {}, 'fanout~testtopic.127.0.0.1',
- {'tx_id': 1, 'method': 'hello-world'},
- None, False, [], True)
-
- @mock.patch('oslo_messaging._drivers.impl_zmq._call', autospec=True)
- def test_send_receive_direct(self, mock_call):
- # Also verifies fix for bug http://pad.lv/1301723
- target = messaging.Target(topic='testtopic', server='localhost')
- self.driver.listen(target)
+ self.listener._received.wait()
+
+ self.assertIsNone(result)
+ self.assertEqual(True, self.listener._received.isSet())
+ method = self.listener.message.message[u'method']
+ self.assertEqual(u'hello-world', method)
- mock_call.__name__ = '_call'
- mock_call.return_value = [True]
+ @testtools.skip("Not implemented feature")
+ def test_send_fanout(self):
+ target = oslo_messaging.Target(topic='testtopic', fanout=True)
+ self.driver.listen(target)
result = self.driver.send(
target, {},
{'method': 'hello-world', 'tx_id': 1},
- wait_for_reply=True)
+ wait_for_reply=False)
- self.assertEqual(result, True)
- mock_call.assert_called_once_with(
- self.driver,
- 'tcp://localhost:%s' % self.conf['rpc_zmq_port'],
- {}, 'testtopic.localhost',
- {'tx_id': 1, 'method': 'hello-world'},
- None, False, [], True)
+ self.assertIsNone(result)
+ self.assertEqual(True, self.listener._received.isSet())
+ msg_pattern = "{'method': 'hello-world', 'tx_id': 1}"
+ self.assertEqual(msg_pattern, self.listener.message)
+ def test_send_receive_direct(self):
+ """Call() without topic."""
-class TestZmqSocket(test_utils.BaseTestCase):
+ target = oslo_messaging.Target(server='127.0.0.1')
+ self.listener.listen(target)
+ message = {'method': 'hello-world', 'tx_id': 1}
+ context = {}
+ result = self.driver.send(target, context, message,
+ wait_for_reply=True)
+ self.assertTrue(result)
- @testtools.skipIf(zmq is None, "zmq not available")
- def setUp(self):
- super(TestZmqSocket, self).setUp()
- self.messaging_conf.transport_driver = 'zmq'
- # Get driver
- transport = messaging.get_transport(self.conf)
- self.driver = transport._driver
- @mock.patch('oslo_messaging._drivers.impl_zmq.ZmqSocket.subscribe')
- @mock.patch('oslo_messaging._drivers.impl_zmq.zmq.Context')
- def test_zmqsocket_init_type_pull(self, mock_context, mock_subscribe):
- mock_ctxt = mock.Mock()
- mock_context.return_value = mock_ctxt
- mock_sock = mock.Mock()
- mock_ctxt.socket = mock.Mock(return_value=mock_sock)
- mock_sock.connect = mock.Mock()
- mock_sock.bind = mock.Mock()
- addr = '127.0.0.1'
-
- sock = impl_zmq.ZmqSocket(addr, impl_zmq.zmq.PULL, bind=False,
- subscribe=None)
- self.assertTrue(sock.can_recv)
- self.assertFalse(sock.can_send)
- self.assertFalse(sock.can_sub)
- self.assertTrue(mock_sock.connect.called)
- self.assertFalse(mock_sock.bind.called)
-
- @mock.patch('oslo_messaging._drivers.impl_zmq.ZmqSocket.subscribe')
- @mock.patch('oslo_messaging._drivers.impl_zmq.zmq.Context')
- def test_zmqsocket_init_type_sub(self, mock_context, mock_subscribe):
- mock_ctxt = mock.Mock()
- mock_context.return_value = mock_ctxt
- mock_sock = mock.Mock()
- mock_ctxt.socket = mock.Mock(return_value=mock_sock)
- mock_sock.connect = mock.Mock()
- mock_sock.bind = mock.Mock()
- addr = '127.0.0.1'
-
- sock = impl_zmq.ZmqSocket(addr, impl_zmq.zmq.SUB, bind=False,
- subscribe=None)
- self.assertTrue(sock.can_recv)
- self.assertFalse(sock.can_send)
- self.assertTrue(sock.can_sub)
- self.assertTrue(mock_sock.connect.called)
- self.assertFalse(mock_sock.bind.called)
-
- @mock.patch('oslo_messaging._drivers.impl_zmq.ZmqSocket.subscribe')
- @mock.patch('oslo_messaging._drivers.impl_zmq.zmq.Context')
- def test_zmqsocket_init_type_push(self, mock_context, mock_subscribe):
- mock_ctxt = mock.Mock()
- mock_context.return_value = mock_ctxt
- mock_sock = mock.Mock()
- mock_ctxt.socket = mock.Mock(return_value=mock_sock)
- mock_sock.connect = mock.Mock()
- mock_sock.bind = mock.Mock()
- addr = '127.0.0.1'
-
- sock = impl_zmq.ZmqSocket(addr, impl_zmq.zmq.PUSH, bind=False,
- subscribe=None)
- self.assertFalse(sock.can_recv)
- self.assertTrue(sock.can_send)
- self.assertFalse(sock.can_sub)
- self.assertTrue(mock_sock.connect.called)
- self.assertFalse(mock_sock.bind.called)
-
- @mock.patch('oslo_messaging._drivers.impl_zmq.ZmqSocket.subscribe')
- @mock.patch('oslo_messaging._drivers.impl_zmq.zmq.Context')
- def test_zmqsocket_init_type_pub(self, mock_context, mock_subscribe):
- mock_ctxt = mock.Mock()
- mock_context.return_value = mock_ctxt
- mock_sock = mock.Mock()
- mock_ctxt.socket = mock.Mock(return_value=mock_sock)
- mock_sock.connect = mock.Mock()
- mock_sock.bind = mock.Mock()
- addr = '127.0.0.1'
-
- sock = impl_zmq.ZmqSocket(addr, impl_zmq.zmq.PUB, bind=False,
- subscribe=None)
- self.assertFalse(sock.can_recv)
- self.assertTrue(sock.can_send)
- self.assertFalse(sock.can_sub)
- self.assertTrue(mock_sock.connect.called)
- self.assertFalse(mock_sock.bind.called)
-
-
-class TestZmqIncomingMessage(test_utils.BaseTestCase):
+class TestPoller(test_utils.BaseTestCase):
- @testtools.skipIf(zmq is None, "zmq not available")
def setUp(self):
- super(TestZmqIncomingMessage, self).setUp()
- self.messaging_conf.transport_driver = 'zmq'
- # Get driver
- transport = messaging.get_transport(self.conf)
- self.driver = transport._driver
+ super(TestPoller, self).setUp()
+ self.poller = zmq_async.get_poller()
+ self.ctx = zmq.Context()
+ self.ADDR_REQ = "ipc://request1"
+
+ def test_poll_blocking(self):
+
+ rep = self.ctx.socket(zmq.REP)
+ rep.bind(self.ADDR_REQ)
+
+ reply_poller = zmq_async.get_reply_poller()
+ reply_poller.register(rep)
+
+ def listener():
+ incoming, socket = reply_poller.poll()
+ self.assertEqual(b'Hello', incoming[0])
+ socket.send_string('Reply')
+ reply_poller.resume_polling(socket)
+
+ executor = zmq_async.get_executor(listener)
+ executor.execute()
+
+ req1 = self.ctx.socket(zmq.REQ)
+ req1.connect(self.ADDR_REQ)
+
+ req2 = self.ctx.socket(zmq.REQ)
+ req2.connect(self.ADDR_REQ)
+
+ req1.send_string('Hello')
+ req2.send_string('Hello')
+
+ reply = req1.recv_string()
+ self.assertEqual('Reply', reply)
+
+ reply = req2.recv_string()
+ self.assertEqual('Reply', reply)
+
+ def test_poll_timeout(self):
+ rep = self.ctx.socket(zmq.REP)
+ rep.bind(self.ADDR_REQ)
+
+ reply_poller = zmq_async.get_reply_poller()
+ reply_poller.register(rep)
- def test_zmqincomingmessage(self):
- msg = impl_zmq.ZmqIncomingMessage(mock.Mock(), None, 'msg.foo')
- msg.reply("abc")
- self.assertIsInstance(
- msg.received, impl_zmq.ZmqIncomingMessage.ReceivedReply)
- self.assertIsInstance(
- msg.received, impl_zmq.ZmqIncomingMessage.ReceivedReply)
- self.assertEqual(msg.received.reply, "abc")
- msg.requeue()
-
-
-class TestZmqConnection(ZmqBaseTestCase):
-
- @mock.patch('oslo_messaging._drivers.impl_zmq.ZmqReactor', autospec=True)
- def test_zmqconnection_create_consumer(self, mock_reactor):
-
- mock_reactor.register = mock.Mock()
- conn = impl_zmq.Connection(self.driver.conf, self.driver)
- topic = 'topic.foo'
- context = mock.Mock()
- inaddr = ('ipc://%s/zmq_topic_topic.127.0.0.1' %
- (self.internal_ipc_dir))
- # No Fanout
- conn.create_consumer(topic, context)
- conn.reactor.register.assert_called_with(context, inaddr,
- impl_zmq.zmq.PULL,
- subscribe=None, in_bind=False)
-
- # Reset for next bunch of checks
- conn.reactor.register.reset_mock()
-
- # Fanout
- inaddr = ('ipc://%s/zmq_topic_fanout~topic' %
- (self.internal_ipc_dir))
- conn.create_consumer(topic, context, fanout='subscriber.foo')
- conn.reactor.register.assert_called_with(context, inaddr,
- impl_zmq.zmq.SUB,
- subscribe='subscriber.foo',
- in_bind=False)
-
- @mock.patch('oslo_messaging._drivers.impl_zmq.ZmqReactor', autospec=True)
- def test_zmqconnection_create_consumer_topic_exists(self, mock_reactor):
- mock_reactor.register = mock.Mock()
- conn = impl_zmq.Connection(self.driver.conf, self.driver)
- topic = 'topic.foo'
- context = mock.Mock()
- inaddr = ('ipc://%s/zmq_topic_topic.127.0.0.1' %
- (self.internal_ipc_dir))
-
- conn.create_consumer(topic, context)
- conn.reactor.register.assert_called_with(
- context, inaddr, impl_zmq.zmq.PULL, subscribe=None, in_bind=False)
- conn.reactor.register.reset_mock()
- # Call again with same topic
- conn.create_consumer(topic, context)
- self.assertFalse(conn.reactor.register.called)
-
- @mock.patch('oslo_messaging._drivers.impl_zmq._get_matchmaker',
- autospec=True)
- @mock.patch('oslo_messaging._drivers.impl_zmq.ZmqReactor', autospec=True)
- def test_zmqconnection_close(self, mock_reactor, mock_getmatchmaker):
- conn = impl_zmq.Connection(self.driver.conf, self.driver)
- conn.reactor.close = mock.Mock()
- mock_getmatchmaker.return_value.stop_heartbeat = mock.Mock()
- conn.close()
- self.assertTrue(mock_getmatchmaker.return_value.stop_heartbeat.called)
- self.assertTrue(conn.reactor.close.called)
-
- @mock.patch('oslo_messaging._drivers.impl_zmq.ZmqReactor', autospec=True)
- def test_zmqconnection_wait(self, mock_reactor):
- conn = impl_zmq.Connection(self.driver.conf, self.driver)
- conn.reactor.wait = mock.Mock()
- conn.wait()
- self.assertTrue(conn.reactor.wait.called)
-
- @mock.patch('oslo_messaging._drivers.impl_zmq._get_matchmaker',
- autospec=True)
- @mock.patch('oslo_messaging._drivers.impl_zmq.ZmqReactor', autospec=True)
- def test_zmqconnection_consume_in_thread(self, mock_reactor,
- mock_getmatchmaker):
- mock_getmatchmaker.return_value.start_heartbeat = mock.Mock()
- conn = impl_zmq.Connection(self.driver.conf, self.driver)
- conn.reactor.consume_in_thread = mock.Mock()
- conn.consume_in_thread()
- self.assertTrue(mock_getmatchmaker.return_value.start_heartbeat.called)
- self.assertTrue(conn.reactor.consume_in_thread.called)
-
-
-class TestZmqListener(ZmqBaseTestCase):
-
- def test_zmqlistener_no_msg(self):
- listener = impl_zmq.ZmqListener(self.driver)
- # Timeout = 0 should return straight away since the queue is empty
- listener.poll(timeout=0)
-
- def test_zmqlistener_w_msg(self):
- listener = impl_zmq.ZmqListener(self.driver)
- kwargs = {'a': 1, 'b': 2}
- m = mock.Mock()
- ctxt = mock.Mock(autospec=impl_zmq.RpcContext)
- message = {'namespace': 'name.space', 'method': m.fake_method,
- 'args': kwargs}
- eventlet.spawn_n(listener.dispatch, ctxt, message)
- resp = listener.poll(timeout=10)
- msg = {'method': m.fake_method, 'namespace': 'name.space',
- 'args': kwargs}
- self.assertEqual(resp.message, msg)
-
-
-class TestZmqDriver(ZmqBaseTestCase):
-
- @mock.patch('oslo_messaging._drivers.impl_zmq._cast', autospec=True)
- @mock.patch('oslo_messaging._drivers.impl_zmq._multi_send', autospec=True)
- def test_zmqdriver_send(self, mock_multi_send, mock_cast):
- context = mock.Mock(autospec=impl_zmq.RpcContext)
- topic = 'testtopic'
- msg = 'jeronimo'
- self.driver.send(messaging.Target(topic=topic), context, msg,
- False, 0, False)
- mock_multi_send.assert_called_with(self.driver, mock_cast, context,
- topic, msg,
- allowed_remote_exmods=[],
- envelope=False, pooled=True)
-
- @mock.patch('oslo_messaging._drivers.impl_zmq._cast', autospec=True)
- @mock.patch('oslo_messaging._drivers.impl_zmq._multi_send', autospec=True)
- def test_zmqdriver_send_notification(self, mock_multi_send, mock_cast):
- context = mock.Mock(autospec=impl_zmq.RpcContext)
- topic = 'testtopic.foo'
- topic_reformat = 'testtopic-foo'
- msg = 'jeronimo'
- self.driver.send_notification(messaging.Target(topic=topic), context,
- msg, False, False)
- mock_multi_send.assert_called_with(self.driver, mock_cast, context,
- topic_reformat, msg,
- allowed_remote_exmods=[],
- envelope=False, pooled=True)
-
- @mock.patch('oslo_messaging._drivers.impl_zmq.ZmqListener', autospec=True)
- @mock.patch('oslo_messaging._drivers.impl_zmq.Connection', autospec=True)
- def test_zmqdriver_listen(self, mock_connection, mock_listener):
- mock_listener.return_value = listener = mock.Mock()
- mock_connection.return_value = conn = mock.Mock()
- conn.create_consumer = mock.Mock()
- conn.consume_in_thread = mock.Mock()
- topic = 'testtopic.foo'
- self.driver.listen(messaging.Target(topic=topic))
- conn.create_consumer.assert_called_with(topic, listener, fanout=True)
-
- @mock.patch('oslo_messaging._drivers.impl_zmq.ZmqListener', autospec=True)
- @mock.patch('oslo_messaging._drivers.impl_zmq.Connection', autospec=True)
- def test_zmqdriver_listen_for_notification(self, mock_connection,
- mock_listener):
- mock_listener.return_value = listener = mock.Mock()
- mock_connection.return_value = conn = mock.Mock()
- conn.create_consumer = mock.Mock()
- conn.consume_in_thread = mock.Mock()
- topic = 'testtopic.foo'
- data = [(messaging.Target(topic=topic), 0)]
- # NOTE(jamespage): Pooling not supported, just pass None for now.
- self.driver.listen_for_notifications(data, None)
- conn.create_consumer.assert_called_with("%s-%s" % (topic, 0), listener)
+ incoming, socket = reply_poller.poll(1)
+ self.assertIsNone(incoming)
+ self.assertIsNone(socket)