diff options
author | Matus Valo <matusvalo@users.noreply.github.com> | 2019-01-04 06:18:40 +0100 |
---|---|---|
committer | Asif Saif Uddin <auvipy@gmail.com> | 2019-01-04 11:18:40 +0600 |
commit | 6c8b4b45d622a2ca1a154596e9ea2f3621feb43a (patch) | |
tree | fb3f67499afcd38a8ad46386d0beba472593a5a2 | |
parent | ccddd02a71cb53f7aa6f4bad7397dc9060a08dfb (diff) | |
download | py-amqp-6c8b4b45d622a2ca1a154596e9ea2f3621feb43a.tar.gz |
queue, exchange and basic.get integration tests (#234)
* Improve return values in doc strings
* Added queue and exchange integration tests
-rw-r--r-- | amqp/channel.py | 11 | ||||
-rw-r--r-- | t/integration/test_integration.py | 405 |
2 files changed, 376 insertions, 40 deletions
diff --git a/amqp/channel.py b/amqp/channel.py index 4942a82..42fee9f 100644 --- a/amqp/channel.py +++ b/amqp/channel.py @@ -1140,8 +1140,8 @@ class Channel(AbstractChannel): True. Returns a tuple containing 3 items: - the name of the queue (essential for automatically-named queues) - message count + the name of the queue (essential for automatically-named queues), + message count and consumer count """ self.send_method( @@ -1220,6 +1220,8 @@ class Channel(AbstractChannel): client should not wait for a reply method. If the server could not complete the method it will raise a channel or connection exception. + + If nowait is False, returns the number of deleted messages. """ return self.send_method( spec.Queue.Delete, argsig, @@ -1280,7 +1282,7 @@ class Channel(AbstractChannel): server could not complete the method it will raise a channel or connection exception. - if nowait is False, returns a message_count + If nowait is False, returns a number of purged messages. """ return self.send_method( spec.Queue.Purge, argsig, (0, queue, nowait), @@ -1647,7 +1649,8 @@ class Channel(AbstractChannel): reliability. Messages can get lost if a client dies before it can deliver them to the application. - Non-blocking, returns a message object, or None. + Non-blocking, returns a amqp.basic_message.Message object, + or None if queue is empty. """ ret = self.send_method( spec.Basic.Get, argsig, (0, queue, no_ack), diff --git a/t/integration/test_integration.py b/t/integration/test_integration.py index 3a20540..e682199 100644 --- a/t/integration/test_integration.py +++ b/t/integration/test_integration.py @@ -5,20 +5,11 @@ import pytest from case import patch, call, Mock from amqp import spec, Connection, Channel, sasl, Message from amqp.platform import pack -from amqp.exceptions import ConnectionError +from amqp.exceptions import ConnectionError, \ + InvalidCommand, AccessRefused, PreconditionFailed, NotFound, ResourceLocked from amqp.serialization import dumps, loads from amqp.protocol import queue_declare_ok_t - -def ret_factory(method, channel=0, args=b'', arg_format=None): - if len(args) > 0: - args = dumps(arg_format, args) - else: - args = b'' - frame = (b''.join([pack('>HH', *method), args])) - return 1, channel, frame - - connection_testdata = ( (spec.Connection.Blocked, '_on_blocked'), (spec.Connection.Unblocked, '_on_unblocked'), @@ -26,13 +17,49 @@ connection_testdata = ( (spec.Connection.CloseOk, '_on_close_ok'), ) - channel_testdata = ( (spec.Basic.Ack, '_on_basic_ack'), (spec.Basic.Nack, '_on_basic_nack'), (spec.Basic.CancelOk, '_on_basic_cancel_ok'), ) +exchange_declare_error_testdata = ( + ( + 503, "COMMAND_INVALID - " + "unknown exchange type 'exchange-type'", + InvalidCommand + ), + ( + 403, "ACCESS_REFUSED - " + "exchange name 'amq.foo' contains reserved prefix 'amq.*'", + AccessRefused + ), + ( + 406, "PRECONDITION_FAILED - " + "inequivalent arg 'type' for exchange 'foo' in vhost '/':" + "received 'direct' but current is 'fanout'", + PreconditionFailed + ), +) + +queue_declare_error_testdata = ( + ( + 403, "ACCESS_REFUSED - " + "queue name 'amq.foo' contains reserved prefix 'amq.*", + AccessRefused + ), + ( + 404, "NOT_FOUND - " + "no queue 'foo' in vhost '/'", + NotFound + ), + ( + 405, "RESOURCE_LOCKED - " + "cannot obtain exclusive access to locked queue 'foo' in vhost '/'", + ResourceLocked + ), +) + CLIENT_CAPABILITIES = { 'product': 'py-amqp', 'product_version': '2.3.2', @@ -64,6 +91,26 @@ SERVER_CAPABILITIES = { } +def build_frame_type_1(method, channel=0, args=b'', arg_format=None): + if len(args) > 0: + args = dumps(arg_format, args) + else: + args = b'' + frame = (b''.join([pack('>HH', *method), args])) + return 1, channel, frame + + +def build_frame_type_2(body_len, channel, properties): + frame = (b''.join( + [pack('>HxxQ', spec.Basic.CLASS_ID, body_len), properties]) + ) + return 2, channel, frame + + +def build_frame_type_3(channel, body): + return 3, channel, body + + class DataComparator(object): # Comparator used for asserting serialized data. It can be used # in cases when direct comparision of bytestream cannot be used @@ -80,18 +127,18 @@ class DataComparator(object): def handshake(conn, transport_mock): # Helper function simulating connection handshake with server transport_mock().read_frame.side_effect = [ - ret_factory( + build_frame_type_1( spec.Connection.Start, channel=0, args=( 0, 9, SERVER_CAPABILITIES, 'AMQPLAIN PLAIN', 'en_US' ), arg_format='ooFSS' ), - ret_factory( + build_frame_type_1( spec.Connection.Tune, channel=0, args=(2047, 131072, 60), arg_format='BlB' ), - ret_factory( + build_frame_type_1( spec.Connection.OpenOk, channel=0 ) ] @@ -101,7 +148,7 @@ def handshake(conn, transport_mock): def create_channel(channel_id, conn, transport_mock): transport_mock().read_frame.side_effect = [ - ret_factory( + build_frame_type_1( spec.Channel.OpenOk, channel=channel_id, args=(1, False), @@ -178,10 +225,8 @@ class test_connection: handshake(conn, transport_mock) frame_writer_mock.reset_mock() # Inject CloseOk response from broker - transport_mock().read_frame.return_value = ret_factory( - spec.Connection.CloseOk, - args=(1, False), - arg_format='Lb' + transport_mock().read_frame.return_value = build_frame_type_1( + spec.Connection.CloseOk ) t = conn.transport conn.close() @@ -201,7 +246,7 @@ class test_connection: handshake(conn, transport_mock) frame_writer_mock.reset_mock() # Inject Close response from broker - transport_mock().read_frame.return_value = ret_factory( + transport_mock().read_frame.return_value = build_frame_type_1( spec.Connection.Close, args=(1, False), arg_format='Lb' @@ -231,7 +276,7 @@ class test_channel: with patch.object(conn, 'Transport') as transport_mock: handshake(conn, transport_mock) # Inject desired method - transport_mock().read_frame.return_value = ret_factory( + transport_mock().read_frame.return_value = build_frame_type_1( method, channel=0, args=(1, False), arg_format='Lb' ) conn.drain_events(0) @@ -247,18 +292,16 @@ class test_channel: channel_id = 1 transport_mock().read_frame.side_effect = [ # Inject Open Handshake - ret_factory( + build_frame_type_1( spec.Channel.OpenOk, channel=channel_id, args=(1, False), arg_format='Lb' ), # Inject close method - ret_factory( + build_frame_type_1( spec.Channel.CloseOk, - channel=channel_id, - args=(1, False), - arg_format='Lb' + channel=channel_id ) ] @@ -300,16 +343,14 @@ class test_channel: # Replies sent by broker transport_mock().read_frame.side_effect = [ # Inject close methods - ret_factory( + build_frame_type_1( spec.Channel.Close, channel=channel_id, args=(1, False), arg_format='Lb' ), - ret_factory( - spec.Connection.CloseOk, - args=(1, False), - arg_format='Lb' + build_frame_type_1( + spec.Connection.CloseOk ) ] conn.close() @@ -325,7 +366,7 @@ class test_channel: create_channel(1, conn, transport_mock) # Inject desired method - transport_mock().read_frame.return_value = ret_factory( + transport_mock().read_frame.return_value = build_frame_type_1( method, channel=1, args=(1, False), @@ -365,7 +406,7 @@ class test_channel: ch = create_channel(1, conn, transport_mock) # Inject ConsumeOk response from Broker - transport_mock().read_frame.return_value = ret_factory( + transport_mock().read_frame.return_value = build_frame_type_1( spec.Basic.ConsumeOk, channel=1, args=(consumer_tag,), @@ -394,7 +435,7 @@ class test_channel: ch = create_channel(1, conn, transport_mock) # Inject ConcumeOk response from Broker - transport_mock().read_frame.return_value = ret_factory( + transport_mock().read_frame.return_value = build_frame_type_1( spec.Basic.ConsumeOk, channel=1, args=('my_tag',), @@ -425,7 +466,7 @@ class test_channel: with patch.object(conn, 'Transport') as transport_mock: handshake(conn, transport_mock) ch = create_channel(1, conn, transport_mock) - transport_mock().read_frame.return_value = ret_factory( + transport_mock().read_frame.return_value = build_frame_type_1( spec.Queue.DeclareOk, channel=1, arg_format='sll', @@ -451,3 +492,295 @@ class test_channel: ), None ) + + @pytest.mark.parametrize( + "reply_code, reply_text, exception", queue_declare_error_testdata) + def test_queue_declare_error(self, reply_code, reply_text, exception): + # Test verifying wrong declaring exchange + frame_writer_cls_mock = Mock() + conn = Connection(frame_writer=frame_writer_cls_mock) + with patch.object(conn, 'Transport') as transport_mock: + handshake(conn, transport_mock) + ch = create_channel(1, conn, transport_mock) + transport_mock().read_frame.return_value = build_frame_type_1( + spec.Connection.Close, + args=(reply_code, reply_text) + spec.Exchange.Declare, + arg_format='BsBB' + ) + frame_writer_mock = frame_writer_cls_mock() + frame_writer_mock.reset_mock() + with pytest.raises(exception) as excinfo: + ch.queue_declare('foo') + assert excinfo.value.code == reply_code + assert excinfo.value.message == reply_text + assert excinfo.value.method == 'Exchange.declare' + assert excinfo.value.method_name == 'Exchange.declare' + assert excinfo.value.method_sig == spec.Exchange.Declare + # Client is sending to broker: + # 1. Exchange Declare + # 2. Connection.CloseOk as reply to received Connecton.Close + frame_writer_calls = [ + call( + 1, 1, spec.Queue.Declare, + dumps( + 'BsbbbbbF', + ( + 0, + # queue, passive, durable, exclusive, + 'foo', False, False, False, + # auto_delete, nowait, arguments + True, False, None + ) + ), + None + ), + call( + 1, 0, spec.Connection.CloseOk, + '', + None + ), + ] + frame_writer_mock.assert_has_calls(frame_writer_calls) + + def test_queue_delete(self): + # Test verifying deleting queue + frame_writer_cls_mock = Mock() + conn = Connection(frame_writer=frame_writer_cls_mock) + with patch.object(conn, 'Transport') as transport_mock: + handshake(conn, transport_mock) + ch = create_channel(1, conn, transport_mock) + transport_mock().read_frame.return_value = build_frame_type_1( + spec.Queue.DeleteOk, + channel=1, + arg_format='l', + args=(5,) + ) + frame_writer_mock = frame_writer_cls_mock() + frame_writer_mock.reset_mock() + msg_count = ch.queue_delete('foo') + assert msg_count == 5 + frame_writer_mock.assert_called_once_with( + 1, 1, spec.Queue.Delete, + dumps( + 'Bsbbb', + # queue, if_unused, if_empty, nowait + (0, 'foo', False, False, False) + ), + None + ) + + def test_queue_purge(self): + # Test verifying purging queue + frame_writer_cls_mock = Mock() + conn = Connection(frame_writer=frame_writer_cls_mock) + with patch.object(conn, 'Transport') as transport_mock: + handshake(conn, transport_mock) + ch = create_channel(1, conn, transport_mock) + transport_mock().read_frame.return_value = build_frame_type_1( + spec.Queue.PurgeOk, + channel=1, + arg_format='l', + args=(4,) + ) + frame_writer_mock = frame_writer_cls_mock() + frame_writer_mock.reset_mock() + msg_count = ch.queue_purge('foo') + assert msg_count == 4 + frame_writer_mock.assert_called_once_with( + 1, 1, spec.Queue.Purge, + dumps( + 'Bsb', + # queue, nowait + (0, 'foo', False) + ), + None + ) + + def test_queue_get(self): + # Test verifying getting message from queue + frame_writer_cls_mock = Mock() + conn = Connection(frame_writer=frame_writer_cls_mock) + with patch.object(conn, 'Transport') as transport_mock: + handshake(conn, transport_mock) + ch = create_channel(1, conn, transport_mock) + transport_mock().read_frame.side_effect = [ + build_frame_type_1( + spec.Basic.GetOk, + channel=1, + arg_format='Lbssl', + args=( + # delivery_tag, redelivered, exchange_name + 1, False, 'foo_exchange', + # routing_key, message_count + 'routing_key', 1 + ) + ), + build_frame_type_2( + channel=1, + body_len=12, + properties=b'0\x00\x00\x00\x00\x00\x01' + ), + build_frame_type_3( + channel=1, + body=b'Hello World!' + ) + ] + frame_writer_mock = frame_writer_cls_mock() + frame_writer_mock.reset_mock() + msg = ch.basic_get('foo') + assert msg.body_size == 12 + assert msg.body == b'Hello World!' + assert msg.frame_method == spec.Basic.GetOk + assert msg.delivery_tag == 1 + assert msg.ready is True + assert msg.delivery_info == { + 'delivery_tag': 1, 'redelivered': False, + 'exchange': 'foo_exchange', + 'routing_key': 'routing_key', 'message_count': 1 + } + assert msg.properties == { + 'application_headers': {}, 'delivery_mode': 1 + } + frame_writer_mock.assert_called_once_with( + 1, 1, spec.Basic.Get, + dumps( + 'Bsb', + # queue, nowait + (0, 'foo', False) + ), + None + ) + + def test_queue_get_empty(self): + # Test verifying getting message from empty queue + frame_writer_cls_mock = Mock() + conn = Connection(frame_writer=frame_writer_cls_mock) + with patch.object(conn, 'Transport') as transport_mock: + handshake(conn, transport_mock) + ch = create_channel(1, conn, transport_mock) + transport_mock().read_frame.return_value = build_frame_type_1( + spec.Basic.GetEmpty, + channel=1, + arg_format='s', + args=('s') + ) + frame_writer_mock = frame_writer_cls_mock() + frame_writer_mock.reset_mock() + ret = ch.basic_get('foo') + assert ret is None + frame_writer_mock.assert_called_once_with( + 1, 1, spec.Basic.Get, + dumps( + 'Bsb', + # queue, nowait + (0, 'foo', False) + ), + None + ) + + def test_exchange_declare(self): + # Test verifying declaring exchange + frame_writer_cls_mock = Mock() + conn = Connection(frame_writer=frame_writer_cls_mock) + with patch.object(conn, 'Transport') as transport_mock: + handshake(conn, transport_mock) + ch = create_channel(1, conn, transport_mock) + transport_mock().read_frame.return_value = build_frame_type_1( + spec.Exchange.DeclareOk, + channel=1 + ) + frame_writer_mock = frame_writer_cls_mock() + frame_writer_mock.reset_mock() + ret = ch.exchange_declare('foo', 'fanout') + assert ret is None + frame_writer_mock.assert_called_once_with( + 1, 1, spec.Exchange.Declare, + dumps( + 'BssbbbbbF', + ( + 0, + # exchange, type, passive, durable, + 'foo', 'fanout', False, False, + # auto_delete, internal, nowait, arguments + True, False, False, None + ) + ), + None + ) + + @pytest.mark.parametrize( + "reply_code, reply_text, exception", exchange_declare_error_testdata) + def test_exchange_declare_error(self, reply_code, reply_text, exception): + # Test verifying wrong declaring exchange + frame_writer_cls_mock = Mock() + conn = Connection(frame_writer=frame_writer_cls_mock) + with patch.object(conn, 'Transport') as transport_mock: + handshake(conn, transport_mock) + ch = create_channel(1, conn, transport_mock) + transport_mock().read_frame.return_value = build_frame_type_1( + spec.Connection.Close, + args=(reply_code, reply_text) + spec.Exchange.Declare, + arg_format='BsBB' + ) + frame_writer_mock = frame_writer_cls_mock() + frame_writer_mock.reset_mock() + with pytest.raises(exception) as excinfo: + ch.exchange_declare('exchange', 'exchange-type') + assert excinfo.value.code == reply_code + assert excinfo.value.message == reply_text + assert excinfo.value.method == 'Exchange.declare' + assert excinfo.value.method_name == 'Exchange.declare' + assert excinfo.value.method_sig == spec.Exchange.Declare + # Client is sending to broker: + # 1. Exchange Declare + # 2. Connection.CloseOk as reply to received Connecton.Close + frame_writer_calls = [ + call( + 1, 1, spec.Exchange.Declare, + dumps( + 'BssbbbbbF', + ( + 0, + # exchange, type, passive, durable, + 'exchange', 'exchange-type', False, False, + # auto_delete, internal, nowait, arguments + True, False, False, None + ) + ), + None + ), + call( + 1, 0, spec.Connection.CloseOk, + '', + None + ), + ] + frame_writer_mock.assert_has_calls(frame_writer_calls) + + def test_exchange_delete(self): + # Test verifying declaring exchange + frame_writer_cls_mock = Mock() + conn = Connection(frame_writer=frame_writer_cls_mock) + with patch.object(conn, 'Transport') as transport_mock: + handshake(conn, transport_mock) + ch = create_channel(1, conn, transport_mock) + transport_mock().read_frame.return_value = build_frame_type_1( + spec.Exchange.DeleteOk, + channel=1 + ) + frame_writer_mock = frame_writer_cls_mock() + frame_writer_mock.reset_mock() + ret = ch.exchange_delete('foo') + assert ret == () + frame_writer_mock.assert_called_once_with( + 1, 1, spec.Exchange.Delete, + dumps( + 'Bsbb', + ( + 0, + # exchange, if-unused, no-wait + 'foo', False, False + ) + ), + None + ) |