summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorOleksii Zamiatin <ozamiatin@mirantis.com>2016-07-15 15:10:30 +0300
committerozamiatin <ozamiatin@mirantis.com>2016-07-22 19:00:48 +0300
commit18c8bc933dc6c55af4eaaa755e5198d89ad54468 (patch)
treeec6f91a94fa622fa96bb96239548f5cee6fa8f25
parent0ecc25509fa8188109f48f85702c2271c23baf7f (diff)
downloadoslo-messaging-18c8bc933dc6c55af4eaaa755e5198d89ad54468.tar.gz
[zmq] Let proxy serve on a static port numbers
Currently proxy binds to a random port from a port range specified in zmq config and therefore needs to register in redis to become visible to clients and servers. That could be done much simpler by using a static port(s) for proxy. Moreover zmq handles reconnect to a socket if restarted service uses the same port number as it had before restart. Change-Id: I088792fd08a4161d08e9160830fc3ec4d560cca4
-rw-r--r--oslo_messaging/_cmd/zmq_proxy.py34
-rw-r--r--oslo_messaging/_drivers/zmq_driver/proxy/__init__.py (renamed from oslo_messaging/_drivers/zmq_driver/broker/__init__.py)0
-rw-r--r--oslo_messaging/_drivers/zmq_driver/proxy/zmq_proxy.py (renamed from oslo_messaging/_drivers/zmq_driver/broker/zmq_proxy.py)18
-rw-r--r--oslo_messaging/_drivers/zmq_driver/proxy/zmq_publisher_proxy.py (renamed from oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_pub_publisher.py)17
-rw-r--r--oslo_messaging/_drivers/zmq_driver/proxy/zmq_queue_proxy.py (renamed from oslo_messaging/_drivers/zmq_driver/broker/zmq_queue_proxy.py)48
-rw-r--r--oslo_messaging/_drivers/zmq_driver/zmq_socket.py37
-rw-r--r--oslo_messaging/tests/drivers/zmq/test_impl_zmq.py2
-rw-r--r--oslo_messaging/tests/drivers/zmq/test_pub_sub.py23
-rwxr-xr-xsetup-test-env-zmq-proxy.sh5
9 files changed, 143 insertions, 41 deletions
diff --git a/oslo_messaging/_cmd/zmq_proxy.py b/oslo_messaging/_cmd/zmq_proxy.py
index 03ccea1..a76b0b5 100644
--- a/oslo_messaging/_cmd/zmq_proxy.py
+++ b/oslo_messaging/_cmd/zmq_proxy.py
@@ -18,14 +18,15 @@ import logging
from oslo_config import cfg
from oslo_messaging._drivers import impl_zmq
-from oslo_messaging._drivers.zmq_driver.broker import zmq_proxy
-from oslo_messaging._drivers.zmq_driver.broker import zmq_queue_proxy
-from oslo_messaging import server
+from oslo_messaging._drivers.zmq_driver.proxy import zmq_proxy
+from oslo_messaging._drivers.zmq_driver.proxy import zmq_queue_proxy
CONF = cfg.CONF
CONF.register_opts(impl_zmq.zmq_opts)
-CONF.register_opts(server._pool_opts)
-CONF.rpc_zmq_native = True
+
+opt_group = cfg.OptGroup(name='zmq_proxy_opts',
+ title='ZeroMQ proxy options')
+CONF.register_opts(zmq_proxy.zmq_proxy_opts, group=opt_group)
USAGE = """ Usage: ./zmq-proxy.py [-h] [] ...
@@ -42,9 +43,20 @@ def main():
parser.add_argument('--config-file', dest='config_file', type=str,
help='Path to configuration file')
+
+ parser.add_argument('--host', dest='host', type=str,
+ help='Host FQDN for current proxy')
+ parser.add_argument('--frontend-port', dest='frontend_port', type=int,
+ help='Front-end ROUTER port number')
+ parser.add_argument('--backend-port', dest='backend_port', type=int,
+ help='Back-end ROUTER port number')
+ parser.add_argument('--publisher-port', dest='publisher_port', type=int,
+ help='Back-end PUBLISHER port number')
+
parser.add_argument('-d', '--debug', dest='debug', type=bool,
default=False,
help="Turn on DEBUG logging level instead of INFO")
+
args = parser.parse_args()
if args.config_file:
@@ -57,6 +69,18 @@ def main():
format='%(asctime)s %(name)s '
'%(levelname)-8s %(message)s')
+ if args.host:
+ CONF.zmq_proxy_opts.host = args.host
+ if args.frontend_port:
+ CONF.set_override('frontend_port', args.frontend_port,
+ group='zmq_proxy_opts')
+ if args.backend_port:
+ CONF.set_override('backend_port', args.backend_port,
+ group='zmq_proxy_opts')
+ if args.publisher_port:
+ CONF.set_override('publisher_port', args.publisher_port,
+ group='zmq_proxy_opts')
+
reactor = zmq_proxy.ZmqProxy(CONF, zmq_queue_proxy.UniversalQueueProxy)
try:
diff --git a/oslo_messaging/_drivers/zmq_driver/broker/__init__.py b/oslo_messaging/_drivers/zmq_driver/proxy/__init__.py
index e69de29..e69de29 100644
--- a/oslo_messaging/_drivers/zmq_driver/broker/__init__.py
+++ b/oslo_messaging/_drivers/zmq_driver/proxy/__init__.py
diff --git a/oslo_messaging/_drivers/zmq_driver/broker/zmq_proxy.py b/oslo_messaging/_drivers/zmq_driver/proxy/zmq_proxy.py
index 77a47d2..b35a7f9 100644
--- a/oslo_messaging/_drivers/zmq_driver/broker/zmq_proxy.py
+++ b/oslo_messaging/_drivers/zmq_driver/proxy/zmq_proxy.py
@@ -13,9 +13,11 @@
# under the License.
import logging
+import socket
from stevedore import driver
+from oslo_config import cfg
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._i18n import _LI
@@ -23,6 +25,22 @@ zmq = zmq_async.import_zmq()
LOG = logging.getLogger(__name__)
+zmq_proxy_opts = [
+ cfg.StrOpt('host', default=socket.gethostname(),
+ help='Hostname (FQDN) of current proxy'
+ ' an ethernet interface, or IP address.'),
+
+ cfg.IntOpt('frontend_port', default=0,
+ help='Front-end ROUTER port number. Zero means random.'),
+
+ cfg.IntOpt('backend_port', default=0,
+ help='Back-end ROUTER port number. Zero means random.'),
+
+ cfg.IntOpt('publisher_port', default=0,
+ help='Publisher port number. Zero means random.'),
+]
+
+
class ZmqProxy(object):
"""Wrapper class for Publishers and Routers proxies.
The main reason to have a proxy is high complexity of TCP sockets number
diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_pub_publisher.py b/oslo_messaging/_drivers/zmq_driver/proxy/zmq_publisher_proxy.py
index 68b9de2..727b419 100644
--- a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_pub_publisher.py
+++ b/oslo_messaging/_drivers/zmq_driver/proxy/zmq_publisher_proxy.py
@@ -14,7 +14,6 @@
import logging
-from oslo_messaging._drivers.zmq_driver import zmq_address
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_names
from oslo_messaging._drivers.zmq_driver import zmq_socket
@@ -24,7 +23,7 @@ LOG = logging.getLogger(__name__)
zmq = zmq_async.import_zmq()
-class PubPublisherProxy(object):
+class PublisherProxy(object):
"""PUB/SUB based request publisher
The publisher intended to be used for Fanout and Notify
@@ -39,16 +38,20 @@ class PubPublisherProxy(object):
"""
def __init__(self, conf, matchmaker):
- super(PubPublisherProxy, self).__init__()
+ super(PublisherProxy, self).__init__()
self.conf = conf
self.zmq_context = zmq.Context()
self.matchmaker = matchmaker
- self.socket = zmq_socket.ZmqRandomPortSocket(
- self.conf, self.zmq_context, zmq.PUB)
+ port = conf.zmq_proxy_opts.publisher_port
- self.host = zmq_address.combine_address(self.conf.rpc_zmq_host,
- self.socket.port)
+ self.socket = zmq_socket.ZmqFixedPortSocket(
+ self.conf, self.zmq_context, zmq.PUB, conf.zmq_proxy_opts.host,
+ port) if port != 0 else \
+ zmq_socket.ZmqRandomPortSocket(
+ self.conf, self.zmq_context, zmq.PUB, conf.zmq_proxy_opts.host)
+
+ self.host = self.socket.connect_address
def send_request(self, multipart_message):
message_type = multipart_message.pop(0)
diff --git a/oslo_messaging/_drivers/zmq_driver/broker/zmq_queue_proxy.py b/oslo_messaging/_drivers/zmq_driver/proxy/zmq_queue_proxy.py
index bd3e613..2e053f5 100644
--- a/oslo_messaging/_drivers/zmq_driver/broker/zmq_queue_proxy.py
+++ b/oslo_messaging/_drivers/zmq_driver/proxy/zmq_queue_proxy.py
@@ -16,9 +16,7 @@ import logging
import six
-from oslo_messaging._drivers.zmq_driver.client.publishers \
- import zmq_pub_publisher
-from oslo_messaging._drivers.zmq_driver import zmq_address
+from oslo_messaging._drivers.zmq_driver.proxy import zmq_publisher_proxy
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_names
from oslo_messaging._drivers.zmq_driver import zmq_socket
@@ -38,27 +36,31 @@ class UniversalQueueProxy(object):
self.matchmaker = matchmaker
self.poller = zmq_async.get_poller()
- self.fe_router_socket = zmq_socket.ZmqRandomPortSocket(
- conf, context, zmq.ROUTER)
- self.be_router_socket = zmq_socket.ZmqRandomPortSocket(
- conf, context, zmq.ROUTER)
+ port = conf.zmq_proxy_opts.frontend_port
+ host = conf.zmq_proxy_opts.host
+ self.fe_router_socket = zmq_socket.ZmqFixedPortSocket(
+ conf, context, zmq.ROUTER, host,
+ conf.zmq_proxy_opts.frontend_port) if port != 0 else \
+ zmq_socket.ZmqRandomPortSocket(conf, context, zmq.ROUTER, host)
+
+ port = conf.zmq_proxy_opts.backend_port
+ self.be_router_socket = zmq_socket.ZmqFixedPortSocket(
+ conf, context, zmq.ROUTER, host,
+ conf.zmq_proxy_opts.backend_port) if port != 0 else \
+ zmq_socket.ZmqRandomPortSocket(conf, context, zmq.ROUTER, host)
self.poller.register(self.fe_router_socket.handle,
self._receive_in_request)
self.poller.register(self.be_router_socket.handle,
self._receive_in_request)
- self.fe_router_address = zmq_address.combine_address(
- self.conf.rpc_zmq_host, self.fe_router_socket.port)
- self.be_router_address = zmq_address.combine_address(
- self.conf.rpc_zmq_host, self.be_router_socket.port)
-
- self.pub_publisher = zmq_pub_publisher.PubPublisherProxy(
+ self.pub_publisher = zmq_publisher_proxy.PublisherProxy(
conf, matchmaker)
self._router_updater = RouterUpdater(
- conf, matchmaker, self.pub_publisher.host, self.fe_router_address,
- self.be_router_address)
+ conf, matchmaker, self.pub_publisher.host,
+ self.fe_router_socket.connect_address,
+ self.be_router_socket.connect_address)
def run(self):
message, socket = self.poller.poll()
@@ -102,16 +104,17 @@ class UniversalQueueProxy(object):
socket.send(b'', zmq.SNDMORE)
socket.send(reply_id, zmq.SNDMORE)
socket.send(six.b(str(message_type)), zmq.SNDMORE)
- LOG.debug("Dispatching message %s" % message_id)
+ LOG.debug("Dispatching %(msg_type)s message %(msg_id)s - to %(rkey)s" %
+ {"msg_type": zmq_names.message_type_str(message_type),
+ "msg_id": message_id,
+ "rkey": routing_key})
socket.send_multipart(multipart_message)
def cleanup(self):
self.fe_router_socket.close()
self.be_router_socket.close()
self.pub_publisher.cleanup()
- self.matchmaker.unregister_publisher(
- (self.pub_publisher.host, self.fe_router_address))
- self.matchmaker.unregister_router(self.be_router_address)
+ self._router_updater.cleanup()
class RouterUpdater(zmq_updater.UpdaterBase):
@@ -138,3 +141,10 @@ class RouterUpdater(zmq_updater.UpdaterBase):
expire=self.conf.zmq_target_expire)
LOG.info(_LI("[Backend ROUTER:%(router)s] Update ROUTER"),
{"router": self.be_router_address})
+
+ def cleanup(self):
+ super(RouterUpdater, self).cleanup()
+ self.matchmaker.unregister_publisher(
+ (self.publisher_address, self.fe_router_address))
+ self.matchmaker.unregister_router(
+ self.be_router_address)
diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_socket.py b/oslo_messaging/_drivers/zmq_driver/zmq_socket.py
index 1179e24..c50ffe4 100644
--- a/oslo_messaging/_drivers/zmq_driver/zmq_socket.py
+++ b/oslo_messaging/_drivers/zmq_driver/zmq_socket.py
@@ -150,23 +150,50 @@ class ZmqSocket(object):
self.connect_to_address(address)
-class ZmqPortRangeExceededException(exceptions.MessagingException):
- """Raised by ZmqRandomPortSocket - wrapping zmq.ZMQBindError"""
+class ZmqPortBusy(exceptions.MessagingException):
+ """Raised when binding to a port failure"""
+
+ def __init__(self, port_number):
+ super(ZmqPortBusy, self).__init__()
+ self.port_number = port_number
class ZmqRandomPortSocket(ZmqSocket):
- def __init__(self, conf, context, socket_type, high_watermark=0):
+ def __init__(self, conf, context, socket_type, host=None,
+ high_watermark=0):
super(ZmqRandomPortSocket, self).__init__(conf, context, socket_type,
high_watermark)
self.bind_address = zmq_address.get_tcp_random_address(self.conf)
-
+ if host is None:
+ host = conf.rpc_zmq_host
try:
self.port = self.handle.bind_to_random_port(
self.bind_address,
min_port=conf.rpc_zmq_min_port,
max_port=conf.rpc_zmq_max_port,
max_tries=conf.rpc_zmq_bind_port_retries)
+ self.connect_address = zmq_address.combine_address(host, self.port)
except zmq.ZMQBindError:
LOG.error(_LE("Random ports range exceeded!"))
- raise ZmqPortRangeExceededException()
+ raise ZmqPortBusy(port_number=0)
+
+
+class ZmqFixedPortSocket(ZmqSocket):
+
+ def __init__(self, conf, context, socket_type, host, port,
+ high_watermark=0):
+ super(ZmqFixedPortSocket, self).__init__(conf, context, socket_type,
+ high_watermark)
+ self.connect_address = zmq_address.combine_address(host, port)
+ self.bind_address = zmq_address.get_tcp_direct_address(
+ zmq_address.combine_address(conf.rpc_zmq_bind_address, port))
+ self.host = host
+ self.port = port
+
+ try:
+ self.handle.bind(self.bind_address)
+ except zmq.ZMQError as e:
+ LOG.exception(e)
+ LOG.error(_LE("Chosen port %d is being busy.") % self.port)
+ raise ZmqPortBusy(port_number=port)
diff --git a/oslo_messaging/tests/drivers/zmq/test_impl_zmq.py b/oslo_messaging/tests/drivers/zmq/test_impl_zmq.py
index f9f6f52..76b61cf 100644
--- a/oslo_messaging/tests/drivers/zmq/test_impl_zmq.py
+++ b/oslo_messaging/tests/drivers/zmq/test_impl_zmq.py
@@ -45,7 +45,7 @@ class ZmqTestPortsRange(zmq_common.ZmqBaseTestCase):
target = oslo_messaging.Target(topic='testtopic_' + str(i))
new_listener = self.driver.listen(target, None, None)
listeners.append(new_listener)
- except zmq_socket.ZmqPortRangeExceededException:
+ except zmq_socket.ZmqPortBusy:
pass
self.assertLessEqual(len(listeners), 5)
diff --git a/oslo_messaging/tests/drivers/zmq/test_pub_sub.py b/oslo_messaging/tests/drivers/zmq/test_pub_sub.py
index 50e9d1b..02519de 100644
--- a/oslo_messaging/tests/drivers/zmq/test_pub_sub.py
+++ b/oslo_messaging/tests/drivers/zmq/test_pub_sub.py
@@ -13,15 +13,17 @@
# under the License.
import json
-import msgpack
import time
+import msgpack
import six
import testscenarios
+from oslo_config import cfg
+
import oslo_messaging
-from oslo_messaging._drivers.zmq_driver.client.publishers \
- import zmq_pub_publisher
+from oslo_messaging._drivers.zmq_driver.proxy import zmq_proxy
+from oslo_messaging._drivers.zmq_driver.proxy import zmq_publisher_proxy
from oslo_messaging._drivers.zmq_driver import zmq_address
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_names
@@ -31,6 +33,10 @@ load_tests = testscenarios.load_tests_apply_scenarios
zmq = zmq_async.import_zmq()
+opt_group = cfg.OptGroup(name='zmq_proxy_opts',
+ title='ZeroMQ proxy options')
+cfg.CONF.register_opts(zmq_proxy.zmq_proxy_opts, group=opt_group)
+
class TestPubSub(zmq_common.ZmqBaseTestCase):
@@ -50,7 +56,10 @@ class TestPubSub(zmq_common.ZmqBaseTestCase):
'rpc_zmq_serialization': self.serialization}
self.config(**kwargs)
- self.publisher = zmq_pub_publisher.PubPublisherProxy(
+ self.config(host="127.0.0.1", group="zmq_proxy_opts")
+ self.config(publisher_port="0", group="zmq_proxy_opts")
+
+ self.publisher = zmq_publisher_proxy.PublisherProxy(
self.conf, self.driver.matchmaker)
self.driver.matchmaker.register_publisher(
(self.publisher.host, ""))
@@ -59,6 +68,12 @@ class TestPubSub(zmq_common.ZmqBaseTestCase):
for i in range(self.LISTENERS_COUNT):
self.listeners.append(zmq_common.TestServerListener(self.driver))
+ def tearDown(self):
+ super(TestPubSub, self).tearDown()
+ self.publisher.cleanup()
+ for listener in self.listeners:
+ listener.stop()
+
def _send_request(self, target):
# Needed only in test env to give listener a chance to connect
# before request fires
diff --git a/setup-test-env-zmq-proxy.sh b/setup-test-env-zmq-proxy.sh
index e40dbb3..ebce12c 100755
--- a/setup-test-env-zmq-proxy.sh
+++ b/setup-test-env-zmq-proxy.sh
@@ -13,6 +13,8 @@ export ZMQ_IPC_DIR=${DATADIR}
export ZMQ_USE_PUB_SUB=false
export ZMQ_USE_ROUTER_PROXY=true
+export ZMQ_PROXY_HOST=127.0.0.1
+
cat > ${DATADIR}/zmq.conf <<EOF
[DEFAULT]
transport_url=${TRANSPORT_URL}
@@ -22,6 +24,9 @@ use_pub_sub=${ZMQ_USE_PUB_SUB}
use_router_proxy=${ZMQ_USE_ROUTER_PROXY}
[matchmaker_redis]
port=${ZMQ_REDIS_PORT}
+
+[zmq_proxy_opts]
+host=${ZMQ_PROXY_HOST}
EOF
redis-server --port $ZMQ_REDIS_PORT &