diff options
author | Jenkins <jenkins@review.openstack.org> | 2015-12-09 17:22:00 +0000 |
---|---|---|
committer | Gerrit Code Review <review@openstack.org> | 2015-12-09 17:22:00 +0000 |
commit | 7f08805bc9908a374681202b3e14d4e311eaefcf (patch) | |
tree | 2db2dad0790ea8a007050ac6d56db4e3dc0a2b2a | |
parent | fdfc98c6a259ad83dee775f77028d0f0b2887243 (diff) | |
parent | c1d0412e2d5b437b06d8729bbe2cdaea594427be (diff) | |
download | oslo-messaging-7f08805bc9908a374681202b3e14d4e311eaefcf.tar.gz |
Merge "kombu: remove compat of folsom reply format"
-rw-r--r-- | oslo_messaging/_drivers/amqpdriver.py | 46 |
1 files changed, 15 insertions, 31 deletions
diff --git a/oslo_messaging/_drivers/amqpdriver.py b/oslo_messaging/_drivers/amqpdriver.py index d03fa65..f8deac8 100644 --- a/oslo_messaging/_drivers/amqpdriver.py +++ b/oslo_messaging/_drivers/amqpdriver.py @@ -49,9 +49,8 @@ class AMQPIncomingMessage(base.IncomingMessage): self._obsolete_reply_queues = obsolete_reply_queues def _send_reply(self, conn, reply=None, failure=None, log_failure=True): - if (self.reply_q and - not self._obsolete_reply_queues.reply_q_valid(self.reply_q, - self.msg_id)): + if not self._obsolete_reply_queues.reply_q_valid(self.reply_q, + self.msg_id): return if failure: @@ -59,27 +58,17 @@ class AMQPIncomingMessage(base.IncomingMessage): log_failure) # NOTE(sileht): ending can be removed in N*, see Listener.wait() # for more detail. - msg = {'result': reply, 'failure': failure, 'ending': True} + msg = {'result': reply, 'failure': failure, 'ending': True, + '_msg_id': self.msg_id} rpc_amqp._add_unique_id(msg) unique_id = msg[rpc_amqp.UNIQUE_ID] - # If a reply_q exists, add the msg_id to the reply and pass the - # reply_q to direct_send() to use it as the response queue. - # Otherwise use the msg_id for backward compatibility. - if self.reply_q: - msg['_msg_id'] = self.msg_id - LOG.debug("sending reply msg_id: %(msg_id)s " - "reply queue: %(reply_q)s" % { - 'msg_id': self.msg_id, - 'unique_id': unique_id, - 'reply_q': self.reply_q}) - conn.direct_send(self.reply_q, rpc_common.serialize_msg(msg)) - else: - # TODO(sileht): look at which version of oslo-incubator rpc - # send need this, but I guess this is older than icehouse - # if this is icehouse, we can drop this at Mitaka - # if this is havana, we can drop this now. - conn.direct_send(self.msg_id, rpc_common.serialize_msg(msg)) + LOG.debug("sending reply msg_id: %(msg_id)s " + "reply queue: %(reply_q)s" % { + 'msg_id': self.msg_id, + 'unique_id': unique_id, + 'reply_q': self.reply_q}) + conn.direct_send(self.reply_q, rpc_common.serialize_msg(msg)) def reply(self, reply=None, failure=None, log_failure=True): if not self.msg_id: @@ -87,10 +76,9 @@ class AMQPIncomingMessage(base.IncomingMessage): # because reply should not be expected by caller side return - # NOTE(sileht): return without using a connection if possible - if (self.reply_q and - not self._obsolete_reply_queues.reply_q_valid(self.reply_q, - self.msg_id)): + # NOTE(sileht): return without hold the a connection if possible + if not self._obsolete_reply_queues.reply_q_valid(self.reply_q, + self.msg_id): return # NOTE(sileht): we read the configuration value from the driver @@ -204,12 +192,8 @@ class AMQPListener(base.Listener): unique_id = self.msg_id_cache.check_duplicate_message(message) - if ctxt.reply_q: - LOG.debug( - "received message msg_id: %(msg_id)s reply to %(queue)s" % { - 'queue': ctxt.reply_q, 'msg_id': ctxt.msg_id}) - else: - LOG.debug("received message unique_id: %s " % unique_id) + LOG.debug("received message msg_id: %(msg_id)s reply to %(queue)s" % { + 'queue': ctxt.reply_q, 'msg_id': ctxt.msg_id}) self.incoming.append(AMQPIncomingMessage(self, ctxt.to_dict(), |