diff options
author | Oleksii Zamiatin <ozamiatin@mirantis.com> | 2015-07-02 18:11:42 +0300 |
---|---|---|
committer | Oleksii Zamiatin <ozamiatin@mirantis.com> | 2015-07-09 22:50:44 +0300 |
commit | 7df65f2937597a259ddebaded9743d9957c77740 (patch) | |
tree | 5bceec7320901c04e65e631cd317fd72806bdda0 | |
parent | 76f44879e166143bc557a74b7912a24ea197ad85 (diff) | |
download | oslo-messaging-7df65f2937597a259ddebaded9743d9957c77740.tar.gz |
Local Fanout implementation
Fanout unit-test passes now
No matchmaker used yet (multi-host fanout wouldn't work)
Change-Id: I9362adab4f7c7eba8120b51efe1b8c2056df3bbe
15 files changed, 318 insertions, 64 deletions
diff --git a/oslo_messaging/_drivers/zmq_driver/broker/zmq_base_proxy.py b/oslo_messaging/_drivers/zmq_driver/broker/zmq_base_proxy.py index 9e11a08..d591d94 100644 --- a/oslo_messaging/_drivers/zmq_driver/broker/zmq_base_proxy.py +++ b/oslo_messaging/_drivers/zmq_driver/broker/zmq_base_proxy.py @@ -13,15 +13,30 @@ # under the License. import abc +import logging import six +from oslo_messaging._drivers.common import RPCException from oslo_messaging._drivers.zmq_driver import zmq_async +from oslo_messaging._drivers.zmq_driver import zmq_topic +from oslo_messaging._i18n import _LE, _LI + +LOG = logging.getLogger(__name__) + +zmq = zmq_async.import_zmq() @six.add_metaclass(abc.ABCMeta) class BaseProxy(object): + """Base TCP-proxy. + + TCP-proxy redirects messages received by TCP from clients to servers + over IPC. Consists of TCP-frontend and IPC-backend objects. Runs + in async executor. + """ + def __init__(self, conf, context): super(BaseProxy, self).__init__() self.conf = conf @@ -30,7 +45,7 @@ class BaseProxy(object): @abc.abstractmethod def run(self): - "Main execution point of the proxy" + """Main execution point of the proxy""" def start(self): self.executor.execute() @@ -45,10 +60,47 @@ class BaseProxy(object): @six.add_metaclass(abc.ABCMeta) class BaseTcpFrontend(object): - def __init__(self, conf, poller, context): + """Base frontend clause. + + TCP-frontend is a part of TCP-proxy which receives incoming + messages from clients. + """ + + def __init__(self, conf, poller, context, + socket_type=None, + port_number=None, + receive_meth=None): + + """Construct a TCP-frontend. + + Its attributes are: + + :param conf: Driver configuration object. + :type conf: ConfigOpts + :param poller: Messages poller-object green or threading. + :type poller: ZmqPoller + :param context: ZeroMQ context object. + :type context: zmq.Context + :param socket_type: ZeroMQ socket type. + :type socket_type: int + :param port_number: Current messaging pipeline port. + :type port_number: int + """ + self.conf = conf self.poller = poller self.context = context + try: + self.frontend = self.context.socket(socket_type) + bind_address = zmq_topic.get_tcp_bind_address(port_number) + LOG.info(_LI("Binding to TCP %s") % bind_address) + self.frontend.bind(bind_address) + self.poller.register(self.frontend, receive_meth) + except zmq.ZMQError as e: + errmsg = _LE("Could not create ZeroMQ receiver daemon. " + "Socket may already be in use: %s") % str(e) + LOG.error(errmsg) + raise RPCException(errmsg) def receive_incoming(self): message, socket = self.poller.poll(1) @@ -64,6 +116,14 @@ class BaseBackendMatcher(object): self.backends = {} self.poller = poller + @abc.abstractmethod + def redirect_to_backend(self, message): + """Redirect message""" + + +@six.add_metaclass(abc.ABCMeta) +class DirectBackendMatcher(BaseBackendMatcher): + def redirect_to_backend(self, message): backend, topic = self._match_backend(message) self._send_message(backend, message, topic) @@ -77,16 +137,16 @@ class BaseBackendMatcher(object): @abc.abstractmethod def _get_topic(self, message): - "Extract topic from message" + """Extract topic from message""" @abc.abstractmethod def _get_ipc_address(self, topic): - "Get ipc backend address from topic" + """Get ipc backend address from topic""" @abc.abstractmethod def _send_message(self, backend, message, topic): - "Backend specific sending logic" + """Backend specific sending logic""" @abc.abstractmethod def _create_backend(self, ipc_address): - "Backend specific socket opening logic" + """Backend specific socket opening logic""" diff --git a/oslo_messaging/_drivers/zmq_driver/broker/zmq_broker.py b/oslo_messaging/_drivers/zmq_driver/broker/zmq_broker.py index a0d3f4f..08c5d7f 100644 --- a/oslo_messaging/_drivers/zmq_driver/broker/zmq_broker.py +++ b/oslo_messaging/_drivers/zmq_driver/broker/zmq_broker.py @@ -17,7 +17,7 @@ import os from oslo_utils import excutils -from oslo_messaging._drivers.zmq_driver.broker.zmq_call_proxy import CallProxy +from oslo_messaging._drivers.zmq_driver.broker import zmq_universal_proxy from oslo_messaging._drivers.zmq_driver import zmq_async from oslo_messaging._i18n import _LE, _LI @@ -44,7 +44,8 @@ class ZmqBroker(object): super(ZmqBroker, self).__init__() self.conf = conf self.context = zmq.Context() - self.proxies = [CallProxy(conf, self.context)] + proxy = zmq_universal_proxy.UniversalProxy(conf, self.context) + self.proxies = [proxy] self._create_ipc_dirs() def _create_ipc_dirs(self): diff --git a/oslo_messaging/_drivers/zmq_driver/broker/zmq_call_proxy.py b/oslo_messaging/_drivers/zmq_driver/broker/zmq_call_proxy.py index f4471b5..57c7d80 100644 --- a/oslo_messaging/_drivers/zmq_driver/broker/zmq_call_proxy.py +++ b/oslo_messaging/_drivers/zmq_driver/broker/zmq_call_proxy.py @@ -31,7 +31,7 @@ class CallProxy(base_proxy.BaseProxy): def __init__(self, conf, context): super(CallProxy, self).__init__(conf, context) self.tcp_frontend = FrontendTcpRouter(self.conf, context) - self.backend_matcher = CallBackendMatcher(self.conf, context) + self.backend_matcher = DealerBackend(self.conf, context) LOG.info(_LI("Starting call proxy thread")) def run(self): @@ -44,12 +44,12 @@ class CallProxy(base_proxy.BaseProxy): self.tcp_frontend.redirect_outgoing_reply(reply) -class CallBackendMatcher(base_proxy.BaseBackendMatcher): +class DealerBackend(base_proxy.DirectBackendMatcher): def __init__(self, conf, context): - super(CallBackendMatcher, self).__init__(conf, - zmq_async.get_poller(), - context) + super(DealerBackend, self).__init__(conf, + zmq_async.get_poller(), + context) self.backend = self.context.socket(zmq.DEALER) self.poller.register(self.backend) @@ -80,19 +80,9 @@ class FrontendTcpRouter(base_proxy.BaseTcpFrontend): def __init__(self, conf, context): super(FrontendTcpRouter, self).__init__(conf, zmq_async.get_poller(), - context) - - try: - self.frontend = self.context.socket(zmq.ROUTER) - bind_address = zmq_topic.get_tcp_bind_address(conf.rpc_zmq_port) - LOG.info(_LI("Binding to TCP ROUTER %s") % bind_address) - self.frontend.bind(bind_address) - self.poller.register(self.frontend) - except zmq.ZMQError: - errmsg = _LE("Could not create ZeroMQ receiver daemon. " - "Socket may already be in use.") - LOG.error(errmsg) - raise RPCException(errmsg) + context, + socket_type=zmq.ROUTER, + port_number=conf.rpc_zmq_port) @staticmethod def _reduce_empty(reply): diff --git a/oslo_messaging/_drivers/zmq_driver/broker/zmq_fanout_proxy.py b/oslo_messaging/_drivers/zmq_driver/broker/zmq_fanout_proxy.py new file mode 100644 index 0000000..1311016 --- /dev/null +++ b/oslo_messaging/_drivers/zmq_driver/broker/zmq_fanout_proxy.py @@ -0,0 +1,35 @@ +# Copyright 2015 Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +import oslo_messaging._drivers.zmq_driver.broker.zmq_base_proxy as base_proxy +from oslo_messaging._drivers.zmq_driver import zmq_async +from oslo_messaging._drivers.zmq_driver import zmq_serializer +from oslo_messaging._drivers.zmq_driver import zmq_topic + +zmq = zmq_async.import_zmq() + + +class PublisherBackend(base_proxy.BaseBackendMatcher): + + def __init__(self, conf, context): + super(PublisherBackend, self).__init__(conf, + zmq_async.get_poller(), + context) + self.backend = self.context.socket(zmq.PUB) + self.backend.bind(zmq_topic.get_ipc_address_fanout(conf)) + + def redirect_to_backend(self, message): + topic_pos = zmq_serializer.MESSAGE_CALL_TOPIC_POSITION + self.backend.send_multipart(message[topic_pos:]) diff --git a/oslo_messaging/_drivers/zmq_driver/broker/zmq_universal_proxy.py b/oslo_messaging/_drivers/zmq_driver/broker/zmq_universal_proxy.py new file mode 100644 index 0000000..1d8982d --- /dev/null +++ b/oslo_messaging/_drivers/zmq_driver/broker/zmq_universal_proxy.py @@ -0,0 +1,58 @@ +# Copyright 2015 Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import logging + +import oslo_messaging._drivers.zmq_driver.broker.zmq_base_proxy as base_proxy +from oslo_messaging._drivers.zmq_driver.broker import zmq_call_proxy +from oslo_messaging._drivers.zmq_driver.broker import zmq_fanout_proxy +from oslo_messaging._drivers.zmq_driver import zmq_serializer +from oslo_messaging._i18n import _LI + +LOG = logging.getLogger(__name__) + + +class UniversalProxy(base_proxy.BaseProxy): + + def __init__(self, conf, context): + super(UniversalProxy, self).__init__(conf, context) + self.tcp_frontend = zmq_call_proxy.FrontendTcpRouter(conf, context) + self.backend_matcher = BackendMatcher(conf, context) + call = zmq_serializer.CALL_TYPE + self.call_backend = self.backend_matcher.backends[call] + LOG.info(_LI("Starting universal-proxy thread")) + + def run(self): + message = self.tcp_frontend.receive_incoming() + if message is not None: + self.backend_matcher.redirect_to_backend(message) + + reply, socket = self.call_backend.receive_outgoing_reply() + if reply is not None: + self.tcp_frontend.redirect_outgoing_reply(reply) + + +class BackendMatcher(base_proxy.BaseBackendMatcher): + + def __init__(self, conf, context): + super(BackendMatcher, self).__init__(conf, None, context) + direct_backend = zmq_call_proxy.DealerBackend(conf, context) + self.backends[zmq_serializer.CALL_TYPE] = direct_backend + self.backends[zmq_serializer.CAST_TYPE] = direct_backend + fanout_backend = zmq_fanout_proxy.PublisherBackend(conf, context) + self.backends[zmq_serializer.FANOUT_TYPE] = fanout_backend + + def redirect_to_backend(self, message): + message_type = zmq_serializer.get_msg_type(message) + self.backends[message_type].redirect_to_backend(message) diff --git a/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_call_request.py b/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_call_request.py index fb20efd..682b46f 100644 --- a/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_call_request.py +++ b/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_call_request.py @@ -16,6 +16,7 @@ import logging from oslo_messaging._drivers.zmq_driver.rpc.client.zmq_request import Request from oslo_messaging._drivers.zmq_driver import zmq_async +from oslo_messaging._drivers.zmq_driver import zmq_serializer from oslo_messaging._drivers.zmq_driver import zmq_topic from oslo_messaging._i18n import _LE, _LI @@ -33,7 +34,9 @@ class CallRequest(Request): socket = self.zmq_context.socket(zmq.REQ) super(CallRequest, self).__init__(conf, target, context, - message, socket, timeout, retry) + message, socket, + zmq_serializer.CALL_TYPE, + timeout, retry) self.connect_address = zmq_topic.get_tcp_address_call(conf, self.topic) diff --git a/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_cast_dealer.py b/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_cast_dealer.py index 40fddd9..30a117c 100644 --- a/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_cast_dealer.py +++ b/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_cast_dealer.py @@ -17,6 +17,7 @@ import logging from oslo_messaging._drivers.zmq_driver.rpc.client import zmq_cast_publisher from oslo_messaging._drivers.zmq_driver.rpc.client.zmq_request import Request from oslo_messaging._drivers.zmq_driver import zmq_async +from oslo_messaging._drivers.zmq_driver import zmq_serializer from oslo_messaging._drivers.zmq_driver import zmq_topic from oslo_messaging._i18n import _LE, _LI @@ -30,8 +31,11 @@ class CastRequest(Request): def __init__(self, conf, target, context, message, socket, address, timeout=None, retry=None): self.connect_address = address + fanout_type = zmq_serializer.FANOUT_TYPE + cast_type = zmq_serializer.CAST_TYPE + msg_type = fanout_type if target.fanout else cast_type super(CastRequest, self).__init__(conf, target, context, message, - socket, timeout, retry) + socket, msg_type, timeout, retry) def __call__(self, *args, **kwargs): self.send_request() diff --git a/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_request.py b/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_request.py index 2bfe755..9575bdc 100644 --- a/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_request.py +++ b/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_request.py @@ -20,6 +20,7 @@ import uuid import six from oslo_messaging._drivers.zmq_driver import zmq_async +from oslo_messaging._drivers.zmq_driver import zmq_serializer from oslo_messaging._drivers.zmq_driver import zmq_topic from oslo_messaging._i18n import _LE @@ -32,7 +33,9 @@ zmq = zmq_async.import_zmq() class Request(object): def __init__(self, conf, target, context, message, - socket, timeout=None, retry=None): + socket, msg_type, timeout=None, retry=None): + + assert msg_type in zmq_serializer.MESSAGE_TYPES, "Unknown msg type!" if message['method'] is None: errmsg = _LE("No method specified for RPC call") @@ -40,6 +43,7 @@ class Request(object): raise KeyError(errmsg) self.msg_id = uuid.uuid4().hex + self.msg_type = msg_type self.target = target self.context = context self.message = message @@ -62,6 +66,7 @@ class Request(object): return False def send_request(self): + self.socket.send_string(self.msg_type, zmq.SNDMORE) self.socket.send_string(str(self.topic), zmq.SNDMORE) self.socket.send_string(self.msg_id, zmq.SNDMORE) self.socket.send_json(self.context, zmq.SNDMORE) diff --git a/oslo_messaging/_drivers/zmq_driver/rpc/server/zmq_call_responder.py b/oslo_messaging/_drivers/zmq_driver/rpc/server/zmq_call_responder.py index 959ffd7..9431b8f 100644 --- a/oslo_messaging/_drivers/zmq_driver/rpc/server/zmq_call_responder.py +++ b/oslo_messaging/_drivers/zmq_driver/rpc/server/zmq_call_responder.py @@ -56,13 +56,19 @@ class ZmqIncomingRequest(base.IncomingMessage): class CallResponder(zmq_base_consumer.ConsumerBase): - def __init__(self, listener, conf, poller, context): - super(CallResponder, self).__init__(listener, conf, poller, context) - - def poll(self, timeout=None): + def _receive_message(self, socket): try: - incoming, socket = self.poller.poll(timeout) - reply_id, context, message = incoming + reply_id = socket.recv() + empty = socket.recv() + assert empty == b'', 'Bad format: empty separator expected' + msg_type = socket.recv_string() + assert msg_type is not None, 'Bad format: msg type expected' + topic = socket.recv_string() + assert topic is not None, 'Bad format: topic string expected' + msg_id = socket.recv_string() + assert msg_id is not None, 'Bad format: message ID expected' + context = socket.recv_json() + message = socket.recv_json() LOG.debug("[Server] REP Received message %s" % str(message)) incoming = ZmqIncomingRequest(self.listener, context, @@ -70,27 +76,13 @@ class CallResponder(zmq_base_consumer.ConsumerBase): reply_id, self.poller) return incoming - except zmq.ZMQError as e: LOG.error(_LE("Receiving message failed ... {}"), e) def listen(self, target): - - def _receive_message(socket): - reply_id = socket.recv() - empty = socket.recv() - assert empty == b'', 'Bad format: empty separator expected' - topic = socket.recv_string() - assert topic is not None, 'Bad format: topic string expected' - msg_id = socket.recv_string() - assert msg_id is not None, 'Bad format: message ID expected' - context = socket.recv_json() - message = socket.recv_json() - return (reply_id, context, message) - topic = topic_utils.Topic.from_target(self.conf, target) ipc_rep_address = topic_utils.get_ipc_address_call(self.conf, topic) rep_socket = self.context.socket(zmq.REP) rep_socket.bind(ipc_rep_address) self.sockets_per_topic[str(topic)] = rep_socket - self.poller.register(rep_socket, _receive_message) + self.poller.register(rep_socket, self._receive_message) diff --git a/oslo_messaging/_drivers/zmq_driver/rpc/server/zmq_fanout_consumer.py b/oslo_messaging/_drivers/zmq_driver/rpc/server/zmq_fanout_consumer.py new file mode 100644 index 0000000..3ca78cf --- /dev/null +++ b/oslo_messaging/_drivers/zmq_driver/rpc/server/zmq_fanout_consumer.py @@ -0,0 +1,74 @@ +# Copyright 2015 Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +import logging + +import six + +from oslo_messaging._drivers import base +from oslo_messaging._drivers.zmq_driver.rpc.server import zmq_base_consumer +from oslo_messaging._drivers.zmq_driver import zmq_async +from oslo_messaging._drivers.zmq_driver import zmq_topic as topic_utils +from oslo_messaging._i18n import _LE + + +LOG = logging.getLogger(__name__) + +zmq = zmq_async.import_zmq() + + +class ZmqFanoutMessage(base.IncomingMessage): + + def __init__(self, listener, context, message, socket, poller): + super(ZmqFanoutMessage, self).__init__(listener, context, message) + poller.resume_polling(socket) + + def reply(self, reply=None, failure=None, log_failure=True): + """Reply is not needed for fanout(cast) messages""" + + def acknowledge(self): + pass + + def requeue(self): + pass + + +class FanoutConsumer(zmq_base_consumer.ConsumerBase): + + def _receive_message(self, socket): + try: + topic = socket.recv_string() + assert topic is not None, 'Bad format: Topic is expected' + msg_id = socket.recv_string() + assert msg_id is not None, 'Bad format: message ID expected' + context = socket.recv_json() + message = socket.recv_json() + LOG.debug("[Server] REP Received message %s" % str(message)) + incoming = ZmqFanoutMessage(self.listener, context, message, + socket, self.poller) + return incoming + except zmq.ZMQError as e: + LOG.error(_LE("Receiving message failed ... {}"), e) + + def listen(self, target): + topic = topic_utils.Topic.from_target(self.conf, target) + ipc_address = topic_utils.get_ipc_address_fanout(self.conf) + sub_socket = self.context.socket(zmq.SUB) + sub_socket.connect(ipc_address) + if six.PY3: + sub_socket.setsockopt_string(zmq.SUBSCRIBE, str(topic)) + else: + sub_socket.setsockopt(zmq.SUBSCRIBE, str(topic)) + self.poller.register(sub_socket, self._receive_message) diff --git a/oslo_messaging/_drivers/zmq_driver/rpc/server/zmq_server.py b/oslo_messaging/_drivers/zmq_driver/rpc/server/zmq_server.py index e6f67ab..a5540ec 100644 --- a/oslo_messaging/_drivers/zmq_driver/rpc/server/zmq_server.py +++ b/oslo_messaging/_drivers/zmq_driver/rpc/server/zmq_server.py @@ -16,6 +16,7 @@ import logging from oslo_messaging._drivers import base from oslo_messaging._drivers.zmq_driver.rpc.server import zmq_call_responder +from oslo_messaging._drivers.zmq_driver.rpc.server import zmq_fanout_consumer from oslo_messaging._drivers.zmq_driver import zmq_async LOG = logging.getLogger(__name__) @@ -29,14 +30,17 @@ class ZmqServer(base.Listener): LOG.info("[Server] __init__") self.conf = conf self.context = zmq.Context() - poller = zmq_async.get_reply_poller() - self.call_responder = zmq_call_responder.CallResponder(self, conf, - poller, - self.context) + self.poller = zmq_async.get_reply_poller() + self.call_resp = zmq_call_responder.CallResponder(self, conf, + self.poller, + self.context) + self.fanout_resp = zmq_fanout_consumer.FanoutConsumer(self, conf, + self.poller, + self.context) def poll(self, timeout=None): - incoming = self.call_responder.poll(timeout) - return incoming + incoming = self.poller.poll(timeout) + return incoming[0] def stop(self): LOG.info("[Server] Stop") @@ -46,4 +50,7 @@ class ZmqServer(base.Listener): def listen(self, target): LOG.info("[Server] Listen to Target %s" % target) - self.call_responder.listen(target) + if target.fanout: + self.fanout_resp.listen(target) + else: + self.call_resp.listen(target) diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_serializer.py b/oslo_messaging/_drivers/zmq_driver/zmq_serializer.py index 0f0733a..81259be 100644 --- a/oslo_messaging/_drivers/zmq_driver/zmq_serializer.py +++ b/oslo_messaging/_drivers/zmq_driver/zmq_serializer.py @@ -23,7 +23,26 @@ from oslo_messaging._i18n import _LE, _LW LOG = logging.getLogger(__name__) -MESSAGE_CALL_TOPIC_POSITION = 2 +MESSAGE_CALL_TYPE_POSITION = 2 +MESSAGE_CALL_TOPIC_POSITION = 3 + +CALL_TYPE = 'call' +CAST_TYPE = 'cast' +FANOUT_TYPE = 'fanout' +NOTIFY_TYPE = 'notify' + +MESSAGE_TYPES = (CALL_TYPE, CAST_TYPE, FANOUT_TYPE, NOTIFY_TYPE) + + +def get_msg_type(message): + type = message[MESSAGE_CALL_TYPE_POSITION] + if six.PY3: + type = type.decode('utf-8') + if type not in MESSAGE_TYPES: + errmsg = _LE("Unknown message type: %s") % str(type) + LOG.error(errmsg) + rpc_common.RPCException(errmsg) + return type def _get_topic_from_msg(message, position): @@ -46,7 +65,7 @@ def _get_topic_from_msg(message, position): except Exception as e: errmsg = _LE("Failed topic string parsing, %s") % str(e) LOG.error(errmsg) - rpc_common.RPCException(errmsg) + raise rpc_common.RPCException(errmsg) return topic_items[0], topic_items[1] diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_topic.py b/oslo_messaging/_drivers/zmq_driver/zmq_topic.py index c338b69..332c819 100644 --- a/oslo_messaging/_drivers/zmq_driver/zmq_topic.py +++ b/oslo_messaging/_drivers/zmq_driver/zmq_topic.py @@ -29,6 +29,10 @@ def get_ipc_address_cast(conf, topic): return "ipc://%s/fanout/%s" % (conf.rpc_zmq_ipc_dir, str(topic)) +def get_ipc_address_fanout(conf): + return "ipc://%s/fanout_general" % conf.rpc_zmq_ipc_dir + + class Topic(object): def __init__(self, conf, topic, server=None, fanout=False): @@ -58,4 +62,4 @@ class Topic(object): return self._topic if self._topic else "" def __str__(self, *args, **kwargs): - return "%s.%s" % (self.topic, self.server) + return u"%s.%s" % (self.topic, self.server) diff --git a/oslo_messaging/tests/drivers/test_impl_zmq.py b/oslo_messaging/tests/drivers/test_impl_zmq.py index a6eef2f..79b797c 100644 --- a/oslo_messaging/tests/drivers/test_impl_zmq.py +++ b/oslo_messaging/tests/drivers/test_impl_zmq.py @@ -165,20 +165,21 @@ class TestZmqBasics(ZmqBaseTestCase): method = self.listener.message.message[u'method'] self.assertEqual(u'hello-world', method) - @testtools.skip("Not implemented feature") def test_send_fanout(self): target = oslo_messaging.Target(topic='testtopic', fanout=True) - self.driver.listen(target) + self.listener.listen(target) result = self.driver.send( target, {}, {'method': 'hello-world', 'tx_id': 1}, wait_for_reply=False) + self.listener._received.wait() + self.assertIsNone(result) self.assertEqual(True, self.listener._received.isSet()) - msg_pattern = "{'method': 'hello-world', 'tx_id': 1}" - self.assertEqual(msg_pattern, self.listener.message) + method = self.listener.message.message[u'method'] + self.assertEqual(u'hello-world', method) def test_send_receive_direct(self): """Call() without topic.""" diff --git a/tests/test_exception_serialization.py b/tests/test_exception_serialization.py index 17e8ff1..baa2b79 100644 --- a/tests/test_exception_serialization.py +++ b/tests/test_exception_serialization.py @@ -19,6 +19,7 @@ import six import testscenarios from oslo import messaging + from oslo_messaging._drivers import common as exceptions from oslo_messaging.tests import utils as test_utils from oslo_serialization import jsonutils |