diff options
author | Rafael H. Schloming <rhs@apache.org> | 2008-05-08 20:52:28 +0000 |
---|---|---|
committer | Rafael H. Schloming <rhs@apache.org> | 2008-05-08 20:52:28 +0000 |
commit | fd5ba0c75091336020287825a973c88a07dbe5b4 (patch) | |
tree | 5a90ff41987b15e85f254a9266d9c933cd505b9e | |
parent | 32613b43c550fec2785299a271b3818db75490c4 (diff) | |
download | qpid-python-fd5ba0c75091336020287825a973c88a07dbe5b4.tar.gz |
QPID-979: added access to enums through the session so that symbolic constants can be used rather than hard coded ones; also added default loading of the spec
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@654618 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | python/qpid/connection.py | 11 | ||||
-rw-r--r-- | python/qpid/invoker.py | 26 | ||||
-rw-r--r-- | python/qpid/session.py | 12 | ||||
-rw-r--r-- | python/qpid/spec.py | 13 | ||||
-rw-r--r-- | python/qpid/spec010.py | 16 | ||||
-rw-r--r-- | python/tests_0-10/alternate_exchange.py | 12 | ||||
-rw-r--r-- | python/tests_0-10/broker.py | 16 | ||||
-rw-r--r-- | python/tests_0-10/dtx.py | 12 | ||||
-rw-r--r-- | python/tests_0-10/example.py | 4 | ||||
-rw-r--r-- | python/tests_0-10/exchange.py | 4 | ||||
-rw-r--r-- | python/tests_0-10/message.py | 114 | ||||
-rw-r--r-- | python/tests_0-10/persistence.py | 4 | ||||
-rw-r--r-- | python/tests_0-10/queue.py | 16 | ||||
-rw-r--r-- | python/tests_0-10/tx.py | 8 |
14 files changed, 156 insertions, 112 deletions
diff --git a/python/qpid/connection.py b/python/qpid/connection.py index dc72cd9cb8..39f882e9c3 100644 --- a/python/qpid/connection.py +++ b/python/qpid/connection.py @@ -25,7 +25,8 @@ from assembler import Assembler, Segment from codec010 import StringCodec from session import Session from invoker import Invoker -from spec010 import Control, Command +from spec010 import Control, Command, load +from spec import default from exceptions import * from logging import getLogger import delegates @@ -44,8 +45,10 @@ def server(*args): class Connection(Assembler): - def __init__(self, sock, spec, delegate=client): + def __init__(self, sock, spec=None, delegate=client): Assembler.__init__(self, sock) + if spec == None: + spec = load(default()) self.spec = spec self.track = self.spec["track"] @@ -162,9 +165,9 @@ class Channel(Invoker): def resolve_method(self, name): inst = self.connection.spec.instructions.get(name) if inst is not None and isinstance(inst, Control): - return inst + return self.METHOD, inst else: - return None + return self.ERROR, None def invoke(self, type, args, kwargs): ctl = type.new(args, kwargs) diff --git a/python/qpid/invoker.py b/python/qpid/invoker.py index 9e6f6943d8..2d9e45179e 100644 --- a/python/qpid/invoker.py +++ b/python/qpid/invoker.py @@ -17,16 +17,26 @@ # under the License. # +# TODO: need a better naming for this class now that it does the value +# stuff class Invoker: - def resolve_method(self, name): - pass - - def __getattr__(self, name): - resolved = self.resolve_method(name) - if resolved == None: - raise AttributeError("%s instance has no attribute '%s'" % - (self.__class__.__name__, name)) + def METHOD(self, name, resolved): method = lambda *args, **kwargs: self.invoke(resolved, args, kwargs) self.__dict__[name] = method return method + + def VALUE(self, name, resolved): + self.__dict__[name] = resolved + return resolved + + def ERROR(self, name, resolved): + raise AttributeError("%s instance has no attribute '%s'" % + (self.__class__.__name__, name)) + + def resolve_method(self, name): + return ERROR, None + + def __getattr__(self, name): + disp, resolved = self.resolve_method(name) + return disp(name, resolved) diff --git a/python/qpid/session.py b/python/qpid/session.py index 11249ca435..f8ac98b96e 100644 --- a/python/qpid/session.py +++ b/python/qpid/session.py @@ -112,15 +112,17 @@ class Session(Invoker): def resolve_method(self, name): cmd = self.spec.instructions.get(name) if cmd is not None and cmd.track == self.spec["track.command"].value: - return cmd + return self.METHOD, cmd else: # XXX for st in self.spec.structs.values(): if st.name == name: - return st - if self.spec.structs_by_name.has_key(name): - return self.spec.structs_by_name[name] - return None + return self.METHOD, st + if self.spec.structs_by_name.has_key(name): + return self.METHOD, self.spec.structs_by_name[name] + if self.spec.enums.has_key(name): + return self.VALUE, self.spec.enums[name] + return self.ERROR, None def invoke(self, type, args, kwargs): # XXX diff --git a/python/qpid/spec.py b/python/qpid/spec.py index 64a14b0f61..152763b762 100644 --- a/python/qpid/spec.py +++ b/python/qpid/spec.py @@ -31,6 +31,19 @@ situations. import os, mllib, spec08, spec010 +def default(): + try: + specfile = os.environ["AMQP_SPEC"] + return specfile + except KeyError: + try: + from AMQP_SPEC import location as specfile + return specfile + except ImportError: + raise Exception("unable to locate the amqp specification, please set " + "the AMQP_SPEC environment variable or supply a " + "configured AMQP_SPEC.py") + def load(specfile, *errata): for name in (specfile,) + errata: if not os.path.exists(name): diff --git a/python/qpid/spec010.py b/python/qpid/spec010.py index 1668729876..fb625eab65 100644 --- a/python/qpid/spec010.py +++ b/python/qpid/spec010.py @@ -166,6 +166,15 @@ class Domain(Type, Lookup): def decode(self, codec): return self.type.decode(codec) +class Enum: + + def __init__(self, name): + self.name = name + + def __repr__(self): + return "%s(%s)" % (self.name, ", ".join([k for k in self.__dict__.keys() + if k != "name"])) + class Choice(Named, Node): def __init__(self, name, value, children): @@ -177,6 +186,12 @@ class Choice(Named, Node): Named.register(self, node) node.choices[self.value] = self Node.register(self) + try: + enum = node.spec.enums[node.name] + except KeyError: + enum = Enum(node.name) + node.spec.enums[node.name] = enum + setattr(enum, self.name, self.value) class Composite(Type, Coded): @@ -450,6 +465,7 @@ class Spec(Node): self.commands = {} self.structs = {} self.structs_by_name = {} + self.enums = {} def encoding(self, klass): if Spec.ENCODINGS.has_key(klass): diff --git a/python/tests_0-10/alternate_exchange.py b/python/tests_0-10/alternate_exchange.py index c177c3deb7..9cf331c110 100644 --- a/python/tests_0-10/alternate_exchange.py +++ b/python/tests_0-10/alternate_exchange.py @@ -41,16 +41,16 @@ class AlternateExchangeTests(TestBase010): session.queue_declare(queue="returns", exclusive=True, auto_delete=True) session.exchange_bind(queue="returns", exchange="secondary") session.message_subscribe(destination="a", queue="returns") - session.message_flow(destination="a", unit=0, value=0xFFFFFFFF) - session.message_flow(destination="a", unit=1, value=0xFFFFFFFF) + session.message_flow(destination="a", unit=session.credit_unit.message, value=0xFFFFFFFF) + session.message_flow(destination="a", unit=session.credit_unit.byte, value=0xFFFFFFFF) returned = session.incoming("a") #declare, bind (to the primary exchange) and consume from a queue for 'processed' messages session.queue_declare(queue="processed", exclusive=True, auto_delete=True) session.exchange_bind(queue="processed", exchange="primary", binding_key="my-key") session.message_subscribe(destination="b", queue="processed") - session.message_flow(destination="b", unit=0, value=0xFFFFFFFF) - session.message_flow(destination="b", unit=1, value=0xFFFFFFFF) + session.message_flow(destination="b", unit=session.credit_unit.message, value=0xFFFFFFFF) + session.message_flow(destination="b", unit=session.credit_unit.byte, value=0xFFFFFFFF) processed = session.incoming("b") #publish to the primary exchange @@ -81,8 +81,8 @@ class AlternateExchangeTests(TestBase010): session.queue_declare(queue="deleted", exclusive=True, auto_delete=True) session.exchange_bind(exchange="dlq", queue="deleted") session.message_subscribe(destination="dlq", queue="deleted") - session.message_flow(destination="dlq", unit=0, value=0xFFFFFFFF) - session.message_flow(destination="dlq", unit=1, value=0xFFFFFFFF) + session.message_flow(destination="dlq", unit=session.credit_unit.message, value=0xFFFFFFFF) + session.message_flow(destination="dlq", unit=session.credit_unit.byte, value=0xFFFFFFFF) dlq = session.incoming("dlq") #create a queue using the dlq as its alternate exchange: diff --git a/python/tests_0-10/broker.py b/python/tests_0-10/broker.py index 25cf1241ec..d4aa57765c 100644 --- a/python/tests_0-10/broker.py +++ b/python/tests_0-10/broker.py @@ -36,8 +36,8 @@ class BrokerTests(TestBase010): # No ack consumer ctag = "tag1" session.message_subscribe(queue = "myqueue", destination = ctag) - session.message_flow(destination=ctag, unit=0, value=0xFFFFFFFF) - session.message_flow(destination=ctag, unit=1, value=0xFFFFFFFF) + session.message_flow(destination=ctag, unit=session.credit_unit.message, value=0xFFFFFFFF) + session.message_flow(destination=ctag, unit=session.credit_unit.byte, value=0xFFFFFFFF) body = "test no-ack" session.message_transfer(message=Message(session.delivery_properties(routing_key="myqueue"), body)) msg = session.incoming(ctag).get(timeout = 5) @@ -47,8 +47,8 @@ class BrokerTests(TestBase010): session.queue_declare(queue = "otherqueue", exclusive=True, auto_delete=True) ctag = "tag2" session.message_subscribe(queue = "otherqueue", destination = ctag, accept_mode = 1) - session.message_flow(destination=ctag, unit=0, value=0xFFFFFFFF) - session.message_flow(destination=ctag, unit=1, value=0xFFFFFFFF) + session.message_flow(destination=ctag, unit=session.credit_unit.message, value=0xFFFFFFFF) + session.message_flow(destination=ctag, unit=session.credit_unit.byte, value=0xFFFFFFFF) body = "test ack" session.message_transfer(message=Message(session.delivery_properties(routing_key="otherqueue"), body)) msg = session.incoming(ctag).get(timeout = 5) @@ -64,8 +64,8 @@ class BrokerTests(TestBase010): session.exchange_bind(queue="test-queue", exchange="amq.fanout") consumer_tag = "tag1" session.message_subscribe(queue="test-queue", destination=consumer_tag) - session.message_flow(unit = 0, value = 0xFFFFFFFF, destination = consumer_tag) - session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = consumer_tag) + session.message_flow(unit = session.credit_unit.message, value = 0xFFFFFFFF, destination = consumer_tag) + session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFF, destination = consumer_tag) queue = session.incoming(consumer_tag) body = "Immediate Delivery" @@ -86,8 +86,8 @@ class BrokerTests(TestBase010): consumer_tag = "tag1" session.message_subscribe(queue="test-queue", destination=consumer_tag) - session.message_flow(unit = 0, value = 0xFFFFFFFF, destination = consumer_tag) - session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = consumer_tag) + session.message_flow(unit = session.credit_unit.message, value = 0xFFFFFFFF, destination = consumer_tag) + session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFF, destination = consumer_tag) queue = session.incoming(consumer_tag) msg = queue.get(timeout=5) self.assert_(msg.body == body) diff --git a/python/tests_0-10/dtx.py b/python/tests_0-10/dtx.py index 042df521ae..531f8a8f66 100644 --- a/python/tests_0-10/dtx.py +++ b/python/tests_0-10/dtx.py @@ -493,8 +493,8 @@ class DtxTests(TestBase010): session2.dtx_select() session2.dtx_start(xid=tx) session2.message_subscribe(queue="dummy", destination="dummy") - session2.message_flow(destination="dummy", unit=0, value=1) - session2.message_flow(destination="dummy", unit=1, value=0xFFFFFFFF) + session2.message_flow(destination="dummy", unit=session2.credit_unit.message, value=1) + session2.message_flow(destination="dummy", unit=session2.credit_unit.byte, value=0xFFFFFFFF) msg = session2.incoming("dummy").get(timeout=1) session2.message_accept(RangedSet(msg.id)) session2.message_cancel(destination="dummy") @@ -629,8 +629,8 @@ class DtxTests(TestBase010): def swap(self, session, src, dest): #consume from src: session.message_subscribe(destination="temp-swap", queue=src) - session.message_flow(destination="temp-swap", unit=0, value=1) - session.message_flow(destination="temp-swap", unit=1, value=0xFFFFFFFF) + session.message_flow(destination="temp-swap", unit=session.credit_unit.message, value=1) + session.message_flow(destination="temp-swap", unit=session.credit_unit.byte, value=0xFFFFFFFF) msg = session.incoming("temp-swap").get(timeout=1) session.message_cancel(destination="temp-swap") session.message_accept(RangedSet(msg.id)) @@ -646,8 +646,8 @@ class DtxTests(TestBase010): def assertMessageId(self, expected, queue): self.session.message_subscribe(queue=queue, destination="results") - self.session.message_flow(destination="results", unit=0, value=1) - self.session.message_flow(destination="results", unit=1, value=0xFFFFFFFF) + self.session.message_flow(destination="results", unit=self.session.credit_unit.message, value=1) + self.session.message_flow(destination="results", unit=self.session.credit_unit.byte, value=0xFFFFFFFF) self.assertEqual(expected, self.getMessageProperty(self.session.incoming("results").get(timeout=1), 'correlation_id')) self.session.message_cancel(destination="results") diff --git a/python/tests_0-10/example.py b/python/tests_0-10/example.py index 1e140a285d..83d208192b 100644 --- a/python/tests_0-10/example.py +++ b/python/tests_0-10/example.py @@ -69,8 +69,8 @@ class ExampleTest (TestBase010): # field that is filled if the reply includes content. In this case the # interesting field is the consumer_tag. session.message_subscribe(queue="test-queue", destination="consumer_tag") - session.message_flow(destination="consumer_tag", unit=0, value=0xFFFFFFFF) - session.message_flow(destination="consumer_tag", unit=1, value=0xFFFFFFFF) + session.message_flow(destination="consumer_tag", unit=session.credit_unit.message, value=0xFFFFFFFF) + session.message_flow(destination="consumer_tag", unit=session.credit_unit.byte, value=0xFFFFFFFF) # We can use the session.incoming(...) method to access the messages # delivered for our consumer_tag. diff --git a/python/tests_0-10/exchange.py b/python/tests_0-10/exchange.py index 991da17ed4..4b5dc78143 100644 --- a/python/tests_0-10/exchange.py +++ b/python/tests_0-10/exchange.py @@ -108,8 +108,8 @@ class TestHelper(TestBase010): else: self.uniqueTag += 1 consumer_tag = "tag" + str(self.uniqueTag) self.session.message_subscribe(queue=queueName, destination=consumer_tag) - self.session.message_flow(destination=consumer_tag, unit=0, value=0xFFFFFFFF) - self.session.message_flow(destination=consumer_tag, unit=1, value=0xFFFFFFFF) + self.session.message_flow(destination=consumer_tag, unit=self.session.credit_unit.message, value=0xFFFFFFFF) + self.session.message_flow(destination=consumer_tag, unit=self.session.credit_unit.byte, value=0xFFFFFFFF) return self.session.incoming(consumer_tag) diff --git a/python/tests_0-10/message.py b/python/tests_0-10/message.py index c54622e091..5f2ee7264f 100644 --- a/python/tests_0-10/message.py +++ b/python/tests_0-10/message.py @@ -229,8 +229,8 @@ class MessageTests(TestBase010): session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue-4"), "One")) session.message_subscribe(destination="my-consumer", queue="test-queue-4") - session.message_flow(destination="my-consumer", unit=0, value=0xFFFFFFFF) - session.message_flow(destination="my-consumer", unit=1, value=0xFFFFFFFF) + session.message_flow(destination="my-consumer", unit=session.credit_unit.message, value=0xFFFFFFFF) + session.message_flow(destination="my-consumer", unit=session.credit_unit.byte, value=0xFFFFFFFF) #should flush here @@ -258,8 +258,8 @@ class MessageTests(TestBase010): session.queue_declare(queue="test-ack-queue", auto_delete=True) session.message_subscribe(queue = "test-ack-queue", destination = "consumer") - session.message_flow(destination="consumer", unit=0, value=0xFFFFFFFF) - session.message_flow(destination="consumer", unit=1, value=0xFFFFFFFF) + session.message_flow(destination="consumer", unit=session.credit_unit.message, value=0xFFFFFFFF) + session.message_flow(destination="consumer", unit=session.credit_unit.byte, value=0xFFFFFFFF) queue = session.incoming("consumer") delivery_properties = session.delivery_properties(routing_key="test-ack-queue") @@ -289,8 +289,8 @@ class MessageTests(TestBase010): session.close(timeout=10) session = self.session - session.message_flow(destination="checker", unit=0, value=0xFFFFFFFF) - session.message_flow(destination="checker", unit=1, value=0xFFFFFFFF) + session.message_flow(destination="checker", unit=session.credit_unit.message, value=0xFFFFFFFF) + session.message_flow(destination="checker", unit=session.credit_unit.byte, value=0xFFFFFFFF) queue = session.incoming("checker") msg3b = queue.get(timeout=1) @@ -504,16 +504,16 @@ class MessageTests(TestBase010): session.exchange_bind(queue = "r", exchange = "amq.fanout") session.message_subscribe(queue = "q", destination = "consumer") - session.message_flow(destination="consumer", unit=0, value=0xFFFFFFFF) - session.message_flow(destination="consumer", unit=1, value=0xFFFFFFFF) + session.message_flow(destination="consumer", unit=session.credit_unit.message, value=0xFFFFFFFF) + session.message_flow(destination="consumer", unit=session.credit_unit.byte, value=0xFFFFFFFF) session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "blah, blah")) msg = session.incoming("consumer").get(timeout = 1) self.assertEquals(msg.body, "blah, blah") session.message_reject(RangedSet(msg.id)) session.message_subscribe(queue = "r", destination = "checker") - session.message_flow(destination="checker", unit=0, value=0xFFFFFFFF) - session.message_flow(destination="checker", unit=1, value=0xFFFFFFFF) + session.message_flow(destination="checker", unit=session.credit_unit.message, value=0xFFFFFFFF) + session.message_flow(destination="checker", unit=session.credit_unit.byte, value=0xFFFFFFFF) msg = session.incoming("checker").get(timeout = 1) self.assertEquals(msg.body, "blah, blah") @@ -532,9 +532,9 @@ class MessageTests(TestBase010): session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "Message %d" % i)) #set message credit to finite amount (less than enough for all messages) - session.message_flow(unit = 0, value = 5, destination = "c") + session.message_flow(unit = session.credit_unit.message, value = 5, destination = "c") #set infinite byte credit - session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "c") + session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFF, destination = "c") #check that expected number were received q = session.incoming("c") for i in range(1, 6): @@ -543,7 +543,7 @@ class MessageTests(TestBase010): #increase credit again and check more are received for i in range(6, 11): - session.message_flow(unit = 0, value = 1, destination = "c") + session.message_flow(unit = session.credit_unit.message, value = 1, destination = "c") self.assertDataEquals(session, q.get(timeout = 1), "Message %d" % i) self.assertEmpty(q) @@ -565,9 +565,9 @@ class MessageTests(TestBase010): msg_size = 21 #set byte credit to finite amount (less than enough for all messages) - session.message_flow(unit = 1, value = msg_size*5, destination = "c") + session.message_flow(unit = session.credit_unit.byte, value = msg_size*5, destination = "c") #set infinite message credit - session.message_flow(unit = 0, value = 0xFFFFFFFF, destination = "c") + session.message_flow(unit = session.credit_unit.message, value = 0xFFFFFFFF, destination = "c") #check that expected number were received q = session.incoming("c") for i in range(5): @@ -576,7 +576,7 @@ class MessageTests(TestBase010): #increase credit again and check more are received for i in range(5): - session.message_flow(unit = 1, value = msg_size, destination = "c") + session.message_flow(unit = session.credit_unit.byte, value = msg_size, destination = "c") self.assertDataEquals(session, q.get(timeout = 1), "abcdefgh") self.assertEmpty(q) @@ -596,9 +596,9 @@ class MessageTests(TestBase010): session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "Message %d" % i)) #set message credit to finite amount (less than enough for all messages) - session.message_flow(unit = 0, value = 5, destination = "c") + session.message_flow(unit = session.credit_unit.message, value = 5, destination = "c") #set infinite byte credit - session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "c") + session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFF, destination = "c") #check that expected number were received q = session.incoming("c") for i in range(1, 6): @@ -634,9 +634,9 @@ class MessageTests(TestBase010): msg_size = 19 #set byte credit to finite amount (less than enough for all messages) - session.message_flow(unit = 1, value = msg_size*5, destination = "c") + session.message_flow(unit = session.credit_unit.byte, value = msg_size*5, destination = "c") #set infinite message credit - session.message_flow(unit = 0, value = 0xFFFFFFFF, destination = "c") + session.message_flow(unit = session.credit_unit.message, value = 0xFFFFFFFF, destination = "c") #check that expected number were received q = session.incoming("c") msgs = [] @@ -665,11 +665,11 @@ class MessageTests(TestBase010): session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "Message %s" % i)) session.message_subscribe(queue = "q", destination = "a", acquire_mode = 1) - session.message_flow(unit = 0, value = 0xFFFFFFFF, destination = "a") - session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "a") + session.message_flow(unit = session.credit_unit.message, value = 0xFFFFFFFF, destination = "a") + session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFF, destination = "a") session.message_subscribe(queue = "q", destination = "b", acquire_mode = 1) - session.message_flow(unit = 0, value = 0xFFFFFFFF, destination = "b") - session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "b") + session.message_flow(unit = session.credit_unit.message, value = 0xFFFFFFFF, destination = "b") + session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFF, destination = "b") for i in range(6, 11): session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "Message %s" % i)) @@ -700,8 +700,8 @@ class MessageTests(TestBase010): session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "acquire me")) session.message_subscribe(queue = "q", destination = "a", acquire_mode = 1) - session.message_flow(destination="a", unit=0, value=0xFFFFFFFF) - session.message_flow(destination="a", unit=1, value=0xFFFFFFFF) + session.message_flow(destination="a", unit=session.credit_unit.message, value=0xFFFFFFFF) + session.message_flow(destination="a", unit=session.credit_unit.byte, value=0xFFFFFFFF) msg = session.incoming("a").get(timeout = 1) self.assertEquals("acquire me", msg.body) #message should still be on the queue: @@ -726,8 +726,8 @@ class MessageTests(TestBase010): session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "release me")) session.message_subscribe(queue = "q", destination = "a") - session.message_flow(destination="a", unit=0, value=0xFFFFFFFF) - session.message_flow(destination="a", unit=1, value=0xFFFFFFFF) + session.message_flow(destination="a", unit=session.credit_unit.message, value=0xFFFFFFFF) + session.message_flow(destination="a", unit=session.credit_unit.byte, value=0xFFFFFFFF) msg = session.incoming("a").get(timeout = 1) self.assertEquals("release me", msg.body) session.message_cancel(destination = "a") @@ -746,8 +746,8 @@ class MessageTests(TestBase010): session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "released message %s" % (i))) session.message_subscribe(queue = "q", destination = "a") - session.message_flow(unit = 0, value = 10, destination = "a") - session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "a") + session.message_flow(unit = session.credit_unit.message, value = 10, destination = "a") + session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFF, destination = "a") queue = session.incoming("a") first = queue.get(timeout = 1) for i in range(2, 10): @@ -779,8 +779,8 @@ class MessageTests(TestBase010): session.message_transfer(message=Message(delivery_properties, "message %s" % (i))) session.message_subscribe(queue = "q", destination = "a") - session.message_flow(unit = 0, value = 10, destination = "a") - session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "a") + session.message_flow(unit = session.credit_unit.message, value = 10, destination = "a") + session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFF, destination = "a") queue = session.incoming("a") ids = [] for i in range (1, 11): @@ -805,8 +805,8 @@ class MessageTests(TestBase010): session.close(timeout=10) session = self.session - session.message_flow(destination="checker", unit=0, value=0xFFFFFFFF) - session.message_flow(destination="checker", unit=1, value=0xFFFFFFFF) + session.message_flow(destination="checker", unit=session.credit_unit.message, value=0xFFFFFFFF) + session.message_flow(destination="checker", unit=session.credit_unit.byte, value=0xFFFFFFFF) queue = session.incoming("checker") self.assertEquals("message 4", queue.get(timeout = 1).body) @@ -823,8 +823,8 @@ class MessageTests(TestBase010): #consume some of them session.message_subscribe(queue = "q", destination = "a") session.message_set_flow_mode(flow_mode = 0, destination = "a") - session.message_flow(unit = 0, value = 5, destination = "a") - session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "a") + session.message_flow(unit = session.credit_unit.message, value = 5, destination = "a") + session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFF, destination = "a") queue = session.incoming("a") for i in range(1, 6): @@ -839,11 +839,11 @@ class MessageTests(TestBase010): #now create a not-acquired subscriber session.message_subscribe(queue = "q", destination = "b", acquire_mode=1) - session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "b") + session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFF, destination = "b") #check it gets those not consumed queue = session.incoming("b") - session.message_flow(unit = 0, value = 1, destination = "b") + session.message_flow(unit = session.credit_unit.message, value = 1, destination = "b") for i in range(6, 11): msg = queue.get(timeout = 1) self.assertEquals("message-%d" % (i), msg.body) @@ -851,7 +851,7 @@ class MessageTests(TestBase010): #TODO: tidy up completion session.receiver._completed.add(msg.id) session.channel.session_completed(session.receiver._completed) - session.message_flow(unit = 0, value = 1, destination = "b") + session.message_flow(unit = session.credit_unit.message, value = 1, destination = "b") self.assertEmpty(queue) #check all 'browsed' messages are still on the queue @@ -867,8 +867,8 @@ class MessageTests(TestBase010): #create a not-acquired subscriber session.message_subscribe(queue = "q", destination = "a", acquire_mode=1) - session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "a") - session.message_flow(unit = 0, value = 10, destination = "a") + session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFF, destination = "a") + session.message_flow(unit = session.credit_unit.message, value = 10, destination = "a") #browse through messages queue = session.incoming("a") @@ -889,8 +889,8 @@ class MessageTests(TestBase010): #create a second not-acquired subscriber session.message_subscribe(queue = "q", destination = "b", acquire_mode=1) - session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "b") - session.message_flow(unit = 0, value = 1, destination = "b") + session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFF, destination = "b") + session.message_flow(unit = session.credit_unit.message, value = 1, destination = "b") #check it gets those not consumed queue = session.incoming("b") for i in [2,4,6,8,10]: @@ -899,7 +899,7 @@ class MessageTests(TestBase010): session.message_release(RangedSet(msg.id)) session.receiver._completed.add(msg.id) session.channel.session_completed(session.receiver._completed) - session.message_flow(unit = 0, value = 1, destination = "b") + session.message_flow(unit = session.credit_unit.message, value = 1, destination = "b") self.assertEmpty(queue) #check all 'browsed' messages are still on the queue @@ -916,13 +916,13 @@ class MessageTests(TestBase010): #create two 'browsers' session.message_subscribe(queue = "q", destination = "a", acquire_mode=1) - session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "a") - session.message_flow(unit = 0, value = 10, destination = "a") + session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFF, destination = "a") + session.message_flow(unit = session.credit_unit.message, value = 10, destination = "a") queueA = session.incoming("a") session.message_subscribe(queue = "q", destination = "b", acquire_mode=1) - session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "b") - session.message_flow(unit = 0, value = 10, destination = "b") + session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFF, destination = "b") + session.message_flow(unit = session.credit_unit.message, value = 10, destination = "b") queueB = session.incoming("b") #have each browser release the message @@ -938,8 +938,8 @@ class MessageTests(TestBase010): #create consumer session.message_subscribe(queue = "q", destination = "c") - session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "c") - session.message_flow(unit = 0, value = 10, destination = "c") + session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFF, destination = "c") + session.message_flow(unit = session.credit_unit.message, value = 10, destination = "c") queueC = session.incoming("c") #consume the message then ack it msgC = queueC.get(timeout = 1) @@ -950,12 +950,12 @@ class MessageTests(TestBase010): def test_no_size(self): self.queue_declare(queue = "q", exclusive=True, auto_delete=True) - ch = self.session - ch.message_transfer(content=SizelessContent(properties={'routing_key' : "q"}, body="message-body")) + ssn = self.session + ssn.message_transfer(content=SizelessContent(properties={'routing_key' : "q"}, body="message-body")) - ch.message_subscribe(queue = "q", destination="d", confirm_mode = 0) - ch.message_flow(unit = 0, value = 0xFFFFFFFF, destination = "d") - ch.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "d") + ssn.message_subscribe(queue = "q", destination="d", confirm_mode = 0) + ssn.message_flow(unit = ssn.credit_unit.message, value = 0xFFFFFFFF, destination = "d") + ssn.message_flow(unit = ssn.credit_unit.byte, value = 0xFFFFFFFF, destination = "d") queue = session.incoming("d") msg = queue.get(timeout = 3) @@ -969,8 +969,8 @@ class MessageTests(TestBase010): consumer_tag = "tag1" session.message_subscribe(queue="xyz", destination=consumer_tag) - session.message_flow(unit = 0, value = 0xFFFFFFFF, destination = consumer_tag) - session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = consumer_tag) + session.message_flow(unit = session.credit_unit.message, value = 0xFFFFFFFF, destination = consumer_tag) + session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFF, destination = consumer_tag) queue = session.incoming(consumer_tag) msg = queue.get(timeout=1) self.assertEquals("", msg.body) diff --git a/python/tests_0-10/persistence.py b/python/tests_0-10/persistence.py index a4b5691910..815ad1f3dc 100644 --- a/python/tests_0-10/persistence.py +++ b/python/tests_0-10/persistence.py @@ -49,8 +49,8 @@ class PersistenceTests(TestBase010): #create consumer session.message_subscribe(queue = "q", destination = "a", accept_mode = 1, acquire_mode=0) - session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "a") - session.message_flow(unit = 0, value = 10, destination = "a") + session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFF, destination = "a") + session.message_flow(unit = session.credit_unit.message, value = 10, destination = "a") queue = session.incoming("a") #consume the message, cancel subscription (triggering auto-delete), then ack it diff --git a/python/tests_0-10/queue.py b/python/tests_0-10/queue.py index 97e7a92b87..a3b23a1c32 100644 --- a/python/tests_0-10/queue.py +++ b/python/tests_0-10/queue.py @@ -49,8 +49,8 @@ class QueueTests(TestBase010): #send a further message and consume it, ensuring that the other messages are really gone session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue"), "four")) session.message_subscribe(queue="test-queue", destination="tag") - session.message_flow(destination="tag", unit=0, value=0xFFFFFFFF) - session.message_flow(destination="tag", unit=1, value=0xFFFFFFFF) + session.message_flow(destination="tag", unit=session.credit_unit.message, value=0xFFFFFFFF) + session.message_flow(destination="tag", unit=session.credit_unit.byte, value=0xFFFFFFFF) queue = session.incoming("tag") msg = queue.get(timeout=1) self.assertEqual("four", msg.body) @@ -166,11 +166,11 @@ class QueueTests(TestBase010): session.queue_declare(queue="queue-2", exclusive=True, auto_delete=True) session.message_subscribe(queue="queue-1", destination="queue-1") - session.message_flow(destination="queue-1", unit=0, value=0xFFFFFFFF) - session.message_flow(destination="queue-1", unit=1, value=0xFFFFFFFF) + session.message_flow(destination="queue-1", unit=session.credit_unit.message, value=0xFFFFFFFF) + session.message_flow(destination="queue-1", unit=session.credit_unit.byte, value=0xFFFFFFFF) session.message_subscribe(queue="queue-2", destination="queue-2") - session.message_flow(destination="queue-2", unit=0, value=0xFFFFFFFF) - session.message_flow(destination="queue-2", unit=1, value=0xFFFFFFFF) + session.message_flow(destination="queue-2", unit=session.credit_unit.message, value=0xFFFFFFFF) + session.message_flow(destination="queue-2", unit=session.credit_unit.byte, value=0xFFFFFFFF) queue1 = session.incoming("queue-1") queue2 = session.incoming("queue-2") @@ -267,8 +267,8 @@ class QueueTests(TestBase010): #empty queue: session.message_subscribe(destination="consumer_tag", queue="delete-me-2") - session.message_flow(destination="consumer_tag", unit=0, value=0xFFFFFFFF) - session.message_flow(destination="consumer_tag", unit=1, value=0xFFFFFFFF) + session.message_flow(destination="consumer_tag", unit=session.credit_unit.message, value=0xFFFFFFFF) + session.message_flow(destination="consumer_tag", unit=session.credit_unit.byte, value=0xFFFFFFFF) queue = session.incoming("consumer_tag") msg = queue.get(timeout=1) self.assertEqual("message", msg.body) diff --git a/python/tests_0-10/tx.py b/python/tests_0-10/tx.py index 5aef2b00e8..da162d54ec 100644 --- a/python/tests_0-10/tx.py +++ b/python/tests_0-10/tx.py @@ -251,13 +251,13 @@ class TxTests(TestBase010): session = session or self.session consumer_tag = keys["destination"] session.message_subscribe(**keys) - session.message_flow(destination=consumer_tag, unit=0, value=0xFFFFFFFF) - session.message_flow(destination=consumer_tag, unit=1, value=0xFFFFFFFF) + session.message_flow(destination=consumer_tag, unit=session.credit_unit.message, value=0xFFFFFFFF) + session.message_flow(destination=consumer_tag, unit=session.credit_unit.byte, value=0xFFFFFFFF) def enable_flow(self, tag, session=None): session = session or self.session - session.message_flow(destination=tag, unit=0, value=0xFFFFFFFF) - session.message_flow(destination=tag, unit=1, value=0xFFFFFFFF) + session.message_flow(destination=tag, unit=session.credit_unit.message, value=0xFFFFFFFF) + session.message_flow(destination=tag, unit=session.credit_unit.byte, value=0xFFFFFFFF) def complete(self, session, msg): session.receiver._completed.add(msg.id)#TODO: this may be done automatically |