diff options
author | Rafael H. Schloming <rhs@apache.org> | 2009-08-11 15:40:19 +0000 |
---|---|---|
committer | Rafael H. Schloming <rhs@apache.org> | 2009-08-11 15:40:19 +0000 |
commit | d22ac4bbbd52fc8cbf80f864c49c904b0b24a529 (patch) | |
tree | 1f140400747e507d88d69695046e13a5efcf0e52 /python/qpid/connection.py | |
parent | 0fc88ad654ed1dabf14c489ed5920b440a7fc6a2 (diff) | |
download | qpid-python-d22ac4bbbd52fc8cbf80f864c49c904b0b24a529.tar.gz |
- removed old and redundent tests
- removed old test harness in favor of qpid-python-test
- modified qpid-python-test to support "skipped" tests, these are
tests that failed due to an anticipated environmental reason such
as the broker is not running or it is the wrong version
- modified the qpid-python-test harness to exit with appropriate
error codes based on the test results
- modified the python clients to report version mismatches rather
than framing errors
- made qpid_config provide variables for 0-8, 0-9, and 0-10 versions
of the spec
- modified the 0-10 client to directly codegen classes
- added new 0-10 framing layer based on push parsing rather than pull
parsing
- added numerous framing tests
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@803168 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/qpid/connection.py')
-rw-r--r-- | python/qpid/connection.py | 65 |
1 files changed, 48 insertions, 17 deletions
diff --git a/python/qpid/connection.py b/python/qpid/connection.py index 5abab3802c..680f8f62e3 100644 --- a/python/qpid/connection.py +++ b/python/qpid/connection.py @@ -20,14 +20,14 @@ import datatypes, session from threading import Thread, Condition, RLock from util import wait, notify -from assembler import Assembler, Segment from codec010 import StringCodec +from framing import * from session import Session from generator import control_invoker from spec import SPEC from exceptions import * from logging import getLogger -import delegates +import delegates, socket class ChannelBusy(Exception): pass @@ -43,12 +43,12 @@ def client(*args, **kwargs): def server(*args, **kwargs): return delegates.Server(*args, **kwargs) -class Connection(Assembler): +from framer import Framer - def __init__(self, sock, spec=SPEC, delegate=client, **args): - Assembler.__init__(self, sock) - self.spec = spec +class Connection(Framer): + def __init__(self, sock, delegate=client, **args): + Framer.__init__(self, sock) self.lock = RLock() self.attached = {} self.sessions = {} @@ -66,6 +66,10 @@ class Connection(Assembler): self.channel_max = 65535 + self.op_enc = OpEncoder() + self.seg_enc = SegmentEncoder() + self.frame_enc = FrameEncoder() + self.delegate = delegate(self, **args) def attach(self, name, ch, delegate, force=False): @@ -145,15 +149,44 @@ class Connection(Assembler): raise ConnectionFailed(*self.close_code) def run(self): + frame_dec = FrameDecoder() + seg_dec = SegmentDecoder() + op_dec = OpDecoder() + while not self.closed: try: - seg = self.read_segment() - except Closed: + data = self.sock.recv(64*1024) + if not data: + self.detach_all() + break + except socket.timeout: + if self.aborted(): + self.detach_all() + raise Closed("connection timed out") + else: + continue + except socket.error, e: self.detach_all() - break - self.delegate.received(seg) + raise Closed(e) + frame_dec.write(data) + seg_dec.write(*frame_dec.read()) + op_dec.write(*seg_dec.read()) + for op in op_dec.read(): + self.delegate.received(op) self.sock.close() + def write_op(self, op): + self.sock_lock.acquire() + try: + self.op_enc.write(op) + self.seg_enc.write(*self.op_enc.read()) + self.frame_enc.write(*self.seg_enc.read()) + bytes = self.frame_enc.read() + self.write(bytes) + self.flush() + finally: + self.sock_lock.release() + def close(self, timeout=None): if not self.opened: return Channel(self, 0).connection_close(200) @@ -169,19 +202,17 @@ class Connection(Assembler): log = getLogger("qpid.io.ctl") -class Channel(control_invoker(SPEC)): +class Channel(control_invoker()): def __init__(self, connection, id): self.connection = connection self.id = id self.session = None - def invoke(self, type, args, kwargs): - ctl = type.new(args, kwargs) - sc = StringCodec(self.spec) - sc.write_control(ctl) - self.connection.write_segment(Segment(True, True, type.segment_type, - type.track, self.id, sc.encoded)) + def invoke(self, op, args, kwargs): + ctl = op(*args, **kwargs) + ctl.channel = self.id + self.connection.write_op(ctl) log.debug("SENT %s", ctl) def __str__(self): |