summaryrefslogtreecommitdiff
path: root/python/qpid/connection.py
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2009-08-11 15:40:19 +0000
committerRafael H. Schloming <rhs@apache.org>2009-08-11 15:40:19 +0000
commitd22ac4bbbd52fc8cbf80f864c49c904b0b24a529 (patch)
tree1f140400747e507d88d69695046e13a5efcf0e52 /python/qpid/connection.py
parent0fc88ad654ed1dabf14c489ed5920b440a7fc6a2 (diff)
downloadqpid-python-d22ac4bbbd52fc8cbf80f864c49c904b0b24a529.tar.gz
- removed old and redundent tests
- removed old test harness in favor of qpid-python-test - modified qpid-python-test to support "skipped" tests, these are tests that failed due to an anticipated environmental reason such as the broker is not running or it is the wrong version - modified the qpid-python-test harness to exit with appropriate error codes based on the test results - modified the python clients to report version mismatches rather than framing errors - made qpid_config provide variables for 0-8, 0-9, and 0-10 versions of the spec - modified the 0-10 client to directly codegen classes - added new 0-10 framing layer based on push parsing rather than pull parsing - added numerous framing tests git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@803168 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/qpid/connection.py')
-rw-r--r--python/qpid/connection.py65
1 files changed, 48 insertions, 17 deletions
diff --git a/python/qpid/connection.py b/python/qpid/connection.py
index 5abab3802c..680f8f62e3 100644
--- a/python/qpid/connection.py
+++ b/python/qpid/connection.py
@@ -20,14 +20,14 @@
import datatypes, session
from threading import Thread, Condition, RLock
from util import wait, notify
-from assembler import Assembler, Segment
from codec010 import StringCodec
+from framing import *
from session import Session
from generator import control_invoker
from spec import SPEC
from exceptions import *
from logging import getLogger
-import delegates
+import delegates, socket
class ChannelBusy(Exception): pass
@@ -43,12 +43,12 @@ def client(*args, **kwargs):
def server(*args, **kwargs):
return delegates.Server(*args, **kwargs)
-class Connection(Assembler):
+from framer import Framer
- def __init__(self, sock, spec=SPEC, delegate=client, **args):
- Assembler.__init__(self, sock)
- self.spec = spec
+class Connection(Framer):
+ def __init__(self, sock, delegate=client, **args):
+ Framer.__init__(self, sock)
self.lock = RLock()
self.attached = {}
self.sessions = {}
@@ -66,6 +66,10 @@ class Connection(Assembler):
self.channel_max = 65535
+ self.op_enc = OpEncoder()
+ self.seg_enc = SegmentEncoder()
+ self.frame_enc = FrameEncoder()
+
self.delegate = delegate(self, **args)
def attach(self, name, ch, delegate, force=False):
@@ -145,15 +149,44 @@ class Connection(Assembler):
raise ConnectionFailed(*self.close_code)
def run(self):
+ frame_dec = FrameDecoder()
+ seg_dec = SegmentDecoder()
+ op_dec = OpDecoder()
+
while not self.closed:
try:
- seg = self.read_segment()
- except Closed:
+ data = self.sock.recv(64*1024)
+ if not data:
+ self.detach_all()
+ break
+ except socket.timeout:
+ if self.aborted():
+ self.detach_all()
+ raise Closed("connection timed out")
+ else:
+ continue
+ except socket.error, e:
self.detach_all()
- break
- self.delegate.received(seg)
+ raise Closed(e)
+ frame_dec.write(data)
+ seg_dec.write(*frame_dec.read())
+ op_dec.write(*seg_dec.read())
+ for op in op_dec.read():
+ self.delegate.received(op)
self.sock.close()
+ def write_op(self, op):
+ self.sock_lock.acquire()
+ try:
+ self.op_enc.write(op)
+ self.seg_enc.write(*self.op_enc.read())
+ self.frame_enc.write(*self.seg_enc.read())
+ bytes = self.frame_enc.read()
+ self.write(bytes)
+ self.flush()
+ finally:
+ self.sock_lock.release()
+
def close(self, timeout=None):
if not self.opened: return
Channel(self, 0).connection_close(200)
@@ -169,19 +202,17 @@ class Connection(Assembler):
log = getLogger("qpid.io.ctl")
-class Channel(control_invoker(SPEC)):
+class Channel(control_invoker()):
def __init__(self, connection, id):
self.connection = connection
self.id = id
self.session = None
- def invoke(self, type, args, kwargs):
- ctl = type.new(args, kwargs)
- sc = StringCodec(self.spec)
- sc.write_control(ctl)
- self.connection.write_segment(Segment(True, True, type.segment_type,
- type.track, self.id, sc.encoded))
+ def invoke(self, op, args, kwargs):
+ ctl = op(*args, **kwargs)
+ ctl.channel = self.id
+ self.connection.write_op(ctl)
log.debug("SENT %s", ctl)
def __str__(self):