summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMehdi Abaakouk <sileht@redhat.com>2015-12-02 10:13:18 +0100
committerMehdi Abaakouk <sileht@redhat.com>2015-12-04 15:25:03 +0100
commitc1d0412e2d5b437b06d8729bbe2cdaea594427be (patch)
tree8466dc4a6750dc3db77f1464bc1b01e18e6c9f91
parent6ad70713a3316dd2003ff1f73db573b674a6f20f (diff)
downloadoslo-messaging-c1d0412e2d5b437b06d8729bbe2cdaea594427be.tar.gz
kombu: remove compat of folsom reply format
This change removes codepath where _reply_q is not present in the message dict. This kind of messages have been deprecated in grizzly and cannot be emitted since havana. https://github.com/openstack/oslo-incubator/commit/70891c271e011f59792933eaf65c3214493ef14a Change-Id: I20558d9fae8f56970c967aa0def77cfb2a1ca3ec
-rw-r--r--oslo_messaging/_drivers/amqpdriver.py46
1 files changed, 15 insertions, 31 deletions
diff --git a/oslo_messaging/_drivers/amqpdriver.py b/oslo_messaging/_drivers/amqpdriver.py
index d03fa65..f8deac8 100644
--- a/oslo_messaging/_drivers/amqpdriver.py
+++ b/oslo_messaging/_drivers/amqpdriver.py
@@ -49,9 +49,8 @@ class AMQPIncomingMessage(base.IncomingMessage):
self._obsolete_reply_queues = obsolete_reply_queues
def _send_reply(self, conn, reply=None, failure=None, log_failure=True):
- if (self.reply_q and
- not self._obsolete_reply_queues.reply_q_valid(self.reply_q,
- self.msg_id)):
+ if not self._obsolete_reply_queues.reply_q_valid(self.reply_q,
+ self.msg_id):
return
if failure:
@@ -59,27 +58,17 @@ class AMQPIncomingMessage(base.IncomingMessage):
log_failure)
# NOTE(sileht): ending can be removed in N*, see Listener.wait()
# for more detail.
- msg = {'result': reply, 'failure': failure, 'ending': True}
+ msg = {'result': reply, 'failure': failure, 'ending': True,
+ '_msg_id': self.msg_id}
rpc_amqp._add_unique_id(msg)
unique_id = msg[rpc_amqp.UNIQUE_ID]
- # If a reply_q exists, add the msg_id to the reply and pass the
- # reply_q to direct_send() to use it as the response queue.
- # Otherwise use the msg_id for backward compatibility.
- if self.reply_q:
- msg['_msg_id'] = self.msg_id
- LOG.debug("sending reply msg_id: %(msg_id)s "
- "reply queue: %(reply_q)s" % {
- 'msg_id': self.msg_id,
- 'unique_id': unique_id,
- 'reply_q': self.reply_q})
- conn.direct_send(self.reply_q, rpc_common.serialize_msg(msg))
- else:
- # TODO(sileht): look at which version of oslo-incubator rpc
- # send need this, but I guess this is older than icehouse
- # if this is icehouse, we can drop this at Mitaka
- # if this is havana, we can drop this now.
- conn.direct_send(self.msg_id, rpc_common.serialize_msg(msg))
+ LOG.debug("sending reply msg_id: %(msg_id)s "
+ "reply queue: %(reply_q)s" % {
+ 'msg_id': self.msg_id,
+ 'unique_id': unique_id,
+ 'reply_q': self.reply_q})
+ conn.direct_send(self.reply_q, rpc_common.serialize_msg(msg))
def reply(self, reply=None, failure=None, log_failure=True):
if not self.msg_id:
@@ -87,10 +76,9 @@ class AMQPIncomingMessage(base.IncomingMessage):
# because reply should not be expected by caller side
return
- # NOTE(sileht): return without using a connection if possible
- if (self.reply_q and
- not self._obsolete_reply_queues.reply_q_valid(self.reply_q,
- self.msg_id)):
+ # NOTE(sileht): return without hold the a connection if possible
+ if not self._obsolete_reply_queues.reply_q_valid(self.reply_q,
+ self.msg_id):
return
# NOTE(sileht): we read the configuration value from the driver
@@ -204,12 +192,8 @@ class AMQPListener(base.Listener):
unique_id = self.msg_id_cache.check_duplicate_message(message)
- if ctxt.reply_q:
- LOG.debug(
- "received message msg_id: %(msg_id)s reply to %(queue)s" % {
- 'queue': ctxt.reply_q, 'msg_id': ctxt.msg_id})
- else:
- LOG.debug("received message unique_id: %s " % unique_id)
+ LOG.debug("received message msg_id: %(msg_id)s reply to %(queue)s" % {
+ 'queue': ctxt.reply_q, 'msg_id': ctxt.msg_id})
self.incoming.append(AMQPIncomingMessage(self,
ctxt.to_dict(),