summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatus Valo <matusvalo@users.noreply.github.com>2018-11-26 14:52:39 +0100
committerAsif Saif Uddin <auvipy@gmail.com>2018-11-26 19:52:39 +0600
commitf16df2a17630c9804a6da614443c5e862271823f (patch)
tree3cf9a5e5d7b7f88815283dc811fe21bf96d6d2c9
parent0e793de205e447d57214b32ea121ff2078c2819d (diff)
downloadpy-amqp-f16df2a17630c9804a6da614443c5e862271823f.tar.gz
Fix basic_consume() with no consumer_tag provided (#221)
* Use consumer tag sent by broker instead of using directly parameter value * Added more unittests of basic_consume() method * Make pep8 happy * Split integration tests to connection tests and channel tests * Added connection closed integration test * Added integration tests for basic_consume() * Fix typos
-rw-r--r--amqp/channel.py22
-rw-r--r--t/integration/test_integration.py136
-rw-r--r--t/unit/test_channel.py41
3 files changed, 171 insertions, 28 deletions
diff --git a/amqp/channel.py b/amqp/channel.py
index 785ad7b..c829a2c 100644
--- a/amqp/channel.py
+++ b/amqp/channel.py
@@ -1559,14 +1559,26 @@ class Channel(AbstractChannel):
"""
p = self.send_method(
spec.Basic.Consume, argsig,
- (0, queue, consumer_tag, no_local, no_ack, exclusive,
- nowait, arguments),
+ (
+ 0, queue, consumer_tag, no_local, no_ack, exclusive,
+ nowait, arguments
+ ),
wait=None if nowait else spec.Basic.ConsumeOk,
+ returns_tuple=True
)
- # XXX Fix this hack
- if not nowait and not consumer_tag:
- consumer_tag = p
+ if not nowait:
+ # send_method() returns (spec.Basic.ConsumeOk, consumer_tag) tuple.
+ # consumer_tag is returned by broker using following rules:
+ # * consumer_tag is not specified by client, random one
+ # is generated by Broker
+ # * consumer_tag is provided by client, the same one
+ # is returned by broker
+ consumer_tag = p[1]
+ elif nowait and not consumer_tag:
+ raise ValueError(
+ 'Consumer tag must be specified when nowait is True'
+ )
self.callbacks[consumer_tag] = callback
diff --git a/t/integration/test_integration.py b/t/integration/test_integration.py
index 6bcd9e2..2969d2b 100644
--- a/t/integration/test_integration.py
+++ b/t/integration/test_integration.py
@@ -4,6 +4,7 @@ import pytest
from case import patch, call, Mock
from amqp import spec, Connection, Channel, sasl, Message
from amqp.platform import pack
+from amqp.exceptions import ConnectionError
from amqp.serialization import dumps, loads
@@ -96,7 +97,19 @@ def handshake(conn, transport_mock):
transport_mock().read_frame.side_effect = None
-class test_integration:
+def create_channel(channel_id, conn, transport_mock):
+ transport_mock().read_frame.return_value = ret_factory(
+ spec.Channel.OpenOk,
+ channel=channel_id,
+ args=(1, False),
+ arg_format='Lb'
+ )
+ ch = conn.channel(channel_id=channel_id)
+ transport_mock().read_frame.side_effect = None
+ return ch
+
+
+class test_connection:
# Integration tests. Tests verify the correctness of communication between
# library and broker.
# * tests mocks broker responses mocking return values of
@@ -173,6 +186,38 @@ class test_integration:
)
t.close.assert_called_once_with()
+ def test_connection_closed_by_broker(self):
+ # Test that library response correctly CloseOk when
+ # close method is received and _on_close_ok() method is called.
+ frame_writer_cls_mock = Mock()
+ frame_writer_mock = frame_writer_cls_mock()
+ with patch.object(Connection, '_on_close_ok') as callback_mock:
+ conn = Connection(frame_writer=frame_writer_cls_mock)
+ with patch.object(conn, 'Transport') as transport_mock:
+ handshake(conn, transport_mock)
+ frame_writer_mock.reset_mock()
+ # Inject Close response from broker
+ transport_mock().read_frame.return_value = ret_factory(
+ spec.Connection.Close,
+ args=(1, False),
+ arg_format='Lb'
+ )
+ with pytest.raises(ConnectionError):
+ conn.drain_events(0)
+ frame_writer_mock.assert_called_once_with(
+ 1, 0, spec.Connection.CloseOk, '', None
+ )
+ callback_mock.assert_called_once_with()
+
+
+class test_channel:
+ # Integration tests. Tests verify the correctness of communication between
+ # library and broker.
+ # * tests mocks broker responses mocking return values of
+ # amqp.transport.Transport.read_frame() method
+ # * tests asserts expected library responses to broker via calls of
+ # amqp.method_framing.frame_writer() function
+
@pytest.mark.parametrize("method, callback", connection_testdata)
def test_connection_methods(self, method, callback):
# Test verifying that proper Connection callback is called when
@@ -244,22 +289,12 @@ class test_integration:
conn = Connection()
with patch.object(conn, 'Transport') as transport_mock:
handshake(conn, transport_mock)
-
- channel_id = 1
- # Inject Open Handshake
- transport_mock().read_frame.return_value = ret_factory(
- spec.Channel.OpenOk,
- channel=channel_id,
- args=(1, False),
- arg_format='Lb'
- )
-
- conn.channel(channel_id=channel_id)
+ create_channel(1, conn, transport_mock)
# Inject desired method
transport_mock().read_frame.return_value = ret_factory(
method,
- channel=channel_id,
+ channel=1,
args=(1, False),
arg_format='Lb'
)
@@ -272,17 +307,8 @@ class test_integration:
conn = Connection(frame_writer=frame_writer_cls_mock)
with patch.object(conn, 'Transport') as transport_mock:
handshake(conn, transport_mock)
+ ch = create_channel(1, conn, transport_mock)
- channel_id = 1
- # Inject Open Handshake
- transport_mock().read_frame.return_value = ret_factory(
- spec.Channel.OpenOk,
- channel=channel_id,
- args=(1, False),
- arg_format='Lb'
- )
-
- ch = conn.channel(channel_id=channel_id)
frame_writer_mock = frame_writer_cls_mock()
frame_writer_mock.reset_mock()
msg = Message('test')
@@ -291,3 +317,67 @@ class test_integration:
1, 1, spec.Basic.Publish,
dumps('Bssbb', (0, '', '', False, False)), msg
)
+
+ def test_consume_no_consumer_tag(self):
+ # Test verifing starting consuming without specified consumer_tag
+ callback_mock = Mock()
+ frame_writer_cls_mock = Mock()
+ conn = Connection(frame_writer=frame_writer_cls_mock)
+ consumer_tag = 'amq.ctag-PCmzXGkhCw_v0Zq7jXyvkg'
+ with patch.object(conn, 'Transport') as transport_mock:
+ handshake(conn, transport_mock)
+ ch = create_channel(1, conn, transport_mock)
+
+ # Inject ConsumeOk response from Broker
+ transport_mock().read_frame.return_value = ret_factory(
+ spec.Basic.ConsumeOk,
+ channel=1,
+ args=(consumer_tag,),
+ arg_format='s'
+ )
+ frame_writer_mock = frame_writer_cls_mock()
+ frame_writer_mock.reset_mock()
+ ch.basic_consume('my_queue', callback=callback_mock)
+ frame_writer_mock.assert_called_once_with(
+ 1, 1, spec.Basic.Consume,
+ dumps(
+ 'BssbbbbF',
+ (0, 'my_queue', '', False, False, False, False, None)
+ ),
+ None
+ )
+ assert ch.callbacks[consumer_tag] == callback_mock
+
+ def test_consume_with_consumer_tag(self):
+ # Test verifing starting consuming with specified consumer_tag
+ callback_mock = Mock()
+ frame_writer_cls_mock = Mock()
+ conn = Connection(frame_writer=frame_writer_cls_mock)
+ with patch.object(conn, 'Transport') as transport_mock:
+ handshake(conn, transport_mock)
+ ch = create_channel(1, conn, transport_mock)
+
+ # Inject ConcumeOk response from Broker
+ transport_mock().read_frame.return_value = ret_factory(
+ spec.Basic.ConsumeOk,
+ channel=1,
+ args=('my_tag',),
+ arg_format='s'
+ )
+ frame_writer_mock = frame_writer_cls_mock()
+ frame_writer_mock.reset_mock()
+ ch.basic_consume(
+ 'my_queue', callback=callback_mock, consumer_tag='my_tag'
+ )
+ frame_writer_mock.assert_called_once_with(
+ 1, 1, spec.Basic.Consume,
+ dumps(
+ 'BssbbbbF',
+ (
+ 0, 'my_queue', 'my_tag',
+ False, False, False, False, None
+ )
+ ),
+ None
+ )
+ assert ch.callbacks['my_tag'] == callback_mock
diff --git a/t/unit/test_channel.py b/t/unit/test_channel.py
index 3e7e5ff..173e0d7 100644
--- a/t/unit/test_channel.py
+++ b/t/unit/test_channel.py
@@ -266,6 +266,7 @@ class test_Channel:
def test_basic_consume(self):
callback = Mock()
on_cancel = Mock()
+ self.c.send_method.return_value = (spec.Basic.ConsumeOk, 123)
self.c.basic_consume(
'q', 123, arguments={'x': 1},
callback=callback,
@@ -275,16 +276,56 @@ class test_Channel:
spec.Basic.Consume, 'BssbbbbF',
(0, 'q', 123, False, False, False, False, {'x': 1}),
wait=spec.Basic.ConsumeOk,
+ returns_tuple=True
)
assert self.c.callbacks[123] is callback
assert self.c.cancel_callbacks[123] is on_cancel
def test_basic_consume__no_ack(self):
+ self.c.send_method.return_value = (spec.Basic.ConsumeOk, 123)
self.c.basic_consume(
'q', 123, arguments={'x': 1}, no_ack=True,
)
assert 123 in self.c.no_ack_consumers
+ def test_basic_consume_no_consumer_tag(self):
+ callback = Mock()
+ self.c.send_method.return_value = (spec.Basic.ConsumeOk, 123)
+ self.c.basic_consume(
+ 'q', arguments={'x': 1},
+ callback=callback,
+ )
+ self.c.send_method.assert_called_with(
+ spec.Basic.Consume, 'BssbbbbF',
+ (0, 'q', '', False, False, False, False, {'x': 1}),
+ wait=spec.Basic.ConsumeOk,
+ returns_tuple=True
+ )
+ assert self.c.callbacks[123] is callback
+
+ def test_basic_consume_no_wait(self):
+ callback = Mock()
+ self.c.basic_consume(
+ 'q', 123, arguments={'x': 1},
+ callback=callback, nowait=True
+ )
+ self.c.send_method.assert_called_with(
+ spec.Basic.Consume, 'BssbbbbF',
+ (0, 'q', 123, False, False, False, True, {'x': 1}),
+ wait=None,
+ returns_tuple=True
+ )
+ assert self.c.callbacks[123] is callback
+
+ def test_basic_consume_no_wait_no_consumer_tag(self):
+ callback = Mock()
+ with pytest.raises(ValueError):
+ self.c.basic_consume(
+ 'q', arguments={'x': 1},
+ callback=callback, nowait=True
+ )
+ assert 123 not in self.c.callbacks
+
def test_on_basic_deliver(self):
msg = Mock()
self.c._on_basic_deliver(123, '321', False, 'ex', 'rkey', msg)