diff options
Diffstat (limited to 'oslo_messaging')
-rw-r--r-- | oslo_messaging/_drivers/amqp.py | 12 | ||||
-rw-r--r-- | oslo_messaging/_drivers/amqpdriver.py | 40 | ||||
-rw-r--r-- | oslo_messaging/_drivers/impl_rabbit.py | 3 | ||||
-rw-r--r-- | oslo_messaging/tests/drivers/test_impl_rabbit.py | 11 |
4 files changed, 14 insertions, 52 deletions
diff --git a/oslo_messaging/_drivers/amqp.py b/oslo_messaging/_drivers/amqp.py index 86f41ad..06a59f8 100644 --- a/oslo_messaging/_drivers/amqp.py +++ b/oslo_messaging/_drivers/amqp.py @@ -48,18 +48,6 @@ amqp_opts = [ default=False, deprecated_group='DEFAULT', help='Auto-delete queues in AMQP.'), - cfg.BoolOpt('send_single_reply', - default=False, - help='Send a single AMQP reply to call message. The current ' - 'behaviour since oslo-incubator is to send two AMQP ' - 'replies - first one with the payload, a second one to ' - 'ensure the other have finish to send the payload. We ' - 'are going to remove it in the N release, but we must ' - 'keep backward compatible at the same time. This option ' - 'provides such compatibility - it defaults to False in ' - 'Liberty and can be turned on for early adopters with a ' - 'new installations or for testing. Please note, that ' - 'this option will be removed in the Mitaka release.') ] UNIQUE_ID = '_unique_id' diff --git a/oslo_messaging/_drivers/amqpdriver.py b/oslo_messaging/_drivers/amqpdriver.py index e95edfc..d03fa65 100644 --- a/oslo_messaging/_drivers/amqpdriver.py +++ b/oslo_messaging/_drivers/amqpdriver.py @@ -48,8 +48,7 @@ class AMQPIncomingMessage(base.IncomingMessage): self.requeue_callback = message.requeue self._obsolete_reply_queues = obsolete_reply_queues - def _send_reply(self, conn, reply=None, failure=None, - ending=False, log_failure=True): + def _send_reply(self, conn, reply=None, failure=None, log_failure=True): if (self.reply_q and not self._obsolete_reply_queues.reply_q_valid(self.reply_q, self.msg_id)): @@ -58,11 +57,9 @@ class AMQPIncomingMessage(base.IncomingMessage): if failure: failure = rpc_common.serialize_remote_exception(failure, log_failure) - - msg = {'result': reply, 'failure': failure} - if ending: - msg['ending'] = True - + # NOTE(sileht): ending can be removed in N*, see Listener.wait() + # for more detail. + msg = {'result': reply, 'failure': failure, 'ending': True} rpc_amqp._add_unique_id(msg) unique_id = msg[rpc_amqp.UNIQUE_ID] @@ -71,12 +68,11 @@ class AMQPIncomingMessage(base.IncomingMessage): # Otherwise use the msg_id for backward compatibility. if self.reply_q: msg['_msg_id'] = self.msg_id - if ending: - LOG.debug("sending reply msg_id: %(msg_id)s " - "reply queue: %(reply_q)s" % { - 'msg_id': self.msg_id, - 'unique_id': unique_id, - 'reply_q': self.reply_q}) + LOG.debug("sending reply msg_id: %(msg_id)s " + "reply queue: %(reply_q)s" % { + 'msg_id': self.msg_id, + 'unique_id': unique_id, + 'reply_q': self.reply_q}) conn.direct_send(self.reply_q, rpc_common.serialize_msg(msg)) else: # TODO(sileht): look at which version of oslo-incubator rpc @@ -104,21 +100,12 @@ class AMQPIncomingMessage(base.IncomingMessage): timer = rpc_common.DecayingTimer(duration=duration) timer.start() - first_reply_sent = False while True: try: with self.listener.driver._get_connection( rpc_common.PURPOSE_SEND) as conn: - if self.listener.driver.send_single_reply: - self._send_reply(conn, reply, failure, - log_failure=log_failure, - ending=True) - else: - if not first_reply_sent: - self._send_reply(conn, reply, failure, - log_failure=log_failure) - first_reply_sent = True - self._send_reply(conn, ending=True) + self._send_reply(conn, reply, failure, + log_failure=log_failure) return except rpc_amqp.AMQPDestinationNotFound: if timer.check_return() > 0: @@ -378,8 +365,7 @@ class AMQPDriverBase(base.BaseDriver): missing_destination_retry_timeout = 0 def __init__(self, conf, url, connection_pool, - default_exchange=None, allowed_remote_exmods=None, - send_single_reply=False): + default_exchange=None, allowed_remote_exmods=None): super(AMQPDriverBase, self).__init__(conf, url, default_exchange, allowed_remote_exmods) @@ -392,8 +378,6 @@ class AMQPDriverBase(base.BaseDriver): self._reply_q_conn = None self._waiter = None - self.send_single_reply = send_single_reply - def _get_exchange(self, target): return target.exchange or self._default_exchange diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py index b9ff363..f0110d9 100644 --- a/oslo_messaging/_drivers/impl_rabbit.py +++ b/oslo_messaging/_drivers/impl_rabbit.py @@ -1139,8 +1139,7 @@ class RabbitDriver(amqpdriver.AMQPDriverBase): conf, url, connection_pool, default_exchange, - allowed_remote_exmods, - conf.oslo_messaging_rabbit.send_single_reply, + allowed_remote_exmods ) def require_features(self, requeue=True): diff --git a/oslo_messaging/tests/drivers/test_impl_rabbit.py b/oslo_messaging/tests/drivers/test_impl_rabbit.py index 52cbfe1..9fdd211 100644 --- a/oslo_messaging/tests/drivers/test_impl_rabbit.py +++ b/oslo_messaging/tests/drivers/test_impl_rabbit.py @@ -28,7 +28,6 @@ from oslotest import mockpatch import testscenarios import oslo_messaging -from oslo_messaging._drivers import amqp from oslo_messaging._drivers import amqpdriver from oslo_messaging._drivers import common as driver_common from oslo_messaging._drivers import impl_rabbit as rabbit_driver @@ -363,11 +362,6 @@ class TestSendReceive(test_utils.BaseTestCase): ('timeout', dict(timeout=0.01)), # FIXME(markmc): timeout=0 is broken? ] - _reply_ending = [ - ('old_behavior', dict(send_single_reply=False)), - ('new_behavior', dict(send_single_reply=True)), - ] - @classmethod def generate_scenarios(cls): cls.scenarios = testscenarios.multiply_scenarios(cls._n_senders, @@ -375,16 +369,13 @@ class TestSendReceive(test_utils.BaseTestCase): cls._reply, cls._reply_fail, cls._failure, - cls._timeout, - cls._reply_ending) + cls._timeout) def test_send_receive(self): self.config(kombu_missing_consumer_retry_timeout=0.5, group="oslo_messaging_rabbit") self.config(heartbeat_timeout_threshold=0, group="oslo_messaging_rabbit") - self.config(send_single_reply=self.send_single_reply, - group="oslo_messaging_rabbit") transport = oslo_messaging.get_transport(self.conf, 'kombu+memory:////') self.addCleanup(transport.cleanup) |