diff options
Diffstat (limited to 'python/qpid/session.py')
-rw-r--r-- | python/qpid/session.py | 199 |
1 files changed, 70 insertions, 129 deletions
diff --git a/python/qpid/session.py b/python/qpid/session.py index 3b8bd18469..4413a22899 100644 --- a/python/qpid/session.py +++ b/python/qpid/session.py @@ -22,9 +22,9 @@ from spec import SPEC from generator import command_invoker from datatypes import RangedSet, Struct, Future from codec010 import StringCodec -from assembler import Segment from queue import Queue from datatypes import Message, serial +from ops import Command, MessageTransfer from util import wait, notify from exceptions import * from logging import getLogger @@ -44,7 +44,7 @@ def server(*args): INCOMPLETE = object() -class Session(command_invoker(SPEC)): +class Session(command_invoker()): def __init__(self, name, auto_sync=True, timeout=10, delegate=client): self.name = name @@ -67,8 +67,6 @@ class Session(command_invoker(SPEC)): self.results = {} self.exceptions = [] - self.assembly = None - self.delegate = delegate(self) def incoming(self, destination): @@ -134,68 +132,51 @@ class Session(command_invoker(SPEC)): finally: self.lock.release() - def invoke(self, type, args, kwargs): - # XXX - if not hasattr(type, "track"): - return type.new(args, kwargs) - - self.invoke_lock.acquire() - try: - return self.do_invoke(type, args, kwargs) - finally: - self.invoke_lock.release() + def invoke(self, op, args, kwargs): + if issubclass(op, Command): + self.invoke_lock.acquire() + try: + return self.do_invoke(op, args, kwargs) + finally: + self.invoke_lock.release() + else: + return op(*args, **kwargs) - def do_invoke(self, type, args, kwargs): + def do_invoke(self, op, args, kwargs): if self._closing: raise SessionClosed() if self.channel == None: raise SessionDetached() - if type.segments: - if len(args) == len(type.fields) + 1: + if op == MessageTransfer: + if len(args) == len(op.FIELDS) + 1: message = args[-1] args = args[:-1] else: message = kwargs.pop("message", None) - else: - message = None - - hdr = Struct(self.spec["session.header"]) - hdr.sync = self.auto_sync or kwargs.pop("sync", False) - self.need_sync = not hdr.sync + if message is not None: + kwargs["headers"] = message.headers + kwargs["payload"] = message.body - cmd = type.new(args, kwargs) - sc = StringCodec(self.spec) - sc.write_command(hdr, cmd) + cmd = op(*args, **kwargs) + cmd.sync = self.auto_sync or cmd.sync + self.need_sync = not cmd.sync + cmd.channel = self.channel.id - seg = Segment(True, (message == None or - (message.headers == None and message.body == None)), - type.segment_type, type.track, self.channel.id, sc.encoded) - - if type.result: + if op.RESULT: result = Future(exception=SessionException) self.results[self.sender.next_id] = result - self.send(seg) - - log.debug("SENT %s %s %s", seg.id, hdr, cmd) - - if message != None: - if message.headers != None: - sc = StringCodec(self.spec) - for st in message.headers: - sc.write_struct32(st) - seg = Segment(False, message.body == None, self.spec["segment_type.header"].value, - type.track, self.channel.id, sc.encoded) - self.send(seg) - if message.body != None: - seg = Segment(False, True, self.spec["segment_type.body"].value, - type.track, self.channel.id, message.body) - self.send(seg) - msg.debug("SENT %s", message) - - if type.result: + log.debug("SENDING %s", cmd) + + self.send(cmd) + + log.debug("SENT %s", cmd) + if op == MessageTransfer: + msg.debug("SENT %s", cmd) + + if op.RESULT: if self.auto_sync: return result.get(self.timeout) else: @@ -203,81 +184,47 @@ class Session(command_invoker(SPEC)): elif self.auto_sync: self.sync(self.timeout) - def received(self, seg): - self.receiver.received(seg) - if seg.first: - assert self.assembly == None - self.assembly = [] - self.assembly.append(seg) - if seg.last: - self.dispatch(self.assembly) - self.assembly = None - - def dispatch(self, assembly): - segments = assembly[:] - - hdr, cmd = assembly.pop(0).decode(self.spec) - log.debug("RECV %s %s %s", cmd.id, hdr, cmd) - - args = [] - - for st in cmd._type.segments: - if assembly: - seg = assembly[0] - if seg.type == st.segment_type: - args.append(seg.decode(self.spec)) - assembly.pop(0) - continue - args.append(None) - - assert len(assembly) == 0 + def received(self, cmd): + self.receiver.received(cmd) + self.dispatch(cmd) - attr = cmd._type.qname.replace(".", "_") - result = getattr(self.delegate, attr)(cmd, *args) + def dispatch(self, cmd): + log.debug("RECV %s", cmd) - if cmd._type.result: + result = getattr(self.delegate, cmd.NAME)(cmd) + if result is INCOMPLETE: + return + elif result is not None: self.execution_result(cmd.id, result) - if result is not INCOMPLETE: - for seg in segments: - self.receiver.completed(seg) - # XXX: don't forget to obey sync for manual completion as well - if hdr.sync: - self.channel.session_completed(self.receiver._completed) + self.receiver.completed(cmd) + # XXX: don't forget to obey sync for manual completion as well + if cmd.sync: + self.channel.session_completed(self.receiver._completed) - def send(self, seg): - self.sender.send(seg) - - def __str__(self): - return '<Session: %s, %s>' % (self.name, self.channel) + def send(self, cmd): + self.sender.send(cmd) def __repr__(self): - return str(self) + return '<Session: %s, %s>' % (self.name, self.channel) class Receiver: def __init__(self, session): self.session = session self.next_id = None - self.next_offset = None self._completed = RangedSet() - def received(self, seg): - if self.next_id == None or self.next_offset == None: + def received(self, cmd): + if self.next_id == None: raise Exception("todo") - seg.id = self.next_id - seg.offset = self.next_offset - if seg.last: - self.next_id += 1 - self.next_offset = 0 - else: - self.next_offset += len(seg.payload) + cmd.id = self.next_id + self.next_id += 1 - def completed(self, seg): - if seg.id == None: - raise ValueError("cannot complete unidentified segment") - if seg.last: - self._completed.add(seg.id) + def completed(self, cmd): + if cmd.id == None: + raise ValueError("cannot complete unidentified command") + self._completed.add(cmd.id) def known_completed(self, commands): completed = RangedSet() @@ -294,30 +241,24 @@ class Sender: def __init__(self, session): self.session = session self.next_id = serial(0) - self.next_offset = 0 - self.segments = [] + self.commands = [] self._completed = RangedSet() - def send(self, seg): - seg.id = self.next_id - seg.offset = self.next_offset - if seg.last: - self.next_id += 1 - self.next_offset = 0 - else: - self.next_offset += len(seg.payload) - self.segments.append(seg) + def send(self, cmd): + cmd.id = self.next_id + self.next_id += 1 if self.session.send_id: self.session.send_id = False - self.session.channel.session_command_point(seg.id, seg.offset) - self.session.channel.connection.write_segment(seg) + self.session.channel.session_command_point(cmd.id, 0) + self.commands.append(cmd) + self.session.channel.connection.write_op(cmd) def completed(self, commands): idx = 0 - while idx < len(self.segments): - seg = self.segments[idx] - if seg.id in commands: - del self.segments[idx] + while idx < len(self.commands): + cmd = self.commands[idx] + if cmd.id in commands: + del self.commands[idx] else: idx += 1 for range in commands.ranges: @@ -332,7 +273,7 @@ class Incoming(Queue): def start(self): self.session.message_set_flow_mode(self.destination, self.session.flow_mode.credit) - for unit in self.session.credit_unit.values(): + for unit in self.session.credit_unit.VALUES: self.session.message_flow(self.destination, unit, 0xFFFFFFFFL) def stop(self): @@ -356,9 +297,9 @@ class Delegate: class Client(Delegate): - def message_transfer(self, cmd, headers, body): - m = Message(body) - m.headers = headers + def message_transfer(self, cmd): + m = Message(cmd.payload) + m.headers = cmd.headers m.id = cmd.id messages = self.session.incoming(cmd.destination) messages.put(m) |