diff options
Diffstat (limited to 'qpid/python/qpid/session.py')
-rw-r--r-- | qpid/python/qpid/session.py | 308 |
1 files changed, 308 insertions, 0 deletions
diff --git a/qpid/python/qpid/session.py b/qpid/python/qpid/session.py new file mode 100644 index 0000000000..95714a128a --- /dev/null +++ b/qpid/python/qpid/session.py @@ -0,0 +1,308 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from threading import Condition, RLock, Lock, currentThread +from generator import command_invoker +from datatypes import RangedSet, Struct, Future +from codec010 import StringCodec +from queue import Queue +from datatypes import Message, serial +from ops import Command, MessageTransfer +from util import wait, notify +from exceptions import * +from logging import getLogger + +log = getLogger("qpid.io.cmd") +msg = getLogger("qpid.io.msg") + +class SessionException(Exception): pass +class SessionClosed(SessionException): pass +class SessionDetached(SessionException): pass + +def client(*args): + return Client(*args) + +def server(*args): + return Server(*args) + +INCOMPLETE = object() + +class Session(command_invoker()): + + def __init__(self, name, auto_sync=True, timeout=10, delegate=client): + self.name = name + self.auto_sync = auto_sync + self.need_sync = True + self.timeout = timeout + self.channel = None + self.invoke_lock = Lock() + self._closing = False + self._closed = False + + self.condition = Condition() + + self.send_id = True + self.receiver = Receiver(self) + self.sender = Sender(self) + + self.lock = RLock() + self._incoming = {} + self.results = {} + self.exceptions = [] + + self.delegate = delegate(self) + + def incoming(self, destination): + self.lock.acquire() + try: + queue = self._incoming.get(destination) + if queue == None: + queue = Incoming(self, destination) + self._incoming[destination] = queue + return queue + finally: + self.lock.release() + + def error(self): + exc = self.exceptions[:] + if len(exc) == 0: + return None + elif len(exc) == 1: + return exc[0] + else: + return tuple(exc) + + def sync(self, timeout=None): + ch = self.channel + if ch is not None and currentThread() == ch.connection.thread: + raise SessionException("deadlock detected") + if self.need_sync: + self.execution_sync(sync=True) + last = self.sender.next_id - 1 + if not wait(self.condition, lambda: + last in self.sender._completed or self.exceptions, + timeout): + raise Timeout() + if self.exceptions: + raise SessionException(self.error()) + + def close(self, timeout=None): + self.invoke_lock.acquire() + try: + self._closing = True + self.channel.session_detach(self.name) + finally: + self.invoke_lock.release() + if not wait(self.condition, lambda: self._closed, timeout): + raise Timeout() + + def closed(self): + self.lock.acquire() + try: + if self._closed: return + + error = self.error() + for id in self.results: + f = self.results[id] + f.error(error) + self.results.clear() + + for q in self._incoming.values(): + q.close(error) + + self._closed = True + notify(self.condition) + finally: + self.lock.release() + + def invoke(self, op, args, kwargs): + if issubclass(op, Command): + self.invoke_lock.acquire() + try: + return self.do_invoke(op, args, kwargs) + finally: + self.invoke_lock.release() + else: + return op(*args, **kwargs) + + def do_invoke(self, op, args, kwargs): + if self._closing: + raise SessionClosed() + + ch = self.channel + if ch == None: + raise SessionDetached() + + if op == MessageTransfer: + if len(args) == len(op.FIELDS) + 1: + message = args[-1] + args = args[:-1] + else: + message = kwargs.pop("message", None) + if message is not None: + kwargs["headers"] = message.headers + kwargs["payload"] = message.body + + cmd = op(*args, **kwargs) + cmd.sync = self.auto_sync or cmd.sync + self.need_sync = not cmd.sync + cmd.channel = ch.id + + if op.RESULT: + result = Future(exception=SessionException) + self.results[self.sender.next_id] = result + + self.send(cmd) + + log.debug("SENT %s", cmd) + if op == MessageTransfer: + msg.debug("SENT %s", cmd) + + if op.RESULT: + if self.auto_sync: + return result.get(self.timeout) + else: + return result + elif self.auto_sync: + self.sync(self.timeout) + + def received(self, cmd): + self.receiver.received(cmd) + self.dispatch(cmd) + + def dispatch(self, cmd): + log.debug("RECV %s", cmd) + + result = getattr(self.delegate, cmd.NAME)(cmd) + if result is INCOMPLETE: + return + elif result is not None: + self.execution_result(cmd.id, result) + + self.receiver.completed(cmd) + # XXX: don't forget to obey sync for manual completion as well + if cmd.sync: + self.channel.session_completed(self.receiver._completed) + + def send(self, cmd): + self.sender.send(cmd) + + def __repr__(self): + return '<Session: %s, %s>' % (self.name, self.channel) + +class Receiver: + + def __init__(self, session): + self.session = session + self.next_id = None + self._completed = RangedSet() + + def received(self, cmd): + if self.next_id == None: + raise Exception("todo") + cmd.id = self.next_id + self.next_id += 1 + + def completed(self, cmd): + if cmd.id == None: + raise ValueError("cannot complete unidentified command") + self._completed.add(cmd.id) + + def known_completed(self, commands): + completed = RangedSet() + for c in self._completed.ranges: + for kc in commands.ranges: + if c.lower in kc and c.upper in kc: + break + else: + completed.add_range(c) + self._completed = completed + +class Sender: + + def __init__(self, session): + self.session = session + self.next_id = serial(0) + self.commands = [] + self._completed = RangedSet() + + def send(self, cmd): + ch = self.session.channel + if ch is None: + raise SessionDetached() + cmd.id = self.next_id + self.next_id += 1 + if self.session.send_id: + self.session.send_id = False + ch.session_command_point(cmd.id, 0) + self.commands.append(cmd) + ch.connection.write_op(cmd) + + def completed(self, commands): + idx = 0 + while idx < len(self.commands): + cmd = self.commands[idx] + if cmd.id in commands: + del self.commands[idx] + else: + idx += 1 + for range in commands.ranges: + self._completed.add(range.lower, range.upper) + +class Incoming(Queue): + + def __init__(self, session, destination): + Queue.__init__(self) + self.session = session + self.destination = destination + + def start(self): + self.session.message_set_flow_mode(self.destination, self.session.flow_mode.credit) + for unit in self.session.credit_unit.VALUES: + self.session.message_flow(self.destination, unit, 0xFFFFFFFFL) + + def stop(self): + self.session.message_cancel(self.destination) + self.listen(None) + +class Delegate: + + def __init__(self, session): + self.session = session + + #XXX: do something with incoming accepts + def message_accept(self, ma): None + + def execution_result(self, er): + future = self.session.results.pop(er.command_id) + future.set(er.value) + + def execution_exception(self, ex): + self.session.exceptions.append(ex) + +class Client(Delegate): + + def message_transfer(self, cmd): + m = Message(cmd.payload) + m.headers = cmd.headers + m.id = cmd.id + messages = self.session.incoming(cmd.destination) + messages.put(m) + msg.debug("RECV %s", m) + return INCOMPLETE |