summaryrefslogtreecommitdiff
path: root/python/qpid/session.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/qpid/session.py')
-rw-r--r--python/qpid/session.py199
1 files changed, 70 insertions, 129 deletions
diff --git a/python/qpid/session.py b/python/qpid/session.py
index 3b8bd18469..4413a22899 100644
--- a/python/qpid/session.py
+++ b/python/qpid/session.py
@@ -22,9 +22,9 @@ from spec import SPEC
from generator import command_invoker
from datatypes import RangedSet, Struct, Future
from codec010 import StringCodec
-from assembler import Segment
from queue import Queue
from datatypes import Message, serial
+from ops import Command, MessageTransfer
from util import wait, notify
from exceptions import *
from logging import getLogger
@@ -44,7 +44,7 @@ def server(*args):
INCOMPLETE = object()
-class Session(command_invoker(SPEC)):
+class Session(command_invoker()):
def __init__(self, name, auto_sync=True, timeout=10, delegate=client):
self.name = name
@@ -67,8 +67,6 @@ class Session(command_invoker(SPEC)):
self.results = {}
self.exceptions = []
- self.assembly = None
-
self.delegate = delegate(self)
def incoming(self, destination):
@@ -134,68 +132,51 @@ class Session(command_invoker(SPEC)):
finally:
self.lock.release()
- def invoke(self, type, args, kwargs):
- # XXX
- if not hasattr(type, "track"):
- return type.new(args, kwargs)
-
- self.invoke_lock.acquire()
- try:
- return self.do_invoke(type, args, kwargs)
- finally:
- self.invoke_lock.release()
+ def invoke(self, op, args, kwargs):
+ if issubclass(op, Command):
+ self.invoke_lock.acquire()
+ try:
+ return self.do_invoke(op, args, kwargs)
+ finally:
+ self.invoke_lock.release()
+ else:
+ return op(*args, **kwargs)
- def do_invoke(self, type, args, kwargs):
+ def do_invoke(self, op, args, kwargs):
if self._closing:
raise SessionClosed()
if self.channel == None:
raise SessionDetached()
- if type.segments:
- if len(args) == len(type.fields) + 1:
+ if op == MessageTransfer:
+ if len(args) == len(op.FIELDS) + 1:
message = args[-1]
args = args[:-1]
else:
message = kwargs.pop("message", None)
- else:
- message = None
-
- hdr = Struct(self.spec["session.header"])
- hdr.sync = self.auto_sync or kwargs.pop("sync", False)
- self.need_sync = not hdr.sync
+ if message is not None:
+ kwargs["headers"] = message.headers
+ kwargs["payload"] = message.body
- cmd = type.new(args, kwargs)
- sc = StringCodec(self.spec)
- sc.write_command(hdr, cmd)
+ cmd = op(*args, **kwargs)
+ cmd.sync = self.auto_sync or cmd.sync
+ self.need_sync = not cmd.sync
+ cmd.channel = self.channel.id
- seg = Segment(True, (message == None or
- (message.headers == None and message.body == None)),
- type.segment_type, type.track, self.channel.id, sc.encoded)
-
- if type.result:
+ if op.RESULT:
result = Future(exception=SessionException)
self.results[self.sender.next_id] = result
- self.send(seg)
-
- log.debug("SENT %s %s %s", seg.id, hdr, cmd)
-
- if message != None:
- if message.headers != None:
- sc = StringCodec(self.spec)
- for st in message.headers:
- sc.write_struct32(st)
- seg = Segment(False, message.body == None, self.spec["segment_type.header"].value,
- type.track, self.channel.id, sc.encoded)
- self.send(seg)
- if message.body != None:
- seg = Segment(False, True, self.spec["segment_type.body"].value,
- type.track, self.channel.id, message.body)
- self.send(seg)
- msg.debug("SENT %s", message)
-
- if type.result:
+ log.debug("SENDING %s", cmd)
+
+ self.send(cmd)
+
+ log.debug("SENT %s", cmd)
+ if op == MessageTransfer:
+ msg.debug("SENT %s", cmd)
+
+ if op.RESULT:
if self.auto_sync:
return result.get(self.timeout)
else:
@@ -203,81 +184,47 @@ class Session(command_invoker(SPEC)):
elif self.auto_sync:
self.sync(self.timeout)
- def received(self, seg):
- self.receiver.received(seg)
- if seg.first:
- assert self.assembly == None
- self.assembly = []
- self.assembly.append(seg)
- if seg.last:
- self.dispatch(self.assembly)
- self.assembly = None
-
- def dispatch(self, assembly):
- segments = assembly[:]
-
- hdr, cmd = assembly.pop(0).decode(self.spec)
- log.debug("RECV %s %s %s", cmd.id, hdr, cmd)
-
- args = []
-
- for st in cmd._type.segments:
- if assembly:
- seg = assembly[0]
- if seg.type == st.segment_type:
- args.append(seg.decode(self.spec))
- assembly.pop(0)
- continue
- args.append(None)
-
- assert len(assembly) == 0
+ def received(self, cmd):
+ self.receiver.received(cmd)
+ self.dispatch(cmd)
- attr = cmd._type.qname.replace(".", "_")
- result = getattr(self.delegate, attr)(cmd, *args)
+ def dispatch(self, cmd):
+ log.debug("RECV %s", cmd)
- if cmd._type.result:
+ result = getattr(self.delegate, cmd.NAME)(cmd)
+ if result is INCOMPLETE:
+ return
+ elif result is not None:
self.execution_result(cmd.id, result)
- if result is not INCOMPLETE:
- for seg in segments:
- self.receiver.completed(seg)
- # XXX: don't forget to obey sync for manual completion as well
- if hdr.sync:
- self.channel.session_completed(self.receiver._completed)
+ self.receiver.completed(cmd)
+ # XXX: don't forget to obey sync for manual completion as well
+ if cmd.sync:
+ self.channel.session_completed(self.receiver._completed)
- def send(self, seg):
- self.sender.send(seg)
-
- def __str__(self):
- return '<Session: %s, %s>' % (self.name, self.channel)
+ def send(self, cmd):
+ self.sender.send(cmd)
def __repr__(self):
- return str(self)
+ return '<Session: %s, %s>' % (self.name, self.channel)
class Receiver:
def __init__(self, session):
self.session = session
self.next_id = None
- self.next_offset = None
self._completed = RangedSet()
- def received(self, seg):
- if self.next_id == None or self.next_offset == None:
+ def received(self, cmd):
+ if self.next_id == None:
raise Exception("todo")
- seg.id = self.next_id
- seg.offset = self.next_offset
- if seg.last:
- self.next_id += 1
- self.next_offset = 0
- else:
- self.next_offset += len(seg.payload)
+ cmd.id = self.next_id
+ self.next_id += 1
- def completed(self, seg):
- if seg.id == None:
- raise ValueError("cannot complete unidentified segment")
- if seg.last:
- self._completed.add(seg.id)
+ def completed(self, cmd):
+ if cmd.id == None:
+ raise ValueError("cannot complete unidentified command")
+ self._completed.add(cmd.id)
def known_completed(self, commands):
completed = RangedSet()
@@ -294,30 +241,24 @@ class Sender:
def __init__(self, session):
self.session = session
self.next_id = serial(0)
- self.next_offset = 0
- self.segments = []
+ self.commands = []
self._completed = RangedSet()
- def send(self, seg):
- seg.id = self.next_id
- seg.offset = self.next_offset
- if seg.last:
- self.next_id += 1
- self.next_offset = 0
- else:
- self.next_offset += len(seg.payload)
- self.segments.append(seg)
+ def send(self, cmd):
+ cmd.id = self.next_id
+ self.next_id += 1
if self.session.send_id:
self.session.send_id = False
- self.session.channel.session_command_point(seg.id, seg.offset)
- self.session.channel.connection.write_segment(seg)
+ self.session.channel.session_command_point(cmd.id, 0)
+ self.commands.append(cmd)
+ self.session.channel.connection.write_op(cmd)
def completed(self, commands):
idx = 0
- while idx < len(self.segments):
- seg = self.segments[idx]
- if seg.id in commands:
- del self.segments[idx]
+ while idx < len(self.commands):
+ cmd = self.commands[idx]
+ if cmd.id in commands:
+ del self.commands[idx]
else:
idx += 1
for range in commands.ranges:
@@ -332,7 +273,7 @@ class Incoming(Queue):
def start(self):
self.session.message_set_flow_mode(self.destination, self.session.flow_mode.credit)
- for unit in self.session.credit_unit.values():
+ for unit in self.session.credit_unit.VALUES:
self.session.message_flow(self.destination, unit, 0xFFFFFFFFL)
def stop(self):
@@ -356,9 +297,9 @@ class Delegate:
class Client(Delegate):
- def message_transfer(self, cmd, headers, body):
- m = Message(body)
- m.headers = headers
+ def message_transfer(self, cmd):
+ m = Message(cmd.payload)
+ m.headers = cmd.headers
m.id = cmd.id
messages = self.session.incoming(cmd.destination)
messages.put(m)