summaryrefslogtreecommitdiff
path: root/python/qpid/connection.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/qpid/connection.py')
-rw-r--r--python/qpid/connection.py65
1 files changed, 48 insertions, 17 deletions
diff --git a/python/qpid/connection.py b/python/qpid/connection.py
index 5abab3802c..680f8f62e3 100644
--- a/python/qpid/connection.py
+++ b/python/qpid/connection.py
@@ -20,14 +20,14 @@
import datatypes, session
from threading import Thread, Condition, RLock
from util import wait, notify
-from assembler import Assembler, Segment
from codec010 import StringCodec
+from framing import *
from session import Session
from generator import control_invoker
from spec import SPEC
from exceptions import *
from logging import getLogger
-import delegates
+import delegates, socket
class ChannelBusy(Exception): pass
@@ -43,12 +43,12 @@ def client(*args, **kwargs):
def server(*args, **kwargs):
return delegates.Server(*args, **kwargs)
-class Connection(Assembler):
+from framer import Framer
- def __init__(self, sock, spec=SPEC, delegate=client, **args):
- Assembler.__init__(self, sock)
- self.spec = spec
+class Connection(Framer):
+ def __init__(self, sock, delegate=client, **args):
+ Framer.__init__(self, sock)
self.lock = RLock()
self.attached = {}
self.sessions = {}
@@ -66,6 +66,10 @@ class Connection(Assembler):
self.channel_max = 65535
+ self.op_enc = OpEncoder()
+ self.seg_enc = SegmentEncoder()
+ self.frame_enc = FrameEncoder()
+
self.delegate = delegate(self, **args)
def attach(self, name, ch, delegate, force=False):
@@ -145,15 +149,44 @@ class Connection(Assembler):
raise ConnectionFailed(*self.close_code)
def run(self):
+ frame_dec = FrameDecoder()
+ seg_dec = SegmentDecoder()
+ op_dec = OpDecoder()
+
while not self.closed:
try:
- seg = self.read_segment()
- except Closed:
+ data = self.sock.recv(64*1024)
+ if not data:
+ self.detach_all()
+ break
+ except socket.timeout:
+ if self.aborted():
+ self.detach_all()
+ raise Closed("connection timed out")
+ else:
+ continue
+ except socket.error, e:
self.detach_all()
- break
- self.delegate.received(seg)
+ raise Closed(e)
+ frame_dec.write(data)
+ seg_dec.write(*frame_dec.read())
+ op_dec.write(*seg_dec.read())
+ for op in op_dec.read():
+ self.delegate.received(op)
self.sock.close()
+ def write_op(self, op):
+ self.sock_lock.acquire()
+ try:
+ self.op_enc.write(op)
+ self.seg_enc.write(*self.op_enc.read())
+ self.frame_enc.write(*self.seg_enc.read())
+ bytes = self.frame_enc.read()
+ self.write(bytes)
+ self.flush()
+ finally:
+ self.sock_lock.release()
+
def close(self, timeout=None):
if not self.opened: return
Channel(self, 0).connection_close(200)
@@ -169,19 +202,17 @@ class Connection(Assembler):
log = getLogger("qpid.io.ctl")
-class Channel(control_invoker(SPEC)):
+class Channel(control_invoker()):
def __init__(self, connection, id):
self.connection = connection
self.id = id
self.session = None
- def invoke(self, type, args, kwargs):
- ctl = type.new(args, kwargs)
- sc = StringCodec(self.spec)
- sc.write_control(ctl)
- self.connection.write_segment(Segment(True, True, type.segment_type,
- type.track, self.id, sc.encoded))
+ def invoke(self, op, args, kwargs):
+ ctl = op(*args, **kwargs)
+ ctl.channel = self.id
+ self.connection.write_op(ctl)
log.debug("SENT %s", ctl)
def __str__(self):