summaryrefslogtreecommitdiff
path: root/python/qpid/session.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/qpid/session.py')
-rw-r--r--python/qpid/session.py79
1 files changed, 57 insertions, 22 deletions
diff --git a/python/qpid/session.py b/python/qpid/session.py
index 2e5f47b63e..334902bbf3 100644
--- a/python/qpid/session.py
+++ b/python/qpid/session.py
@@ -17,11 +17,14 @@
# under the License.
#
-from threading import Event
+from threading import Event, RLock
from invoker import Invoker
from datatypes import RangeSet, Struct, Future
from codec010 import StringCodec
from assembler import Segment
+from queue import Queue
+from datatypes import Message
+from logging import getLogger
class SessionDetached(Exception): pass
@@ -46,6 +49,20 @@ class Session(Invoker):
self.delegate = delegate(self)
self.send_id = True
self.results = {}
+ self.lock = RLock()
+ self._incoming = {}
+ self.assembly = None
+
+ def incoming(self, destination):
+ self.lock.acquire()
+ try:
+ queue = self._incoming.get(destination)
+ if queue == None:
+ queue = Queue()
+ self._incoming[destination] = queue
+ return queue
+ finally:
+ self.lock.release()
def close(self, timeout=None):
self.channel.session_detach(self.name)
@@ -106,19 +123,37 @@ class Session(Invoker):
def received(self, seg):
self.receiver.received(seg)
- if seg.type == self.spec["segment_type.command"].value:
- cmd = seg.decode(self.spec)
- attr = cmd.type.qname.replace(".", "_")
- result = getattr(self.delegate, attr)(cmd)
- if cmd.type.result:
- self.execution_result(seg.id, result)
- elif seg.type == self.spec["segment_type.header"].value:
- self.delegate.header(seg.decode(self.spec))
- elif seg.type == self.spec["segment_type.body"].value:
- self.delegate.body(seg.decode(self.spec))
- else:
- raise ValueError("unknown segment type: %s" % seg.type)
- self.receiver.completed(seg)
+ if seg.first:
+ assert self.assembly == None
+ self.assembly = []
+ self.assembly.append(seg)
+ if seg.last:
+ self.dispatch(self.assembly)
+ self.assembly = None
+
+ def dispatch(self, assembly):
+ cmd = assembly.pop(0).decode(self.spec)
+ args = []
+
+ for st in cmd.type.segments:
+ if assembly:
+ seg = assembly[0]
+ if seg.type == st.segment_type:
+ args.append(seg.decode(self.spec))
+ assembly.pop(0)
+ continue
+ args.append(None)
+
+ assert len(assembly) == 0
+
+ attr = cmd.type.qname.replace(".", "_")
+ result = getattr(self.delegate, attr)(cmd, *args)
+
+ if cmd.type.result:
+ self.execution_result(cmd.id, result)
+
+ for seg in assembly:
+ self.receiver.completed(seg)
def send(self, seg):
self.sender.send(seg)
@@ -196,13 +231,13 @@ class Delegate:
future = self.session.results[er.command_id]
future.set(er.value)
-class Client(Delegate):
+msg = getLogger("qpid.ssn.msg")
- def message_transfer(self, cmd):
- print "TRANSFER:", cmd
-
- def header(self, hdr):
- print "HEADER:", hdr
+class Client(Delegate):
- def body(self, seg):
- print "BODY:", seg
+ def message_transfer(self, cmd, headers, body):
+ m = Message(body)
+ m.headers = headers
+ messages = self.session.incoming(cmd.destination)
+ messages.put(m)
+ msg.debug("RECV: %s", m)