summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2015-12-09 17:22:00 +0000
committerGerrit Code Review <review@openstack.org>2015-12-09 17:22:00 +0000
commit7f08805bc9908a374681202b3e14d4e311eaefcf (patch)
tree2db2dad0790ea8a007050ac6d56db4e3dc0a2b2a
parentfdfc98c6a259ad83dee775f77028d0f0b2887243 (diff)
parentc1d0412e2d5b437b06d8729bbe2cdaea594427be (diff)
downloadoslo-messaging-7f08805bc9908a374681202b3e14d4e311eaefcf.tar.gz
Merge "kombu: remove compat of folsom reply format"
-rw-r--r--oslo_messaging/_drivers/amqpdriver.py46
1 files changed, 15 insertions, 31 deletions
diff --git a/oslo_messaging/_drivers/amqpdriver.py b/oslo_messaging/_drivers/amqpdriver.py
index d03fa65..f8deac8 100644
--- a/oslo_messaging/_drivers/amqpdriver.py
+++ b/oslo_messaging/_drivers/amqpdriver.py
@@ -49,9 +49,8 @@ class AMQPIncomingMessage(base.IncomingMessage):
self._obsolete_reply_queues = obsolete_reply_queues
def _send_reply(self, conn, reply=None, failure=None, log_failure=True):
- if (self.reply_q and
- not self._obsolete_reply_queues.reply_q_valid(self.reply_q,
- self.msg_id)):
+ if not self._obsolete_reply_queues.reply_q_valid(self.reply_q,
+ self.msg_id):
return
if failure:
@@ -59,27 +58,17 @@ class AMQPIncomingMessage(base.IncomingMessage):
log_failure)
# NOTE(sileht): ending can be removed in N*, see Listener.wait()
# for more detail.
- msg = {'result': reply, 'failure': failure, 'ending': True}
+ msg = {'result': reply, 'failure': failure, 'ending': True,
+ '_msg_id': self.msg_id}
rpc_amqp._add_unique_id(msg)
unique_id = msg[rpc_amqp.UNIQUE_ID]
- # If a reply_q exists, add the msg_id to the reply and pass the
- # reply_q to direct_send() to use it as the response queue.
- # Otherwise use the msg_id for backward compatibility.
- if self.reply_q:
- msg['_msg_id'] = self.msg_id
- LOG.debug("sending reply msg_id: %(msg_id)s "
- "reply queue: %(reply_q)s" % {
- 'msg_id': self.msg_id,
- 'unique_id': unique_id,
- 'reply_q': self.reply_q})
- conn.direct_send(self.reply_q, rpc_common.serialize_msg(msg))
- else:
- # TODO(sileht): look at which version of oslo-incubator rpc
- # send need this, but I guess this is older than icehouse
- # if this is icehouse, we can drop this at Mitaka
- # if this is havana, we can drop this now.
- conn.direct_send(self.msg_id, rpc_common.serialize_msg(msg))
+ LOG.debug("sending reply msg_id: %(msg_id)s "
+ "reply queue: %(reply_q)s" % {
+ 'msg_id': self.msg_id,
+ 'unique_id': unique_id,
+ 'reply_q': self.reply_q})
+ conn.direct_send(self.reply_q, rpc_common.serialize_msg(msg))
def reply(self, reply=None, failure=None, log_failure=True):
if not self.msg_id:
@@ -87,10 +76,9 @@ class AMQPIncomingMessage(base.IncomingMessage):
# because reply should not be expected by caller side
return
- # NOTE(sileht): return without using a connection if possible
- if (self.reply_q and
- not self._obsolete_reply_queues.reply_q_valid(self.reply_q,
- self.msg_id)):
+ # NOTE(sileht): return without hold the a connection if possible
+ if not self._obsolete_reply_queues.reply_q_valid(self.reply_q,
+ self.msg_id):
return
# NOTE(sileht): we read the configuration value from the driver
@@ -204,12 +192,8 @@ class AMQPListener(base.Listener):
unique_id = self.msg_id_cache.check_duplicate_message(message)
- if ctxt.reply_q:
- LOG.debug(
- "received message msg_id: %(msg_id)s reply to %(queue)s" % {
- 'queue': ctxt.reply_q, 'msg_id': ctxt.msg_id})
- else:
- LOG.debug("received message unique_id: %s " % unique_id)
+ LOG.debug("received message msg_id: %(msg_id)s reply to %(queue)s" % {
+ 'queue': ctxt.reply_q, 'msg_id': ctxt.msg_id})
self.incoming.append(AMQPIncomingMessage(self,
ctxt.to_dict(),