diff options
Diffstat (limited to 'python/qpid/session.py')
-rw-r--r-- | python/qpid/session.py | 79 |
1 files changed, 57 insertions, 22 deletions
diff --git a/python/qpid/session.py b/python/qpid/session.py index 2e5f47b63e..334902bbf3 100644 --- a/python/qpid/session.py +++ b/python/qpid/session.py @@ -17,11 +17,14 @@ # under the License. # -from threading import Event +from threading import Event, RLock from invoker import Invoker from datatypes import RangeSet, Struct, Future from codec010 import StringCodec from assembler import Segment +from queue import Queue +from datatypes import Message +from logging import getLogger class SessionDetached(Exception): pass @@ -46,6 +49,20 @@ class Session(Invoker): self.delegate = delegate(self) self.send_id = True self.results = {} + self.lock = RLock() + self._incoming = {} + self.assembly = None + + def incoming(self, destination): + self.lock.acquire() + try: + queue = self._incoming.get(destination) + if queue == None: + queue = Queue() + self._incoming[destination] = queue + return queue + finally: + self.lock.release() def close(self, timeout=None): self.channel.session_detach(self.name) @@ -106,19 +123,37 @@ class Session(Invoker): def received(self, seg): self.receiver.received(seg) - if seg.type == self.spec["segment_type.command"].value: - cmd = seg.decode(self.spec) - attr = cmd.type.qname.replace(".", "_") - result = getattr(self.delegate, attr)(cmd) - if cmd.type.result: - self.execution_result(seg.id, result) - elif seg.type == self.spec["segment_type.header"].value: - self.delegate.header(seg.decode(self.spec)) - elif seg.type == self.spec["segment_type.body"].value: - self.delegate.body(seg.decode(self.spec)) - else: - raise ValueError("unknown segment type: %s" % seg.type) - self.receiver.completed(seg) + if seg.first: + assert self.assembly == None + self.assembly = [] + self.assembly.append(seg) + if seg.last: + self.dispatch(self.assembly) + self.assembly = None + + def dispatch(self, assembly): + cmd = assembly.pop(0).decode(self.spec) + args = [] + + for st in cmd.type.segments: + if assembly: + seg = assembly[0] + if seg.type == st.segment_type: + args.append(seg.decode(self.spec)) + assembly.pop(0) + continue + args.append(None) + + assert len(assembly) == 0 + + attr = cmd.type.qname.replace(".", "_") + result = getattr(self.delegate, attr)(cmd, *args) + + if cmd.type.result: + self.execution_result(cmd.id, result) + + for seg in assembly: + self.receiver.completed(seg) def send(self, seg): self.sender.send(seg) @@ -196,13 +231,13 @@ class Delegate: future = self.session.results[er.command_id] future.set(er.value) -class Client(Delegate): +msg = getLogger("qpid.ssn.msg") - def message_transfer(self, cmd): - print "TRANSFER:", cmd - - def header(self, hdr): - print "HEADER:", hdr +class Client(Delegate): - def body(self, seg): - print "BODY:", seg + def message_transfer(self, cmd, headers, body): + m = Message(body) + m.headers = headers + messages = self.session.incoming(cmd.destination) + messages.put(m) + msg.debug("RECV: %s", m) |