diff options
Diffstat (limited to 'oslo_messaging/_drivers/amqpdriver.py')
-rw-r--r-- | oslo_messaging/_drivers/amqpdriver.py | 40 |
1 files changed, 12 insertions, 28 deletions
diff --git a/oslo_messaging/_drivers/amqpdriver.py b/oslo_messaging/_drivers/amqpdriver.py index e95edfc..d03fa65 100644 --- a/oslo_messaging/_drivers/amqpdriver.py +++ b/oslo_messaging/_drivers/amqpdriver.py @@ -48,8 +48,7 @@ class AMQPIncomingMessage(base.IncomingMessage): self.requeue_callback = message.requeue self._obsolete_reply_queues = obsolete_reply_queues - def _send_reply(self, conn, reply=None, failure=None, - ending=False, log_failure=True): + def _send_reply(self, conn, reply=None, failure=None, log_failure=True): if (self.reply_q and not self._obsolete_reply_queues.reply_q_valid(self.reply_q, self.msg_id)): @@ -58,11 +57,9 @@ class AMQPIncomingMessage(base.IncomingMessage): if failure: failure = rpc_common.serialize_remote_exception(failure, log_failure) - - msg = {'result': reply, 'failure': failure} - if ending: - msg['ending'] = True - + # NOTE(sileht): ending can be removed in N*, see Listener.wait() + # for more detail. + msg = {'result': reply, 'failure': failure, 'ending': True} rpc_amqp._add_unique_id(msg) unique_id = msg[rpc_amqp.UNIQUE_ID] @@ -71,12 +68,11 @@ class AMQPIncomingMessage(base.IncomingMessage): # Otherwise use the msg_id for backward compatibility. if self.reply_q: msg['_msg_id'] = self.msg_id - if ending: - LOG.debug("sending reply msg_id: %(msg_id)s " - "reply queue: %(reply_q)s" % { - 'msg_id': self.msg_id, - 'unique_id': unique_id, - 'reply_q': self.reply_q}) + LOG.debug("sending reply msg_id: %(msg_id)s " + "reply queue: %(reply_q)s" % { + 'msg_id': self.msg_id, + 'unique_id': unique_id, + 'reply_q': self.reply_q}) conn.direct_send(self.reply_q, rpc_common.serialize_msg(msg)) else: # TODO(sileht): look at which version of oslo-incubator rpc @@ -104,21 +100,12 @@ class AMQPIncomingMessage(base.IncomingMessage): timer = rpc_common.DecayingTimer(duration=duration) timer.start() - first_reply_sent = False while True: try: with self.listener.driver._get_connection( rpc_common.PURPOSE_SEND) as conn: - if self.listener.driver.send_single_reply: - self._send_reply(conn, reply, failure, - log_failure=log_failure, - ending=True) - else: - if not first_reply_sent: - self._send_reply(conn, reply, failure, - log_failure=log_failure) - first_reply_sent = True - self._send_reply(conn, ending=True) + self._send_reply(conn, reply, failure, + log_failure=log_failure) return except rpc_amqp.AMQPDestinationNotFound: if timer.check_return() > 0: @@ -378,8 +365,7 @@ class AMQPDriverBase(base.BaseDriver): missing_destination_retry_timeout = 0 def __init__(self, conf, url, connection_pool, - default_exchange=None, allowed_remote_exmods=None, - send_single_reply=False): + default_exchange=None, allowed_remote_exmods=None): super(AMQPDriverBase, self).__init__(conf, url, default_exchange, allowed_remote_exmods) @@ -392,8 +378,6 @@ class AMQPDriverBase(base.BaseDriver): self._reply_q_conn = None self._waiter = None - self.send_single_reply = send_single_reply - def _get_exchange(self, target): return target.exchange or self._default_exchange |