summaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
Diffstat (limited to 'python')
-rw-r--r--python/cpp_failing_0-10.txt13
-rw-r--r--python/qpid/codec.py25
-rw-r--r--python/qpid/message.py8
-rw-r--r--python/qpid/peer.py82
-rw-r--r--python/qpid/spec.py2
-rw-r--r--python/tests_0-10/broker.py2
-rw-r--r--python/tests_0-10/dtx.py26
-rw-r--r--python/tests_0-10/example.py2
-rw-r--r--python/tests_0-10/message.py24
-rw-r--r--python/tests_0-10/tx.py56
10 files changed, 178 insertions, 62 deletions
diff --git a/python/cpp_failing_0-10.txt b/python/cpp_failing_0-10.txt
index 798d1769eb..e68f942d67 100644
--- a/python/cpp_failing_0-10.txt
+++ b/python/cpp_failing_0-10.txt
@@ -1,4 +1,15 @@
-tests_0-10.message.MessageTests.test_checkpoint
tests_0-10.message.MessageTests.test_reject
tests_0-10.basic.BasicTests.test_get
+tests_0-10.message.MessageTests.test_get
+tests_0-10.message.MessageTests.test_checkpoint
+tests_0-10.message.MessageTests.test_empty_reference
+tests_0-10.message.MessageTests.test_reference_already_opened_error
+tests_0-10.message.MessageTests.test_reference_completion
+tests_0-10.message.MessageTests.test_reference_large
+tests_0-10.message.MessageTests.test_reference_multi_transfer
+tests_0-10.message.MessageTests.test_reference_simple
+tests_0-10.message.MessageTests.test_reference_unopened_on_append_error
+tests_0-10.message.MessageTests.test_reference_unopened_on_close_error
+tests_0-10.message.MessageTests.test_reference_unopened_on_transfer_error
+
diff --git a/python/qpid/codec.py b/python/qpid/codec.py
index a5228e8003..a0d9696c8b 100644
--- a/python/qpid/codec.py
+++ b/python/qpid/codec.py
@@ -329,12 +329,6 @@ class Codec:
return ReferenceId(self.decode_longstr())
# new domains for 0-10:
-
- def encode_uuid(self, s):
- self.encode_longstr(s)
-
- def decode_uuid(self):
- return self.decode_longstr()
def encode_rfc1982_long(self, s):
self.encode_long(s)
@@ -342,10 +336,21 @@ class Codec:
def decode_rfc1982_long(self):
return self.decode_long()
- #Not done yet
def encode_rfc1982_long_set(self, s):
- self.encode_short(0)
+ self.encode_short(len(s))
+ for i in s:
+ self.encode_long(i)
def decode_rfc1982_long_set(self):
- self.decode_short()
- return 0;
+ count = self.decode_short()
+ set = []
+ for i in range(0, count):
+ set.append(self.decode_long())
+ return set;
+
+ #not correct for 0-10 yet
+ def encode_uuid(self, s):
+ self.encode_longstr(s)
+
+ def decode_uuid(self):
+ return self.decode_longstr()
diff --git a/python/qpid/message.py b/python/qpid/message.py
index f80293180e..970ab9d974 100644
--- a/python/qpid/message.py
+++ b/python/qpid/message.py
@@ -26,7 +26,10 @@ class Message:
self.frame = frame
self.method = frame.method_type
self.content = content
-
+ if self.method.klass.name != "execution":
+ self.command_id = self.channel.incoming_completion.sequence.next()
+ #print "allocated: ", self.command_id, "to ", self.method.klass.name, "_", self.method.name
+
def __len__(self):
return len(self.frame.args)
@@ -66,3 +69,6 @@ class Message:
def __repr__(self):
return Message.REPR % (self.method, self.frame.args, self.content)
+
+ def complete(self, cumulative=True):
+ self.channel.incoming_completion.complete(mark=self.command_id, cumulative=cumulative)
diff --git a/python/qpid/peer.py b/python/qpid/peer.py
index 3927f20667..bedc96895b 100644
--- a/python/qpid/peer.py
+++ b/python/qpid/peer.py
@@ -30,6 +30,7 @@ from message import Message
from queue import Queue, Closed as QueueClosed
from content import Content
from cStringIO import StringIO
+from time import time
class Sequence:
@@ -186,11 +187,11 @@ class Channel:
self.requester = Requester(self.write)
self.responder = Responder(self.write)
- self.completion = ExecutionCompletion()
+ self.completion = OutgoingCompletion()
+ self.incoming_completion = IncomingCompletion(self)
# Use reliable framing if version == 0-9.
- # (also for 0-10 while transitioning...)
- self.reliable = (spec.major == 0 and (spec.minor == 9 or spec.minor == 10))
+ self.reliable = (spec.major == 0 and spec.minor == 9)
self.use_execution_layer = (spec.major == 0 and spec.minor == 10)
self.synchronous = True
@@ -202,6 +203,7 @@ class Channel:
self.incoming.close()
self.responses.close()
self.completion.close()
+ self.incoming_completion.reset()
def write(self, frame, content = None):
if self.closed:
@@ -252,6 +254,9 @@ class Channel:
self.responder.respond(method, batch, request)
def invoke(self, type, args, kwargs):
+ if type.klass.name == "channel" and (type.name == "close" or type.name == "open"):
+ self.completion.reset()
+ self.incoming_completion.reset()
self.completion.next_command(type)
content = kwargs.pop("content", None)
frame = Method(type, type.arguments(*args, **kwargs))
@@ -306,6 +311,13 @@ class Channel:
return Message(self, resp, content)
else:
raise ValueError(resp)
+ elif self.synchronous and not frame.method.response \
+ and self.use_execution_layer and frame.method.klass.name != "execution":
+ self.execution_flush()
+ self.completion.wait()
+ if self.closed:
+ raise Closed(self.reason)
+
except QueueClosed, e:
if self.closed:
raise Closed(self.reason)
@@ -349,21 +361,32 @@ class Future:
def is_complete(self):
return self.completed.isSet()
-class ExecutionCompletion:
+class OutgoingCompletion:
+ """
+ Manages completion of outgoing commands i.e. command sent by this peer
+ """
+
def __init__(self):
self.condition = threading.Condition()
- self.sequence = Sequence(1)
- self.command_id = 0
- self.mark = 0
+
+ self.sequence = Sequence(1) #issues ids for outgoing commands
+ self.command_id = 0 #last issued id
+ self.mark = 0 #commands up to this mark are known to be complete
+ self.closed = False
def next_command(self, method):
#the following test is a hack until the track/sub-channel is available
if method.klass.name != "execution":
self.command_id = self.sequence.next()
+ def reset(self):
+ self.sequence = Sequence(1) #reset counter
+
def close(self):
+ self.reset()
self.condition.acquire()
try:
+ self.closed = True
self.condition.notifyAll()
finally:
self.condition.release()
@@ -378,11 +401,50 @@ class ExecutionCompletion:
def wait(self, point_of_interest=-1, timeout=None):
if point_of_interest == -1: point_of_interest = self.command_id
+ start_time = time()
+ remaining = timeout
self.condition.acquire()
try:
- if point_of_interest > self.mark:
- self.condition.wait(timeout)
+ while not self.closed and point_of_interest > self.mark:
+ #print "waiting for ", point_of_interest, " mark is currently at ", self.mark
+ self.condition.wait(remaining)
+ if timeout:
+ if start_time + timeout > time(): break
+ else: remaining = timeout - (time() - start_time)
finally:
self.condition.release()
- #todo: retry until timed out or closed
return point_of_interest <= self.mark
+
+class IncomingCompletion:
+ """
+ Manages completion of incoming commands i.e. command received by this peer
+ """
+
+ def __init__(self, channel):
+ self.sequence = Sequence(1) #issues ids for incoming commands
+ self.mark = 0 #id of last command of whose completion notification was sent to the other peer
+ self.channel = channel
+
+ def next_id(self, method):
+ #the following test is a hack until the track/sub-channel is available
+ if method.klass.name != "execution":
+ return self.sequence.next()
+ else:
+ return 0
+
+ def reset(self):
+ self.sequence = Sequence(1) #reset counter
+
+ def complete(self, mark, cumulative=True):
+ if cumulative:
+ if mark > self.mark:
+ self.mark = mark
+ self.channel.execution_complete(cumulative_execution_mark=self.mark)
+ else:
+ #TODO: record and manage the ranges properly
+ range = [mark, mark]
+ self.channel.execution_complete(cumulative_execution_mark=self.mark, ranged_execution_set=range)
+
+
+
+
diff --git a/python/qpid/spec.py b/python/qpid/spec.py
index c537401164..09e7dc9d0b 100644
--- a/python/qpid/spec.py
+++ b/python/qpid/spec.py
@@ -240,7 +240,7 @@ class Method(Metadata):
"content": None,
"uuid": "",
"rfc1982_long": 0,
- "rfc1982_long_set": 0
+ "rfc1982_long_set": []
}
def define_method(self, name):
diff --git a/python/tests_0-10/broker.py b/python/tests_0-10/broker.py
index 684b36597e..6bc2f7ceb8 100644
--- a/python/tests_0-10/broker.py
+++ b/python/tests_0-10/broker.py
@@ -48,7 +48,7 @@ class BrokerTests(TestBase):
body = "test ack"
ch.message_transfer(routing_key = "otherqueue", body = body)
msg = self.client.queue(ctag).get(timeout = 5)
- msg.ok()
+ msg.complete()
self.assert_(msg.body == body)
def test_simple_delivery_immediate(self):
diff --git a/python/tests_0-10/dtx.py b/python/tests_0-10/dtx.py
index c0d1bd2b74..2835d703ae 100644
--- a/python/tests_0-10/dtx.py
+++ b/python/tests_0-10/dtx.py
@@ -40,6 +40,11 @@ class DtxTests(TestBase):
XA_RBROLLBACK = 1
XA_RBTIMEOUT = 2
XA_OK = 8
+ tx_counter = 0
+
+ def reset_channel(self):
+ self.channel.channel_close()
+ self.channel.channel_open()
def test_simple_commit(self):
"""
@@ -56,6 +61,9 @@ class DtxTests(TestBase):
#commit
self.assertEqual(self.XA_OK, channel.dtx_coordination_commit(xid=tx, one_phase=True).status)
+ #should close and reopen channel to ensure no unacked messages are held
+ self.reset_channel()
+
#check result
self.assertMessageCount(0, "queue-a")
self.assertMessageCount(1, "queue-b")
@@ -79,6 +87,8 @@ class DtxTests(TestBase):
#commit
self.assertEqual(self.XA_OK, channel.dtx_coordination_commit(xid=tx, one_phase=False).status)
+ self.reset_channel()
+
#check result
self.assertMessageCount(0, "queue-a")
self.assertMessageCount(1, "queue-b")
@@ -100,6 +110,8 @@ class DtxTests(TestBase):
#rollback
self.assertEqual(self.XA_OK, channel.dtx_coordination_rollback(xid=tx).status)
+ self.reset_channel()
+
#check result
self.assertMessageCount(1, "queue-a")
self.assertMessageCount(0, "queue-b")
@@ -123,6 +135,8 @@ class DtxTests(TestBase):
#rollback
self.assertEqual(self.XA_OK, channel.dtx_coordination_rollback(xid=tx).status)
+ self.reset_channel()
+
#check result
self.assertMessageCount(1, "queue-a")
self.assertMessageCount(0, "queue-b")
@@ -191,6 +205,8 @@ class DtxTests(TestBase):
channel = self.channel
#do some transactional work & complete the transaction
self.test_simple_commit()
+ # channel has been reset, so reselect for use with dtx
+ channel.dtx_demarcation_select()
#start association for the same xid as the previously completed txn
tx = self.xid("my-xid")
@@ -355,7 +371,7 @@ class DtxTests(TestBase):
self.assertEqual("two", msg.message_id)
channel.message_cancel(destination="results")
#ack the message then close the channel
- msg.ok()
+ msg.complete()
channel.channel_close()
channel = self.channel
@@ -446,7 +462,7 @@ class DtxTests(TestBase):
channel2.dtx_demarcation_select()
channel2.dtx_demarcation_start(xid=tx)
channel2.message_get(queue="dummy", destination="dummy")
- self.client.queue("dummy").get(timeout=1).ok()
+ self.client.queue("dummy").get(timeout=1).complete()
channel2.message_transfer(routing_key="dummy", body="whatever")
channel2.channel_close()
@@ -548,7 +564,9 @@ class DtxTests(TestBase):
channel.dtx_coordination_rollback(xid=x)
self.fail("Recovered xids not as expected. missing: %s; extra: %s" % (missing, extra))
- def xid(self, txid, branchqual = ''):
+ def xid(self, txid):
+ DtxTests.tx_counter += 1
+ branchqual = "v%s" % DtxTests.tx_counter
return pack('LBB', 0, len(txid), len(branchqual)) + txid + branchqual
def txswap(self, tx, id):
@@ -573,7 +591,7 @@ class DtxTests(TestBase):
#consume from src:
channel.message_get(destination="temp-swap", queue=src)
msg = self.client.queue("temp-swap").get(timeout=1)
- msg.ok();
+ msg.complete();
#re-publish to dest
channel.message_transfer(routing_key=dest, message_id=msg.message_id, body=msg.body)
diff --git a/python/tests_0-10/example.py b/python/tests_0-10/example.py
index 7ab4cc7d0a..dc71b0590b 100644
--- a/python/tests_0-10/example.py
+++ b/python/tests_0-10/example.py
@@ -90,5 +90,5 @@ class ExampleTest (TestBase):
self.assertEqual(body, msg.body)
# Now acknowledge the message.
- msg.ok()
+ msg.complete()
diff --git a/python/tests_0-10/message.py b/python/tests_0-10/message.py
index b25016e680..74e2b6416f 100644
--- a/python/tests_0-10/message.py
+++ b/python/tests_0-10/message.py
@@ -171,8 +171,8 @@ class MessageTests(TestBase):
self.assertEqual("Four", msg4.body)
self.assertEqual("Five", msg5.body)
- msg1.ok(batchoffset=1)#One and Two
- msg4.ok()
+ msg2.complete(cumulative=True)#One and Two
+ msg4.complete(cumulative=False)
channel.message_recover(requeue=False)
@@ -215,8 +215,8 @@ class MessageTests(TestBase):
self.assertEqual("Four", msg4.body)
self.assertEqual("Five", msg5.body)
- msg1.ok(batchoffset=1) #One and Two
- msg4.ok() #Four
+ msg2.complete(cumulative=True) #One and Two
+ msg4.complete(cumulative=False) #Four
channel.message_cancel(destination="consumer_tag")
@@ -276,14 +276,13 @@ class MessageTests(TestBase):
except Empty: None
#ack messages and check that the next set arrive ok:
- #todo: once batching is implmented, send a single response for all messages
- msg.ok(batchoffset=-4)#1-5
+ msg.complete()
for i in range(6, 11):
msg = queue.get(timeout=1)
self.assertEqual("Message %d" % i, msg.body)
- msg.ok(batchoffset=-4)#6-10
+ msg.complete()
try:
extra = queue.get(timeout=1)
@@ -320,13 +319,13 @@ class MessageTests(TestBase):
except Empty: None
#ack messages and check that the next set arrive ok:
- msg.ok(batchoffset=-4)#1-5
+ msg.complete()
for i in range(6, 11):
msg = queue.get(timeout=1)
self.assertEqual("Message %d" % i, msg.body)
- msg.ok(batchoffset=-4)#6-10
+ msg.complete()
try:
extra = queue.get(timeout=1)
@@ -376,9 +375,9 @@ class MessageTests(TestBase):
self.assertEqual("Message %d" % i, msg.body)
if (i==13):
- msg.ok(batchoffset=-2)#11, 12 & 13
+ msg.complete()#11, 12 & 13
if(i in [15, 17, 19]):
- msg.ok()
+ msg.complete(cumulative=False)
reply = channel.message_get(no_ack=True, queue="test-get")
self.assertEqual(reply.method.klass.name, "message")
@@ -395,8 +394,7 @@ class MessageTests(TestBase):
self.assertEqual(reply.method.name, "ok")
msg = self.client.queue(tag).get(timeout=1)
self.assertEqual("Message %d" % i, msg.body)
- msg.ok()
- #channel.message_ack(delivery_tag=reply.delivery_tag)
+ msg.complete()
reply = channel.message_get(no_ack=True, queue="test-get")
self.assertEqual(reply.method.klass.name, "message")
diff --git a/python/tests_0-10/tx.py b/python/tests_0-10/tx.py
index 0f6b4f5cd1..b499c2d1f9 100644
--- a/python/tests_0-10/tx.py
+++ b/python/tests_0-10/tx.py
@@ -30,23 +30,39 @@ class TxTests(TestBase):
"""
Test that commited publishes are delivered and commited acks are not re-delivered
"""
+ channel2 = self.client.channel(2)
+ channel2.channel_open()
+ self.perform_txn_work(channel2, "tx-commit-a", "tx-commit-b", "tx-commit-c")
+ channel2.tx_commit()
+ channel2.channel_close()
+
+ #use a different channel with new subscriptions to ensure
+ #there is no redelivery of acked messages:
channel = self.channel
- queue_a, queue_b, queue_c = self.perform_txn_work(channel, "tx-commit-a", "tx-commit-b", "tx-commit-c")
- channel.tx_commit()
+ channel.tx_select()
+
+ channel.message_consume(queue="tx-commit-a", destination="qa", no_ack=False)
+ queue_a = self.client.queue("qa")
+
+ channel.message_consume(queue="tx-commit-b", destination="qb", no_ack=False)
+ queue_b = self.client.queue("qb")
+
+ channel.message_consume(queue="tx-commit-c", destination="qc", no_ack=False)
+ queue_c = self.client.queue("qc")
#check results
for i in range(1, 5):
msg = queue_c.get(timeout=1)
self.assertEqual("TxMessage %d" % i, msg.body)
- msg.ok()
+ msg.complete()
msg = queue_b.get(timeout=1)
self.assertEqual("TxMessage 6", msg.body)
- msg.ok()
+ msg.complete()
msg = queue_a.get(timeout=1)
self.assertEqual("TxMessage 7", msg.body)
- msg.ok()
+ msg.complete()
for q in [queue_a, queue_b, queue_c]:
try:
@@ -76,15 +92,15 @@ class TxTests(TestBase):
for i in range(1, 5):
msg = queue_a.get(timeout=1)
self.assertEqual("Message %d" % i, msg.body)
- msg.ok()
+ msg.complete()
msg = queue_b.get(timeout=1)
self.assertEqual("Message 6", msg.body)
- msg.ok()
+ msg.complete()
msg = queue_c.get(timeout=1)
self.assertEqual("Message 7", msg.body)
- msg.ok()
+ msg.complete()
for q in [queue_a, queue_b, queue_c]:
try:
@@ -114,15 +130,15 @@ class TxTests(TestBase):
for i in range(1, 5):
msg = queue_a.get(timeout=1)
self.assertEqual("Message %d" % i, msg.body)
- msg.ok()
+ msg.complete()
msg = queue_b.get(timeout=1)
self.assertEqual("Message 6", msg.body)
- msg.ok()
+ msg.complete()
msg = queue_c.get(timeout=1)
self.assertEqual("Message 7", msg.body)
- msg.ok()
+ msg.complete()
for q in [queue_a, queue_b, queue_c]:
try:
@@ -150,10 +166,10 @@ class TxTests(TestBase):
channel.queue_bind(queue=name_c, exchange="amq.topic", routing_key=topic)
for i in range(1, 5):
- channel.message_transfer(routing_key=name_a, body="Message %d" % i)
+ channel.message_transfer(routing_key=name_a, message_id="msg%d" % i, body="Message %d" % i)
- channel.message_transfer(routing_key=key, destination="amq.direct", body="Message 6")
- channel.message_transfer(routing_key=topic, destination="amq.topic", body="Message 7")
+ channel.message_transfer(routing_key=key, destination="amq.direct", message_id="msg6", body="Message 6")
+ channel.message_transfer(routing_key=topic, destination="amq.topic", message_id="msg7", body="Message 7")
channel.tx_select()
@@ -164,25 +180,25 @@ class TxTests(TestBase):
msg = queue_a.get(timeout=1)
self.assertEqual("Message %d" % i, msg.body)
- msg.ok(batchoffset=-3)
+ msg.complete()
channel.message_consume(queue=name_b, destination="sub_b", no_ack=False)
queue_b = self.client.queue("sub_b")
msg = queue_b.get(timeout=1)
self.assertEqual("Message 6", msg.body)
- msg.ok()
+ msg.complete()
sub_c = channel.message_consume(queue=name_c, destination="sub_c", no_ack=False)
queue_c = self.client.queue("sub_c")
msg = queue_c.get(timeout=1)
self.assertEqual("Message 7", msg.body)
- msg.ok()
+ msg.complete()
#publish messages
for i in range(1, 5):
- channel.message_transfer(routing_key=topic, destination="amq.topic", body="TxMessage %d" % i)
+ channel.message_transfer(routing_key=topic, destination="amq.topic", message_id="tx-msg%d" % i, body="TxMessage %d" % i)
- channel.message_transfer(routing_key=key, destination="amq.direct", body="TxMessage 6")
- channel.message_transfer(routing_key=name_a, body="TxMessage 7")
+ channel.message_transfer(routing_key=key, destination="amq.direct", message_id="tx-msg6", body="TxMessage 6")
+ channel.message_transfer(routing_key=name_a, message_id="tx-msg7", body="TxMessage 7")
return queue_a, queue_b, queue_c