diff options
Diffstat (limited to 'python/qpid/connection.py')
-rw-r--r-- | python/qpid/connection.py | 65 |
1 files changed, 48 insertions, 17 deletions
diff --git a/python/qpid/connection.py b/python/qpid/connection.py index 5abab3802c..680f8f62e3 100644 --- a/python/qpid/connection.py +++ b/python/qpid/connection.py @@ -20,14 +20,14 @@ import datatypes, session from threading import Thread, Condition, RLock from util import wait, notify -from assembler import Assembler, Segment from codec010 import StringCodec +from framing import * from session import Session from generator import control_invoker from spec import SPEC from exceptions import * from logging import getLogger -import delegates +import delegates, socket class ChannelBusy(Exception): pass @@ -43,12 +43,12 @@ def client(*args, **kwargs): def server(*args, **kwargs): return delegates.Server(*args, **kwargs) -class Connection(Assembler): +from framer import Framer - def __init__(self, sock, spec=SPEC, delegate=client, **args): - Assembler.__init__(self, sock) - self.spec = spec +class Connection(Framer): + def __init__(self, sock, delegate=client, **args): + Framer.__init__(self, sock) self.lock = RLock() self.attached = {} self.sessions = {} @@ -66,6 +66,10 @@ class Connection(Assembler): self.channel_max = 65535 + self.op_enc = OpEncoder() + self.seg_enc = SegmentEncoder() + self.frame_enc = FrameEncoder() + self.delegate = delegate(self, **args) def attach(self, name, ch, delegate, force=False): @@ -145,15 +149,44 @@ class Connection(Assembler): raise ConnectionFailed(*self.close_code) def run(self): + frame_dec = FrameDecoder() + seg_dec = SegmentDecoder() + op_dec = OpDecoder() + while not self.closed: try: - seg = self.read_segment() - except Closed: + data = self.sock.recv(64*1024) + if not data: + self.detach_all() + break + except socket.timeout: + if self.aborted(): + self.detach_all() + raise Closed("connection timed out") + else: + continue + except socket.error, e: self.detach_all() - break - self.delegate.received(seg) + raise Closed(e) + frame_dec.write(data) + seg_dec.write(*frame_dec.read()) + op_dec.write(*seg_dec.read()) + for op in op_dec.read(): + self.delegate.received(op) self.sock.close() + def write_op(self, op): + self.sock_lock.acquire() + try: + self.op_enc.write(op) + self.seg_enc.write(*self.op_enc.read()) + self.frame_enc.write(*self.seg_enc.read()) + bytes = self.frame_enc.read() + self.write(bytes) + self.flush() + finally: + self.sock_lock.release() + def close(self, timeout=None): if not self.opened: return Channel(self, 0).connection_close(200) @@ -169,19 +202,17 @@ class Connection(Assembler): log = getLogger("qpid.io.ctl") -class Channel(control_invoker(SPEC)): +class Channel(control_invoker()): def __init__(self, connection, id): self.connection = connection self.id = id self.session = None - def invoke(self, type, args, kwargs): - ctl = type.new(args, kwargs) - sc = StringCodec(self.spec) - sc.write_control(ctl) - self.connection.write_segment(Segment(True, True, type.segment_type, - type.track, self.id, sc.encoded)) + def invoke(self, op, args, kwargs): + ctl = op(*args, **kwargs) + ctl.channel = self.id + self.connection.write_op(ctl) log.debug("SENT %s", ctl) def __str__(self): |