summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--bindep.txt10
-rw-r--r--oslo_messaging/_drivers/amqp1_driver/controller.py68
-rw-r--r--oslo_messaging/_drivers/amqp1_driver/eventloop.py27
-rw-r--r--oslo_messaging/_drivers/impl_rabbit.py3
-rw-r--r--oslo_messaging/tests/drivers/test_amqp_driver.py24
-rwxr-xr-xoslo_messaging/tests/functional/gate/post_test_hook.sh13
-rw-r--r--requirements.txt5
-rw-r--r--test-requirements.txt3
-rw-r--r--tox.ini11
9 files changed, 87 insertions, 77 deletions
diff --git a/bindep.txt b/bindep.txt
index 636043a..df4c23e 100644
--- a/bindep.txt
+++ b/bindep.txt
@@ -13,11 +13,11 @@ libffi-devel [platform:rpm]
# kombu/pika
rabbitmq-server [platform:dpkg rabbit pika]
-# zeromq
-redis-sentinel [platform:ubuntu !platform:ubuntu-trusty zeromq]
-redis-server [platform:dpkg zeromq]
-python-redis [platform:dpkg zeromq]
-zookeeperd [platform:dpkg zeromq]
+# zmq
+redis-sentinel [platform:ubuntu !platform:ubuntu-trusty zmq]
+redis-server [platform:dpkg zmq]
+python-redis [platform:dpkg zmq]
+zookeeperd [platform:dpkg zmq]
# AMQP1 dpkg
qpidd [platform:dpkg amqp1]
diff --git a/oslo_messaging/_drivers/amqp1_driver/controller.py b/oslo_messaging/_drivers/amqp1_driver/controller.py
index 4a9361c..49aba92 100644
--- a/oslo_messaging/_drivers/amqp1_driver/controller.py
+++ b/oslo_messaging/_drivers/amqp1_driver/controller.py
@@ -293,7 +293,7 @@ class Sender(pyngus.SenderEventHandler):
if self._link:
self._link.close()
- def reset(self):
+ def reset(self, reason="Link reset"):
"""Called by the controller on connection failover. Release all link
resources, abort any in-flight messages, and check the retry limit on
all pending send requests.
@@ -304,16 +304,16 @@ class Sender(pyngus.SenderEventHandler):
if self._link:
self._link.destroy()
self._link = None
- self._abort_unacked("Link reset")
- self._check_retry_limit()
+ self._abort_unacked(reason)
+ self._check_retry_limit(reason)
- def destroy(self):
+ def destroy(self, reason="Link destroyed"):
"""Destroy the sender and all pending messages. Called on driver
shutdown.
"""
LOG.debug("Sender %s destroyed", self._address)
- self.reset()
- self._abort_pending("Link destroyed")
+ self.reset(reason)
+ self._abort_pending(reason)
def send_message(self, send_task):
"""Send a message out the link.
@@ -354,21 +354,24 @@ class Sender(pyngus.SenderEventHandler):
# sender_closed() will be called once the link completes closing
def sender_closed(self, sender_link):
- self._abort_unacked("Sender closed")
- if self._connection:
- # still attached, so attempt to restart the link
- self._check_retry_limit()
- self._scheduler.defer(self._reopen_link, self._delay)
+ self._handle_sender_closed()
def sender_failed(self, sender_link, error):
"""Protocol error occurred."""
LOG.warning(_LW("sender %(addr)s failed error=%(error)s"),
{'addr': self._address, 'error': error})
- self.sender_closed(sender_link)
+ self._handle_sender_closed(str(error))
# end Pyngus callbacks
- def _check_retry_limit(self):
+ def _handle_sender_closed(self, reason="Sender closed"):
+ self._abort_unacked(reason)
+ if self._connection:
+ # still attached, so attempt to restart the link
+ self._check_retry_limit(reason)
+ self._scheduler.defer(self._reopen_link, self._delay)
+
+ def _check_retry_limit(self, reason):
# Called on recoverable connection or link failure. Remove any pending
# sends that have exhausted their retry count:
expired = set()
@@ -377,7 +380,7 @@ class Sender(pyngus.SenderEventHandler):
send_task.retry -= 1
if send_task.retry <= 0:
expired.add(send_task)
- send_task._on_error("Send retries exhausted")
+ send_task._on_error("Message send failed: %s" % reason)
while expired:
self._pending_sends.remove(expired.pop())
@@ -813,7 +816,7 @@ class Controller(pyngus.ConnectionEventHandler):
self._closing = False
# only schedule one outstanding reconnect attempt at a time
self._reconnecting = False
- self._delay = 1 # seconds between retries
+ self._delay = self.conn_retry_interval # seconds between retries
# prevent queuing up multiple requests to run _process_tasks()
self._process_tasks_scheduled = False
self._process_tasks_lock = threading.Lock()
@@ -843,7 +846,7 @@ class Controller(pyngus.ConnectionEventHandler):
self.processor.wakeup(self._start_shutdown)
LOG.debug("Waiting for eventloop to exit")
self.processor.join(timeout)
- self._hard_reset()
+ self._hard_reset("Shutting down")
for sender in itervalues(self._all_senders):
sender.destroy()
self._all_senders.clear()
@@ -1013,7 +1016,7 @@ class Controller(pyngus.ConnectionEventHandler):
def socket_error(self, error):
"""Called by eventloop when a socket error occurs."""
LOG.error(_LE("Socket failure: %s"), error)
- self._handle_connection_loss()
+ self._handle_connection_loss(str(error))
# Pyngus connection event callbacks (and their helpers), all invoked from
# the eventloop thread:
@@ -1026,7 +1029,7 @@ class Controller(pyngus.ConnectionEventHandler):
# pyngus bug: ignore failure callback on destroyed connections
return
LOG.debug("AMQP Connection failure: %s", error)
- self._handle_connection_loss()
+ self._handle_connection_loss(str(error))
def connection_active(self, connection):
"""This is a Pyngus callback, invoked by Pyngus when the connection to
@@ -1048,7 +1051,7 @@ class Controller(pyngus.ConnectionEventHandler):
self._reply_link_ready,
self._reply_link_down,
self._reply_credit)
- self._delay = 1
+ self._delay = self.conn_retry_interval # reset
# schedule periodic maintenance of sender links
self._link_maint_timer = self.processor.defer(self._purge_sender_links,
self._link_maint_timeout)
@@ -1061,7 +1064,7 @@ class Controller(pyngus.ConnectionEventHandler):
"""
LOG.debug("AMQP connection closed.")
# if the driver isn't being shutdown, failover and reconnect
- self._handle_connection_loss()
+ self._handle_connection_loss("AMQP connection closed.")
def connection_remote_closed(self, connection, reason):
"""This is a Pyngus callback, invoked by Pyngus when the peer has
@@ -1089,13 +1092,14 @@ class Controller(pyngus.ConnectionEventHandler):
{'hostname': self.hosts.current.hostname,
'port': self.hosts.current.port,
'username': self.hosts.current.username})
- # connection failure will be handled later
+ # pyngus will invoke connection_failed() eventually
- def _handle_connection_loss(self):
+ def _handle_connection_loss(self, reason):
"""The connection to the messaging service has been lost. Try to
reestablish the connection/failover if not shutting down the driver.
"""
self.addresser = None
+ self._socket_connection.close()
if self._closing:
# we're in the middle of shutting down the driver anyways,
# just consider it done:
@@ -1107,31 +1111,33 @@ class Controller(pyngus.ConnectionEventHandler):
self._reconnecting = True
LOG.info(_LI("delaying reconnect attempt for %d seconds"),
self._delay)
- self.processor.defer(self._do_reconnect, self._delay)
- self._delay = min(self._delay * 2, 60)
+ self.processor.defer(lambda: self._do_reconnect(reason),
+ self._delay)
+ self._delay = min(self._delay * self.conn_retry_backoff,
+ self.conn_retry_interval_max)
if self._link_maint_timer:
self._link_maint_timer.cancel()
self._link_maint_timer = None
- def _do_reconnect(self):
+ def _do_reconnect(self, reason):
"""Invoked on connection/socket failure, failover and re-connect to the
messaging service.
"""
self._reconnecting = False
if not self._closing:
- self._hard_reset()
+ self._hard_reset(reason)
host = self.hosts.next()
LOG.info(_LI("Reconnecting to: %(hostname)s:%(port)s"),
{'hostname': host.hostname, 'port': host.port})
self._socket_connection.connect(host)
- def _hard_reset(self):
+ def _hard_reset(self, reason):
"""Reset the controller to its pre-connection state"""
# note well: since this method destroys the connection, it cannot be
# invoked directly from a pyngus callback. Use processor.defer() to
# run this method on the main loop instead.
for sender in self._purged_senders:
- sender.destroy()
+ sender.destroy(reason)
del self._purged_senders[:]
self._active_senders.clear()
unused = []
@@ -1140,10 +1146,10 @@ class Controller(pyngus.ConnectionEventHandler):
if sender.pending_messages == 0:
unused.append(key)
else:
- sender.reset()
+ sender.reset(reason)
self._active_senders.add(key)
for key in unused:
- self._all_senders[key].destroy()
+ self._all_senders[key].destroy(reason)
del self._all_senders[key]
for servers in itervalues(self._servers):
for server in itervalues(servers):
@@ -1170,7 +1176,7 @@ class Controller(pyngus.ConnectionEventHandler):
if not self._closing:
# destroy links that have already been closed
for sender in self._purged_senders:
- sender.destroy()
+ sender.destroy("Idle link purged")
del self._purged_senders[:]
# determine next set to purge
diff --git a/oslo_messaging/_drivers/amqp1_driver/eventloop.py b/oslo_messaging/_drivers/amqp1_driver/eventloop.py
index 0f3b5da..c2e16fb 100644
--- a/oslo_messaging/_drivers/amqp1_driver/eventloop.py
+++ b/oslo_messaging/_drivers/amqp1_driver/eventloop.py
@@ -69,31 +69,27 @@ class _SocketConnection(object):
def read_socket(self):
"""Called to read from the socket."""
- while True:
+ if self.socket:
try:
- rc = pyngus.read_socket_input(self.pyngus_conn, self.socket)
+ pyngus.read_socket_input(self.pyngus_conn, self.socket)
self.pyngus_conn.process(now())
- return rc
except (socket.timeout, socket.error) as e:
# pyngus handles EAGAIN/EWOULDBLOCK and EINTER
self.pyngus_conn.close_input()
self.pyngus_conn.close_output()
self._handler.socket_error(str(e))
- return pyngus.Connection.EOS
def write_socket(self):
"""Called to write to the socket."""
- while True:
+ if self.socket:
try:
- rc = pyngus.write_socket_output(self.pyngus_conn, self.socket)
+ pyngus.write_socket_output(self.pyngus_conn, self.socket)
self.pyngus_conn.process(now())
- return rc
except (socket.timeout, socket.error) as e:
# pyngus handles EAGAIN/EWOULDBLOCK and EINTER
self.pyngus_conn.close_output()
self.pyngus_conn.close_input()
self._handler.socket_error(str(e))
- return pyngus.Connection.EOS
def connect(self, host):
"""Connect to host and start the AMQP protocol."""
@@ -358,7 +354,7 @@ class Thread(threading.Thread):
deadline = self._scheduler._next_deadline
pyngus_conn = self._connection and self._connection.pyngus_conn
- if pyngus_conn:
+ if pyngus_conn and self._connection.socket:
if pyngus_conn.needs_input:
readfds.append(self._connection)
if pyngus_conn.has_output:
@@ -388,13 +384,12 @@ class Thread(threading.Thread):
# Testing shows that polling improves latency over checking the
# lists returned by select()
self._requests.process_requests()
- if pyngus_conn:
- self._connection.read_socket()
- if pyngus_conn.deadline:
- _now = now()
- if pyngus_conn.deadline <= _now:
- pyngus_conn.process(_now)
- self._connection.write_socket()
+ self._connection.read_socket()
+ if pyngus_conn and pyngus_conn.deadline:
+ _now = now()
+ if pyngus_conn.deadline <= _now:
+ pyngus_conn.process(_now)
+ self._connection.write_socket()
self._scheduler._process() # run any deferred requests
diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py
index 11b1bae..eb6d11a 100644
--- a/oslo_messaging/_drivers/impl_rabbit.py
+++ b/oslo_messaging/_drivers/impl_rabbit.py
@@ -607,7 +607,8 @@ class Connection(object):
self._heartbeat_support_log_emitted = False
# NOTE(sileht): just ensure the connection is setuped at startup
- self.ensure_connection()
+ with self._connection_lock:
+ self.ensure_connection()
# NOTE(sileht): if purpose is PURPOSE_LISTEN
# the consume code does the heartbeat stuff
diff --git a/oslo_messaging/tests/drivers/test_amqp_driver.py b/oslo_messaging/tests/drivers/test_amqp_driver.py
index dd1f25b..12db37a 100644
--- a/oslo_messaging/tests/drivers/test_amqp_driver.py
+++ b/oslo_messaging/tests/drivers/test_amqp_driver.py
@@ -674,12 +674,12 @@ class TestAuthentication(test_utils.BaseTestCase):
target = oslo_messaging.Target(topic="test-topic")
_ListenerThread(
driver.listen(target, None, None)._poll_style_listener, 1)
- self.assertRaises(oslo_messaging.MessagingTimeout,
+ self.assertRaises(oslo_messaging.MessageDeliveryFailure,
driver.send,
target, {"context": True},
{"method": "echo"},
wait_for_reply=True,
- timeout=2.0)
+ retry=2)
driver.cleanup()
@@ -771,7 +771,6 @@ mech_list: ${mechs}
"""Verify that a bad password given in TransportHost is
rejected by the broker.
"""
-
addr = "amqp://joe:badpass@%s:%d" % (self._broker.host,
self._broker.port)
url = oslo_messaging.TransportURL.parse(self.conf, addr)
@@ -779,12 +778,15 @@ mech_list: ${mechs}
target = oslo_messaging.Target(topic="test-topic")
_ListenerThread(
driver.listen(target, None, None)._poll_style_listener, 1)
- self.assertRaises(oslo_messaging.MessagingTimeout,
- driver.send,
- target, {"context": True},
- {"method": "echo"},
- wait_for_reply=True,
- timeout=2.0)
+ try:
+ driver.send(target, {"context": True}, {"method": "echo"},
+ wait_for_reply=True, retry=2)
+ except oslo_messaging.MessageDeliveryFailure as e:
+ # verify the exception indicates the failure was an authentication
+ # error
+ self.assertTrue('amqp:unauthorized-access' in str(e))
+ else:
+ self.assertIsNone("Expected authentication failure")
driver.cleanup()
def test_authentication_bad_mechs(self):
@@ -800,12 +802,12 @@ mech_list: ${mechs}
target = oslo_messaging.Target(topic="test-topic")
_ListenerThread(
driver.listen(target, None, None)._poll_style_listener, 1)
- self.assertRaises(oslo_messaging.MessagingTimeout,
+ self.assertRaises(oslo_messaging.MessageDeliveryFailure,
driver.send,
target, {"context": True},
{"method": "echo"},
wait_for_reply=True,
- timeout=2.0)
+ retry=0)
driver.cleanup()
def test_authentication_default_username(self):
diff --git a/oslo_messaging/tests/functional/gate/post_test_hook.sh b/oslo_messaging/tests/functional/gate/post_test_hook.sh
index fb8778b..107988b 100755
--- a/oslo_messaging/tests/functional/gate/post_test_hook.sh
+++ b/oslo_messaging/tests/functional/gate/post_test_hook.sh
@@ -14,7 +14,16 @@
# This script is executed inside post_test_hook function in devstack gate.
-RPC_BACKEND=$1
+
+# NOTE(sileht): we have to rename the zmq tox target to shorter name
+# this is to not break the current dsvm gating system until we switch to the tox one
+case $1 in
+ zeromq) RPC_BACKEND=zmq;;
+ zeromq-proxy) RPC_BACKEND=zmq-proxy;;
+ zeromq-pub-sub) RPC_BACKEND=zmq-pubsub;;
+ *) RPC_BACKEND=$1;;
+esac
+
PYTHON=${2:-py27}
function generate_testr_results {
@@ -43,7 +52,7 @@ fi
# Install required packages
case $RPC_BACKEND in
- zeromq|zeromq-proxy|zeromq-pub-sub)
+ zmq|zmq-proxy|zmq-pubsub)
sudo apt-get update -y
sudo apt-get install -y redis-server python-redis
;;
diff --git a/requirements.txt b/requirements.txt
index 7884b12..7ef1b53 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -20,11 +20,6 @@ monotonic>=0.6 # Apache-2.0
six>=1.9.0 # MIT
cachetools>=1.1.0 # MIT License
-# FIXME(markmc): remove this when the drivers no longer import eventlet
-
-eventlet!=0.18.3,>=0.18.2 # MIT
-greenlet>=0.3.2 # MIT
-
WebOb>=1.6.0 # MIT
# for the routing notifier
diff --git a/test-requirements.txt b/test-requirements.txt
index e96097f..d03cceb 100644
--- a/test-requirements.txt
+++ b/test-requirements.txt
@@ -44,3 +44,6 @@ pyngus>=2.0.2 # Apache-2.0
# Bandit security code scanner
bandit>=1.1.0 # Apache-2.0
+
+eventlet!=0.18.3,>=0.18.2 # MIT
+greenlet>=0.3.2 # MIT
diff --git a/tox.ini b/tox.ini
index 1cad49a..f99ff13 100644
--- a/tox.ini
+++ b/tox.ini
@@ -90,20 +90,19 @@ setenv =
WORKDIR={toxworkdir}
commands = {toxinidir}/setup-test-env-amqp1.sh python setup.py testr --slowest --testr-args='oslo_messaging.tests.functional'
-[testenv:py27-func-zeromq]
+[testenv:py27-func-zmq]
commands = {toxinidir}/setup-test-env-zmq.sh python setup.py testr --slowest --testr-args='{posargs:oslo_messaging.tests.functional}'
-[testenv:py34-func-zeromq]
-basepython = python3.4
+[testenv:py34-func-zmq]
commands = {toxinidir}/setup-test-env-zmq.sh python setup.py testr --slowest --testr-args='{posargs:oslo_messaging.tests.functional}'
-[testenv:py27-func-zeromq-direct-static]
+[testenv:py27-func-zmq-direct]
commands = {toxinidir}/setup-test-env-zmq-direct-static.sh python setup.py testr --slowest --testr-args='{posargs:oslo_messaging.tests.functional}'
-[testenv:py27-func-zeromq-proxy]
+[testenv:py27-func-zmq-proxy]
commands = {toxinidir}/setup-test-env-zmq-proxy.sh python setup.py testr --slowest --testr-args='{posargs:oslo_messaging.tests.functional}'
-[testenv:py27-func-zeromq-pub-sub]
+[testenv:py27-func-zmq-pubsub]
commands = {toxinidir}/setup-test-env-zmq-pub-sub.sh python setup.py testr --slowest --testr-args='{posargs:oslo_messaging.tests.functional}'
[testenv:bandit]