diff options
Diffstat (limited to 'python')
-rw-r--r-- | python/qpid/assembler.py | 4 | ||||
-rw-r--r-- | python/qpid/codec010.py | 27 | ||||
-rw-r--r-- | python/qpid/connection010.py | 2 | ||||
-rw-r--r-- | python/qpid/datatypes.py | 47 | ||||
-rw-r--r-- | python/qpid/delegates.py | 2 | ||||
-rw-r--r-- | python/qpid/session.py | 11 | ||||
-rw-r--r-- | python/qpid/spec010.py | 45 | ||||
-rw-r--r-- | python/tests/connection010.py | 2 | ||||
-rw-r--r-- | python/tests/spec010.py | 18 | ||||
-rw-r--r-- | python/tests_0-10/example.py | 2 | ||||
-rw-r--r-- | python/tests_0-10/queue.py | 2 |
11 files changed, 89 insertions, 73 deletions
diff --git a/python/qpid/assembler.py b/python/qpid/assembler.py index 1a480698c8..9f8bc4bd8a 100644 --- a/python/qpid/assembler.py +++ b/python/qpid/assembler.py @@ -46,9 +46,9 @@ class Segment: def decode_command(self, spec): sc = StringCodec(spec, self.payload) - cmd = sc.read_command() + hdr, cmd = sc.read_command() cmd.id = self.id - return cmd + return hdr, cmd def decode_header(self, spec): sc = StringCodec(spec, self.payload) diff --git a/python/qpid/codec010.py b/python/qpid/codec010.py index 27df9db974..60adbdc2e4 100644 --- a/python/qpid/codec010.py +++ b/python/qpid/codec010.py @@ -18,7 +18,7 @@ # from packer import Packer -from datatypes import RangedSet +from datatypes import RangedSet, Struct class CodecException(Exception): pass @@ -184,27 +184,32 @@ class Codec(Packer): def read_struct32(self): size = self.read_uint32() code = self.read_uint16() - struct = self.spec.structs[code] - return struct.decode_fields(self) + type = self.spec.structs[code] + fields = type.decode_fields(self) + return Struct(type, **fields) def write_struct32(self, value): sc = StringCodec(self.spec) - sc.write_uint16(value.type.code) - value.type.encode_fields(sc, value) + sc.write_uint16(value._type.code) + value._type.encode_fields(sc, value) self.write_vbin32(sc.encoded) def read_control(self): cntrl = self.spec.controls[self.read_uint16()] return cntrl.decode(self) - def write_control(self, type, ctrl): + def write_control(self, ctrl): + type = ctrl._type self.write_uint16(type.code) type.encode(self, ctrl) def read_command(self): - cmd = self.spec.commands[self.read_uint16()] - return cmd.decode(self) - def write_command(self, type, cmd): - self.write_uint16(type.code) - type.encode(self, cmd) + type = self.spec.commands[self.read_uint16()] + hdr = self.spec["session.header"].decode(self) + cmd = type.decode(self) + return hdr, cmd + def write_command(self, hdr, cmd): + self.write_uint16(cmd._type.code) + hdr._type.encode(self, hdr) + cmd._type.encode(self, cmd) def read_size(self, width): if width > 0: diff --git a/python/qpid/connection010.py b/python/qpid/connection010.py index 022ef8e411..1235476b82 100644 --- a/python/qpid/connection010.py +++ b/python/qpid/connection010.py @@ -166,7 +166,7 @@ class Channel(Invoker): def invoke(self, type, args, kwargs): cntrl = type.new(args, kwargs) sc = StringCodec(self.connection.spec) - sc.write_control(type, cntrl) + sc.write_control(cntrl) self.connection.write_segment(Segment(True, True, type.segment_type, type.track, self.id, sc.encoded)) diff --git a/python/qpid/datatypes.py b/python/qpid/datatypes.py index e9973b4cc8..45944a2494 100644 --- a/python/qpid/datatypes.py +++ b/python/qpid/datatypes.py @@ -21,15 +21,48 @@ import threading class Struct: - def __init__(self, fields): - self.__dict__ = fields + def __init__(self, _type, *args, **kwargs): + if len(args) > len(_type.fields): + raise TypeError("%s() takes at most %s arguments (%s given)" % + (_type.name, len(_type.fields), len(args))) - def __repr__(self): - return "Struct(%s)" % ", ".join(["%s=%r" % (k, v) - for k, v in self.__dict__.items()]) + self._type = _type + + idx = 0 + for field in _type.fields: + if idx < len(args): + arg = args[idx] + if kwargs.has_key(field.name): + raise TypeError("%s() got multiple values for keyword argument '%s'" % + (_type.name, field.name)) + elif kwargs.has_key(field.name): + arg = kwargs.pop(field.name) + else: + arg = field.default() + setattr(self, field.name, arg) + idx += 1 - def fields(self): - return self.__dict__ + if kwargs: + unexpected = kwargs.keys()[0] + raise TypeError("%s() got an unexpected keywoard argument '%s'" % + (_type.name, unexpected)) + + def __getitem__(self, name): + return getattr(self, name) + + def __setitem__(self, name, value): + if not hasattr(self, name): + raise AttributeError("'%s' object has no attribute '%s'" % + (self._type.name, name)) + setattr(self, name, value) + + def __repr__(self): + fields = [] + for f in self._type.fields: + v = self[f.name] + if f.type.is_present(v): + fields.append("%s=%r" % (f.name, v)) + return "%s(%s)" % (self._type.name, ", ".join(fields)) class Message: diff --git a/python/qpid/delegates.py b/python/qpid/delegates.py index d1f615a3fa..8de7141962 100644 --- a/python/qpid/delegates.py +++ b/python/qpid/delegates.py @@ -38,7 +38,7 @@ class Delegate: if seg.track == self.control: cntrl = seg.decode(self.spec) - attr = cntrl.type.qname.replace(".", "_") + attr = cntrl._type.qname.replace(".", "_") getattr(self, attr)(ch, cntrl) elif ssn is None: ch.session_detached() diff --git a/python/qpid/session.py b/python/qpid/session.py index 7a84fa601d..8374b8cd3d 100644 --- a/python/qpid/session.py +++ b/python/qpid/session.py @@ -129,7 +129,8 @@ class Session(Invoker): cmd = type.new(args, kwargs) sc = StringCodec(self.spec) - sc.write_command(type, cmd) + hdr = Struct(self.spec["session.header"]) + sc.write_command(hdr, cmd) seg = Segment(True, (message == None or (message.headers == None and message.body == None)), @@ -174,10 +175,10 @@ class Session(Invoker): def dispatch(self, assembly): segments = assembly[:] - cmd = assembly.pop(0).decode(self.spec) + hdr, cmd = assembly.pop(0).decode(self.spec) args = [] - for st in cmd.type.segments: + for st in cmd._type.segments: if assembly: seg = assembly[0] if seg.type == st.segment_type: @@ -188,10 +189,10 @@ class Session(Invoker): assert len(assembly) == 0 - attr = cmd.type.qname.replace(".", "_") + attr = cmd._type.qname.replace(".", "_") result = getattr(self.delegate, attr)(cmd, *args) - if cmd.type.result: + if cmd._type.result: self.execution_result(cmd.id, result) if result is not INCOMPLETE: diff --git a/python/qpid/spec010.py b/python/qpid/spec010.py index b84be7a047..815c1d064a 100644 --- a/python/qpid/spec010.py +++ b/python/qpid/spec010.py @@ -188,34 +188,18 @@ class Composite(Type, Coded): self.pack = pack def new(self, args, kwargs): - if len(args) > len(self.fields): - raise TypeError("%s takes at most %s arguments (%s given)" % - (self.name, len(self.fields), len(self.args))) - - result = {"type": self} - - for a, f, in zip(args, self.fields): - result[f.name] = a - - for k, v in kwargs.items(): - f = self.named.get(k) - if f == None: - raise TypeError("%s got an unexpected keyword argument '%s'" % - (self.name, k)) - result[f.name] = v - - return datatypes.Struct(result) + return datatypes.Struct(self, *args, **kwargs) def decode(self, codec): codec.read_size(self.size) - return self.decode_fields(codec) + return datatypes.Struct(self, **self.decode_fields(codec)) def decode_fields(self, codec): flags = 0 for i in range(self.pack): flags |= (codec.read_uint8() << 8*i) - result = {"type": self} + result = {} for i in range(len(self.fields)): f = self.fields[i] @@ -223,7 +207,7 @@ class Composite(Type, Coded): result[f.name] = f.type.decode(codec) else: result[f.name] = None - return datatypes.Struct(result) + return result def encode(self, codec, value): sc = StringCodec(self.spec) @@ -231,12 +215,11 @@ class Composite(Type, Coded): codec.write_size(self.size, len(sc.encoded)) codec.write(sc.encoded) - def encode_fields(self, codec, value): - values = value.__dict__ + def encode_fields(self, codec, values): flags = 0 for i in range(len(self.fields)): f = self.fields[i] - if f.type.is_present(values.get(f.name)): + if f.type.is_present(values[f.name]): flags |= (0x1 << i) for i in range(self.pack): codec.write_uint8((flags >> 8*i) & 0xFF) @@ -253,6 +236,9 @@ class Field(Named, Node, Lookup): self.type = type self.exceptions = [] + def default(self): + return None + def register(self, node): Named.register(self, node) node.fields.append(self) @@ -328,23 +314,10 @@ class Command(Instruction): def register(self, node): Instruction.register(self, node) node.commands.append(self) - self.header = self.spec["session.header"] self.spec.commands[self.code] = self self.segment_type = self.spec["segment_type.command"].value self.track = self.spec["track.command"].value - def decode(self, codec): - hdr = self.header.decode(codec) - args = Instruction.decode(self, codec) - result = {} - result.update(hdr.fields()) - result.update(args.fields()) - return datatypes.Struct(result) - - def encode(self, codec, cmd): - self.header.encode(codec, cmd) - Instruction.encode(self, codec, cmd) - class Header(Segment, Node): def __init__(self, children): diff --git a/python/tests/connection010.py b/python/tests/connection010.py index e966ede377..a953e034a2 100644 --- a/python/tests/connection010.py +++ b/python/tests/connection010.py @@ -49,7 +49,7 @@ class TestSession(Delegate): self.queue = queue def queue_query(self, qq): - return qq.type.result.type.new((qq.queue,), {}) + return qq._type.result.type.new((qq.queue,), {}) def message_transfer(self, cmd, header, body): self.queue.put((cmd, header, body)) diff --git a/python/tests/spec010.py b/python/tests/spec010.py index 1c520ee323..4161dc060f 100644 --- a/python/tests/spec010.py +++ b/python/tests/spec010.py @@ -31,11 +31,11 @@ class SpecTest(TestCase): def testSessionHeader(self): hdr = self.spec["session.header"] sc = StringCodec(self.spec) - hdr.encode(sc, Struct({"sync": True})) + hdr.encode(sc, Struct(hdr, sync=True)) assert sc.encoded == "\x01\x01" sc = StringCodec(self.spec) - hdr.encode(sc, Struct({"sync": False})) + hdr.encode(sc, Struct(hdr, sync=False)) assert sc.encoded == "\x01\x00" def encdec(self, type, value): @@ -45,16 +45,20 @@ class SpecTest(TestCase): return decoded def testMessageProperties(self): - props = Struct({"content_length": 0xDEADBEEF, - "reply_to": - Struct({"exchange": "the exchange name", "routing_key": "the routing key"})}) - dec = self.encdec(self.spec["message.message_properties"], props) + mp = self.spec["message.message_properties"] + rt = self.spec["message.reply_to"] + + props = Struct(mp, content_length=0xDEADBEEF, + reply_to=Struct(rt, exchange="the exchange name", + routing_key="the routing key")) + dec = self.encdec(mp, props) assert props.content_length == dec.content_length assert props.reply_to.exchange == dec.reply_to.exchange assert props.reply_to.routing_key == dec.reply_to.routing_key def testMessageSubscribe(self): - cmd = Struct({"exclusive": True, "destination": "this is a test"}) + ms = self.spec["message.subscribe"] + cmd = Struct(ms, exclusive=True, destination="this is a test") dec = self.encdec(self.spec["message.subscribe"], cmd) assert cmd.exclusive == dec.exclusive assert cmd.destination == dec.destination diff --git a/python/tests_0-10/example.py b/python/tests_0-10/example.py index fd10a8df4f..1e140a285d 100644 --- a/python/tests_0-10/example.py +++ b/python/tests_0-10/example.py @@ -60,7 +60,7 @@ class ExampleTest (TestBase010): session.exchange_declare("test", "direct") # Here we use keyword arguments. - session.queue_declare(session, queue="test-queue", exclusive=True, auto_delete=True) + session.queue_declare(queue="test-queue", exclusive=True, auto_delete=True) session.exchange_bind(queue="test-queue", exchange="test", binding_key="key") # Call Session.subscribe to register as a consumer. diff --git a/python/tests_0-10/queue.py b/python/tests_0-10/queue.py index dba732d415..bacbc60bc1 100644 --- a/python/tests_0-10/queue.py +++ b/python/tests_0-10/queue.py @@ -269,7 +269,7 @@ class QueueTests(TestBase010): session = self.conn.session("replacement", 2) #empty queue: - session.message_subscribe(session, destination="consumer_tag", queue="delete-me-2") + session.message_subscribe(destination="consumer_tag", queue="delete-me-2") session.message_flow(destination="consumer_tag", unit=0, value=0xFFFFFFFF) session.message_flow(destination="consumer_tag", unit=1, value=0xFFFFFFFF) queue = session.incoming("consumer_tag") |