diff options
author | Rafael H. Schloming <rhs@apache.org> | 2008-03-05 14:39:40 +0000 |
---|---|---|
committer | Rafael H. Schloming <rhs@apache.org> | 2008-03-05 14:39:40 +0000 |
commit | 86779be122dea590bc1e5201c58777ea3e362a95 (patch) | |
tree | 5867b18efe04c62c99e1ca14d177b0eda894bd82 | |
parent | 00f2ca6cf33f77e44b94db2701830f8c9bcd794e (diff) | |
download | qpid-python-86779be122dea590bc1e5201c58777ea3e362a95.tar.gz |
added incoming queues for messages; altered session dispatch to send entire assembly to a single handler; added logging switch for hello-010-world
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@633861 13f79535-47bb-0310-9956-ffa450edef68
-rwxr-xr-x | python/hello-010-world | 16 | ||||
-rw-r--r-- | python/qpid/assembler.py | 6 | ||||
-rw-r--r-- | python/qpid/datatypes.py | 8 | ||||
-rw-r--r-- | python/qpid/session.py | 79 | ||||
-rw-r--r-- | python/qpid/spec010.py | 32 | ||||
-rwxr-xr-x | python/server010 | 6 | ||||
-rw-r--r-- | python/tests/connection010.py | 24 |
7 files changed, 124 insertions, 47 deletions
diff --git a/python/hello-010-world b/python/hello-010-world index 1b4c9aed33..2b762b1296 100755 --- a/python/hello-010-world +++ b/python/hello-010-world @@ -1,13 +1,18 @@ #!/usr/bin/env python -import logging +import sys, logging from qpid.connection010 import Connection from qpid.spec010 import load from qpid.util import connect from qpid.datatypes import Message +if "-v" in sys.argv: + level = logging.DEBUG +else: + level = logging.WARN + format = "%(asctime)s %(name)-12s %(levelname)-8s %(message)s" -logging.basicConfig(level=logging.DEBUG, format=format, datefmt='%H:%M:%S') +logging.basicConfig(level=level, format=format, datefmt='%H:%M:%S') spec = load("../specs/amqp.0-10.xml") conn = Connection(connect("0.0.0.0", spec.port), spec) @@ -18,10 +23,15 @@ ssn = conn.session("my-session", timeout=10) ssn.queue_declare("asdf") ssn.message_transfer("this", None, None, Message("testing...")) -ssn.message_transfer("is") +ssn.message_transfer("is", None, None, Message("more testing...")) ssn.message_transfer("a") ssn.message_transfer("test") +print ssn.incoming("this").get() +print ssn.incoming("is").get() +print ssn.incoming("a").get() +print ssn.incoming("test").get() + print ssn.queue_query("testing") ssn.close(timeout=10) diff --git a/python/qpid/assembler.py b/python/qpid/assembler.py index aac8b80cb4..fe78baaceb 100644 --- a/python/qpid/assembler.py +++ b/python/qpid/assembler.py @@ -46,7 +46,9 @@ class Segment: def decode_command(self, spec): sc = StringCodec(spec, self.payload) - return sc.read_command() + cmd = sc.read_command() + cmd.id = self.id + return cmd def decode_header(self, spec): sc = StringCodec(spec, self.payload) @@ -56,7 +58,7 @@ class Segment: return values def decode_body(self, spec): - return self + return self.payload def __str__(self): return "%s%s %s %s %s %r" % (int(self.first), int(self.last), self.type, diff --git a/python/qpid/datatypes.py b/python/qpid/datatypes.py index 649c8f4d76..efd1fdd4ff 100644 --- a/python/qpid/datatypes.py +++ b/python/qpid/datatypes.py @@ -43,6 +43,14 @@ class Message: else: self.headers = None + def __repr__(self): + args = [] + if self.headers: + args.extend(self.headers) + if self.body: + args.append(self.body) + return "Message(%s)" % ", ".join(map(repr, args)) + class Range: def __init__(self, lower, upper): diff --git a/python/qpid/session.py b/python/qpid/session.py index 2e5f47b63e..334902bbf3 100644 --- a/python/qpid/session.py +++ b/python/qpid/session.py @@ -17,11 +17,14 @@ # under the License. # -from threading import Event +from threading import Event, RLock from invoker import Invoker from datatypes import RangeSet, Struct, Future from codec010 import StringCodec from assembler import Segment +from queue import Queue +from datatypes import Message +from logging import getLogger class SessionDetached(Exception): pass @@ -46,6 +49,20 @@ class Session(Invoker): self.delegate = delegate(self) self.send_id = True self.results = {} + self.lock = RLock() + self._incoming = {} + self.assembly = None + + def incoming(self, destination): + self.lock.acquire() + try: + queue = self._incoming.get(destination) + if queue == None: + queue = Queue() + self._incoming[destination] = queue + return queue + finally: + self.lock.release() def close(self, timeout=None): self.channel.session_detach(self.name) @@ -106,19 +123,37 @@ class Session(Invoker): def received(self, seg): self.receiver.received(seg) - if seg.type == self.spec["segment_type.command"].value: - cmd = seg.decode(self.spec) - attr = cmd.type.qname.replace(".", "_") - result = getattr(self.delegate, attr)(cmd) - if cmd.type.result: - self.execution_result(seg.id, result) - elif seg.type == self.spec["segment_type.header"].value: - self.delegate.header(seg.decode(self.spec)) - elif seg.type == self.spec["segment_type.body"].value: - self.delegate.body(seg.decode(self.spec)) - else: - raise ValueError("unknown segment type: %s" % seg.type) - self.receiver.completed(seg) + if seg.first: + assert self.assembly == None + self.assembly = [] + self.assembly.append(seg) + if seg.last: + self.dispatch(self.assembly) + self.assembly = None + + def dispatch(self, assembly): + cmd = assembly.pop(0).decode(self.spec) + args = [] + + for st in cmd.type.segments: + if assembly: + seg = assembly[0] + if seg.type == st.segment_type: + args.append(seg.decode(self.spec)) + assembly.pop(0) + continue + args.append(None) + + assert len(assembly) == 0 + + attr = cmd.type.qname.replace(".", "_") + result = getattr(self.delegate, attr)(cmd, *args) + + if cmd.type.result: + self.execution_result(cmd.id, result) + + for seg in assembly: + self.receiver.completed(seg) def send(self, seg): self.sender.send(seg) @@ -196,13 +231,13 @@ class Delegate: future = self.session.results[er.command_id] future.set(er.value) -class Client(Delegate): +msg = getLogger("qpid.ssn.msg") - def message_transfer(self, cmd): - print "TRANSFER:", cmd - - def header(self, hdr): - print "HEADER:", hdr +class Client(Delegate): - def body(self, seg): - print "BODY:", seg + def message_transfer(self, cmd, headers, body): + m = Message(body) + m.headers = headers + messages = self.session.incoming(cmd.destination) + messages.put(m) + msg.debug("RECV: %s", m) diff --git a/python/qpid/spec010.py b/python/qpid/spec010.py index e6b7946e17..c3f3e6ad57 100644 --- a/python/qpid/spec010.py +++ b/python/qpid/spec010.py @@ -194,7 +194,7 @@ class Composite(Type, Coded): result[f.name] = a for k, v in kwargs.items(): - f = self.named.get(k, None) + f = self.named.get(k) if f == None: raise TypeError("%s got an unexpected keyword argument '%s'" % (self.name, k)) @@ -232,7 +232,7 @@ class Composite(Type, Coded): flags = 0 for i in range(len(self.fields)): f = self.fields[i] - if f.type.is_present(values.get(f.name, None)): + if f.type.is_present(values.get(f.name)): flags |= (0x1 << i) for i in range(self.pack): codec.write_uint8((flags >> 8*i) & 0xFF) @@ -272,7 +272,10 @@ class Struct(Composite): for f in self.fields]) return "%s {\n %s\n}" % (self.qname, fields) -class Segment(Node): +class Segment: + + def __init__(self): + self.segment_type = None def register(self, node): self.spec = node.spec @@ -284,7 +287,7 @@ class Instruction(Composite, Segment): def __init__(self, name, code, children): Composite.__init__(self, name, code, 0, 2, children) - self.segment_type = None + Segment.__init__(self) self.track = None self.handlers = [] @@ -337,11 +340,17 @@ class Command(Instruction): self.header.encode(codec, cmd) Instruction.encode(self, codec, cmd) -class Header(Segment): +class Header(Segment, Node): def __init__(self, children): + Segment.__init__(self) + Node.__init__(self, children) self.entries = [] - Segment.__init__(self, children) + + def register(self, node): + Segment.register(self, node) + self.segment_type = self.spec["segment_type.header"].value + Node.register(self) class Entry(Lookup): @@ -356,7 +365,16 @@ class Entry(Lookup): def resolve(self): self.type = self.lookup(self.type) -class Body(Segment): +class Body(Segment, Node): + + def __init__(self, children): + Segment.__init__(self) + Node.__init__(self, children) + + def register(self, node): + Segment.register(self, node) + self.segment_type = self.spec["segment_type.body"].value + Node.register(self) def resolve(self): pass diff --git a/python/server010 b/python/server010 index b0e13d1e9f..6d89ee5ea0 100755 --- a/python/server010 +++ b/python/server010 @@ -5,6 +5,7 @@ from qpid.connection010 import Connection from qpid.util import connect, listen from qpid.spec010 import load from qpid.session import Client +from qpid.datatypes import Message spec = load("../specs/amqp.0-10.xml") @@ -27,6 +28,11 @@ class SessionDelegate(Client): def queue_query(self, qq): return qq.type.result.type.new((qq.queue,), {}) + def message_transfer(self, cmd, header, body): + m = Message(body) + m.header = header + self.session.message_transfer(cmd.destination, cmd.accept_mode, cmd.acquire_mode, m) + server = Server() for s in listen("0.0.0.0", spec.port): diff --git a/python/tests/connection010.py b/python/tests/connection010.py index 5e4bf983da..8adf20fd78 100644 --- a/python/tests/connection010.py +++ b/python/tests/connection010.py @@ -50,11 +50,8 @@ class TestSession(Delegate): def queue_query(self, qq): return qq.type.result.type.new((qq.queue,), {}) - def message_transfer(self, cmd): - self.queue.put(cmd) - - def body(self, body): - self.queue.put(body) + def message_transfer(self, cmd, header, body): + self.queue.put((cmd, header, body)) class ConnectionTest(TestCase): @@ -88,8 +85,8 @@ class ConnectionTest(TestCase): c = Connection(connect("0.0.0.0", PORT), self.spec) c.start(10) - ssn1 = c.session("test1") - ssn2 = c.session("test2") + ssn1 = c.session("test1", timeout=10) + ssn2 = c.session("test2", timeout=10) assert ssn1 == c.sessions["test1"] assert ssn2 == c.sessions["test2"] @@ -110,7 +107,7 @@ class ConnectionTest(TestCase): assert ssn2 not in c.attached.values() assert ssn2 not in c.sessions.values() - ssn = c.session("session") + ssn = c.session("session", timeout=10) assert ssn.channel != None assert ssn in c.sessions.values() @@ -121,16 +118,17 @@ class ConnectionTest(TestCase): ssn.message_transfer(d) for d in destinations: - cmd = self.queue.get(10) + cmd, header, body = self.queue.get(10) assert cmd.destination == d + assert header == None + assert body == None msg = Message("this is a test") ssn.message_transfer("four", message=msg) - cmd = self.queue.get(10) + cmd, header, body = self.queue.get(10) assert cmd.destination == "four" - body = self.queue.get(10) - assert body.payload == msg.body - assert body.last + assert header == None + assert body == msg.body qq = ssn.queue_query("asdf") assert qq.queue == "asdf" |