summaryrefslogtreecommitdiff
path: root/trunk/qpid/python/qpid/session.py
diff options
context:
space:
mode:
Diffstat (limited to 'trunk/qpid/python/qpid/session.py')
-rw-r--r--trunk/qpid/python/qpid/session.py378
1 files changed, 0 insertions, 378 deletions
diff --git a/trunk/qpid/python/qpid/session.py b/trunk/qpid/python/qpid/session.py
deleted file mode 100644
index 2f70461ab6..0000000000
--- a/trunk/qpid/python/qpid/session.py
+++ /dev/null
@@ -1,378 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-from threading import Condition, RLock, Lock, currentThread
-from invoker import Invoker
-from datatypes import RangedSet, Struct, Future
-from codec010 import StringCodec
-from assembler import Segment
-from queue import Queue
-from datatypes import Message, serial
-from util import wait, notify
-from exceptions import *
-from logging import getLogger
-
-log = getLogger("qpid.io.cmd")
-msg = getLogger("qpid.io.msg")
-
-class SessionException(Exception): pass
-class SessionClosed(SessionException): pass
-class SessionDetached(SessionException): pass
-
-def client(*args):
- return Client(*args)
-
-def server(*args):
- return Server(*args)
-
-INCOMPLETE = object()
-
-class Session(Invoker):
-
- def __init__(self, name, spec, auto_sync=True, timeout=10, delegate=client):
- self.name = name
- self.spec = spec
- self.auto_sync = auto_sync
- self.timeout = timeout
- self.channel = None
- self.invoke_lock = Lock()
- self._closing = False
- self._closed = False
-
- self.condition = Condition()
-
- self.send_id = True
- self.receiver = Receiver(self)
- self.sender = Sender(self)
-
- self.lock = RLock()
- self._incoming = {}
- self.results = {}
- self.exceptions = []
-
- self.assembly = None
-
- self.delegate = delegate(self)
-
- def incoming(self, destination):
- self.lock.acquire()
- try:
- queue = self._incoming.get(destination)
- if queue == None:
- queue = Incoming(self, destination)
- self._incoming[destination] = queue
- return queue
- finally:
- self.lock.release()
-
- def error(self):
- exc = self.exceptions[:]
- if len(exc) == 0:
- return None
- elif len(exc) == 1:
- return exc[0]
- else:
- return tuple(exc)
-
- def sync(self, timeout=None):
- ch = self.channel
- if ch is not None and currentThread() == ch.connection.thread:
- raise SessionException("deadlock detected")
- if not self.auto_sync:
- self.execution_sync(sync=True)
- last = self.sender.next_id - 1
- if not wait(self.condition, lambda:
- last in self.sender._completed or self.exceptions,
- timeout):
- raise Timeout()
- if self.exceptions:
- raise SessionException(self.error())
-
- def close(self, timeout=None):
- self.invoke_lock.acquire()
- try:
- self._closing = True
- self.channel.session_detach(self.name)
- finally:
- self.invoke_lock.release()
- if not wait(self.condition, lambda: self._closed, timeout):
- raise Timeout()
-
- def closed(self):
- self.lock.acquire()
- try:
- if self._closed: return
-
- error = self.error()
- for id in self.results:
- f = self.results[id]
- f.error(error)
- self.results.clear()
-
- for q in self._incoming.values():
- q.close(error)
-
- self._closed = True
- notify(self.condition)
- finally:
- self.lock.release()
-
- def resolve_method(self, name):
- cmd = self.spec.instructions.get(name)
- if cmd is not None and cmd.track == self.spec["track.command"].value:
- return self.METHOD, cmd
- else:
- # XXX
- for st in self.spec.structs.values():
- if st.name == name:
- return self.METHOD, st
- if self.spec.structs_by_name.has_key(name):
- return self.METHOD, self.spec.structs_by_name[name]
- if self.spec.enums.has_key(name):
- return self.VALUE, self.spec.enums[name]
- return self.ERROR, None
-
- def invoke(self, type, args, kwargs):
- # XXX
- if not hasattr(type, "track"):
- return type.new(args, kwargs)
-
- self.invoke_lock.acquire()
- try:
- return self.do_invoke(type, args, kwargs)
- finally:
- self.invoke_lock.release()
-
- def do_invoke(self, type, args, kwargs):
- if self._closing:
- raise SessionClosed()
-
- if self.channel == None:
- raise SessionDetached()
-
- if type.segments:
- if len(args) == len(type.fields) + 1:
- message = args[-1]
- args = args[:-1]
- else:
- message = kwargs.pop("message", None)
- else:
- message = None
-
- hdr = Struct(self.spec["session.header"])
- hdr.sync = self.auto_sync or kwargs.pop("sync", False)
-
- cmd = type.new(args, kwargs)
- sc = StringCodec(self.spec)
- sc.write_command(hdr, cmd)
-
- seg = Segment(True, (message == None or
- (message.headers == None and message.body == None)),
- type.segment_type, type.track, self.channel.id, sc.encoded)
-
- if type.result:
- result = Future(exception=SessionException)
- self.results[self.sender.next_id] = result
-
- self.send(seg)
-
- log.debug("SENT %s %s %s", seg.id, hdr, cmd)
-
- if message != None:
- if message.headers != None:
- sc = StringCodec(self.spec)
- for st in message.headers:
- sc.write_struct32(st)
- seg = Segment(False, message.body == None, self.spec["segment_type.header"].value,
- type.track, self.channel.id, sc.encoded)
- self.send(seg)
- if message.body != None:
- seg = Segment(False, True, self.spec["segment_type.body"].value,
- type.track, self.channel.id, message.body)
- self.send(seg)
- msg.debug("SENT %s", message)
-
- if type.result:
- if self.auto_sync:
- return result.get(self.timeout)
- else:
- return result
- elif self.auto_sync:
- self.sync(self.timeout)
-
- def received(self, seg):
- self.receiver.received(seg)
- if seg.first:
- assert self.assembly == None
- self.assembly = []
- self.assembly.append(seg)
- if seg.last:
- self.dispatch(self.assembly)
- self.assembly = None
-
- def dispatch(self, assembly):
- segments = assembly[:]
-
- hdr, cmd = assembly.pop(0).decode(self.spec)
- log.debug("RECV %s %s %s", cmd.id, hdr, cmd)
-
- args = []
-
- for st in cmd._type.segments:
- if assembly:
- seg = assembly[0]
- if seg.type == st.segment_type:
- args.append(seg.decode(self.spec))
- assembly.pop(0)
- continue
- args.append(None)
-
- assert len(assembly) == 0
-
- attr = cmd._type.qname.replace(".", "_")
- result = getattr(self.delegate, attr)(cmd, *args)
-
- if cmd._type.result:
- self.execution_result(cmd.id, result)
-
- if result is not INCOMPLETE:
- for seg in segments:
- self.receiver.completed(seg)
- # XXX: don't forget to obey sync for manual completion as well
- if hdr.sync:
- self.channel.session_completed(self.receiver._completed)
-
- def send(self, seg):
- self.sender.send(seg)
-
- def __str__(self):
- return '<Session: %s, %s>' % (self.name, self.channel)
-
- def __repr__(self):
- return str(self)
-
-class Receiver:
-
- def __init__(self, session):
- self.session = session
- self.next_id = None
- self.next_offset = None
- self._completed = RangedSet()
-
- def received(self, seg):
- if self.next_id == None or self.next_offset == None:
- raise Exception("todo")
- seg.id = self.next_id
- seg.offset = self.next_offset
- if seg.last:
- self.next_id += 1
- self.next_offset = 0
- else:
- self.next_offset += len(seg.payload)
-
- def completed(self, seg):
- if seg.id == None:
- raise ValueError("cannot complete unidentified segment")
- if seg.last:
- self._completed.add(seg.id)
-
- def known_completed(self, commands):
- completed = RangedSet()
- for c in self._completed.ranges:
- for kc in commands.ranges:
- if c.lower in kc and c.upper in kc:
- break
- else:
- completed.add_range(c)
- self._completed = completed
-
-class Sender:
-
- def __init__(self, session):
- self.session = session
- self.next_id = serial(0)
- self.next_offset = 0
- self.segments = []
- self._completed = RangedSet()
-
- def send(self, seg):
- seg.id = self.next_id
- seg.offset = self.next_offset
- if seg.last:
- self.next_id += 1
- self.next_offset = 0
- else:
- self.next_offset += len(seg.payload)
- self.segments.append(seg)
- if self.session.send_id:
- self.session.send_id = False
- self.session.channel.session_command_point(seg.id, seg.offset)
- self.session.channel.connection.write_segment(seg)
-
- def completed(self, commands):
- idx = 0
- while idx < len(self.segments):
- seg = self.segments[idx]
- if seg.id in commands:
- del self.segments[idx]
- else:
- idx += 1
- for range in commands.ranges:
- self._completed.add(range.lower, range.upper)
-
-class Incoming(Queue):
-
- def __init__(self, session, destination):
- Queue.__init__(self)
- self.session = session
- self.destination = destination
-
- def start(self):
- for unit in self.session.credit_unit.values():
- self.session.message_flow(self.destination, unit, 0xFFFFFFFF)
-
- def stop(self):
- self.session.message_cancel(self.destination)
- self.listen(None)
-
-class Delegate:
-
- def __init__(self, session):
- self.session = session
-
- #XXX: do something with incoming accepts
- def message_accept(self, ma): None
-
- def execution_result(self, er):
- future = self.session.results.pop(er.command_id)
- future.set(er.value)
-
- def execution_exception(self, ex):
- self.session.exceptions.append(ex)
-
-class Client(Delegate):
-
- def message_transfer(self, cmd, headers, body):
- m = Message(body)
- m.headers = headers
- m.id = cmd.id
- messages = self.session.incoming(cmd.destination)
- messages.put(m)
- msg.debug("RECV %s", m)
- return INCOMPLETE