summaryrefslogtreecommitdiff
path: root/qpid/python/qpid/session.py
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/python/qpid/session.py')
-rw-r--r--qpid/python/qpid/session.py308
1 files changed, 308 insertions, 0 deletions
diff --git a/qpid/python/qpid/session.py b/qpid/python/qpid/session.py
new file mode 100644
index 0000000000..95714a128a
--- /dev/null
+++ b/qpid/python/qpid/session.py
@@ -0,0 +1,308 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from threading import Condition, RLock, Lock, currentThread
+from generator import command_invoker
+from datatypes import RangedSet, Struct, Future
+from codec010 import StringCodec
+from queue import Queue
+from datatypes import Message, serial
+from ops import Command, MessageTransfer
+from util import wait, notify
+from exceptions import *
+from logging import getLogger
+
+log = getLogger("qpid.io.cmd")
+msg = getLogger("qpid.io.msg")
+
+class SessionException(Exception): pass
+class SessionClosed(SessionException): pass
+class SessionDetached(SessionException): pass
+
+def client(*args):
+ return Client(*args)
+
+def server(*args):
+ return Server(*args)
+
+INCOMPLETE = object()
+
+class Session(command_invoker()):
+
+ def __init__(self, name, auto_sync=True, timeout=10, delegate=client):
+ self.name = name
+ self.auto_sync = auto_sync
+ self.need_sync = True
+ self.timeout = timeout
+ self.channel = None
+ self.invoke_lock = Lock()
+ self._closing = False
+ self._closed = False
+
+ self.condition = Condition()
+
+ self.send_id = True
+ self.receiver = Receiver(self)
+ self.sender = Sender(self)
+
+ self.lock = RLock()
+ self._incoming = {}
+ self.results = {}
+ self.exceptions = []
+
+ self.delegate = delegate(self)
+
+ def incoming(self, destination):
+ self.lock.acquire()
+ try:
+ queue = self._incoming.get(destination)
+ if queue == None:
+ queue = Incoming(self, destination)
+ self._incoming[destination] = queue
+ return queue
+ finally:
+ self.lock.release()
+
+ def error(self):
+ exc = self.exceptions[:]
+ if len(exc) == 0:
+ return None
+ elif len(exc) == 1:
+ return exc[0]
+ else:
+ return tuple(exc)
+
+ def sync(self, timeout=None):
+ ch = self.channel
+ if ch is not None and currentThread() == ch.connection.thread:
+ raise SessionException("deadlock detected")
+ if self.need_sync:
+ self.execution_sync(sync=True)
+ last = self.sender.next_id - 1
+ if not wait(self.condition, lambda:
+ last in self.sender._completed or self.exceptions,
+ timeout):
+ raise Timeout()
+ if self.exceptions:
+ raise SessionException(self.error())
+
+ def close(self, timeout=None):
+ self.invoke_lock.acquire()
+ try:
+ self._closing = True
+ self.channel.session_detach(self.name)
+ finally:
+ self.invoke_lock.release()
+ if not wait(self.condition, lambda: self._closed, timeout):
+ raise Timeout()
+
+ def closed(self):
+ self.lock.acquire()
+ try:
+ if self._closed: return
+
+ error = self.error()
+ for id in self.results:
+ f = self.results[id]
+ f.error(error)
+ self.results.clear()
+
+ for q in self._incoming.values():
+ q.close(error)
+
+ self._closed = True
+ notify(self.condition)
+ finally:
+ self.lock.release()
+
+ def invoke(self, op, args, kwargs):
+ if issubclass(op, Command):
+ self.invoke_lock.acquire()
+ try:
+ return self.do_invoke(op, args, kwargs)
+ finally:
+ self.invoke_lock.release()
+ else:
+ return op(*args, **kwargs)
+
+ def do_invoke(self, op, args, kwargs):
+ if self._closing:
+ raise SessionClosed()
+
+ ch = self.channel
+ if ch == None:
+ raise SessionDetached()
+
+ if op == MessageTransfer:
+ if len(args) == len(op.FIELDS) + 1:
+ message = args[-1]
+ args = args[:-1]
+ else:
+ message = kwargs.pop("message", None)
+ if message is not None:
+ kwargs["headers"] = message.headers
+ kwargs["payload"] = message.body
+
+ cmd = op(*args, **kwargs)
+ cmd.sync = self.auto_sync or cmd.sync
+ self.need_sync = not cmd.sync
+ cmd.channel = ch.id
+
+ if op.RESULT:
+ result = Future(exception=SessionException)
+ self.results[self.sender.next_id] = result
+
+ self.send(cmd)
+
+ log.debug("SENT %s", cmd)
+ if op == MessageTransfer:
+ msg.debug("SENT %s", cmd)
+
+ if op.RESULT:
+ if self.auto_sync:
+ return result.get(self.timeout)
+ else:
+ return result
+ elif self.auto_sync:
+ self.sync(self.timeout)
+
+ def received(self, cmd):
+ self.receiver.received(cmd)
+ self.dispatch(cmd)
+
+ def dispatch(self, cmd):
+ log.debug("RECV %s", cmd)
+
+ result = getattr(self.delegate, cmd.NAME)(cmd)
+ if result is INCOMPLETE:
+ return
+ elif result is not None:
+ self.execution_result(cmd.id, result)
+
+ self.receiver.completed(cmd)
+ # XXX: don't forget to obey sync for manual completion as well
+ if cmd.sync:
+ self.channel.session_completed(self.receiver._completed)
+
+ def send(self, cmd):
+ self.sender.send(cmd)
+
+ def __repr__(self):
+ return '<Session: %s, %s>' % (self.name, self.channel)
+
+class Receiver:
+
+ def __init__(self, session):
+ self.session = session
+ self.next_id = None
+ self._completed = RangedSet()
+
+ def received(self, cmd):
+ if self.next_id == None:
+ raise Exception("todo")
+ cmd.id = self.next_id
+ self.next_id += 1
+
+ def completed(self, cmd):
+ if cmd.id == None:
+ raise ValueError("cannot complete unidentified command")
+ self._completed.add(cmd.id)
+
+ def known_completed(self, commands):
+ completed = RangedSet()
+ for c in self._completed.ranges:
+ for kc in commands.ranges:
+ if c.lower in kc and c.upper in kc:
+ break
+ else:
+ completed.add_range(c)
+ self._completed = completed
+
+class Sender:
+
+ def __init__(self, session):
+ self.session = session
+ self.next_id = serial(0)
+ self.commands = []
+ self._completed = RangedSet()
+
+ def send(self, cmd):
+ ch = self.session.channel
+ if ch is None:
+ raise SessionDetached()
+ cmd.id = self.next_id
+ self.next_id += 1
+ if self.session.send_id:
+ self.session.send_id = False
+ ch.session_command_point(cmd.id, 0)
+ self.commands.append(cmd)
+ ch.connection.write_op(cmd)
+
+ def completed(self, commands):
+ idx = 0
+ while idx < len(self.commands):
+ cmd = self.commands[idx]
+ if cmd.id in commands:
+ del self.commands[idx]
+ else:
+ idx += 1
+ for range in commands.ranges:
+ self._completed.add(range.lower, range.upper)
+
+class Incoming(Queue):
+
+ def __init__(self, session, destination):
+ Queue.__init__(self)
+ self.session = session
+ self.destination = destination
+
+ def start(self):
+ self.session.message_set_flow_mode(self.destination, self.session.flow_mode.credit)
+ for unit in self.session.credit_unit.VALUES:
+ self.session.message_flow(self.destination, unit, 0xFFFFFFFFL)
+
+ def stop(self):
+ self.session.message_cancel(self.destination)
+ self.listen(None)
+
+class Delegate:
+
+ def __init__(self, session):
+ self.session = session
+
+ #XXX: do something with incoming accepts
+ def message_accept(self, ma): None
+
+ def execution_result(self, er):
+ future = self.session.results.pop(er.command_id)
+ future.set(er.value)
+
+ def execution_exception(self, ex):
+ self.session.exceptions.append(ex)
+
+class Client(Delegate):
+
+ def message_transfer(self, cmd):
+ m = Message(cmd.payload)
+ m.headers = cmd.headers
+ m.id = cmd.id
+ messages = self.session.incoming(cmd.destination)
+ messages.put(m)
+ msg.debug("RECV %s", m)
+ return INCOMPLETE