diff options
Diffstat (limited to 'python/tests')
-rw-r--r-- | python/tests/broker.py | 45 | ||||
-rw-r--r-- | python/tests/example.py | 14 | ||||
-rw-r--r-- | python/tests/exchange.py | 16 | ||||
-rw-r--r-- | python/tests/queue.py | 32 | ||||
-rw-r--r-- | python/tests/testlib.py | 2 | ||||
-rw-r--r-- | python/tests/tx.py | 104 |
6 files changed, 98 insertions, 115 deletions
diff --git a/python/tests/broker.py b/python/tests/broker.py index d9ac69c5e3..ebb3a525f5 100644 --- a/python/tests/broker.py +++ b/python/tests/broker.py @@ -24,7 +24,7 @@ from qpid.testlib import testrunner, TestBase class BrokerTests(TestBase): """Tests for basic Broker functionality""" - def test_amqp_basic_13(self): + def test_ack_and_no_ack(self): """ First, this test tries to receive a message with a no-ack consumer. Second, this test tries to explicitely receive and @@ -34,41 +34,44 @@ class BrokerTests(TestBase): self.queue_declare(ch, queue = "myqueue") # No ack consumer - ctag = ch.basic_consume(queue = "myqueue", no_ack = True).consumer_tag + ctag = "tag1" + ch.message_consume(queue = "myqueue", destination = ctag, no_ack = True) body = "test no-ack" - ch.basic_publish(routing_key = "myqueue", content = Content(body)) + ch.message_transfer(routing_key = "myqueue", body = body) msg = self.client.queue(ctag).get(timeout = 5) - self.assert_(msg.content.body == body) + self.assert_(msg.body == body) - # Acknowleding consumer + # Acknowledging consumer self.queue_declare(ch, queue = "otherqueue") - ctag = ch.basic_consume(queue = "otherqueue", no_ack = False).consumer_tag + ctag = "tag2" + ch.message_consume(queue = "otherqueue", destination = ctag, no_ack = False) body = "test ack" - ch.basic_publish(routing_key = "otherqueue", content = Content(body)) + ch.message_transfer(routing_key = "otherqueue", body = body) msg = self.client.queue(ctag).get(timeout = 5) - ch.basic_ack(delivery_tag = msg.delivery_tag) - self.assert_(msg.content.body == body) + msg.ok() + self.assert_(msg.body == body) - def test_basic_delivery_immediate(self): + def test_simple_delivery_immediate(self): """ - Test basic message delivery where consume is issued before publish + Test simple message delivery where consume is issued before publish """ channel = self.channel self.exchange_declare(channel, exchange="test-exchange", type="direct") self.queue_declare(channel, queue="test-queue") channel.queue_bind(queue="test-queue", exchange="test-exchange", routing_key="key") - reply = channel.basic_consume(queue="test-queue", no_ack=True) - queue = self.client.queue(reply.consumer_tag) + consumer_tag = "tag1" + channel.message_consume(queue="test-queue", destination=consumer_tag, no_ack=True) + queue = self.client.queue(consumer_tag) body = "Immediate Delivery" - channel.basic_publish(exchange="test-exchange", routing_key="key", content=Content(body), immediate=True) + channel.message_transfer(destination="test-exchange", routing_key="key", body=body, immediate=True) msg = queue.get(timeout=5) - self.assert_(msg.content.body == body) + self.assert_(msg.body == body) # TODO: Ensure we fail if immediate=True and there's no consumer. - def test_basic_delivery_queued(self): + def test_simple_delivery_queued(self): """ Test basic message delivery where publish is issued before consume (i.e. requires queueing of the message) @@ -78,11 +81,13 @@ class BrokerTests(TestBase): self.queue_declare(channel, queue="test-queue") channel.queue_bind(queue="test-queue", exchange="test-exchange", routing_key="key") body = "Queued Delivery" - channel.basic_publish(exchange="test-exchange", routing_key="key", content=Content(body)) - reply = channel.basic_consume(queue="test-queue", no_ack=True) - queue = self.client.queue(reply.consumer_tag) + channel.message_transfer(destination="test-exchange", routing_key="key", body=body) + + consumer_tag = "tag1" + channel.message_consume(queue="test-queue", destination=consumer_tag, no_ack=True) + queue = self.client.queue(consumer_tag) msg = queue.get(timeout=5) - self.assert_(msg.content.body == body) + self.assert_(msg.body == body) def test_invalid_channel(self): channel = self.client.channel(200) diff --git a/python/tests/example.py b/python/tests/example.py index a1949ccb9f..7ab4cc7d0a 100644 --- a/python/tests/example.py +++ b/python/tests/example.py @@ -68,18 +68,18 @@ class ExampleTest (TestBase): # has fields corresponding to the reply method fields, plus a content # field that is filled if the reply includes content. In this case the # interesting field is the consumer_tag. - reply = channel.basic_consume(queue="test-queue") + channel.message_consume(queue="test-queue", destination="consumer_tag") # We can use the Client.queue(...) method to access the queue # corresponding to our consumer_tag. - queue = self.client.queue(reply.consumer_tag) + queue = self.client.queue("consumer_tag") # Now lets publish a message and see if our consumer gets it. To do # this we need to import the Content class. body = "Hello World!" - channel.basic_publish(exchange="test", - routing_key="key", - content=Content(body)) + channel.message_transfer(destination="test", + routing_key="key", + body = body) # Now we'll wait for the message to arrive. We can use the timeout # argument in case the server hangs. By default queue.get() will wait @@ -87,8 +87,8 @@ class ExampleTest (TestBase): msg = queue.get(timeout=10) # And check that we got the right response with assertEqual - self.assertEqual(body, msg.content.body) + self.assertEqual(body, msg.body) # Now acknowledge the message. - channel.basic_ack(msg.delivery_tag, True) + msg.ok() diff --git a/python/tests/exchange.py b/python/tests/exchange.py index 56d6fa82e4..54c462de24 100644 --- a/python/tests/exchange.py +++ b/python/tests/exchange.py @@ -61,10 +61,10 @@ class StandardExchangeVerifier: self.assertPublishGet(q, ex, "a.x.b.x") self.assertPublishGet(q, ex, "a.x.x.b.x") # Shouldn't match - self.channel.basic_publish(exchange=ex, routing_key="a.b") - self.channel.basic_publish(exchange=ex, routing_key="a.b.x.y") - self.channel.basic_publish(exchange=ex, routing_key="x.a.b.x") - self.channel.basic_publish(exchange=ex, routing_key="a.b") + self.channel.message_transfer(destination=ex, routing_key="a.b") + self.channel.message_transfer(destination=ex, routing_key="a.b.x.y") + self.channel.message_transfer(destination=ex, routing_key="x.a.b.x") + self.channel.message_transfer(destination=ex, routing_key="a.b") self.assert_(q.empty()) def verifyHeadersExchange(self, ex): @@ -73,8 +73,8 @@ class StandardExchangeVerifier: self.channel.queue_bind(queue="q", exchange=ex, arguments={ "x-match":"all", "name":"fred" , "age":3} ) q = self.consume("q") headers = {"name":"fred", "age":3} - self.assertPublishGet(q, exchange=ex, properties={'headers':headers}) - self.channel.basic_publish(exchange=ex) # No headers, won't deliver + self.assertPublishGet(q, exchange=ex, properties=headers) + self.channel.message_transfer(destination=ex, body="") # No headers, won't deliver self.assertEmpty(q); @@ -272,10 +272,10 @@ class HeadersExchangeTests(TestBase): self.q = self.consume("q") def myAssertPublishGet(self, headers): - self.assertPublishGet(self.q, exchange="amq.match", properties={'headers':headers}) + self.assertPublishGet(self.q, exchange="amq.match", properties=headers) def myBasicPublish(self, headers): - self.channel.basic_publish(exchange="amq.match", content=Content("foobar", properties={'headers':headers})) + self.channel.message_transfer(destination="amq.match", body="foobar", application_headers=headers) def testMatchAll(self): self.channel.queue_bind(queue="q", exchange="amq.match", arguments={ 'x-match':'all', "name":"fred", "age":3}) diff --git a/python/tests/queue.py b/python/tests/queue.py index 60ac4c3dfb..d85f04c4c2 100644 --- a/python/tests/queue.py +++ b/python/tests/queue.py @@ -33,9 +33,9 @@ class QueueTests(TestBase): channel.exchange_declare(exchange="test-exchange", type="direct") channel.queue_declare(queue="test-queue", exclusive=True) channel.queue_bind(queue="test-queue", exchange="test-exchange", routing_key="key") - channel.basic_publish(exchange="test-exchange", routing_key="key", content=Content("one")) - channel.basic_publish(exchange="test-exchange", routing_key="key", content=Content("two")) - channel.basic_publish(exchange="test-exchange", routing_key="key", content=Content("three")) + channel.message_transfer(destination="test-exchange", routing_key="key", body="one") + channel.message_transfer(destination="test-exchange", routing_key="key", body="two") + channel.message_transfer(destination="test-exchange", routing_key="key", body="three") #check that the queue now reports 3 messages: reply = channel.queue_declare(queue="test-queue") @@ -48,9 +48,9 @@ class QueueTests(TestBase): self.assertEqual(0, reply.message_count) #send a further message and consume it, ensuring that the other messages are really gone - channel.basic_publish(exchange="test-exchange", routing_key="key", content=Content("four")) - reply = channel.basic_consume(queue="test-queue", no_ack=True) - queue = self.client.queue(reply.consumer_tag) + channel.message_transfer(destination="test-exchange", routing_key="key", content=Content("four")) + channel.message_consume(queue="test-queue", destination="tag", no_ack=True) + queue = self.client.queue("tag") msg = queue.get(timeout=1) self.assertEqual("four", msg.content.body) @@ -153,15 +153,15 @@ class QueueTests(TestBase): def test_delete_simple(self): """ - Test basic queue deletion + Test core queue deletion behaviour """ channel = self.channel #straight-forward case: channel.queue_declare(queue="delete-me") - channel.basic_publish(routing_key="delete-me", content=Content("a")) - channel.basic_publish(routing_key="delete-me", content=Content("b")) - channel.basic_publish(routing_key="delete-me", content=Content("c")) + channel.message_transfer(routing_key="delete-me", body="a") + channel.message_transfer(routing_key="delete-me", body="b") + channel.message_transfer(routing_key="delete-me", body="c") reply = channel.queue_delete(queue="delete-me") self.assertEqual(3, reply.message_count) #check that it has gone be declaring passively @@ -191,7 +191,7 @@ class QueueTests(TestBase): #create a queue and add a message to it (use default binding): channel.queue_declare(queue="delete-me-2") channel.queue_declare(queue="delete-me-2", passive="True") - channel.basic_publish(routing_key="delete-me-2", content=Content("message")) + channel.message_transfer(routing_key="delete-me-2", body="message") #try to delete, but only if empty: try: @@ -205,11 +205,11 @@ class QueueTests(TestBase): channel.channel_open() #empty queue: - reply = channel.basic_consume(queue="delete-me-2", no_ack=True) - queue = self.client.queue(reply.consumer_tag) + channel.message_consume(destination="consumer_tag", queue="delete-me-2", no_ack=True) + queue = self.client.queue("consumer_tag") msg = queue.get(timeout=1) self.assertEqual("message", msg.content.body) - channel.basic_cancel(consumer_tag=reply.consumer_tag) + channel.message_cancel(destination="consumer_tag") #retry deletion on empty queue: channel.queue_delete(queue="delete-me-2", if_empty="True") @@ -230,7 +230,7 @@ class QueueTests(TestBase): #create a queue and register a consumer: channel.queue_declare(queue="delete-me-3") channel.queue_declare(queue="delete-me-3", passive="True") - reply = channel.basic_consume(queue="delete-me-3", no_ack=True) + channel.message_consume(destination="consumer_tag", queue="delete-me-3", no_ack=True) #need new channel now: channel2 = self.client.channel(2) @@ -243,7 +243,7 @@ class QueueTests(TestBase): self.assertChannelException(406, e.args[0]) - channel.basic_cancel(consumer_tag=reply.consumer_tag) + channel.message_cancel(destination="consumer_tag") channel.queue_delete(queue="delete-me-3", if_unused="True") #check that it has gone by declaring passively: try: diff --git a/python/tests/testlib.py b/python/tests/testlib.py index cab07cc4ac..d9391fc529 100644 --- a/python/tests/testlib.py +++ b/python/tests/testlib.py @@ -49,7 +49,7 @@ class TestBaseTest(TestBase): def testAssertEmptyFail(self): self.queue_declare(queue="full") q = self.consume("full") - self.channel.basic_publish(routing_key="full") + self.channel.message_transfer(routing_key="full", body="") try: self.assertEmpty(q); self.fail("assertEmpty did not assert on non-empty queue") diff --git a/python/tests/tx.py b/python/tests/tx.py index 054fb8d8b7..55a5eaeade 100644 --- a/python/tests/tx.py +++ b/python/tests/tx.py @@ -37,22 +37,24 @@ class TxTests(TestBase): #check results for i in range(1, 5): msg = queue_c.get(timeout=1) - self.assertEqual("TxMessage %d" % i, msg.content.body) + self.assertEqual("TxMessage %d" % i, msg.body) + msg.ok() msg = queue_b.get(timeout=1) - self.assertEqual("TxMessage 6", msg.content.body) + self.assertEqual("TxMessage 6", msg.body) + msg.ok() msg = queue_a.get(timeout=1) - self.assertEqual("TxMessage 7", msg.content.body) + self.assertEqual("TxMessage 7", msg.body) + msg.ok() for q in [queue_a, queue_b, queue_c]: try: extra = q.get(timeout=1) - self.fail("Got unexpected message: " + extra.content.body) + self.fail("Got unexpected message: " + extra.body) except Empty: None #cleanup - channel.basic_ack(delivery_tag=0, multiple=True) channel.tx_commit() def test_auto_rollback(self): @@ -65,7 +67,7 @@ class TxTests(TestBase): for q in [queue_a, queue_b, queue_c]: try: extra = q.get(timeout=1) - self.fail("Got unexpected message: " + extra.content.body) + self.fail("Got unexpected message: " + extra.body) except Empty: None channel.tx_rollback() @@ -73,22 +75,24 @@ class TxTests(TestBase): #check results for i in range(1, 5): msg = queue_a.get(timeout=1) - self.assertEqual("Message %d" % i, msg.content.body) + self.assertEqual("Message %d" % i, msg.body) + msg.ok() msg = queue_b.get(timeout=1) - self.assertEqual("Message 6", msg.content.body) + self.assertEqual("Message 6", msg.body) + msg.ok() msg = queue_c.get(timeout=1) - self.assertEqual("Message 7", msg.content.body) + self.assertEqual("Message 7", msg.body) + msg.ok() for q in [queue_a, queue_b, queue_c]: try: extra = q.get(timeout=1) - self.fail("Got unexpected message: " + extra.content.body) + self.fail("Got unexpected message: " + extra.body) except Empty: None #cleanup - channel.basic_ack(delivery_tag=0, multiple=True) channel.tx_commit() def test_rollback(self): @@ -101,7 +105,7 @@ class TxTests(TestBase): for q in [queue_a, queue_b, queue_c]: try: extra = q.get(timeout=1) - self.fail("Got unexpected message: " + extra.content.body) + self.fail("Got unexpected message: " + extra.body) except Empty: None channel.tx_rollback() @@ -109,22 +113,24 @@ class TxTests(TestBase): #check results for i in range(1, 5): msg = queue_a.get(timeout=1) - self.assertEqual("Message %d" % i, msg.content.body) + self.assertEqual("Message %d" % i, msg.body) + msg.ok() msg = queue_b.get(timeout=1) - self.assertEqual("Message 6", msg.content.body) + self.assertEqual("Message 6", msg.body) + msg.ok() msg = queue_c.get(timeout=1) - self.assertEqual("Message 7", msg.content.body) + self.assertEqual("Message 7", msg.body) + msg.ok() for q in [queue_a, queue_b, queue_c]: try: extra = q.get(timeout=1) - self.fail("Got unexpected message: " + extra.content.body) + self.fail("Got unexpected message: " + extra.body) except Empty: None #cleanup - channel.basic_ack(delivery_tag=0, multiple=True) channel.tx_commit() def perform_txn_work(self, channel, name_a, name_b, name_c): @@ -144,66 +150,38 @@ class TxTests(TestBase): channel.queue_bind(queue=name_c, exchange="amq.topic", routing_key=topic) for i in range(1, 5): - channel.basic_publish(routing_key=name_a, content=Content("Message %d" % i)) + channel.message_transfer(routing_key=name_a, body="Message %d" % i) - channel.basic_publish(routing_key=key, exchange="amq.direct", content=Content("Message 6")) - channel.basic_publish(routing_key=topic, exchange="amq.topic", content=Content("Message 7")) + channel.message_transfer(routing_key=key, destination="amq.direct", body="Message 6") + channel.message_transfer(routing_key=topic, destination="amq.topic", body="Message 7") channel.tx_select() #consume and ack messages - sub_a = channel.basic_consume(queue=name_a, no_ack=False) - queue_a = self.client.queue(sub_a.consumer_tag) + channel.message_consume(queue=name_a, destination="sub_a", no_ack=False) + queue_a = self.client.queue("sub_a") for i in range(1, 5): msg = queue_a.get(timeout=1) - self.assertEqual("Message %d" % i, msg.content.body) - channel.basic_ack(delivery_tag=msg.delivery_tag, multiple=True) + self.assertEqual("Message %d" % i, msg.body) + msg.ok() - sub_b = channel.basic_consume(queue=name_b, no_ack=False) - queue_b = self.client.queue(sub_b.consumer_tag) + channel.message_consume(queue=name_b, destination="sub_b", no_ack=False) + queue_b = self.client.queue("sub_b") msg = queue_b.get(timeout=1) - self.assertEqual("Message 6", msg.content.body) - channel.basic_ack(delivery_tag=msg.delivery_tag) + self.assertEqual("Message 6", msg.body) + msg.ok() - sub_c = channel.basic_consume(queue=name_c, no_ack=False) - queue_c = self.client.queue(sub_c.consumer_tag) + sub_c = channel.message_consume(queue=name_c, destination="sub_c", no_ack=False) + queue_c = self.client.queue("sub_c") msg = queue_c.get(timeout=1) - self.assertEqual("Message 7", msg.content.body) - channel.basic_ack(delivery_tag=msg.delivery_tag) + self.assertEqual("Message 7", msg.body) + msg.ok() #publish messages for i in range(1, 5): - channel.basic_publish(routing_key=topic, exchange="amq.topic", content=Content("TxMessage %d" % i)) + channel.message_transfer(routing_key=topic, destination="amq.topic", body="TxMessage %d" % i) - channel.basic_publish(routing_key=key, exchange="amq.direct", content=Content("TxMessage 6")) - channel.basic_publish(routing_key=name_a, content=Content("TxMessage 7")) + channel.message_transfer(routing_key=key, destination="amq.direct", body="TxMessage 6") + channel.message_transfer(routing_key=name_a, body="TxMessage 7") return queue_a, queue_b, queue_c - - def test_commit_overlapping_acks(self): - """ - Test that logically 'overlapping' acks do not cause errors on commit - """ - channel = self.channel - channel.queue_declare(queue="commit-overlapping", exclusive=True) - for i in range(1, 10): - channel.basic_publish(routing_key="commit-overlapping", content=Content("Message %d" % i)) - - - channel.tx_select() - - sub = channel.basic_consume(queue="commit-overlapping", no_ack=False) - queue = self.client.queue(sub.consumer_tag) - for i in range(1, 10): - msg = queue.get(timeout=1) - self.assertEqual("Message %d" % i, msg.content.body) - if i in [3, 6, 10]: - channel.basic_ack(delivery_tag=msg.delivery_tag) - - channel.tx_commit() - - #check all have been acked: - try: - extra = queue.get(timeout=1) - self.fail("Got unexpected message: " + extra.content.body) - except Empty: None |