diff options
Diffstat (limited to 'python')
-rw-r--r-- | python/cpp_failing_0-10.txt | 1 | ||||
-rw-r--r-- | python/qpid/client.py | 3 | ||||
-rw-r--r-- | python/qpid/codec.py | 9 | ||||
-rw-r--r-- | python/qpid/connection.py | 18 | ||||
-rw-r--r-- | python/qpid/peer.py | 5 | ||||
-rw-r--r-- | python/qpid/spec.py | 2 | ||||
-rw-r--r-- | python/qpid/testlib.py | 15 | ||||
-rw-r--r-- | python/tests_0-10/broker.py | 22 | ||||
-rw-r--r-- | python/tests_0-10/dtx.py | 40 | ||||
-rw-r--r-- | python/tests_0-10/exchange.py | 2 | ||||
-rw-r--r-- | python/tests_0-10/execution.py | 2 | ||||
-rw-r--r-- | python/tests_0-10/queue.py | 20 | ||||
-rw-r--r-- | python/tests_0-10/tx.py | 4 |
13 files changed, 72 insertions, 71 deletions
diff --git a/python/cpp_failing_0-10.txt b/python/cpp_failing_0-10.txt index 878afee3c5..8a8a3e1b20 100644 --- a/python/cpp_failing_0-10.txt +++ b/python/cpp_failing_0-10.txt @@ -1,2 +1,3 @@ tests_0-10.alternate-exchange.AlternateExchangeTests.test_immediate +tests_0-10.broker.BrokerTests.test_closed_channel diff --git a/python/qpid/client.py b/python/qpid/client.py index 5734873c6f..f0d7935283 100644 --- a/python/qpid/client.py +++ b/python/qpid/client.py @@ -138,6 +138,9 @@ class ClientDelegate(Delegate): def channel_close(self, ch, msg): ch.close(msg) + def session_closed(self, ch, msg): + ch.close(msg) + def connection_close(self, ch, msg): self.client.peer.close(msg) diff --git a/python/qpid/codec.py b/python/qpid/codec.py index c1a912f5d0..d0a95debb3 100644 --- a/python/qpid/codec.py +++ b/python/qpid/codec.py @@ -346,23 +346,22 @@ class Codec: return self.decode_long() def encode_rfc1982_long_set(self, s): - self.encode_short(len(s)) + self.encode_short(len(s) * 4) for i in s: self.encode_long(i) def decode_rfc1982_long_set(self): - count = self.decode_short() + count = self.decode_short() / 4 set = [] for i in range(0, count): set.append(self.decode_long()) return set; - #not correct for 0-10 yet def encode_uuid(self, s): - self.encode_longstr(s) + self.pack("16s", s) def decode_uuid(self): - return self.decode_longstr() + return self.unpack("16s") def encode_struct(self, type, s): for f in type.fields: diff --git a/python/qpid/connection.py b/python/qpid/connection.py index d23a3b909e..98fff9cd2f 100644 --- a/python/qpid/connection.py +++ b/python/qpid/connection.py @@ -229,14 +229,24 @@ class Method(Frame): self.eof = not method.content def encode(self, c): - c.encode_short(self.method.klass.id) - c.encode_short(self.method.id) + version = (c.spec.major, c.spec.minor) + if version == (0, 10): + c.encode_octet(self.method.klass.id) + c.encode_octet(self.method.id) + else: + c.encode_short(self.method.klass.id) + c.encode_short(self.method.id) for field, arg in zip(self.method.fields, self.args): c.encode(field.type, arg) def decode(spec, c, size): - klass = spec.classes.byid[c.decode_short()] - meth = klass.methods.byid[c.decode_short()] + version = (c.spec.major, c.spec.minor) + if version == (0, 10): + klass = spec.classes.byid[c.decode_octet()] + meth = klass.methods.byid[c.decode_octet()] + else: + klass = spec.classes.byid[c.decode_short()] + meth = klass.methods.byid[c.decode_short()] args = tuple([c.decode(f.type) for f in meth.fields]) return Method(meth, args) diff --git a/python/qpid/peer.py b/python/qpid/peer.py index 5cabf98236..6e91c09806 100644 --- a/python/qpid/peer.py +++ b/python/qpid/peer.py @@ -261,7 +261,7 @@ class Channel: self.responder.respond(method, batch, request) def invoke(self, type, args, kwargs): - if type.klass.name == "channel" and (type.name == "close" or type.name == "open"): + if (type.klass.name in ["channel", "session"]) and (type.name in ["close", "open", "closed"]): self.completion.reset() self.incoming_completion.reset() self.completion.next_command(type) @@ -421,6 +421,7 @@ class OutgoingCompletion: self.condition.acquire() try: self.mark = mark + #print "set mark to %s [%s] " % (self.mark, self) self.condition.notifyAll() finally: self.condition.release() @@ -432,7 +433,7 @@ class OutgoingCompletion: self.condition.acquire() try: while not self.closed and point_of_interest > self.mark: - #print "waiting for ", point_of_interest, " mark is currently at ", self.mark + #print "waiting for %s, mark = %s [%s]" % (point_of_interest, self.mark, self) self.condition.wait(remaining) if not self.closed and point_of_interest > self.mark and timeout: if (start_time + timeout) < time(): break diff --git a/python/qpid/spec.py b/python/qpid/spec.py index 3febab7e09..3cb5f0ca25 100644 --- a/python/qpid/spec.py +++ b/python/qpid/spec.py @@ -208,7 +208,7 @@ class Method(Metadata): self.response = False def is_l4_command(self): - return self.klass.name not in ["execution", "channel", "connection"] + return self.klass.name not in ["execution", "channel", "connection", "session"] def arguments(self, *args, **kwargs): nargs = len(args) + len(kwargs) diff --git a/python/qpid/testlib.py b/python/qpid/testlib.py index ce1ba462a0..c4f55be18a 100644 --- a/python/qpid/testlib.py +++ b/python/qpid/testlib.py @@ -208,8 +208,8 @@ class TestBase(unittest.TestCase): self.exchanges = [] self.client = self.connect() self.channel = self.client.channel(1) - version = (self.client.spec.major, self.client.spec.minor) - if version == (8, 0) or "transitional" in self.client.spec.file: + self.version = (self.client.spec.major, self.client.spec.minor) + if self.version == (8, 0): self.channel.channel_open() else: self.channel.session_open() @@ -313,9 +313,14 @@ class TestBase(unittest.TestCase): self.assertPublishGet(self.consume(queue), exchange, routing_key, properties) def assertChannelException(self, expectedCode, message): - if not isinstance(message, Message): self.fail("expected channel_close method, got %s" % (message)) - self.assertEqual("channel", message.method.klass.name) - self.assertEqual("close", message.method.name) + if self.version == (8, 0): #or "transitional" in self.client.spec.file: + if not isinstance(message, Message): self.fail("expected channel_close method, got %s" % (message)) + self.assertEqual("channel", message.method.klass.name) + self.assertEqual("close", message.method.name) + else: + if not isinstance(message, Message): self.fail("expected session_closed method, got %s" % (message)) + self.assertEqual("session", message.method.klass.name) + self.assertEqual("closed", message.method.name) self.assertEqual(expectedCode, message.reply_code) diff --git a/python/tests_0-10/broker.py b/python/tests_0-10/broker.py index 0df7eb09fa..99936ba742 100644 --- a/python/tests_0-10/broker.py +++ b/python/tests_0-10/broker.py @@ -101,29 +101,11 @@ class BrokerTests(TestBase): def test_closed_channel(self): channel = self.client.channel(200) - channel.channel_open() - channel.channel_close() + channel.session_open() + channel.session_close() try: channel.queue_declare(exclusive=True) self.fail("Expected error on queue_declare for closed channel") except Closed, e: if isinstance(e.args[0], str): self.fail(e) self.assertConnectionException(504, e.args[0]) - - def test_channel_flow(self): - channel = self.channel - channel.queue_declare(queue="flow_test_queue", exclusive=True) - self.subscribe(destination="my-tag", queue="flow_test_queue") - incoming = self.client.queue("my-tag") - - channel.channel_flow(active=False) - c = Content("abcdefghijklmnopqrstuvwxyz", properties = {"routing_key" : "flow_test_queue"}) - channel.message_transfer(content = c) - try: - incoming.get(timeout=1) - self.fail("Received message when flow turned off.") - except Empty: None - - channel.channel_flow(active=True) - msg = incoming.get(timeout=1) - self.assertEqual("abcdefghijklmnopqrstuvwxyz", msg.content.body) diff --git a/python/tests_0-10/dtx.py b/python/tests_0-10/dtx.py index b5645cb596..000eddff21 100644 --- a/python/tests_0-10/dtx.py +++ b/python/tests_0-10/dtx.py @@ -43,8 +43,9 @@ class DtxTests(TestBase): tx_counter = 0 def reset_channel(self): - self.channel.channel_close() - self.channel.channel_open() + self.channel.session_close() + self.channel = self.client.channel(self.channel.id + 1) + self.channel.session_open() def test_simple_commit(self): """ @@ -171,7 +172,7 @@ class DtxTests(TestBase): other = self.connect() channel2 = other.channel(1) - channel2.channel_open() + channel2.session_open() channel2.dtx_demarcation_select() #create a xid @@ -202,17 +203,16 @@ class DtxTests(TestBase): Verify that a xid is 'forgotten' - and can therefore be used again - once it is completed. """ - channel = self.channel #do some transactional work & complete the transaction self.test_simple_commit() # channel has been reset, so reselect for use with dtx - channel.dtx_demarcation_select() + self.channel.dtx_demarcation_select() #start association for the same xid as the previously completed txn tx = self.xid("my-xid") - channel.dtx_demarcation_start(xid=tx) - channel.dtx_demarcation_end(xid=tx) - channel.dtx_coordination_rollback(xid=tx) + self.channel.dtx_demarcation_start(xid=tx) + self.channel.dtx_demarcation_end(xid=tx) + self.channel.dtx_coordination_rollback(xid=tx) def test_start_join_and_resume(self): """ @@ -242,7 +242,7 @@ class DtxTests(TestBase): channel1.dtx_demarcation_select() channel2 = self.client.channel(2) - channel2.channel_open() + channel2.session_open() channel2.dtx_demarcation_select() #setup @@ -323,9 +323,9 @@ class DtxTests(TestBase): #cleanup other = self.connect() channel = other.channel(1) - channel.channel_open() + channel.session_open() channel.dtx_coordination_rollback(xid=tx) - channel.channel_close() + channel.session_close() other.close() @@ -351,7 +351,7 @@ class DtxTests(TestBase): operations are non-transactional """ channel = self.client.channel(2) - channel.channel_open() + channel.session_open() channel.queue_declare(queue="tx-queue", exclusive=True) #publish a message under a transaction @@ -372,7 +372,7 @@ class DtxTests(TestBase): channel.message_cancel(destination="results") #ack the message then close the channel msg.complete() - channel.channel_close() + channel.session_close() channel = self.channel #commit the transaction and check that the first message (and @@ -388,7 +388,7 @@ class DtxTests(TestBase): """ other = self.connect() tester = other.channel(1) - tester.channel_open() + tester.session_open() tester.queue_declare(queue="dummy", exclusive=True) tester.dtx_demarcation_select() tx = self.xid("dummy") @@ -407,7 +407,7 @@ class DtxTests(TestBase): self.channel.dtx_coordination_rollback(xid=tx) self.assertConnectionException(503, e.args[0]) else: - tester.channel_close() + tester.session_close() other.close() self.fail("Invalid use of one_phase=True, expected exception!") @@ -422,7 +422,7 @@ class DtxTests(TestBase): """ other = self.connect() tester = other.channel(1) - tester.channel_open() + tester.session_open() tester.queue_declare(queue="dummy", exclusive=True) tester.dtx_demarcation_select() tx = self.xid("dummy") @@ -440,7 +440,7 @@ class DtxTests(TestBase): self.channel.dtx_coordination_rollback(xid=tx) self.assertConnectionException(503, e.args[0]) else: - tester.channel_close() + tester.session_close() other.close() self.fail("Invalid use of one_phase=False, expected exception!") @@ -452,7 +452,7 @@ class DtxTests(TestBase): """ channel1 = self.channel channel2 = self.client.channel(2) - channel2.channel_open() + channel2.session_open() #setup: channel2.queue_declare(queue="dummy", exclusive=True) @@ -464,7 +464,7 @@ class DtxTests(TestBase): channel2.message_get(queue="dummy", destination="dummy") self.client.queue("dummy").get(timeout=1).complete() channel2.message_transfer(content=Content(properties={'routing_key':"dummy"}, body="whatever")) - channel2.channel_close() + channel2.session_close() self.assertEqual(self.XA_RBROLLBACK, channel1.dtx_coordination_prepare(xid=tx).status) channel1.dtx_coordination_rollback(xid=tx) @@ -492,7 +492,7 @@ class DtxTests(TestBase): """ #open new channel to allow self.channel to be used in checking te queue channel = self.client.channel(2) - channel.channel_open() + channel.session_open() #setup: tx = self.xid("dummy") channel.queue_declare(queue="queue-a", exclusive=True) diff --git a/python/tests_0-10/exchange.py b/python/tests_0-10/exchange.py index 4137eb7a51..4d8b254df7 100644 --- a/python/tests_0-10/exchange.py +++ b/python/tests_0-10/exchange.py @@ -322,6 +322,6 @@ class MiscellaneousErrorsTests(TestBase): #cleanup other = self.connect() c2 = other.channel(1) - c2.channel_open() + c2.session_open() c2.exchange_delete(exchange="test_different_declared_type_exchange") diff --git a/python/tests_0-10/execution.py b/python/tests_0-10/execution.py index 9541369444..950ff59d97 100644 --- a/python/tests_0-10/execution.py +++ b/python/tests_0-10/execution.py @@ -25,5 +25,5 @@ class ExecutionTests (TestBase): channel = self.channel for i in [1, 2, 3]: channel.basic_publish(routing_key=str(i)) - channel.execution_flush() + #channel.execution_flush() assert(channel.completion.wait(channel.completion.command_id, timeout=1)) diff --git a/python/tests_0-10/queue.py b/python/tests_0-10/queue.py index e3438116c8..ba017bb286 100644 --- a/python/tests_0-10/queue.py +++ b/python/tests_0-10/queue.py @@ -56,7 +56,7 @@ class QueueTests(TestBase): #check error conditions (use new channels): channel = self.client.channel(2) - channel.channel_open() + channel.session_open() try: #queue specified but doesn't exist: channel.queue_purge(queue="invalid-queue") @@ -65,7 +65,7 @@ class QueueTests(TestBase): self.assertChannelException(404, e.args[0]) channel = self.client.channel(3) - channel.channel_open() + channel.session_open() try: #queue not specified and none previously declared for channel: channel.queue_purge() @@ -76,7 +76,7 @@ class QueueTests(TestBase): #cleanup other = self.connect() channel = other.channel(1) - channel.channel_open() + channel.session_open() channel.exchange_delete(exchange="test-exchange") def test_declare_exclusive(self): @@ -88,7 +88,7 @@ class QueueTests(TestBase): # Here we open a second separate connection: other = self.connect() c2 = other.channel(1) - c2.channel_open() + c2.session_open() #declare an exclusive queue: c1.queue_declare(queue="exclusive-queue", exclusive="True") @@ -141,7 +141,7 @@ class QueueTests(TestBase): #need to reopen a channel: channel = self.client.channel(2) - channel.channel_open() + channel.session_open() #try and bind non-existant queue: try: @@ -225,7 +225,7 @@ class QueueTests(TestBase): #check attempted deletion of non-existant queue is handled correctly: channel = self.client.channel(2) - channel.channel_open() + channel.session_open() try: channel.queue_delete(queue="i-dont-exist", if_empty="True") self.fail("Expected delete of non-existant queue to fail") @@ -254,7 +254,7 @@ class QueueTests(TestBase): #need new channel now: channel = self.client.channel(2) - channel.channel_open() + channel.session_open() #empty queue: self.subscribe(channel, destination="consumer_tag", queue="delete-me-2") @@ -286,7 +286,7 @@ class QueueTests(TestBase): #need new channel now: channel2 = self.client.channel(2) - channel2.channel_open() + channel2.session_open() #try to delete, but only if empty: try: channel2.queue_delete(queue="delete-me-3", if_unused="True") @@ -312,7 +312,7 @@ class QueueTests(TestBase): channel = self.channel other = self.connect() channel2 = other.channel(1) - channel2.channel_open() + channel2.session_open() channel.queue_declare(queue="auto-delete-me", auto_delete=True) @@ -321,7 +321,7 @@ class QueueTests(TestBase): channel2.basic_consume(queue="auto-delete-me") #implicit cancel - channel2.channel_close() + channel2.session_close() #check it is still there channel.queue_declare(queue="auto-delete-me", passive=True) diff --git a/python/tests_0-10/tx.py b/python/tests_0-10/tx.py index 2415a88fb2..84c07d51c1 100644 --- a/python/tests_0-10/tx.py +++ b/python/tests_0-10/tx.py @@ -31,10 +31,10 @@ class TxTests(TestBase): Test that commited publishes are delivered and commited acks are not re-delivered """ channel2 = self.client.channel(2) - channel2.channel_open() + channel2.session_open() self.perform_txn_work(channel2, "tx-commit-a", "tx-commit-b", "tx-commit-c") channel2.tx_commit() - channel2.channel_close() + channel2.session_close() #use a different channel with new subscriptions to ensure #there is no redelivery of acked messages: |