summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2016-06-10 17:24:28 +0000
committerGerrit Code Review <review@openstack.org>2016-06-10 17:24:28 +0000
commit01338c940acb591efca918dc49a7a6f449a3a764 (patch)
tree30090dfc14841098754a95faa499376d97f065df
parent5690156d9b73e39547dbc3d27b12b41321bb6500 (diff)
parent956bbb92e7c478bf5294e997aae757848dd7d5b7 (diff)
downloadoslo-messaging-5.4.0.tar.gz
Merge "[zmq] Periodic updates of endpoints connections"5.4.0
-rw-r--r--oslo_messaging/_drivers/impl_zmq.py6
-rw-r--r--oslo_messaging/_drivers/zmq_driver/broker/zmq_queue_proxy.py40
-rw-r--r--oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_proxy.py13
-rw-r--r--oslo_messaging/_drivers/zmq_driver/matchmaker/matchmaker_redis.py31
-rw-r--r--oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_consumer_base.py21
-rw-r--r--oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_dealer_consumer.py18
-rw-r--r--oslo_messaging/_drivers/zmq_driver/server/zmq_server.py6
-rw-r--r--oslo_messaging/_drivers/zmq_driver/zmq_socket.py2
-rw-r--r--oslo_messaging/_drivers/zmq_driver/zmq_updater.py55
9 files changed, 152 insertions, 40 deletions
diff --git a/oslo_messaging/_drivers/impl_zmq.py b/oslo_messaging/_drivers/impl_zmq.py
index 21118d7..3829fa5 100644
--- a/oslo_messaging/_drivers/impl_zmq.py
+++ b/oslo_messaging/_drivers/impl_zmq.py
@@ -72,10 +72,14 @@ zmq_opts = [
help='The default number of seconds that poll should wait. '
'Poll raises timeout exception when timeout expired.'),
- cfg.IntOpt('zmq_target_expire', default=120,
+ cfg.IntOpt('zmq_target_expire', default=300,
help='Expiration timeout in seconds of a name service record '
'about existing target ( < 0 means no timeout).'),
+ cfg.IntOpt('zmq_target_update', default=180,
+ help='Update period in seconds of a name service record '
+ 'about existing target.'),
+
cfg.BoolOpt('use_pub_sub', default=True,
help='Use PUB/SUB pattern for fanout methods. '
'PUB/SUB always uses proxy.'),
diff --git a/oslo_messaging/_drivers/zmq_driver/broker/zmq_queue_proxy.py b/oslo_messaging/_drivers/zmq_driver/broker/zmq_queue_proxy.py
index 9ed0737..c75ff4e 100644
--- a/oslo_messaging/_drivers/zmq_driver/broker/zmq_queue_proxy.py
+++ b/oslo_messaging/_drivers/zmq_driver/broker/zmq_queue_proxy.py
@@ -22,6 +22,7 @@ from oslo_messaging._drivers.zmq_driver import zmq_address
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_names
from oslo_messaging._drivers.zmq_driver import zmq_socket
+from oslo_messaging._drivers.zmq_driver import zmq_updater
from oslo_messaging._i18n import _LI
zmq = zmq_async.import_zmq()
@@ -55,14 +56,9 @@ class UniversalQueueProxy(object):
self.pub_publisher = zmq_pub_publisher.PubPublisherProxy(
conf, matchmaker)
- self.matchmaker.register_publisher(
- (self.pub_publisher.host, self.fe_router_address))
- LOG.info(_LI("[PUB:%(pub)s, ROUTER:%(router)s] Run PUB publisher"),
- {"pub": self.pub_publisher.host,
- "router": self.fe_router_address})
- self.matchmaker.register_router(self.be_router_address)
- LOG.info(_LI("[Backend ROUTER:%(router)s] Run ROUTER"),
- {"router": self.be_router_address})
+ self._router_updater = RouterUpdater(
+ conf, matchmaker, self.pub_publisher.host, self.fe_router_address,
+ self.be_router_address)
def run(self):
message, socket = self.poller.poll()
@@ -106,7 +102,7 @@ class UniversalQueueProxy(object):
socket.send(b'', zmq.SNDMORE)
socket.send(reply_id, zmq.SNDMORE)
socket.send(six.b(str(message_type)), zmq.SNDMORE)
- LOG.debug("Redirecting message %s" % message_id)
+ LOG.debug("Dispatching message %s" % message_id)
socket.send_multipart(multipart_message)
def cleanup(self):
@@ -116,3 +112,29 @@ class UniversalQueueProxy(object):
self.matchmaker.unregister_publisher(
(self.pub_publisher.host, self.fe_router_address))
self.matchmaker.unregister_router(self.be_router_address)
+
+
+class RouterUpdater(zmq_updater.UpdaterBase):
+ """This entity performs periodic async updates
+ from router proxy to the matchmaker.
+ """
+
+ def __init__(self, conf, matchmaker, publisher_address, fe_router_address,
+ be_router_address):
+ self.publisher_address = publisher_address
+ self.fe_router_address = fe_router_address
+ self.be_router_address = be_router_address
+ super(RouterUpdater, self).__init__(conf, matchmaker,
+ self._update_records)
+
+ def _update_records(self):
+ self.matchmaker.register_publisher(
+ (self.publisher_address, self.fe_router_address),
+ expire=self.conf.zmq_target_expire)
+ LOG.info(_LI("[PUB:%(pub)s, ROUTER:%(router)s] Update PUB publisher"),
+ {"pub": self.publisher_address,
+ "router": self.fe_router_address})
+ self.matchmaker.register_router(self.be_router_address,
+ expire=self.conf.zmq_target_expire)
+ LOG.info(_LI("[Backend ROUTER:%(router)s] Update ROUTER"),
+ {"router": self.be_router_address})
diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_proxy.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_proxy.py
index 5cba782..e446cde 100644
--- a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_proxy.py
+++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_proxy.py
@@ -25,6 +25,7 @@ from oslo_messaging._drivers.zmq_driver.client.publishers \
from oslo_messaging._drivers.zmq_driver import zmq_address
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_names
+from oslo_messaging._drivers.zmq_driver import zmq_updater
zmq = zmq_async.import_zmq()
@@ -40,6 +41,8 @@ class DealerPublisherProxy(object):
conf, matchmaker, zmq.ROUTER, zmq.DEALER)
self.socket = socket_to_proxy
self.routing_table = RoutingTable(conf, matchmaker)
+ self.connection_updater = PublisherConnectionUpdater(
+ conf, matchmaker, self.socket)
def send_request(self, request):
if request.msg_type == zmq_names.CALL_TYPE:
@@ -92,6 +95,8 @@ class CallSenderProxy(zmq_dealer_call_publisher.CallSender):
self.socket = self.outbound_sockets.get_socket_to_publishers()
self.reply_waiter.poll_socket(self.socket)
self.routing_table = RoutingTable(conf, matchmaker)
+ self.connection_updater = PublisherConnectionUpdater(
+ conf, matchmaker, self.socket)
def _connect_socket(self, target):
return self.socket
@@ -170,3 +175,11 @@ class RoutingTable(object):
def _renew_routable_hosts(self, target):
hosts, _ = self.routing_table[str(target)]
self.routable_hosts[str(target)] = list(hosts)
+
+
+class PublisherConnectionUpdater(zmq_updater.ConnectionUpdater):
+
+ def _update_connection(self):
+ publishers = self.matchmaker.get_publishers()
+ for pub_address, router_address in publishers:
+ self.socket.connect_to_host(router_address)
diff --git a/oslo_messaging/_drivers/zmq_driver/matchmaker/matchmaker_redis.py b/oslo_messaging/_drivers/zmq_driver/matchmaker/matchmaker_redis.py
index 150cf8a..daa1b17 100644
--- a/oslo_messaging/_drivers/zmq_driver/matchmaker/matchmaker_redis.py
+++ b/oslo_messaging/_drivers/zmq_driver/matchmaker/matchmaker_redis.py
@@ -138,9 +138,14 @@ class RedisMatchMaker(base.MatchMakerBase):
"port": self.conf.matchmaker_redis.port,
"password": self.conf.matchmaker_redis.password}
- def register_publisher(self, hostname):
+ def _add_key_with_expire(self, key, value, expire):
+ self._redis.sadd(key, value)
+ if expire > 0:
+ self._redis.expire(key, expire)
+
+ def register_publisher(self, hostname, expire=-1):
host_str = ",".join(hostname)
- self._redis.sadd(_PUBLISHERS_KEY, host_str)
+ self._add_key_with_expire(_PUBLISHERS_KEY, host_str, expire)
def unregister_publisher(self, hostname):
host_str = ",".join(hostname)
@@ -153,8 +158,8 @@ class RedisMatchMaker(base.MatchMakerBase):
self._get_hosts_by_key(_PUBLISHERS_KEY)])
return hosts
- def register_router(self, hostname):
- self._redis.sadd(_ROUTERS_KEY, hostname)
+ def register_router(self, hostname, expire=-1):
+ self._add_key_with_expire(_ROUTERS_KEY, hostname, expire)
def unregister_router(self, hostname):
self._redis.srem(_ROUTERS_KEY, hostname)
@@ -167,22 +172,22 @@ class RedisMatchMaker(base.MatchMakerBase):
def register(self, target, hostname, listener_type, expire=-1):
- def register_key(key):
- self._redis.sadd(key, hostname)
- if expire > 0:
- self._redis.expire(key, expire)
-
if target.topic and target.server:
key = zmq_address.target_to_key(target, listener_type)
- register_key(key)
+ self._add_key_with_expire(key, hostname, expire)
if target.topic:
key = zmq_address.prefix_str(target.topic, listener_type)
- register_key(key)
+ self._add_key_with_expire(key, hostname, expire)
def unregister(self, target, hostname, listener_type):
- key = zmq_address.target_to_key(target, listener_type)
- self._redis.srem(key, hostname)
+ if target.topic and target.server:
+ key = zmq_address.target_to_key(target, listener_type)
+ self._redis.srem(key, hostname)
+
+ if target.topic:
+ key = zmq_address.prefix_str(target.topic, listener_type)
+ self._redis.srem(key, hostname)
def get_hosts(self, target, listener_type):
LOG.debug("[Redis] get_hosts for target %s", target)
diff --git a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_consumer_base.py b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_consumer_base.py
index 86dddee..69c6958 100644
--- a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_consumer_base.py
+++ b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_consumer_base.py
@@ -14,7 +14,6 @@
import abc
import logging
-import time
import six
@@ -23,6 +22,7 @@ from oslo_messaging._drivers.zmq_driver import zmq_address
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_names
from oslo_messaging._drivers.zmq_driver import zmq_socket
+from oslo_messaging._drivers.zmq_driver import zmq_updater
from oslo_messaging._i18n import _LE
LOG = logging.getLogger(__name__)
@@ -60,8 +60,8 @@ class SingleSocketConsumer(ConsumerBase):
self.socket_type = socket_type
self.host = None
self.socket = self.subscribe_socket(socket_type)
- self.target_updater = TargetUpdater(conf, self.matchmaker, self.target,
- self.host, socket_type)
+ self.target_updater = TargetUpdater(
+ conf, self.matchmaker, self.target, self.host, socket_type)
def subscribe_socket(self, socket_type):
try:
@@ -96,25 +96,20 @@ class SingleSocketConsumer(ConsumerBase):
super(SingleSocketConsumer, self).cleanup()
-class TargetUpdater(object):
+class TargetUpdater(zmq_updater.UpdaterBase):
"""This entity performs periodic async updates
to the matchmaker.
"""
def __init__(self, conf, matchmaker, target, host, socket_type):
- self.conf = conf
- self.matchmaker = matchmaker
self.target = target
self.host = host
self.socket_type = socket_type
- self.executor = zmq_async.get_executor(method=self._update_target)
- self.executor.execute()
+ super(TargetUpdater, self).__init__(conf, matchmaker,
+ self._update_target)
def _update_target(self):
self.matchmaker.register(
self.target, self.host,
- zmq_names.socket_type_str(self.socket_type))
- time.sleep(self.conf.zmq_target_expire / 2)
-
- def cleanup(self):
- self.executor.stop()
+ zmq_names.socket_type_str(self.socket_type),
+ expire=self.conf.zmq_target_expire)
diff --git a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_dealer_consumer.py b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_dealer_consumer.py
index f1f5d60..f0fd111 100644
--- a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_dealer_consumer.py
+++ b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_dealer_consumer.py
@@ -25,6 +25,7 @@ from oslo_messaging._drivers.zmq_driver.server.consumers\
import zmq_consumer_base
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_names
+from oslo_messaging._drivers.zmq_driver import zmq_updater
from oslo_messaging._i18n import _LE, _LI
LOG = logging.getLogger(__name__)
@@ -90,6 +91,8 @@ class DealerConsumer(zmq_consumer_base.ConsumerBase):
self.target_updater = zmq_consumer_base.TargetUpdater(
conf, self.matchmaker, self.target, self.host,
zmq.DEALER)
+ self.connection_updater = ConsumerConnectionUpdater(
+ conf, self.matchmaker, self.socket)
LOG.info(_LI("[%s] Run DEALER consumer"), self.host)
def receive_message(self, socket):
@@ -111,6 +114,19 @@ class DealerConsumer(zmq_consumer_base.ConsumerBase):
else:
LOG.error(_LE("Unknown message type: %s"),
zmq_names.message_type_str(message_type))
-
except (zmq.ZMQError, AssertionError) as e:
LOG.error(_LE("Receiving message failure: %s"), str(e))
+
+ def cleanup(self):
+ LOG.info(_LI("[%s] Destroy DEALER consumer"), self.host)
+ super(DealerConsumer, self).cleanup()
+ self.matchmaker.unregister(self.target, self.host,
+ zmq_names.socket_type_str(zmq.DEALER))
+
+
+class ConsumerConnectionUpdater(zmq_updater.ConnectionUpdater):
+
+ def _update_connection(self):
+ routers = self.matchmaker.get_routers()
+ for router_address in routers:
+ self.socket.connect_to_host(router_address)
diff --git a/oslo_messaging/_drivers/zmq_driver/server/zmq_server.py b/oslo_messaging/_drivers/zmq_driver/server/zmq_server.py
index 254c6e5..c963c45 100644
--- a/oslo_messaging/_drivers/zmq_driver/server/zmq_server.py
+++ b/oslo_messaging/_drivers/zmq_driver/server/zmq_server.py
@@ -48,11 +48,11 @@ class ZmqServer(base.PollStyleListener):
conf, self.poller, self) if conf.use_pub_sub else None
self.consumers = []
- if self.router_consumer:
+ if self.router_consumer is not None:
self.consumers.append(self.router_consumer)
- if self.dealer_consumer:
+ if self.dealer_consumer is not None:
self.consumers.append(self.dealer_consumer)
- if self.sub_consumer:
+ if self.sub_consumer is not None:
self.consumers.append(self.sub_consumer)
@base.batch_poll_helper
diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_socket.py b/oslo_messaging/_drivers/zmq_driver/zmq_socket.py
index 2ca816b..a97343e 100644
--- a/oslo_messaging/_drivers/zmq_driver/zmq_socket.py
+++ b/oslo_messaging/_drivers/zmq_driver/zmq_socket.py
@@ -96,6 +96,8 @@ class ZmqSocket(object):
self.handle.close(*args, **kwargs)
def connect_to_address(self, address):
+ if address in self.connections:
+ return
stype = zmq_names.socket_type_str(self.socket_type)
try:
LOG.info(_LI("Connecting %(stype)s id %(id)s to %(address)s"),
diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_updater.py b/oslo_messaging/_drivers/zmq_driver/zmq_updater.py
new file mode 100644
index 0000000..0b4594a
--- /dev/null
+++ b/oslo_messaging/_drivers/zmq_driver/zmq_updater.py
@@ -0,0 +1,55 @@
+# Copyright 2016 Mirantis, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import abc
+import logging
+import time
+
+import six
+
+from oslo_messaging._drivers.zmq_driver import zmq_async
+
+LOG = logging.getLogger(__name__)
+
+zmq = zmq_async.import_zmq()
+
+
+class UpdaterBase(object):
+
+ def __init__(self, conf, matchmaker, update_method):
+ self.conf = conf
+ self.matchmaker = matchmaker
+ self.update_method = update_method
+ self.executor = zmq_async.get_executor(method=self._update_loop)
+ self.executor.execute()
+
+ def _update_loop(self):
+ self.update_method()
+ time.sleep(self.conf.zmq_target_update)
+
+ def cleanup(self):
+ self.executor.stop()
+
+
+@six.add_metaclass(abc.ABCMeta)
+class ConnectionUpdater(UpdaterBase):
+
+ def __init__(self, conf, matchmaker, socket):
+ self.socket = socket
+ super(ConnectionUpdater, self).__init__(
+ conf, matchmaker, self._update_connection)
+
+ @abc.abstractmethod
+ def _update_connection(self):
+ """Update connection info"""