summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorOleksii Zamiatin <ozamiatin@mirantis.com>2015-07-02 18:11:42 +0300
committerOleksii Zamiatin <ozamiatin@mirantis.com>2015-07-09 22:50:44 +0300
commit7df65f2937597a259ddebaded9743d9957c77740 (patch)
tree5bceec7320901c04e65e631cd317fd72806bdda0
parent76f44879e166143bc557a74b7912a24ea197ad85 (diff)
downloadoslo-messaging-7df65f2937597a259ddebaded9743d9957c77740.tar.gz
Local Fanout implementation
Fanout unit-test passes now No matchmaker used yet (multi-host fanout wouldn't work) Change-Id: I9362adab4f7c7eba8120b51efe1b8c2056df3bbe
-rw-r--r--oslo_messaging/_drivers/zmq_driver/broker/zmq_base_proxy.py72
-rw-r--r--oslo_messaging/_drivers/zmq_driver/broker/zmq_broker.py5
-rw-r--r--oslo_messaging/_drivers/zmq_driver/broker/zmq_call_proxy.py26
-rw-r--r--oslo_messaging/_drivers/zmq_driver/broker/zmq_fanout_proxy.py35
-rw-r--r--oslo_messaging/_drivers/zmq_driver/broker/zmq_universal_proxy.py58
-rw-r--r--oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_call_request.py5
-rw-r--r--oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_cast_dealer.py6
-rw-r--r--oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_request.py7
-rw-r--r--oslo_messaging/_drivers/zmq_driver/rpc/server/zmq_call_responder.py34
-rw-r--r--oslo_messaging/_drivers/zmq_driver/rpc/server/zmq_fanout_consumer.py74
-rw-r--r--oslo_messaging/_drivers/zmq_driver/rpc/server/zmq_server.py21
-rw-r--r--oslo_messaging/_drivers/zmq_driver/zmq_serializer.py23
-rw-r--r--oslo_messaging/_drivers/zmq_driver/zmq_topic.py6
-rw-r--r--oslo_messaging/tests/drivers/test_impl_zmq.py9
-rw-r--r--tests/test_exception_serialization.py1
15 files changed, 318 insertions, 64 deletions
diff --git a/oslo_messaging/_drivers/zmq_driver/broker/zmq_base_proxy.py b/oslo_messaging/_drivers/zmq_driver/broker/zmq_base_proxy.py
index 9e11a08..d591d94 100644
--- a/oslo_messaging/_drivers/zmq_driver/broker/zmq_base_proxy.py
+++ b/oslo_messaging/_drivers/zmq_driver/broker/zmq_base_proxy.py
@@ -13,15 +13,30 @@
# under the License.
import abc
+import logging
import six
+from oslo_messaging._drivers.common import RPCException
from oslo_messaging._drivers.zmq_driver import zmq_async
+from oslo_messaging._drivers.zmq_driver import zmq_topic
+from oslo_messaging._i18n import _LE, _LI
+
+LOG = logging.getLogger(__name__)
+
+zmq = zmq_async.import_zmq()
@six.add_metaclass(abc.ABCMeta)
class BaseProxy(object):
+ """Base TCP-proxy.
+
+ TCP-proxy redirects messages received by TCP from clients to servers
+ over IPC. Consists of TCP-frontend and IPC-backend objects. Runs
+ in async executor.
+ """
+
def __init__(self, conf, context):
super(BaseProxy, self).__init__()
self.conf = conf
@@ -30,7 +45,7 @@ class BaseProxy(object):
@abc.abstractmethod
def run(self):
- "Main execution point of the proxy"
+ """Main execution point of the proxy"""
def start(self):
self.executor.execute()
@@ -45,10 +60,47 @@ class BaseProxy(object):
@six.add_metaclass(abc.ABCMeta)
class BaseTcpFrontend(object):
- def __init__(self, conf, poller, context):
+ """Base frontend clause.
+
+ TCP-frontend is a part of TCP-proxy which receives incoming
+ messages from clients.
+ """
+
+ def __init__(self, conf, poller, context,
+ socket_type=None,
+ port_number=None,
+ receive_meth=None):
+
+ """Construct a TCP-frontend.
+
+ Its attributes are:
+
+ :param conf: Driver configuration object.
+ :type conf: ConfigOpts
+ :param poller: Messages poller-object green or threading.
+ :type poller: ZmqPoller
+ :param context: ZeroMQ context object.
+ :type context: zmq.Context
+ :param socket_type: ZeroMQ socket type.
+ :type socket_type: int
+ :param port_number: Current messaging pipeline port.
+ :type port_number: int
+ """
+
self.conf = conf
self.poller = poller
self.context = context
+ try:
+ self.frontend = self.context.socket(socket_type)
+ bind_address = zmq_topic.get_tcp_bind_address(port_number)
+ LOG.info(_LI("Binding to TCP %s") % bind_address)
+ self.frontend.bind(bind_address)
+ self.poller.register(self.frontend, receive_meth)
+ except zmq.ZMQError as e:
+ errmsg = _LE("Could not create ZeroMQ receiver daemon. "
+ "Socket may already be in use: %s") % str(e)
+ LOG.error(errmsg)
+ raise RPCException(errmsg)
def receive_incoming(self):
message, socket = self.poller.poll(1)
@@ -64,6 +116,14 @@ class BaseBackendMatcher(object):
self.backends = {}
self.poller = poller
+ @abc.abstractmethod
+ def redirect_to_backend(self, message):
+ """Redirect message"""
+
+
+@six.add_metaclass(abc.ABCMeta)
+class DirectBackendMatcher(BaseBackendMatcher):
+
def redirect_to_backend(self, message):
backend, topic = self._match_backend(message)
self._send_message(backend, message, topic)
@@ -77,16 +137,16 @@ class BaseBackendMatcher(object):
@abc.abstractmethod
def _get_topic(self, message):
- "Extract topic from message"
+ """Extract topic from message"""
@abc.abstractmethod
def _get_ipc_address(self, topic):
- "Get ipc backend address from topic"
+ """Get ipc backend address from topic"""
@abc.abstractmethod
def _send_message(self, backend, message, topic):
- "Backend specific sending logic"
+ """Backend specific sending logic"""
@abc.abstractmethod
def _create_backend(self, ipc_address):
- "Backend specific socket opening logic"
+ """Backend specific socket opening logic"""
diff --git a/oslo_messaging/_drivers/zmq_driver/broker/zmq_broker.py b/oslo_messaging/_drivers/zmq_driver/broker/zmq_broker.py
index a0d3f4f..08c5d7f 100644
--- a/oslo_messaging/_drivers/zmq_driver/broker/zmq_broker.py
+++ b/oslo_messaging/_drivers/zmq_driver/broker/zmq_broker.py
@@ -17,7 +17,7 @@ import os
from oslo_utils import excutils
-from oslo_messaging._drivers.zmq_driver.broker.zmq_call_proxy import CallProxy
+from oslo_messaging._drivers.zmq_driver.broker import zmq_universal_proxy
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._i18n import _LE, _LI
@@ -44,7 +44,8 @@ class ZmqBroker(object):
super(ZmqBroker, self).__init__()
self.conf = conf
self.context = zmq.Context()
- self.proxies = [CallProxy(conf, self.context)]
+ proxy = zmq_universal_proxy.UniversalProxy(conf, self.context)
+ self.proxies = [proxy]
self._create_ipc_dirs()
def _create_ipc_dirs(self):
diff --git a/oslo_messaging/_drivers/zmq_driver/broker/zmq_call_proxy.py b/oslo_messaging/_drivers/zmq_driver/broker/zmq_call_proxy.py
index f4471b5..57c7d80 100644
--- a/oslo_messaging/_drivers/zmq_driver/broker/zmq_call_proxy.py
+++ b/oslo_messaging/_drivers/zmq_driver/broker/zmq_call_proxy.py
@@ -31,7 +31,7 @@ class CallProxy(base_proxy.BaseProxy):
def __init__(self, conf, context):
super(CallProxy, self).__init__(conf, context)
self.tcp_frontend = FrontendTcpRouter(self.conf, context)
- self.backend_matcher = CallBackendMatcher(self.conf, context)
+ self.backend_matcher = DealerBackend(self.conf, context)
LOG.info(_LI("Starting call proxy thread"))
def run(self):
@@ -44,12 +44,12 @@ class CallProxy(base_proxy.BaseProxy):
self.tcp_frontend.redirect_outgoing_reply(reply)
-class CallBackendMatcher(base_proxy.BaseBackendMatcher):
+class DealerBackend(base_proxy.DirectBackendMatcher):
def __init__(self, conf, context):
- super(CallBackendMatcher, self).__init__(conf,
- zmq_async.get_poller(),
- context)
+ super(DealerBackend, self).__init__(conf,
+ zmq_async.get_poller(),
+ context)
self.backend = self.context.socket(zmq.DEALER)
self.poller.register(self.backend)
@@ -80,19 +80,9 @@ class FrontendTcpRouter(base_proxy.BaseTcpFrontend):
def __init__(self, conf, context):
super(FrontendTcpRouter, self).__init__(conf,
zmq_async.get_poller(),
- context)
-
- try:
- self.frontend = self.context.socket(zmq.ROUTER)
- bind_address = zmq_topic.get_tcp_bind_address(conf.rpc_zmq_port)
- LOG.info(_LI("Binding to TCP ROUTER %s") % bind_address)
- self.frontend.bind(bind_address)
- self.poller.register(self.frontend)
- except zmq.ZMQError:
- errmsg = _LE("Could not create ZeroMQ receiver daemon. "
- "Socket may already be in use.")
- LOG.error(errmsg)
- raise RPCException(errmsg)
+ context,
+ socket_type=zmq.ROUTER,
+ port_number=conf.rpc_zmq_port)
@staticmethod
def _reduce_empty(reply):
diff --git a/oslo_messaging/_drivers/zmq_driver/broker/zmq_fanout_proxy.py b/oslo_messaging/_drivers/zmq_driver/broker/zmq_fanout_proxy.py
new file mode 100644
index 0000000..1311016
--- /dev/null
+++ b/oslo_messaging/_drivers/zmq_driver/broker/zmq_fanout_proxy.py
@@ -0,0 +1,35 @@
+# Copyright 2015 Mirantis, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+
+import oslo_messaging._drivers.zmq_driver.broker.zmq_base_proxy as base_proxy
+from oslo_messaging._drivers.zmq_driver import zmq_async
+from oslo_messaging._drivers.zmq_driver import zmq_serializer
+from oslo_messaging._drivers.zmq_driver import zmq_topic
+
+zmq = zmq_async.import_zmq()
+
+
+class PublisherBackend(base_proxy.BaseBackendMatcher):
+
+ def __init__(self, conf, context):
+ super(PublisherBackend, self).__init__(conf,
+ zmq_async.get_poller(),
+ context)
+ self.backend = self.context.socket(zmq.PUB)
+ self.backend.bind(zmq_topic.get_ipc_address_fanout(conf))
+
+ def redirect_to_backend(self, message):
+ topic_pos = zmq_serializer.MESSAGE_CALL_TOPIC_POSITION
+ self.backend.send_multipart(message[topic_pos:])
diff --git a/oslo_messaging/_drivers/zmq_driver/broker/zmq_universal_proxy.py b/oslo_messaging/_drivers/zmq_driver/broker/zmq_universal_proxy.py
new file mode 100644
index 0000000..1d8982d
--- /dev/null
+++ b/oslo_messaging/_drivers/zmq_driver/broker/zmq_universal_proxy.py
@@ -0,0 +1,58 @@
+# Copyright 2015 Mirantis, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import logging
+
+import oslo_messaging._drivers.zmq_driver.broker.zmq_base_proxy as base_proxy
+from oslo_messaging._drivers.zmq_driver.broker import zmq_call_proxy
+from oslo_messaging._drivers.zmq_driver.broker import zmq_fanout_proxy
+from oslo_messaging._drivers.zmq_driver import zmq_serializer
+from oslo_messaging._i18n import _LI
+
+LOG = logging.getLogger(__name__)
+
+
+class UniversalProxy(base_proxy.BaseProxy):
+
+ def __init__(self, conf, context):
+ super(UniversalProxy, self).__init__(conf, context)
+ self.tcp_frontend = zmq_call_proxy.FrontendTcpRouter(conf, context)
+ self.backend_matcher = BackendMatcher(conf, context)
+ call = zmq_serializer.CALL_TYPE
+ self.call_backend = self.backend_matcher.backends[call]
+ LOG.info(_LI("Starting universal-proxy thread"))
+
+ def run(self):
+ message = self.tcp_frontend.receive_incoming()
+ if message is not None:
+ self.backend_matcher.redirect_to_backend(message)
+
+ reply, socket = self.call_backend.receive_outgoing_reply()
+ if reply is not None:
+ self.tcp_frontend.redirect_outgoing_reply(reply)
+
+
+class BackendMatcher(base_proxy.BaseBackendMatcher):
+
+ def __init__(self, conf, context):
+ super(BackendMatcher, self).__init__(conf, None, context)
+ direct_backend = zmq_call_proxy.DealerBackend(conf, context)
+ self.backends[zmq_serializer.CALL_TYPE] = direct_backend
+ self.backends[zmq_serializer.CAST_TYPE] = direct_backend
+ fanout_backend = zmq_fanout_proxy.PublisherBackend(conf, context)
+ self.backends[zmq_serializer.FANOUT_TYPE] = fanout_backend
+
+ def redirect_to_backend(self, message):
+ message_type = zmq_serializer.get_msg_type(message)
+ self.backends[message_type].redirect_to_backend(message)
diff --git a/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_call_request.py b/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_call_request.py
index fb20efd..682b46f 100644
--- a/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_call_request.py
+++ b/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_call_request.py
@@ -16,6 +16,7 @@ import logging
from oslo_messaging._drivers.zmq_driver.rpc.client.zmq_request import Request
from oslo_messaging._drivers.zmq_driver import zmq_async
+from oslo_messaging._drivers.zmq_driver import zmq_serializer
from oslo_messaging._drivers.zmq_driver import zmq_topic
from oslo_messaging._i18n import _LE, _LI
@@ -33,7 +34,9 @@ class CallRequest(Request):
socket = self.zmq_context.socket(zmq.REQ)
super(CallRequest, self).__init__(conf, target, context,
- message, socket, timeout, retry)
+ message, socket,
+ zmq_serializer.CALL_TYPE,
+ timeout, retry)
self.connect_address = zmq_topic.get_tcp_address_call(conf,
self.topic)
diff --git a/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_cast_dealer.py b/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_cast_dealer.py
index 40fddd9..30a117c 100644
--- a/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_cast_dealer.py
+++ b/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_cast_dealer.py
@@ -17,6 +17,7 @@ import logging
from oslo_messaging._drivers.zmq_driver.rpc.client import zmq_cast_publisher
from oslo_messaging._drivers.zmq_driver.rpc.client.zmq_request import Request
from oslo_messaging._drivers.zmq_driver import zmq_async
+from oslo_messaging._drivers.zmq_driver import zmq_serializer
from oslo_messaging._drivers.zmq_driver import zmq_topic
from oslo_messaging._i18n import _LE, _LI
@@ -30,8 +31,11 @@ class CastRequest(Request):
def __init__(self, conf, target, context,
message, socket, address, timeout=None, retry=None):
self.connect_address = address
+ fanout_type = zmq_serializer.FANOUT_TYPE
+ cast_type = zmq_serializer.CAST_TYPE
+ msg_type = fanout_type if target.fanout else cast_type
super(CastRequest, self).__init__(conf, target, context, message,
- socket, timeout, retry)
+ socket, msg_type, timeout, retry)
def __call__(self, *args, **kwargs):
self.send_request()
diff --git a/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_request.py b/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_request.py
index 2bfe755..9575bdc 100644
--- a/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_request.py
+++ b/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_request.py
@@ -20,6 +20,7 @@ import uuid
import six
from oslo_messaging._drivers.zmq_driver import zmq_async
+from oslo_messaging._drivers.zmq_driver import zmq_serializer
from oslo_messaging._drivers.zmq_driver import zmq_topic
from oslo_messaging._i18n import _LE
@@ -32,7 +33,9 @@ zmq = zmq_async.import_zmq()
class Request(object):
def __init__(self, conf, target, context, message,
- socket, timeout=None, retry=None):
+ socket, msg_type, timeout=None, retry=None):
+
+ assert msg_type in zmq_serializer.MESSAGE_TYPES, "Unknown msg type!"
if message['method'] is None:
errmsg = _LE("No method specified for RPC call")
@@ -40,6 +43,7 @@ class Request(object):
raise KeyError(errmsg)
self.msg_id = uuid.uuid4().hex
+ self.msg_type = msg_type
self.target = target
self.context = context
self.message = message
@@ -62,6 +66,7 @@ class Request(object):
return False
def send_request(self):
+ self.socket.send_string(self.msg_type, zmq.SNDMORE)
self.socket.send_string(str(self.topic), zmq.SNDMORE)
self.socket.send_string(self.msg_id, zmq.SNDMORE)
self.socket.send_json(self.context, zmq.SNDMORE)
diff --git a/oslo_messaging/_drivers/zmq_driver/rpc/server/zmq_call_responder.py b/oslo_messaging/_drivers/zmq_driver/rpc/server/zmq_call_responder.py
index 959ffd7..9431b8f 100644
--- a/oslo_messaging/_drivers/zmq_driver/rpc/server/zmq_call_responder.py
+++ b/oslo_messaging/_drivers/zmq_driver/rpc/server/zmq_call_responder.py
@@ -56,13 +56,19 @@ class ZmqIncomingRequest(base.IncomingMessage):
class CallResponder(zmq_base_consumer.ConsumerBase):
- def __init__(self, listener, conf, poller, context):
- super(CallResponder, self).__init__(listener, conf, poller, context)
-
- def poll(self, timeout=None):
+ def _receive_message(self, socket):
try:
- incoming, socket = self.poller.poll(timeout)
- reply_id, context, message = incoming
+ reply_id = socket.recv()
+ empty = socket.recv()
+ assert empty == b'', 'Bad format: empty separator expected'
+ msg_type = socket.recv_string()
+ assert msg_type is not None, 'Bad format: msg type expected'
+ topic = socket.recv_string()
+ assert topic is not None, 'Bad format: topic string expected'
+ msg_id = socket.recv_string()
+ assert msg_id is not None, 'Bad format: message ID expected'
+ context = socket.recv_json()
+ message = socket.recv_json()
LOG.debug("[Server] REP Received message %s" % str(message))
incoming = ZmqIncomingRequest(self.listener,
context,
@@ -70,27 +76,13 @@ class CallResponder(zmq_base_consumer.ConsumerBase):
reply_id,
self.poller)
return incoming
-
except zmq.ZMQError as e:
LOG.error(_LE("Receiving message failed ... {}"), e)
def listen(self, target):
-
- def _receive_message(socket):
- reply_id = socket.recv()
- empty = socket.recv()
- assert empty == b'', 'Bad format: empty separator expected'
- topic = socket.recv_string()
- assert topic is not None, 'Bad format: topic string expected'
- msg_id = socket.recv_string()
- assert msg_id is not None, 'Bad format: message ID expected'
- context = socket.recv_json()
- message = socket.recv_json()
- return (reply_id, context, message)
-
topic = topic_utils.Topic.from_target(self.conf, target)
ipc_rep_address = topic_utils.get_ipc_address_call(self.conf, topic)
rep_socket = self.context.socket(zmq.REP)
rep_socket.bind(ipc_rep_address)
self.sockets_per_topic[str(topic)] = rep_socket
- self.poller.register(rep_socket, _receive_message)
+ self.poller.register(rep_socket, self._receive_message)
diff --git a/oslo_messaging/_drivers/zmq_driver/rpc/server/zmq_fanout_consumer.py b/oslo_messaging/_drivers/zmq_driver/rpc/server/zmq_fanout_consumer.py
new file mode 100644
index 0000000..3ca78cf
--- /dev/null
+++ b/oslo_messaging/_drivers/zmq_driver/rpc/server/zmq_fanout_consumer.py
@@ -0,0 +1,74 @@
+# Copyright 2015 Mirantis, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+
+import logging
+
+import six
+
+from oslo_messaging._drivers import base
+from oslo_messaging._drivers.zmq_driver.rpc.server import zmq_base_consumer
+from oslo_messaging._drivers.zmq_driver import zmq_async
+from oslo_messaging._drivers.zmq_driver import zmq_topic as topic_utils
+from oslo_messaging._i18n import _LE
+
+
+LOG = logging.getLogger(__name__)
+
+zmq = zmq_async.import_zmq()
+
+
+class ZmqFanoutMessage(base.IncomingMessage):
+
+ def __init__(self, listener, context, message, socket, poller):
+ super(ZmqFanoutMessage, self).__init__(listener, context, message)
+ poller.resume_polling(socket)
+
+ def reply(self, reply=None, failure=None, log_failure=True):
+ """Reply is not needed for fanout(cast) messages"""
+
+ def acknowledge(self):
+ pass
+
+ def requeue(self):
+ pass
+
+
+class FanoutConsumer(zmq_base_consumer.ConsumerBase):
+
+ def _receive_message(self, socket):
+ try:
+ topic = socket.recv_string()
+ assert topic is not None, 'Bad format: Topic is expected'
+ msg_id = socket.recv_string()
+ assert msg_id is not None, 'Bad format: message ID expected'
+ context = socket.recv_json()
+ message = socket.recv_json()
+ LOG.debug("[Server] REP Received message %s" % str(message))
+ incoming = ZmqFanoutMessage(self.listener, context, message,
+ socket, self.poller)
+ return incoming
+ except zmq.ZMQError as e:
+ LOG.error(_LE("Receiving message failed ... {}"), e)
+
+ def listen(self, target):
+ topic = topic_utils.Topic.from_target(self.conf, target)
+ ipc_address = topic_utils.get_ipc_address_fanout(self.conf)
+ sub_socket = self.context.socket(zmq.SUB)
+ sub_socket.connect(ipc_address)
+ if six.PY3:
+ sub_socket.setsockopt_string(zmq.SUBSCRIBE, str(topic))
+ else:
+ sub_socket.setsockopt(zmq.SUBSCRIBE, str(topic))
+ self.poller.register(sub_socket, self._receive_message)
diff --git a/oslo_messaging/_drivers/zmq_driver/rpc/server/zmq_server.py b/oslo_messaging/_drivers/zmq_driver/rpc/server/zmq_server.py
index e6f67ab..a5540ec 100644
--- a/oslo_messaging/_drivers/zmq_driver/rpc/server/zmq_server.py
+++ b/oslo_messaging/_drivers/zmq_driver/rpc/server/zmq_server.py
@@ -16,6 +16,7 @@ import logging
from oslo_messaging._drivers import base
from oslo_messaging._drivers.zmq_driver.rpc.server import zmq_call_responder
+from oslo_messaging._drivers.zmq_driver.rpc.server import zmq_fanout_consumer
from oslo_messaging._drivers.zmq_driver import zmq_async
LOG = logging.getLogger(__name__)
@@ -29,14 +30,17 @@ class ZmqServer(base.Listener):
LOG.info("[Server] __init__")
self.conf = conf
self.context = zmq.Context()
- poller = zmq_async.get_reply_poller()
- self.call_responder = zmq_call_responder.CallResponder(self, conf,
- poller,
- self.context)
+ self.poller = zmq_async.get_reply_poller()
+ self.call_resp = zmq_call_responder.CallResponder(self, conf,
+ self.poller,
+ self.context)
+ self.fanout_resp = zmq_fanout_consumer.FanoutConsumer(self, conf,
+ self.poller,
+ self.context)
def poll(self, timeout=None):
- incoming = self.call_responder.poll(timeout)
- return incoming
+ incoming = self.poller.poll(timeout)
+ return incoming[0]
def stop(self):
LOG.info("[Server] Stop")
@@ -46,4 +50,7 @@ class ZmqServer(base.Listener):
def listen(self, target):
LOG.info("[Server] Listen to Target %s" % target)
- self.call_responder.listen(target)
+ if target.fanout:
+ self.fanout_resp.listen(target)
+ else:
+ self.call_resp.listen(target)
diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_serializer.py b/oslo_messaging/_drivers/zmq_driver/zmq_serializer.py
index 0f0733a..81259be 100644
--- a/oslo_messaging/_drivers/zmq_driver/zmq_serializer.py
+++ b/oslo_messaging/_drivers/zmq_driver/zmq_serializer.py
@@ -23,7 +23,26 @@ from oslo_messaging._i18n import _LE, _LW
LOG = logging.getLogger(__name__)
-MESSAGE_CALL_TOPIC_POSITION = 2
+MESSAGE_CALL_TYPE_POSITION = 2
+MESSAGE_CALL_TOPIC_POSITION = 3
+
+CALL_TYPE = 'call'
+CAST_TYPE = 'cast'
+FANOUT_TYPE = 'fanout'
+NOTIFY_TYPE = 'notify'
+
+MESSAGE_TYPES = (CALL_TYPE, CAST_TYPE, FANOUT_TYPE, NOTIFY_TYPE)
+
+
+def get_msg_type(message):
+ type = message[MESSAGE_CALL_TYPE_POSITION]
+ if six.PY3:
+ type = type.decode('utf-8')
+ if type not in MESSAGE_TYPES:
+ errmsg = _LE("Unknown message type: %s") % str(type)
+ LOG.error(errmsg)
+ rpc_common.RPCException(errmsg)
+ return type
def _get_topic_from_msg(message, position):
@@ -46,7 +65,7 @@ def _get_topic_from_msg(message, position):
except Exception as e:
errmsg = _LE("Failed topic string parsing, %s") % str(e)
LOG.error(errmsg)
- rpc_common.RPCException(errmsg)
+ raise rpc_common.RPCException(errmsg)
return topic_items[0], topic_items[1]
diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_topic.py b/oslo_messaging/_drivers/zmq_driver/zmq_topic.py
index c338b69..332c819 100644
--- a/oslo_messaging/_drivers/zmq_driver/zmq_topic.py
+++ b/oslo_messaging/_drivers/zmq_driver/zmq_topic.py
@@ -29,6 +29,10 @@ def get_ipc_address_cast(conf, topic):
return "ipc://%s/fanout/%s" % (conf.rpc_zmq_ipc_dir, str(topic))
+def get_ipc_address_fanout(conf):
+ return "ipc://%s/fanout_general" % conf.rpc_zmq_ipc_dir
+
+
class Topic(object):
def __init__(self, conf, topic, server=None, fanout=False):
@@ -58,4 +62,4 @@ class Topic(object):
return self._topic if self._topic else ""
def __str__(self, *args, **kwargs):
- return "%s.%s" % (self.topic, self.server)
+ return u"%s.%s" % (self.topic, self.server)
diff --git a/oslo_messaging/tests/drivers/test_impl_zmq.py b/oslo_messaging/tests/drivers/test_impl_zmq.py
index a6eef2f..79b797c 100644
--- a/oslo_messaging/tests/drivers/test_impl_zmq.py
+++ b/oslo_messaging/tests/drivers/test_impl_zmq.py
@@ -165,20 +165,21 @@ class TestZmqBasics(ZmqBaseTestCase):
method = self.listener.message.message[u'method']
self.assertEqual(u'hello-world', method)
- @testtools.skip("Not implemented feature")
def test_send_fanout(self):
target = oslo_messaging.Target(topic='testtopic', fanout=True)
- self.driver.listen(target)
+ self.listener.listen(target)
result = self.driver.send(
target, {},
{'method': 'hello-world', 'tx_id': 1},
wait_for_reply=False)
+ self.listener._received.wait()
+
self.assertIsNone(result)
self.assertEqual(True, self.listener._received.isSet())
- msg_pattern = "{'method': 'hello-world', 'tx_id': 1}"
- self.assertEqual(msg_pattern, self.listener.message)
+ method = self.listener.message.message[u'method']
+ self.assertEqual(u'hello-world', method)
def test_send_receive_direct(self):
"""Call() without topic."""
diff --git a/tests/test_exception_serialization.py b/tests/test_exception_serialization.py
index 17e8ff1..baa2b79 100644
--- a/tests/test_exception_serialization.py
+++ b/tests/test_exception_serialization.py
@@ -19,6 +19,7 @@ import six
import testscenarios
from oslo import messaging
+
from oslo_messaging._drivers import common as exceptions
from oslo_messaging.tests import utils as test_utils
from oslo_serialization import jsonutils