diff options
Diffstat (limited to 'qpid/python/qpid/connection.py')
-rw-r--r-- | qpid/python/qpid/connection.py | 236 |
1 files changed, 236 insertions, 0 deletions
diff --git a/qpid/python/qpid/connection.py b/qpid/python/qpid/connection.py new file mode 100644 index 0000000000..7dbefb8778 --- /dev/null +++ b/qpid/python/qpid/connection.py @@ -0,0 +1,236 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import datatypes, session +from threading import Thread, Condition, RLock +from util import wait, notify +from codec010 import StringCodec +from framing import * +from session import Session +from generator import control_invoker +from exceptions import * +from logging import getLogger +import delegates, socket + +class ChannelBusy(Exception): pass + +class ChannelsBusy(Exception): pass + +class SessionBusy(Exception): pass + +class ConnectionFailed(Exception): pass + +def client(*args, **kwargs): + return delegates.Client(*args, **kwargs) + +def server(*args, **kwargs): + return delegates.Server(*args, **kwargs) + +from framer import Framer + +class Connection(Framer): + + def __init__(self, sock, delegate=client, **args): + Framer.__init__(self, sock) + self.lock = RLock() + self.attached = {} + self.sessions = {} + + self.condition = Condition() + # XXX: we should combine this into a single comprehensive state + # model (whatever that means) + self.opened = False + self.failed = False + self.closed = False + self.close_code = (None, "connection aborted") + + self.thread = Thread(target=self.run) + self.thread.setDaemon(True) + + self.channel_max = 65535 + self.user_id = None + + self.op_enc = OpEncoder() + self.seg_enc = SegmentEncoder() + self.frame_enc = FrameEncoder() + + self.delegate = delegate(self, **args) + + def attach(self, name, ch, delegate, force=False): + self.lock.acquire() + try: + ssn = self.attached.get(ch.id) + if ssn is not None: + if ssn.name != name: + raise ChannelBusy(ch, ssn) + else: + ssn = self.sessions.get(name) + if ssn is None: + ssn = Session(name, delegate=delegate) + self.sessions[name] = ssn + elif ssn.channel is not None: + if force: + del self.attached[ssn.channel.id] + ssn.channel = None + else: + raise SessionBusy(ssn) + self.attached[ch.id] = ssn + ssn.channel = ch + ch.session = ssn + return ssn + finally: + self.lock.release() + + def detach(self, name, ch): + self.lock.acquire() + try: + self.attached.pop(ch.id, None) + ssn = self.sessions.pop(name, None) + if ssn is not None: + ssn.channel = None + ssn.closed() + return ssn + finally: + self.lock.release() + + def __channel(self): + for i in xrange(1, self.channel_max): + if not self.attached.has_key(i): + return i + else: + raise ChannelsBusy() + + def session(self, name, timeout=None, delegate=session.client): + self.lock.acquire() + try: + ch = Channel(self, self.__channel()) + ssn = self.attach(name, ch, delegate) + ssn.channel.session_attach(name) + if wait(ssn.condition, lambda: ssn.channel is not None, timeout): + return ssn + else: + self.detach(name, ch) + raise Timeout() + finally: + self.lock.release() + + def detach_all(self): + self.lock.acquire() + self.failed = True + try: + for ssn in self.attached.values(): + if self.close_code[0] != 200: + ssn.exceptions.append(self.close_code) + self.detach(ssn.name, ssn.channel) + finally: + self.lock.release() + + def start(self, timeout=None): + self.delegate.start() + self.thread.start() + if not wait(self.condition, lambda: self.opened or self.failed, timeout): + self.thread.join() + raise Timeout() + if self.failed: + self.thread.join() + raise ConnectionFailed(*self.close_code) + + def run(self): + frame_dec = FrameDecoder() + seg_dec = SegmentDecoder() + op_dec = OpDecoder() + + while not self.closed: + try: + data = self.sock.recv(64*1024) + if self.security_layer_rx and data: + status, data = self.security_layer_rx.decode(data) + if not data: + self.detach_all() + break + except socket.timeout: + if self.aborted(): + self.close_code = (None, "connection timed out") + self.detach_all() + break + else: + continue + except socket.error, e: + self.close_code = (None, str(e)) + self.detach_all() + break + frame_dec.write(data) + seg_dec.write(*frame_dec.read()) + op_dec.write(*seg_dec.read()) + for op in op_dec.read(): + try: + self.delegate.received(op) + except Closed, e: + self.close_code = (None, str(e)) + if not self.opened: + self.failed = True + self.closed = True + notify(self.condition) + self.sock.close() + + def write_op(self, op): + self.sock_lock.acquire() + try: + self.op_enc.write(op) + self.seg_enc.write(*self.op_enc.read()) + self.frame_enc.write(*self.seg_enc.read()) + bytes = self.frame_enc.read() + self.write(bytes) + self.flush() + finally: + self.sock_lock.release() + + def close(self, timeout=None): + if not self.opened: return + Channel(self, 0).connection_close(200) + if not wait(self.condition, lambda: not self.opened, timeout): + raise Timeout() + self.thread.join(timeout=timeout) + + def __str__(self): + return "%s:%s" % self.sock.getsockname() + + def __repr__(self): + return str(self) + +log = getLogger("qpid.io.ctl") + +class Channel(control_invoker()): + + def __init__(self, connection, id): + self.connection = connection + self.id = id + self.session = None + + def invoke(self, op, args, kwargs): + ctl = op(*args, **kwargs) + ctl.channel = self.id + self.connection.write_op(ctl) + log.debug("SENT %s", ctl) + + def __str__(self): + return "%s[%s]" % (self.connection, self.id) + + def __repr__(self): + return str(self) |