summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--python/qpid/connection.py3
-rw-r--r--python/qpid/peer.py8
-rw-r--r--python/tests/basic.py14
-rw-r--r--python/tests/message.py56
4 files changed, 54 insertions, 27 deletions
diff --git a/python/qpid/connection.py b/python/qpid/connection.py
index fb1e0927f0..0785fe8774 100644
--- a/python/qpid/connection.py
+++ b/python/qpid/connection.py
@@ -232,6 +232,9 @@ class Response(Frame):
method = Method.decode(spec, dec, size - 20)
return Response(id, request_id, batch_offset, method)
+ def __str__(self):
+ return "[%s] Response(%s,%s,%s) %s" % (self.channel, self.id, self.request_id, self.batch_offset, self.method)
+
class Header(Frame):
type = "frame_header"
diff --git a/python/qpid/peer.py b/python/qpid/peer.py
index 8d5029004e..b5c655dc2a 100644
--- a/python/qpid/peer.py
+++ b/python/qpid/peer.py
@@ -220,6 +220,8 @@ class Channel:
work.put(self.incoming)
elif isinstance(frame, Response):
self.requester.receive(self, frame)
+ if frame.method_type.content:
+ self.queue = self.responses
return
self.queue.put(frame)
@@ -241,7 +243,11 @@ class Channel:
return None
try:
resp = self.responses.get()
- return Message(self, resp)
+ if resp.method_type.content:
+ return Message(self, resp, read_content(self.responses))
+ else:
+ return Message(self, resp)
+
except QueueClosed, e:
if self.closed:
raise Closed(self.reason)
diff --git a/python/tests/basic.py b/python/tests/basic.py
index 9f26ee3728..140576540a 100644
--- a/python/tests/basic.py
+++ b/python/tests/basic.py
@@ -347,12 +347,12 @@ class BasicTests(TestBase):
for i in range(1, 11):
reply = channel.basic_get(no_ack=True)
self.assertEqual(reply.method.klass.name, "basic")
- self.assertEqual(reply.method.name, "get-ok")
+ self.assertEqual(reply.method.name, "get_ok")
self.assertEqual("Message %d" % i, reply.content.body)
reply = channel.basic_get(no_ack=True)
self.assertEqual(reply.method.klass.name, "basic")
- self.assertEqual(reply.method.name, "get-empty")
+ self.assertEqual(reply.method.name, "get_empty")
#repeat for no_ack=False
for i in range(11, 21):
@@ -361,7 +361,7 @@ class BasicTests(TestBase):
for i in range(11, 21):
reply = channel.basic_get(no_ack=False)
self.assertEqual(reply.method.klass.name, "basic")
- self.assertEqual(reply.method.name, "get-ok")
+ self.assertEqual(reply.method.name, "get_ok")
self.assertEqual("Message %d" % i, reply.content.body)
if(i == 13):
channel.basic_ack(delivery_tag=reply.delivery_tag, multiple=True)
@@ -370,7 +370,7 @@ class BasicTests(TestBase):
reply = channel.basic_get(no_ack=True)
self.assertEqual(reply.method.klass.name, "basic")
- self.assertEqual(reply.method.name, "get-empty")
+ self.assertEqual(reply.method.name, "get_empty")
#recover(requeue=True)
channel.basic_recover(requeue=True)
@@ -379,16 +379,16 @@ class BasicTests(TestBase):
for i in [14, 16, 18, 20]:
reply = channel.basic_get(no_ack=False)
self.assertEqual(reply.method.klass.name, "basic")
- self.assertEqual(reply.method.name, "get-ok")
+ self.assertEqual(reply.method.name, "get_ok")
self.assertEqual("Message %d" % i, reply.content.body)
channel.basic_ack(delivery_tag=reply.delivery_tag)
reply = channel.basic_get(no_ack=True)
self.assertEqual(reply.method.klass.name, "basic")
- self.assertEqual(reply.method.name, "get-empty")
+ self.assertEqual(reply.method.name, "get_empty")
channel.basic_recover(requeue=True)
reply = channel.basic_get(no_ack=True)
self.assertEqual(reply.method.klass.name, "basic")
- self.assertEqual(reply.method.name, "get-empty")
+ self.assertEqual(reply.method.name, "get_empty")
diff --git a/python/tests/message.py b/python/tests/message.py
index 0c9581f1c4..a7c8f875eb 100644
--- a/python/tests/message.py
+++ b/python/tests/message.py
@@ -149,8 +149,8 @@ class MessageTests(TestBase):
channel = self.channel
channel.queue_declare(queue="test-ack-queue", exclusive=True)
- reply = channel.message_consume(queue="test-ack-queue", no_ack=False)
- queue = self.client.queue(reply.consumer_tag)
+ channel.message_consume(queue="test-ack-queue", destination="consumer_tag", no_ack=False)
+ queue = self.client.queue("consumer_tag")
channel.message_transfer(routing_key="test-ack-queue", body="One")
channel.message_transfer(routing_key="test-ack-queue", body="Two")
@@ -170,8 +170,9 @@ class MessageTests(TestBase):
self.assertEqual("Four", msg4.body)
self.assertEqual("Five", msg5.body)
- channel.message_ack(delivery_tag=msg2.delivery_tag, multiple=True) #One & Two
- channel.message_ack(delivery_tag=msg4.delivery_tag, multiple=False) #Four
+ msg1.ok()
+ msg2.ok()
+ msg4.ok()
channel.message_recover(requeue=False)
@@ -214,8 +215,6 @@ class MessageTests(TestBase):
self.assertEqual("Four", msg4.body)
self.assertEqual("Five", msg5.body)
- #channel.message_ack(delivery_tag=msg2.delivery_tag, multiple=True) #One & Two
- #channel.message_ack(delivery_tag=msg4.delivery_tag, multiple=False) #Four
msg1.ok() #One
msg2.ok() #Two
msg4.ok() #Two
@@ -252,8 +251,8 @@ class MessageTests(TestBase):
#setup: declare queue and subscribe
channel = self.channel
channel.queue_declare(queue="test-prefetch-count", exclusive=True)
- subscription = channel.message_consume(queue="test-prefetch-count", no_ack=False)
- queue = self.client.queue(subscription.consumer_tag)
+ subscription = channel.message_consume(queue="test-prefetch-count", destination="consumer_tag", no_ack=False)
+ queue = self.client.queue("consumer_tag")
#set prefetch to 5:
channel.message_qos(prefetch_count=5)
@@ -263,22 +262,30 @@ class MessageTests(TestBase):
channel.message_transfer(routing_key="test-prefetch-count", body="Message %d" % i)
#only 5 messages should have been delivered:
+ msgs = []
for i in range(1, 6):
msg = queue.get(timeout=1)
self.assertEqual("Message %d" % i, msg.body)
+ msgs.add(msg)
try:
extra = queue.get(timeout=1)
self.fail("Got unexpected 6th message in original queue: " + extra.body)
except Empty: None
#ack messages and check that the next set arrive ok:
- channel.message_ack(delivery_tag=msg.delivery_tag, multiple=True)
+ #todo: once batching is implmented, send a single response for all messages
+ for msg in msgs:
+ msg.ok()
+ msgs.clear()
for i in range(6, 11):
msg = queue.get(timeout=1)
self.assertEqual("Message %d" % i, msg.body)
+ msgs.add(msg)
- channel.message_ack(delivery_tag=msg.delivery_tag, multiple=True)
+ for msg in msgs:
+ msg.ok()
+ msgs.clear()
try:
extra = queue.get(timeout=1)
@@ -294,8 +301,8 @@ class MessageTests(TestBase):
#setup: declare queue and subscribe
channel = self.channel
channel.queue_declare(queue="test-prefetch-size", exclusive=True)
- subscription = channel.message_consume(queue="test-prefetch-size", no_ack=False)
- queue = self.client.queue(subscription.consumer_tag)
+ subscription = channel.message_consume(queue="test-prefetch-size", destination="consumer_tag", no_ack=False)
+ queue = self.client.queue("consumer_tag")
#set prefetch to 50 bytes (each message is 9 or 10 bytes):
channel.message_qos(prefetch_size=50)
@@ -305,9 +312,11 @@ class MessageTests(TestBase):
channel.message_transfer(routing_key="test-prefetch-size", body="Message %d" % i)
#only 5 messages should have been delivered (i.e. 45 bytes worth):
+ msgs = []
for i in range(1, 6):
msg = queue.get(timeout=1)
self.assertEqual("Message %d" % i, msg.body)
+ msgs.add(msg)
try:
extra = queue.get(timeout=1)
@@ -315,13 +324,18 @@ class MessageTests(TestBase):
except Empty: None
#ack messages and check that the next set arrive ok:
- channel.message_ack(delivery_tag=msg.delivery_tag, multiple=True)
+ for msg in msgs:
+ msg.ok()
+ msgs.clear()
for i in range(6, 11):
msg = queue.get(timeout=1)
self.assertEqual("Message %d" % i, msg.body)
+ msgs.add(msg)
- channel.message_ack(delivery_tag=msg.delivery_tag, multiple=True)
+ for msg in msgs:
+ msg.ok()
+ msgs.clear()
try:
extra = queue.get(timeout=1)
@@ -366,10 +380,13 @@ class MessageTests(TestBase):
self.assertEqual(reply.method.klass.name, "message")
self.assertEqual(reply.method.name, "get-ok")
self.assertEqual("Message %d" % i, reply.body)
- if(i == 13):
- channel.message_ack(delivery_tag=reply.delivery_tag, multiple=True)
- if(i in [15, 17, 19]):
- channel.message_ack(delivery_tag=reply.delivery_tag)
+ reply.ok()
+
+ #todo: when batching is available, test ack multiple
+ #if(i == 13):
+ # channel.message_ack(delivery_tag=reply.delivery_tag, multiple=True)
+ #if(i in [15, 17, 19]):
+ # channel.message_ack(delivery_tag=reply.delivery_tag)
reply = channel.message_get(no_ack=True)
self.assertEqual(reply.method.klass.name, "message")
@@ -384,7 +401,8 @@ class MessageTests(TestBase):
self.assertEqual(reply.method.klass.name, "message")
self.assertEqual(reply.method.name, "get-ok")
self.assertEqual("Message %d" % i, reply.body)
- channel.message_ack(delivery_tag=reply.delivery_tag)
+ reply.ok()
+ #channel.message_ack(delivery_tag=reply.delivery_tag)
reply = channel.message_get(no_ack=True)
self.assertEqual(reply.method.klass.name, "message")