summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMehdi Abaakouk <sileht@redhat.com>2015-12-02 10:02:01 +0100
committerMehdi Abaakouk <sileht@redhat.com>2015-12-04 15:25:03 +0100
commit6ad70713a3316dd2003ff1f73db573b674a6f20f (patch)
treeaa65ef9849ad1434e53c3d8f86ea75600c3e0920
parentee240fbb8d9dc73200fbc58f6e8c52660e645dc0 (diff)
downloadoslo-messaging-6ad70713a3316dd2003ff1f73db573b674a6f20f.tar.gz
Follow the plan about the single reply message
This change removes the "send_single_reply" option as planned in the bp: http://specs.openstack.org/openstack/oslo-specs/specs/liberty/oslo.messaging-remove-double-reply.html Change-Id: Ib88de71cb2008a49a25f302d5e47ed587154d402
-rw-r--r--oslo_messaging/_drivers/amqp.py12
-rw-r--r--oslo_messaging/_drivers/amqpdriver.py40
-rw-r--r--oslo_messaging/_drivers/impl_rabbit.py3
-rw-r--r--oslo_messaging/tests/drivers/test_impl_rabbit.py11
4 files changed, 14 insertions, 52 deletions
diff --git a/oslo_messaging/_drivers/amqp.py b/oslo_messaging/_drivers/amqp.py
index 86f41ad..06a59f8 100644
--- a/oslo_messaging/_drivers/amqp.py
+++ b/oslo_messaging/_drivers/amqp.py
@@ -48,18 +48,6 @@ amqp_opts = [
default=False,
deprecated_group='DEFAULT',
help='Auto-delete queues in AMQP.'),
- cfg.BoolOpt('send_single_reply',
- default=False,
- help='Send a single AMQP reply to call message. The current '
- 'behaviour since oslo-incubator is to send two AMQP '
- 'replies - first one with the payload, a second one to '
- 'ensure the other have finish to send the payload. We '
- 'are going to remove it in the N release, but we must '
- 'keep backward compatible at the same time. This option '
- 'provides such compatibility - it defaults to False in '
- 'Liberty and can be turned on for early adopters with a '
- 'new installations or for testing. Please note, that '
- 'this option will be removed in the Mitaka release.')
]
UNIQUE_ID = '_unique_id'
diff --git a/oslo_messaging/_drivers/amqpdriver.py b/oslo_messaging/_drivers/amqpdriver.py
index e95edfc..d03fa65 100644
--- a/oslo_messaging/_drivers/amqpdriver.py
+++ b/oslo_messaging/_drivers/amqpdriver.py
@@ -48,8 +48,7 @@ class AMQPIncomingMessage(base.IncomingMessage):
self.requeue_callback = message.requeue
self._obsolete_reply_queues = obsolete_reply_queues
- def _send_reply(self, conn, reply=None, failure=None,
- ending=False, log_failure=True):
+ def _send_reply(self, conn, reply=None, failure=None, log_failure=True):
if (self.reply_q and
not self._obsolete_reply_queues.reply_q_valid(self.reply_q,
self.msg_id)):
@@ -58,11 +57,9 @@ class AMQPIncomingMessage(base.IncomingMessage):
if failure:
failure = rpc_common.serialize_remote_exception(failure,
log_failure)
-
- msg = {'result': reply, 'failure': failure}
- if ending:
- msg['ending'] = True
-
+ # NOTE(sileht): ending can be removed in N*, see Listener.wait()
+ # for more detail.
+ msg = {'result': reply, 'failure': failure, 'ending': True}
rpc_amqp._add_unique_id(msg)
unique_id = msg[rpc_amqp.UNIQUE_ID]
@@ -71,12 +68,11 @@ class AMQPIncomingMessage(base.IncomingMessage):
# Otherwise use the msg_id for backward compatibility.
if self.reply_q:
msg['_msg_id'] = self.msg_id
- if ending:
- LOG.debug("sending reply msg_id: %(msg_id)s "
- "reply queue: %(reply_q)s" % {
- 'msg_id': self.msg_id,
- 'unique_id': unique_id,
- 'reply_q': self.reply_q})
+ LOG.debug("sending reply msg_id: %(msg_id)s "
+ "reply queue: %(reply_q)s" % {
+ 'msg_id': self.msg_id,
+ 'unique_id': unique_id,
+ 'reply_q': self.reply_q})
conn.direct_send(self.reply_q, rpc_common.serialize_msg(msg))
else:
# TODO(sileht): look at which version of oslo-incubator rpc
@@ -104,21 +100,12 @@ class AMQPIncomingMessage(base.IncomingMessage):
timer = rpc_common.DecayingTimer(duration=duration)
timer.start()
- first_reply_sent = False
while True:
try:
with self.listener.driver._get_connection(
rpc_common.PURPOSE_SEND) as conn:
- if self.listener.driver.send_single_reply:
- self._send_reply(conn, reply, failure,
- log_failure=log_failure,
- ending=True)
- else:
- if not first_reply_sent:
- self._send_reply(conn, reply, failure,
- log_failure=log_failure)
- first_reply_sent = True
- self._send_reply(conn, ending=True)
+ self._send_reply(conn, reply, failure,
+ log_failure=log_failure)
return
except rpc_amqp.AMQPDestinationNotFound:
if timer.check_return() > 0:
@@ -378,8 +365,7 @@ class AMQPDriverBase(base.BaseDriver):
missing_destination_retry_timeout = 0
def __init__(self, conf, url, connection_pool,
- default_exchange=None, allowed_remote_exmods=None,
- send_single_reply=False):
+ default_exchange=None, allowed_remote_exmods=None):
super(AMQPDriverBase, self).__init__(conf, url, default_exchange,
allowed_remote_exmods)
@@ -392,8 +378,6 @@ class AMQPDriverBase(base.BaseDriver):
self._reply_q_conn = None
self._waiter = None
- self.send_single_reply = send_single_reply
-
def _get_exchange(self, target):
return target.exchange or self._default_exchange
diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py
index b9ff363..f0110d9 100644
--- a/oslo_messaging/_drivers/impl_rabbit.py
+++ b/oslo_messaging/_drivers/impl_rabbit.py
@@ -1139,8 +1139,7 @@ class RabbitDriver(amqpdriver.AMQPDriverBase):
conf, url,
connection_pool,
default_exchange,
- allowed_remote_exmods,
- conf.oslo_messaging_rabbit.send_single_reply,
+ allowed_remote_exmods
)
def require_features(self, requeue=True):
diff --git a/oslo_messaging/tests/drivers/test_impl_rabbit.py b/oslo_messaging/tests/drivers/test_impl_rabbit.py
index 52cbfe1..9fdd211 100644
--- a/oslo_messaging/tests/drivers/test_impl_rabbit.py
+++ b/oslo_messaging/tests/drivers/test_impl_rabbit.py
@@ -28,7 +28,6 @@ from oslotest import mockpatch
import testscenarios
import oslo_messaging
-from oslo_messaging._drivers import amqp
from oslo_messaging._drivers import amqpdriver
from oslo_messaging._drivers import common as driver_common
from oslo_messaging._drivers import impl_rabbit as rabbit_driver
@@ -363,11 +362,6 @@ class TestSendReceive(test_utils.BaseTestCase):
('timeout', dict(timeout=0.01)), # FIXME(markmc): timeout=0 is broken?
]
- _reply_ending = [
- ('old_behavior', dict(send_single_reply=False)),
- ('new_behavior', dict(send_single_reply=True)),
- ]
-
@classmethod
def generate_scenarios(cls):
cls.scenarios = testscenarios.multiply_scenarios(cls._n_senders,
@@ -375,16 +369,13 @@ class TestSendReceive(test_utils.BaseTestCase):
cls._reply,
cls._reply_fail,
cls._failure,
- cls._timeout,
- cls._reply_ending)
+ cls._timeout)
def test_send_receive(self):
self.config(kombu_missing_consumer_retry_timeout=0.5,
group="oslo_messaging_rabbit")
self.config(heartbeat_timeout_threshold=0,
group="oslo_messaging_rabbit")
- self.config(send_single_reply=self.send_single_reply,
- group="oslo_messaging_rabbit")
transport = oslo_messaging.get_transport(self.conf,
'kombu+memory:////')
self.addCleanup(transport.cleanup)