diff options
Diffstat (limited to 'python')
| -rw-r--r-- | python/java_failing.txt | 17 | ||||
| -rw-r--r-- | python/qpid/codec.py | 17 | ||||
| -rw-r--r-- | python/qpid/connection.py | 3 | ||||
| -rw-r--r-- | python/qpid/peer.py | 2 | ||||
| -rw-r--r-- | python/qpid/testlib.py | 19 | ||||
| -rw-r--r-- | python/tests/broker.py | 45 | ||||
| -rw-r--r-- | python/tests/example.py | 14 | ||||
| -rw-r--r-- | python/tests/exchange.py | 16 | ||||
| -rw-r--r-- | python/tests/queue.py | 32 | ||||
| -rw-r--r-- | python/tests/testlib.py | 2 | ||||
| -rw-r--r-- | python/tests/tx.py | 104 |
11 files changed, 140 insertions, 131 deletions
diff --git a/python/java_failing.txt b/python/java_failing.txt index c7db632fac..f85e46beca 100644 --- a/python/java_failing.txt +++ b/python/java_failing.txt @@ -1,2 +1,19 @@ +tests.basic.BasicTests.test_ack +tests.basic.BasicTests.test_cancel +tests.basic.BasicTests.test_consume_exclusive +tests.basic.BasicTests.test_consume_no_local +tests.basic.BasicTests.test_consume_queue_errors +tests.basic.BasicTests.test_consume_unique_consumers +tests.basic.BasicTests.test_get +tests.basic.BasicTests.test_qos_prefetch_count +tests.basic.BasicTests.test_qos_prefetch_size +tests.basic.BasicTests.test_recover_requeue tests.exchange.RecommendedTypesRuleTests.testTopic tests.exchange.RequiredInstancesRuleTests.testAmqTopic +tests.queue.QueueTests.test_declare_exclusive +tests.queue.QueueTests.test_declare_passive +tests.queue.QueueTests.test_delete_ifempty +tests.queue.QueueTests.test_delete_ifunused +tests.queue.QueueTests.test_delete_simple +tests.queue.QueueTests.test_purge +tests.testlib.TestBaseTest.testMessageProperties diff --git a/python/qpid/codec.py b/python/qpid/codec.py index d8617c2937..205405894a 100644 --- a/python/qpid/codec.py +++ b/python/qpid/codec.py @@ -157,14 +157,15 @@ class Codec: def encode_table(self, tbl): enc = StringIO() codec = Codec(enc) - for key, value in tbl.items(): - codec.encode_shortstr(key) - if isinstance(value, basestring): - codec.write("S") - codec.encode_longstr(value) - else: - codec.write("I") - codec.encode_long(value) + if tbl: + for key, value in tbl.items(): + codec.encode_shortstr(key) + if isinstance(value, basestring): + codec.write("S") + codec.encode_longstr(value) + else: + codec.write("I") + codec.encode_long(value) s = enc.getvalue() self.encode_long(len(s)) self.write(s) diff --git a/python/qpid/connection.py b/python/qpid/connection.py index 75fb134760..fb1e0927f0 100644 --- a/python/qpid/connection.py +++ b/python/qpid/connection.py @@ -204,6 +204,9 @@ class Request(Frame): method = Method.decode(spec, dec, size - 20) return Request(id, mark, method) + def __str__(self): + return "[%s] Request(%s) %s" % (self.channel, self.id, self.method) + class Response(Frame): type = "frame_response" diff --git a/python/qpid/peer.py b/python/qpid/peer.py index 66d325994b..8d5029004e 100644 --- a/python/qpid/peer.py +++ b/python/qpid/peer.py @@ -237,6 +237,8 @@ class Channel: frame = Method(type, type.arguments(*args, **kwargs)) if self.reliable: self.request(frame, self.queue_response, content) + if not frame.method.responses: + return None try: resp = self.responses.get() return Message(self, resp) diff --git a/python/qpid/testlib.py b/python/qpid/testlib.py index b4534ab362..ecae499451 100644 --- a/python/qpid/testlib.py +++ b/python/qpid/testlib.py @@ -80,7 +80,7 @@ Options: def __init__(self): # Defaults self.setBroker("localhost") - self.specfile = "../specs/amqp.0-8.xml" + self.spec = "../specs/amqp.0-8.xml" self.verbose = 1 self.ignore = [] @@ -203,8 +203,11 @@ class TestBase(unittest.TestCase): def consume(self, queueName): """Consume from named queue returns the Queue object.""" - reply = self.channel.basic_consume(queue=queueName, no_ack=True) - return self.client.queue(reply.consumer_tag) + if not "uniqueTag" in dir(self): self.uniqueTag = 1 + else: self.uniqueTag += 1 + consumer_tag = "tag" + str(self.uniqueTag) + self.channel.message_consume(queue=queueName, destination=consumer_tag, no_ack=True) + return self.client.queue(consumer_tag) def assertEmpty(self, queue): """Assert that the queue is empty""" @@ -218,12 +221,12 @@ class TestBase(unittest.TestCase): Publish to exchange and assert queue.get() returns the same message. """ body = self.uniqueString() - self.channel.basic_publish(exchange=exchange, - content=Content(body, properties=properties), - routing_key=routing_key) + self.channel.message_transfer(destination=exchange, + body=body, application_headers=properties, + routing_key=routing_key) msg = queue.get(timeout=1) - self.assertEqual(body, msg.content.body) - if (properties): self.assertEqual(properties, msg.content.properties) + self.assertEqual(body, msg.body) + if (properties): self.assertEqual(properties, msg.application_headers) def assertPublishConsume(self, queue="", exchange="", routing_key="", properties=None): """ diff --git a/python/tests/broker.py b/python/tests/broker.py index d9ac69c5e3..ebb3a525f5 100644 --- a/python/tests/broker.py +++ b/python/tests/broker.py @@ -24,7 +24,7 @@ from qpid.testlib import testrunner, TestBase class BrokerTests(TestBase): """Tests for basic Broker functionality""" - def test_amqp_basic_13(self): + def test_ack_and_no_ack(self): """ First, this test tries to receive a message with a no-ack consumer. Second, this test tries to explicitely receive and @@ -34,41 +34,44 @@ class BrokerTests(TestBase): self.queue_declare(ch, queue = "myqueue") # No ack consumer - ctag = ch.basic_consume(queue = "myqueue", no_ack = True).consumer_tag + ctag = "tag1" + ch.message_consume(queue = "myqueue", destination = ctag, no_ack = True) body = "test no-ack" - ch.basic_publish(routing_key = "myqueue", content = Content(body)) + ch.message_transfer(routing_key = "myqueue", body = body) msg = self.client.queue(ctag).get(timeout = 5) - self.assert_(msg.content.body == body) + self.assert_(msg.body == body) - # Acknowleding consumer + # Acknowledging consumer self.queue_declare(ch, queue = "otherqueue") - ctag = ch.basic_consume(queue = "otherqueue", no_ack = False).consumer_tag + ctag = "tag2" + ch.message_consume(queue = "otherqueue", destination = ctag, no_ack = False) body = "test ack" - ch.basic_publish(routing_key = "otherqueue", content = Content(body)) + ch.message_transfer(routing_key = "otherqueue", body = body) msg = self.client.queue(ctag).get(timeout = 5) - ch.basic_ack(delivery_tag = msg.delivery_tag) - self.assert_(msg.content.body == body) + msg.ok() + self.assert_(msg.body == body) - def test_basic_delivery_immediate(self): + def test_simple_delivery_immediate(self): """ - Test basic message delivery where consume is issued before publish + Test simple message delivery where consume is issued before publish """ channel = self.channel self.exchange_declare(channel, exchange="test-exchange", type="direct") self.queue_declare(channel, queue="test-queue") channel.queue_bind(queue="test-queue", exchange="test-exchange", routing_key="key") - reply = channel.basic_consume(queue="test-queue", no_ack=True) - queue = self.client.queue(reply.consumer_tag) + consumer_tag = "tag1" + channel.message_consume(queue="test-queue", destination=consumer_tag, no_ack=True) + queue = self.client.queue(consumer_tag) body = "Immediate Delivery" - channel.basic_publish(exchange="test-exchange", routing_key="key", content=Content(body), immediate=True) + channel.message_transfer(destination="test-exchange", routing_key="key", body=body, immediate=True) msg = queue.get(timeout=5) - self.assert_(msg.content.body == body) + self.assert_(msg.body == body) # TODO: Ensure we fail if immediate=True and there's no consumer. - def test_basic_delivery_queued(self): + def test_simple_delivery_queued(self): """ Test basic message delivery where publish is issued before consume (i.e. requires queueing of the message) @@ -78,11 +81,13 @@ class BrokerTests(TestBase): self.queue_declare(channel, queue="test-queue") channel.queue_bind(queue="test-queue", exchange="test-exchange", routing_key="key") body = "Queued Delivery" - channel.basic_publish(exchange="test-exchange", routing_key="key", content=Content(body)) - reply = channel.basic_consume(queue="test-queue", no_ack=True) - queue = self.client.queue(reply.consumer_tag) + channel.message_transfer(destination="test-exchange", routing_key="key", body=body) + + consumer_tag = "tag1" + channel.message_consume(queue="test-queue", destination=consumer_tag, no_ack=True) + queue = self.client.queue(consumer_tag) msg = queue.get(timeout=5) - self.assert_(msg.content.body == body) + self.assert_(msg.body == body) def test_invalid_channel(self): channel = self.client.channel(200) diff --git a/python/tests/example.py b/python/tests/example.py index a1949ccb9f..7ab4cc7d0a 100644 --- a/python/tests/example.py +++ b/python/tests/example.py @@ -68,18 +68,18 @@ class ExampleTest (TestBase): # has fields corresponding to the reply method fields, plus a content # field that is filled if the reply includes content. In this case the # interesting field is the consumer_tag. - reply = channel.basic_consume(queue="test-queue") + channel.message_consume(queue="test-queue", destination="consumer_tag") # We can use the Client.queue(...) method to access the queue # corresponding to our consumer_tag. - queue = self.client.queue(reply.consumer_tag) + queue = self.client.queue("consumer_tag") # Now lets publish a message and see if our consumer gets it. To do # this we need to import the Content class. body = "Hello World!" - channel.basic_publish(exchange="test", - routing_key="key", - content=Content(body)) + channel.message_transfer(destination="test", + routing_key="key", + body = body) # Now we'll wait for the message to arrive. We can use the timeout # argument in case the server hangs. By default queue.get() will wait @@ -87,8 +87,8 @@ class ExampleTest (TestBase): msg = queue.get(timeout=10) # And check that we got the right response with assertEqual - self.assertEqual(body, msg.content.body) + self.assertEqual(body, msg.body) # Now acknowledge the message. - channel.basic_ack(msg.delivery_tag, True) + msg.ok() diff --git a/python/tests/exchange.py b/python/tests/exchange.py index 56d6fa82e4..54c462de24 100644 --- a/python/tests/exchange.py +++ b/python/tests/exchange.py @@ -61,10 +61,10 @@ class StandardExchangeVerifier: self.assertPublishGet(q, ex, "a.x.b.x") self.assertPublishGet(q, ex, "a.x.x.b.x") # Shouldn't match - self.channel.basic_publish(exchange=ex, routing_key="a.b") - self.channel.basic_publish(exchange=ex, routing_key="a.b.x.y") - self.channel.basic_publish(exchange=ex, routing_key="x.a.b.x") - self.channel.basic_publish(exchange=ex, routing_key="a.b") + self.channel.message_transfer(destination=ex, routing_key="a.b") + self.channel.message_transfer(destination=ex, routing_key="a.b.x.y") + self.channel.message_transfer(destination=ex, routing_key="x.a.b.x") + self.channel.message_transfer(destination=ex, routing_key="a.b") self.assert_(q.empty()) def verifyHeadersExchange(self, ex): @@ -73,8 +73,8 @@ class StandardExchangeVerifier: self.channel.queue_bind(queue="q", exchange=ex, arguments={ "x-match":"all", "name":"fred" , "age":3} ) q = self.consume("q") headers = {"name":"fred", "age":3} - self.assertPublishGet(q, exchange=ex, properties={'headers':headers}) - self.channel.basic_publish(exchange=ex) # No headers, won't deliver + self.assertPublishGet(q, exchange=ex, properties=headers) + self.channel.message_transfer(destination=ex, body="") # No headers, won't deliver self.assertEmpty(q); @@ -272,10 +272,10 @@ class HeadersExchangeTests(TestBase): self.q = self.consume("q") def myAssertPublishGet(self, headers): - self.assertPublishGet(self.q, exchange="amq.match", properties={'headers':headers}) + self.assertPublishGet(self.q, exchange="amq.match", properties=headers) def myBasicPublish(self, headers): - self.channel.basic_publish(exchange="amq.match", content=Content("foobar", properties={'headers':headers})) + self.channel.message_transfer(destination="amq.match", body="foobar", application_headers=headers) def testMatchAll(self): self.channel.queue_bind(queue="q", exchange="amq.match", arguments={ 'x-match':'all', "name":"fred", "age":3}) diff --git a/python/tests/queue.py b/python/tests/queue.py index 60ac4c3dfb..d85f04c4c2 100644 --- a/python/tests/queue.py +++ b/python/tests/queue.py @@ -33,9 +33,9 @@ class QueueTests(TestBase): channel.exchange_declare(exchange="test-exchange", type="direct") channel.queue_declare(queue="test-queue", exclusive=True) channel.queue_bind(queue="test-queue", exchange="test-exchange", routing_key="key") - channel.basic_publish(exchange="test-exchange", routing_key="key", content=Content("one")) - channel.basic_publish(exchange="test-exchange", routing_key="key", content=Content("two")) - channel.basic_publish(exchange="test-exchange", routing_key="key", content=Content("three")) + channel.message_transfer(destination="test-exchange", routing_key="key", body="one") + channel.message_transfer(destination="test-exchange", routing_key="key", body="two") + channel.message_transfer(destination="test-exchange", routing_key="key", body="three") #check that the queue now reports 3 messages: reply = channel.queue_declare(queue="test-queue") @@ -48,9 +48,9 @@ class QueueTests(TestBase): self.assertEqual(0, reply.message_count) #send a further message and consume it, ensuring that the other messages are really gone - channel.basic_publish(exchange="test-exchange", routing_key="key", content=Content("four")) - reply = channel.basic_consume(queue="test-queue", no_ack=True) - queue = self.client.queue(reply.consumer_tag) + channel.message_transfer(destination="test-exchange", routing_key="key", content=Content("four")) + channel.message_consume(queue="test-queue", destination="tag", no_ack=True) + queue = self.client.queue("tag") msg = queue.get(timeout=1) self.assertEqual("four", msg.content.body) @@ -153,15 +153,15 @@ class QueueTests(TestBase): def test_delete_simple(self): """ - Test basic queue deletion + Test core queue deletion behaviour """ channel = self.channel #straight-forward case: channel.queue_declare(queue="delete-me") - channel.basic_publish(routing_key="delete-me", content=Content("a")) - channel.basic_publish(routing_key="delete-me", content=Content("b")) - channel.basic_publish(routing_key="delete-me", content=Content("c")) + channel.message_transfer(routing_key="delete-me", body="a") + channel.message_transfer(routing_key="delete-me", body="b") + channel.message_transfer(routing_key="delete-me", body="c") reply = channel.queue_delete(queue="delete-me") self.assertEqual(3, reply.message_count) #check that it has gone be declaring passively @@ -191,7 +191,7 @@ class QueueTests(TestBase): #create a queue and add a message to it (use default binding): channel.queue_declare(queue="delete-me-2") channel.queue_declare(queue="delete-me-2", passive="True") - channel.basic_publish(routing_key="delete-me-2", content=Content("message")) + channel.message_transfer(routing_key="delete-me-2", body="message") #try to delete, but only if empty: try: @@ -205,11 +205,11 @@ class QueueTests(TestBase): channel.channel_open() #empty queue: - reply = channel.basic_consume(queue="delete-me-2", no_ack=True) - queue = self.client.queue(reply.consumer_tag) + channel.message_consume(destination="consumer_tag", queue="delete-me-2", no_ack=True) + queue = self.client.queue("consumer_tag") msg = queue.get(timeout=1) self.assertEqual("message", msg.content.body) - channel.basic_cancel(consumer_tag=reply.consumer_tag) + channel.message_cancel(destination="consumer_tag") #retry deletion on empty queue: channel.queue_delete(queue="delete-me-2", if_empty="True") @@ -230,7 +230,7 @@ class QueueTests(TestBase): #create a queue and register a consumer: channel.queue_declare(queue="delete-me-3") channel.queue_declare(queue="delete-me-3", passive="True") - reply = channel.basic_consume(queue="delete-me-3", no_ack=True) + channel.message_consume(destination="consumer_tag", queue="delete-me-3", no_ack=True) #need new channel now: channel2 = self.client.channel(2) @@ -243,7 +243,7 @@ class QueueTests(TestBase): self.assertChannelException(406, e.args[0]) - channel.basic_cancel(consumer_tag=reply.consumer_tag) + channel.message_cancel(destination="consumer_tag") channel.queue_delete(queue="delete-me-3", if_unused="True") #check that it has gone by declaring passively: try: diff --git a/python/tests/testlib.py b/python/tests/testlib.py index cab07cc4ac..d9391fc529 100644 --- a/python/tests/testlib.py +++ b/python/tests/testlib.py @@ -49,7 +49,7 @@ class TestBaseTest(TestBase): def testAssertEmptyFail(self): self.queue_declare(queue="full") q = self.consume("full") - self.channel.basic_publish(routing_key="full") + self.channel.message_transfer(routing_key="full", body="") try: self.assertEmpty(q); self.fail("assertEmpty did not assert on non-empty queue") diff --git a/python/tests/tx.py b/python/tests/tx.py index 054fb8d8b7..55a5eaeade 100644 --- a/python/tests/tx.py +++ b/python/tests/tx.py @@ -37,22 +37,24 @@ class TxTests(TestBase): #check results for i in range(1, 5): msg = queue_c.get(timeout=1) - self.assertEqual("TxMessage %d" % i, msg.content.body) + self.assertEqual("TxMessage %d" % i, msg.body) + msg.ok() msg = queue_b.get(timeout=1) - self.assertEqual("TxMessage 6", msg.content.body) + self.assertEqual("TxMessage 6", msg.body) + msg.ok() msg = queue_a.get(timeout=1) - self.assertEqual("TxMessage 7", msg.content.body) + self.assertEqual("TxMessage 7", msg.body) + msg.ok() for q in [queue_a, queue_b, queue_c]: try: extra = q.get(timeout=1) - self.fail("Got unexpected message: " + extra.content.body) + self.fail("Got unexpected message: " + extra.body) except Empty: None #cleanup - channel.basic_ack(delivery_tag=0, multiple=True) channel.tx_commit() def test_auto_rollback(self): @@ -65,7 +67,7 @@ class TxTests(TestBase): for q in [queue_a, queue_b, queue_c]: try: extra = q.get(timeout=1) - self.fail("Got unexpected message: " + extra.content.body) + self.fail("Got unexpected message: " + extra.body) except Empty: None channel.tx_rollback() @@ -73,22 +75,24 @@ class TxTests(TestBase): #check results for i in range(1, 5): msg = queue_a.get(timeout=1) - self.assertEqual("Message %d" % i, msg.content.body) + self.assertEqual("Message %d" % i, msg.body) + msg.ok() msg = queue_b.get(timeout=1) - self.assertEqual("Message 6", msg.content.body) + self.assertEqual("Message 6", msg.body) + msg.ok() msg = queue_c.get(timeout=1) - self.assertEqual("Message 7", msg.content.body) + self.assertEqual("Message 7", msg.body) + msg.ok() for q in [queue_a, queue_b, queue_c]: try: extra = q.get(timeout=1) - self.fail("Got unexpected message: " + extra.content.body) + self.fail("Got unexpected message: " + extra.body) except Empty: None #cleanup - channel.basic_ack(delivery_tag=0, multiple=True) channel.tx_commit() def test_rollback(self): @@ -101,7 +105,7 @@ class TxTests(TestBase): for q in [queue_a, queue_b, queue_c]: try: extra = q.get(timeout=1) - self.fail("Got unexpected message: " + extra.content.body) + self.fail("Got unexpected message: " + extra.body) except Empty: None channel.tx_rollback() @@ -109,22 +113,24 @@ class TxTests(TestBase): #check results for i in range(1, 5): msg = queue_a.get(timeout=1) - self.assertEqual("Message %d" % i, msg.content.body) + self.assertEqual("Message %d" % i, msg.body) + msg.ok() msg = queue_b.get(timeout=1) - self.assertEqual("Message 6", msg.content.body) + self.assertEqual("Message 6", msg.body) + msg.ok() msg = queue_c.get(timeout=1) - self.assertEqual("Message 7", msg.content.body) + self.assertEqual("Message 7", msg.body) + msg.ok() for q in [queue_a, queue_b, queue_c]: try: extra = q.get(timeout=1) - self.fail("Got unexpected message: " + extra.content.body) + self.fail("Got unexpected message: " + extra.body) except Empty: None #cleanup - channel.basic_ack(delivery_tag=0, multiple=True) channel.tx_commit() def perform_txn_work(self, channel, name_a, name_b, name_c): @@ -144,66 +150,38 @@ class TxTests(TestBase): channel.queue_bind(queue=name_c, exchange="amq.topic", routing_key=topic) for i in range(1, 5): - channel.basic_publish(routing_key=name_a, content=Content("Message %d" % i)) + channel.message_transfer(routing_key=name_a, body="Message %d" % i) - channel.basic_publish(routing_key=key, exchange="amq.direct", content=Content("Message 6")) - channel.basic_publish(routing_key=topic, exchange="amq.topic", content=Content("Message 7")) + channel.message_transfer(routing_key=key, destination="amq.direct", body="Message 6") + channel.message_transfer(routing_key=topic, destination="amq.topic", body="Message 7") channel.tx_select() #consume and ack messages - sub_a = channel.basic_consume(queue=name_a, no_ack=False) - queue_a = self.client.queue(sub_a.consumer_tag) + channel.message_consume(queue=name_a, destination="sub_a", no_ack=False) + queue_a = self.client.queue("sub_a") for i in range(1, 5): msg = queue_a.get(timeout=1) - self.assertEqual("Message %d" % i, msg.content.body) - channel.basic_ack(delivery_tag=msg.delivery_tag, multiple=True) + self.assertEqual("Message %d" % i, msg.body) + msg.ok() - sub_b = channel.basic_consume(queue=name_b, no_ack=False) - queue_b = self.client.queue(sub_b.consumer_tag) + channel.message_consume(queue=name_b, destination="sub_b", no_ack=False) + queue_b = self.client.queue("sub_b") msg = queue_b.get(timeout=1) - self.assertEqual("Message 6", msg.content.body) - channel.basic_ack(delivery_tag=msg.delivery_tag) + self.assertEqual("Message 6", msg.body) + msg.ok() - sub_c = channel.basic_consume(queue=name_c, no_ack=False) - queue_c = self.client.queue(sub_c.consumer_tag) + sub_c = channel.message_consume(queue=name_c, destination="sub_c", no_ack=False) + queue_c = self.client.queue("sub_c") msg = queue_c.get(timeout=1) - self.assertEqual("Message 7", msg.content.body) - channel.basic_ack(delivery_tag=msg.delivery_tag) + self.assertEqual("Message 7", msg.body) + msg.ok() #publish messages for i in range(1, 5): - channel.basic_publish(routing_key=topic, exchange="amq.topic", content=Content("TxMessage %d" % i)) + channel.message_transfer(routing_key=topic, destination="amq.topic", body="TxMessage %d" % i) - channel.basic_publish(routing_key=key, exchange="amq.direct", content=Content("TxMessage 6")) - channel.basic_publish(routing_key=name_a, content=Content("TxMessage 7")) + channel.message_transfer(routing_key=key, destination="amq.direct", body="TxMessage 6") + channel.message_transfer(routing_key=name_a, body="TxMessage 7") return queue_a, queue_b, queue_c - - def test_commit_overlapping_acks(self): - """ - Test that logically 'overlapping' acks do not cause errors on commit - """ - channel = self.channel - channel.queue_declare(queue="commit-overlapping", exclusive=True) - for i in range(1, 10): - channel.basic_publish(routing_key="commit-overlapping", content=Content("Message %d" % i)) - - - channel.tx_select() - - sub = channel.basic_consume(queue="commit-overlapping", no_ack=False) - queue = self.client.queue(sub.consumer_tag) - for i in range(1, 10): - msg = queue.get(timeout=1) - self.assertEqual("Message %d" % i, msg.content.body) - if i in [3, 6, 10]: - channel.basic_ack(delivery_tag=msg.delivery_tag) - - channel.tx_commit() - - #check all have been acked: - try: - extra = queue.get(timeout=1) - self.fail("Got unexpected message: " + extra.content.body) - except Empty: None |
