summaryrefslogtreecommitdiff
path: root/oslo_messaging/_drivers/amqpdriver.py
diff options
context:
space:
mode:
Diffstat (limited to 'oslo_messaging/_drivers/amqpdriver.py')
-rw-r--r--oslo_messaging/_drivers/amqpdriver.py40
1 files changed, 12 insertions, 28 deletions
diff --git a/oslo_messaging/_drivers/amqpdriver.py b/oslo_messaging/_drivers/amqpdriver.py
index e95edfc..d03fa65 100644
--- a/oslo_messaging/_drivers/amqpdriver.py
+++ b/oslo_messaging/_drivers/amqpdriver.py
@@ -48,8 +48,7 @@ class AMQPIncomingMessage(base.IncomingMessage):
self.requeue_callback = message.requeue
self._obsolete_reply_queues = obsolete_reply_queues
- def _send_reply(self, conn, reply=None, failure=None,
- ending=False, log_failure=True):
+ def _send_reply(self, conn, reply=None, failure=None, log_failure=True):
if (self.reply_q and
not self._obsolete_reply_queues.reply_q_valid(self.reply_q,
self.msg_id)):
@@ -58,11 +57,9 @@ class AMQPIncomingMessage(base.IncomingMessage):
if failure:
failure = rpc_common.serialize_remote_exception(failure,
log_failure)
-
- msg = {'result': reply, 'failure': failure}
- if ending:
- msg['ending'] = True
-
+ # NOTE(sileht): ending can be removed in N*, see Listener.wait()
+ # for more detail.
+ msg = {'result': reply, 'failure': failure, 'ending': True}
rpc_amqp._add_unique_id(msg)
unique_id = msg[rpc_amqp.UNIQUE_ID]
@@ -71,12 +68,11 @@ class AMQPIncomingMessage(base.IncomingMessage):
# Otherwise use the msg_id for backward compatibility.
if self.reply_q:
msg['_msg_id'] = self.msg_id
- if ending:
- LOG.debug("sending reply msg_id: %(msg_id)s "
- "reply queue: %(reply_q)s" % {
- 'msg_id': self.msg_id,
- 'unique_id': unique_id,
- 'reply_q': self.reply_q})
+ LOG.debug("sending reply msg_id: %(msg_id)s "
+ "reply queue: %(reply_q)s" % {
+ 'msg_id': self.msg_id,
+ 'unique_id': unique_id,
+ 'reply_q': self.reply_q})
conn.direct_send(self.reply_q, rpc_common.serialize_msg(msg))
else:
# TODO(sileht): look at which version of oslo-incubator rpc
@@ -104,21 +100,12 @@ class AMQPIncomingMessage(base.IncomingMessage):
timer = rpc_common.DecayingTimer(duration=duration)
timer.start()
- first_reply_sent = False
while True:
try:
with self.listener.driver._get_connection(
rpc_common.PURPOSE_SEND) as conn:
- if self.listener.driver.send_single_reply:
- self._send_reply(conn, reply, failure,
- log_failure=log_failure,
- ending=True)
- else:
- if not first_reply_sent:
- self._send_reply(conn, reply, failure,
- log_failure=log_failure)
- first_reply_sent = True
- self._send_reply(conn, ending=True)
+ self._send_reply(conn, reply, failure,
+ log_failure=log_failure)
return
except rpc_amqp.AMQPDestinationNotFound:
if timer.check_return() > 0:
@@ -378,8 +365,7 @@ class AMQPDriverBase(base.BaseDriver):
missing_destination_retry_timeout = 0
def __init__(self, conf, url, connection_pool,
- default_exchange=None, allowed_remote_exmods=None,
- send_single_reply=False):
+ default_exchange=None, allowed_remote_exmods=None):
super(AMQPDriverBase, self).__init__(conf, url, default_exchange,
allowed_remote_exmods)
@@ -392,8 +378,6 @@ class AMQPDriverBase(base.BaseDriver):
self._reply_q_conn = None
self._waiter = None
- self.send_single_reply = send_single_reply
-
def _get_exchange(self, target):
return target.exchange or self._default_exchange