diff options
author | Alan Conway <aconway@apache.org> | 2007-03-19 20:34:32 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2007-03-19 20:34:32 +0000 |
commit | 61c0db2359216685e697facac5351bdd836035d7 (patch) | |
tree | 1e070c23f405f82e528dee9a91fd3bbfddf86fed /python/tests/message.py | |
parent | f934d8a2d5d91d802fcc791a8492bd92abafc327 (diff) | |
download | qpid-python-61c0db2359216685e697facac5351bdd836035d7.tar.gz |
Merged revisions 507491-507559,507561-507601,507603-507621,507623-507671,507673-507959,507961-507992,507994-508097,508099-508149,508151-508155,508157-508232,508234-508378,508380-508390,508392-508459,508461-508704,508707-509615,509617-509737,509739-509753,509756-509833,509835-510106,510108-510160,510162-510179,510181-510552,510554-510704,510706-510911,510913-510985,510987-511003,511005-514750,514752-515720,515722-516156,516158-516458,516461-516484,516486-516488,516490-517823,517825,517827,517829,517831-517832,517834-517848,517850,517852-517854,517856-517858,517860-517877,517879-517886,517888-517891,517893-517903,517905,517907-517928,517930,517932-518197,518201-518206,518208-518230,518232,518235,518237,518239-518240,518243-518245,518247-518255,518257,518259-518260,518262,518264,518266-518292,518294-518707 via svnmerge from
https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9
........
r507491 | gsim | 2007-02-14 06:39:26 -0500 (Wed, 14 Feb 2007) | 3 lines
Expanded the use of batched acks to a few other places in tests.
........
r508377 | gsim | 2007-02-16 07:03:37 -0500 (Fri, 16 Feb 2007) | 4 lines
Updated failing list for java (some of these result in test suite blocking)
Added better error handling when connection closes without close method
........
r508396 | gsim | 2007-02-16 08:54:54 -0500 (Fri, 16 Feb 2007) | 3 lines
Fix: use message_resume not channel_resume
........
r509611 | gsim | 2007-02-20 10:39:14 -0500 (Tue, 20 Feb 2007) | 3 lines
Fixed bug where response id rather than request id was being used to get listener for response.
........
r509617 | gsim | 2007-02-20 10:52:31 -0500 (Tue, 20 Feb 2007) | 3 lines
Updated list of failing tests for java broker on this branch.
........
r510096 | gsim | 2007-02-21 11:49:27 -0500 (Wed, 21 Feb 2007) | 5 lines
Fixed bug in references where map wasn't qualified in close
Attach reference to transfer, as it will be deleted on close
Altered tests to get reference from the message on the queue rather than looking them up from channel as they are already gone there
........
r510114 | astitcher | 2007-02-21 12:37:36 -0500 (Wed, 21 Feb 2007) | 3 lines
r1224@fuschia: andrew | 2007-02-21 17:20:59 +0000
Updated expected cpp broker test failures
........
r510128 | gsim | 2007-02-21 13:06:02 -0500 (Wed, 21 Feb 2007) | 3 lines
Ensure socket is closed in tearDown
........
r510913 | gsim | 2007-02-23 06:37:08 -0500 (Fri, 23 Feb 2007) | 3 lines
Revised list of failing tests for java broker on this branch
........
r515363 | aconway | 2007-03-06 18:35:08 -0500 (Tue, 06 Mar 2007) | 6 lines
* python/qpid/peer.py (Channel.__init__): use reliable framing if
version >= 0-8.
* python/qpid/spec.py (Spec.__init__): Remove unused parameter.
* python/qpid/testlib.py (TestRunner._parseargs): Add --errata option,
set default errata only if --spec is not present.
........
r518707 | aconway | 2007-03-15 13:49:44 -0400 (Thu, 15 Mar 2007) | 6 lines
* python/qpid/peer.py (Peer.close): Close delegate *before* channels.
Otherwise we get a race: closing a channel can wake a client thread,
which may see client.closed as still false. Was causing bogus exceptions
in some tests.
........
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@520094 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/tests/message.py')
-rw-r--r-- | python/tests/message.py | 214 |
1 files changed, 175 insertions, 39 deletions
diff --git a/python/tests/message.py b/python/tests/message.py index 916a9825bd..d5f5d4dbc2 100644 --- a/python/tests/message.py +++ b/python/tests/message.py @@ -171,8 +171,7 @@ class MessageTests(TestBase): self.assertEqual("Four", msg4.body) self.assertEqual("Five", msg5.body) - msg1.ok() - msg2.ok() + msg1.ok(batchoffset=1)#One and Two msg4.ok() channel.message_recover(requeue=False) @@ -216,9 +215,8 @@ class MessageTests(TestBase): self.assertEqual("Four", msg4.body) self.assertEqual("Five", msg5.body) - msg1.ok() #One - msg2.ok() #Two - msg4.ok() #Two + msg1.ok(batchoffset=1) #One and Two + msg4.ok() #Four channel.message_cancel(destination="consumer_tag") channel.message_consume(queue="test-requeue", destination="consumer_tag") @@ -263,11 +261,9 @@ class MessageTests(TestBase): channel.message_transfer(routing_key="test-prefetch-count", body="Message %d" % i) #only 5 messages should have been delivered: - msgs = [] for i in range(1, 6): msg = queue.get(timeout=1) self.assertEqual("Message %d" % i, msg.body) - msgs.append(msg) try: extra = queue.get(timeout=1) self.fail("Got unexpected 6th message in original queue: " + extra.body) @@ -275,18 +271,13 @@ class MessageTests(TestBase): #ack messages and check that the next set arrive ok: #todo: once batching is implmented, send a single response for all messages - for msg in msgs: - msg.ok() - del msgs + msg.ok(batchoffset=-4)#1-5 for i in range(6, 11): msg = queue.get(timeout=1) self.assertEqual("Message %d" % i, msg.body) - msgs.append(msg) - for msg in msgs: - msg.ok() - del msgs + msg.ok(batchoffset=-4)#6-10 try: extra = queue.get(timeout=1) @@ -313,12 +304,9 @@ class MessageTests(TestBase): channel.message_transfer(routing_key="test-prefetch-size", body="Message %d" % i) #only 5 messages should have been delivered (i.e. 45 bytes worth): - msgs = [] for i in range(1, 6): msg = queue.get(timeout=1) self.assertEqual("Message %d" % i, msg.body) - print "Got Message %d" % i - msgs.append(msg) try: extra = queue.get(timeout=1) @@ -326,18 +314,13 @@ class MessageTests(TestBase): except Empty: None #ack messages and check that the next set arrive ok: - for msg in msgs: - msg.ok() - del msgs + msg.ok(batchoffset=-4)#1-5 for i in range(6, 11): msg = queue.get(timeout=1) self.assertEqual("Message %d" % i, msg.body) - msgs.append(msg) - for msg in msgs: - msg.ok() - del msgs + msg.ok(batchoffset=-4)#6-10 try: extra = queue.get(timeout=1) @@ -383,14 +366,13 @@ class MessageTests(TestBase): reply = channel.message_get(no_ack=False, queue="test-get", destination=tag) self.assertEqual(reply.method.klass.name, "message") self.assertEqual(reply.method.name, "ok") - self.assertEqual("Message %d" % i, self.client.queue(tag).get(timeout=1).body) - reply.ok() - - #todo: when batching is available, test ack multiple - #if(i == 13): - # channel.message_ack(delivery_tag=reply.delivery_tag, multiple=True) - #if(i in [15, 17, 19]): - # channel.message_ack(delivery_tag=reply.delivery_tag) + msg = self.client.queue(tag).get(timeout=1) + self.assertEqual("Message %d" % i, msg.body) + + if (i==13): + msg.ok(batchoffset=-2)#11, 12 & 13 + if(i in [15, 17, 19]): + msg.ok() reply = channel.message_get(no_ack=True, queue="test-get") self.assertEqual(reply.method.klass.name, "message") @@ -479,8 +461,9 @@ class MessageTests(TestBase): queue = other.queue("c1") msg = queue.get(timeout=1) - self.assertTrue(isinstance(msg, Reference)) - self.assertEquals(data, msg.get_complete()) + self.assertTrue(isinstance(msg.body, ReferenceId)) + self.assertTrue(msg.reference) + self.assertEquals(data, msg.reference.get_complete()) def test_reference_completion(self): """ @@ -511,12 +494,165 @@ class MessageTests(TestBase): #first, wait for the ok for the transfer ack.get_response(timeout=1) + self.assertDataEquals(channel, queue.get(timeout=1), "abcd") + + def test_reference_multi_transfer(self): + """ + Test that multiple transfer requests for the same reference are + correctly handled. + """ + channel = self.channel + #declare and consume from two queues + channel.queue_declare(queue="q-one", exclusive=True) + channel.queue_declare(queue="q-two", exclusive=True) + channel.message_consume(queue="q-one", destination="q-one") + channel.message_consume(queue="q-two", destination="q-two") + queue1 = self.client.queue("q-one") + queue2 = self.client.queue("q-two") + + #transfer a single ref to both queues (in separate commands) + channel.message_open(reference="my-ref") + channel.synchronous = False + ack1 = channel.message_transfer(routing_key="q-one", body=ReferenceId("my-ref")) + channel.message_append(reference="my-ref", bytes="my data") + ack2 = channel.message_transfer(routing_key="q-two", body=ReferenceId("my-ref")) + channel.synchronous = True + channel.message_close(reference="my-ref") + + #check that both queues have the message + self.assertDataEquals(channel, queue1.get(timeout=1), "my data") + self.assertDataEquals(channel, queue2.get(timeout=1), "my data") + self.assertEmpty(queue1) + self.assertEmpty(queue2) + + #transfer a single ref to the same queue twice (in separate commands) + channel.message_open(reference="my-ref") + channel.synchronous = False + ack1 = channel.message_transfer(routing_key="q-one", message_id="abc", body=ReferenceId("my-ref")) + channel.message_append(reference="my-ref", bytes="second message") + ack2 = channel.message_transfer(routing_key="q-one", message_id="xyz", body=ReferenceId("my-ref")) + channel.synchronous = True + channel.message_close(reference="my-ref") + + msg1 = queue1.get(timeout=1) + msg2 = queue1.get(timeout=1) + #order is undefined + if msg1.message_id == "abc": + self.assertEquals(msg2.message_id, "xyz") + else: + self.assertEquals(msg1.message_id, "xyz") + self.assertEquals(msg2.message_id, "abc") + + #would be legal for the incoming messages to be transfered + #inline or by reference in any combination + + if isinstance(msg1.body, ReferenceId): + self.assertEquals("second message", msg1.reference.get_complete()) + if isinstance(msg2.body, ReferenceId): + if msg1.body != msg2.body: + self.assertEquals("second message", msg2.reference.get_complete()) + #else ok, as same ref as msg1 + else: + self.assertEquals("second message", msg1.body) + if isinstance(msg2.body, ReferenceId): + self.assertEquals("second message", msg2.reference.get_complete()) + else: + self.assertEquals("second message", msg2.body) + + self.assertEmpty(queue1) + + def test_reference_unopened_on_append_error(self): + channel = self.channel + try: + channel.message_append(reference="unopened") + except Closed, e: + self.assertConnectionException(503, e.args[0]) + + def test_reference_unopened_on_close_error(self): + channel = self.channel + try: + channel.message_close(reference="unopened") + except Closed, e: + self.assertConnectionException(503, e.args[0]) + + def test_reference_unopened_on_transfer_error(self): + channel = self.channel + try: + channel.message_transfer(body=ReferenceId("unopened")) + except Closed, e: + self.assertConnectionException(503, e.args[0]) + + def test_reference_already_opened_error(self): + channel = self.channel + channel.message_open(reference="a") + try: + channel.message_open(reference="a") + except Closed, e: + self.assertConnectionException(503, e.args[0]) + + def test_empty_reference(self): + channel = self.channel + channel.queue_declare(queue="ref_queue", exclusive=True) + channel.message_consume(queue="ref_queue", destination="c1") + queue = self.client.queue("c1") + + refId = "myref" + channel.message_open(reference=refId) + channel.synchronous = False + ack = channel.message_transfer(routing_key="ref_queue", message_id="empty-msg", body=ReferenceId(refId)) + channel.synchronous = True + channel.message_close(reference=refId) + + #first, wait for the ok for the transfer + ack.get_response(timeout=1) + msg = queue.get(timeout=1) - if isinstance(msg, Reference): - #should we force broker to deliver as reference by frame - #size limit? or test that separately? for compliance, - #allowing either seems best for now... - data = msg.get_complete() + self.assertEquals(msg.message_id, "empty-msg") + self.assertDataEquals(channel, msg, "") + + def test_reject(self): + channel = self.channel + channel.queue_declare(queue = "q", exclusive=True) + + channel.message_consume(queue = "q", destination = "consumer") + channel.message_transfer(routing_key = "q", body="blah, blah") + msg = self.client.queue("consumer").get(timeout = 1) + self.assertEquals(msg.body, "blah, blah") + channel.message_cancel(destination = "consumer") + msg.reject() + + channel.message_consume(queue = "q", destination = "checker") + msg = self.client.queue("checker").get(timeout = 1) + self.assertEquals(msg.body, "blah, blah") + + def test_checkpoint(self): + channel = self.channel + channel.queue_declare(queue = "q", exclusive=True) + + channel.message_open(reference="my-ref") + channel.message_append(reference="my-ref", bytes="abcdefgh") + channel.message_append(reference="my-ref", bytes="ijklmnop") + channel.message_checkpoint(reference="my-ref", identifier="my-checkpoint") + channel.channel_close() + + channel = self.client.channel(2) + channel.channel_open() + channel.message_consume(queue = "q", destination = "consumer") + offset = channel.message_resume(reference="my-ref", identifier="my-checkpoint").value + self.assertEquals(offset, 16) + channel.message_append(reference="my-ref", bytes="qrstuvwxyz") + channel.synchronous = False + channel.message_transfer(routing_key="q-one", message_id="abcd", body=ReferenceId("my-ref")) + channel.synchronous = True + channel.message_close(reference="my-ref") + + self.assertDataEquals(channel, self.client.queue("consumer").get(timeout = 1), "abcdefghijklmnopqrstuvwxyz") + self.assertEmpty(self.client.queue("consumer")) + + + def assertDataEquals(self, channel, msg, expected): + if isinstance(msg.body, ReferenceId): + data = msg.reference.get_complete() else: data = msg.body self.assertEquals("abcdefghijkl", data) |