diff options
author | Mehdi Abaakouk <sileht@redhat.com> | 2015-12-02 10:13:18 +0100 |
---|---|---|
committer | Mehdi Abaakouk <sileht@redhat.com> | 2015-12-04 15:25:03 +0100 |
commit | c1d0412e2d5b437b06d8729bbe2cdaea594427be (patch) | |
tree | 8466dc4a6750dc3db77f1464bc1b01e18e6c9f91 | |
parent | 6ad70713a3316dd2003ff1f73db573b674a6f20f (diff) | |
download | oslo-messaging-c1d0412e2d5b437b06d8729bbe2cdaea594427be.tar.gz |
kombu: remove compat of folsom reply format
This change removes codepath where _reply_q is not
present in the message dict.
This kind of messages have been deprecated in grizzly and cannot
be emitted since havana.
https://github.com/openstack/oslo-incubator/commit/70891c271e011f59792933eaf65c3214493ef14a
Change-Id: I20558d9fae8f56970c967aa0def77cfb2a1ca3ec
-rw-r--r-- | oslo_messaging/_drivers/amqpdriver.py | 46 |
1 files changed, 15 insertions, 31 deletions
diff --git a/oslo_messaging/_drivers/amqpdriver.py b/oslo_messaging/_drivers/amqpdriver.py index d03fa65..f8deac8 100644 --- a/oslo_messaging/_drivers/amqpdriver.py +++ b/oslo_messaging/_drivers/amqpdriver.py @@ -49,9 +49,8 @@ class AMQPIncomingMessage(base.IncomingMessage): self._obsolete_reply_queues = obsolete_reply_queues def _send_reply(self, conn, reply=None, failure=None, log_failure=True): - if (self.reply_q and - not self._obsolete_reply_queues.reply_q_valid(self.reply_q, - self.msg_id)): + if not self._obsolete_reply_queues.reply_q_valid(self.reply_q, + self.msg_id): return if failure: @@ -59,27 +58,17 @@ class AMQPIncomingMessage(base.IncomingMessage): log_failure) # NOTE(sileht): ending can be removed in N*, see Listener.wait() # for more detail. - msg = {'result': reply, 'failure': failure, 'ending': True} + msg = {'result': reply, 'failure': failure, 'ending': True, + '_msg_id': self.msg_id} rpc_amqp._add_unique_id(msg) unique_id = msg[rpc_amqp.UNIQUE_ID] - # If a reply_q exists, add the msg_id to the reply and pass the - # reply_q to direct_send() to use it as the response queue. - # Otherwise use the msg_id for backward compatibility. - if self.reply_q: - msg['_msg_id'] = self.msg_id - LOG.debug("sending reply msg_id: %(msg_id)s " - "reply queue: %(reply_q)s" % { - 'msg_id': self.msg_id, - 'unique_id': unique_id, - 'reply_q': self.reply_q}) - conn.direct_send(self.reply_q, rpc_common.serialize_msg(msg)) - else: - # TODO(sileht): look at which version of oslo-incubator rpc - # send need this, but I guess this is older than icehouse - # if this is icehouse, we can drop this at Mitaka - # if this is havana, we can drop this now. - conn.direct_send(self.msg_id, rpc_common.serialize_msg(msg)) + LOG.debug("sending reply msg_id: %(msg_id)s " + "reply queue: %(reply_q)s" % { + 'msg_id': self.msg_id, + 'unique_id': unique_id, + 'reply_q': self.reply_q}) + conn.direct_send(self.reply_q, rpc_common.serialize_msg(msg)) def reply(self, reply=None, failure=None, log_failure=True): if not self.msg_id: @@ -87,10 +76,9 @@ class AMQPIncomingMessage(base.IncomingMessage): # because reply should not be expected by caller side return - # NOTE(sileht): return without using a connection if possible - if (self.reply_q and - not self._obsolete_reply_queues.reply_q_valid(self.reply_q, - self.msg_id)): + # NOTE(sileht): return without hold the a connection if possible + if not self._obsolete_reply_queues.reply_q_valid(self.reply_q, + self.msg_id): return # NOTE(sileht): we read the configuration value from the driver @@ -204,12 +192,8 @@ class AMQPListener(base.Listener): unique_id = self.msg_id_cache.check_duplicate_message(message) - if ctxt.reply_q: - LOG.debug( - "received message msg_id: %(msg_id)s reply to %(queue)s" % { - 'queue': ctxt.reply_q, 'msg_id': ctxt.msg_id}) - else: - LOG.debug("received message unique_id: %s " % unique_id) + LOG.debug("received message msg_id: %(msg_id)s reply to %(queue)s" % { + 'queue': ctxt.reply_q, 'msg_id': ctxt.msg_id}) self.incoming.append(AMQPIncomingMessage(self, ctxt.to_dict(), |