diff options
author | Rafael H. Schloming <rhs@apache.org> | 2008-03-04 20:03:09 +0000 |
---|---|---|
committer | Rafael H. Schloming <rhs@apache.org> | 2008-03-04 20:03:09 +0000 |
commit | 75f598b22ea4573cff2d47fdd689b85cee5dd88d (patch) | |
tree | 964aa4463e2140c5040dd36137a49ab9c261f19a /python/qpid/session.py | |
parent | 24435b9c62976e0a4c0857f86057d3c93389b79f (diff) | |
download | qpid-python-75f598b22ea4573cff2d47fdd689b85cee5dd88d.tar.gz |
import of in-process 0-10 final python client
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@633610 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/qpid/session.py')
-rw-r--r-- | python/qpid/session.py | 208 |
1 files changed, 208 insertions, 0 deletions
diff --git a/python/qpid/session.py b/python/qpid/session.py new file mode 100644 index 0000000000..2e5f47b63e --- /dev/null +++ b/python/qpid/session.py @@ -0,0 +1,208 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from threading import Event +from invoker import Invoker +from datatypes import RangeSet, Struct, Future +from codec010 import StringCodec +from assembler import Segment + +class SessionDetached(Exception): pass + +def client(*args): + return Client(*args) + +def server(*args): + return Server(*args) + +class Session(Invoker): + + def __init__(self, name, spec, sync=True, timeout=10, delegate=client): + self.name = name + self.spec = spec + self.sync = sync + self.timeout = timeout + self.channel = None + self.opened = Event() + self.closed = Event() + self.receiver = Receiver(self) + self.sender = Sender(self) + self.delegate = delegate(self) + self.send_id = True + self.results = {} + + def close(self, timeout=None): + self.channel.session_detach(self.name) + self.closed.wait(timeout=timeout) + + def resolve_method(self, name): + cmd = self.spec.instructions.get(name) + if cmd is not None and cmd.track == self.spec["track.command"].value: + return cmd + else: + return None + + def invoke(self, type, args, kwargs): + if self.channel == None: + raise SessionDetached() + + if type.segments: + if len(args) == len(type.fields) + 1: + message = args[-1] + args = args[:-1] + else: + message = kwargs.pop("message", None) + else: + message = None + + cmd = type.new(args, kwargs) + sc = StringCodec(self.spec) + sc.write_command(type, cmd) + + seg = Segment(True, (message == None or + (message.headers == None and message.body == None)), + type.segment_type, type.track, self.channel.id, sc.encoded) + + if type.result: + result = Future() + self.results[self.sender.next_id] = result + + self.send(seg) + + if message != None: + if message.headers != None: + sc = StringCodec(self.spec) + for st in message.headers: + sc.write_struct32(st.type, st) + seg = Segment(False, message.body == None, self.spec["segment_type.header"].value, + type.track, self.channel.id, sc.encoded) + self.send(seg) + if message.body != None: + seg = Segment(False, True, self.spec["segment_type.body"].value, + type.track, self.channel.id, message.body) + self.send(seg) + + if type.result: + if self.sync: + return result.get(self.timeout) + else: + return result + + def received(self, seg): + self.receiver.received(seg) + if seg.type == self.spec["segment_type.command"].value: + cmd = seg.decode(self.spec) + attr = cmd.type.qname.replace(".", "_") + result = getattr(self.delegate, attr)(cmd) + if cmd.type.result: + self.execution_result(seg.id, result) + elif seg.type == self.spec["segment_type.header"].value: + self.delegate.header(seg.decode(self.spec)) + elif seg.type == self.spec["segment_type.body"].value: + self.delegate.body(seg.decode(self.spec)) + else: + raise ValueError("unknown segment type: %s" % seg.type) + self.receiver.completed(seg) + + def send(self, seg): + self.sender.send(seg) + + def __str__(self): + return '<Session: %s, %s>' % (self.name, self.channel) + + def __repr__(self): + return str(self) + +class Receiver: + + def __init__(self, session): + self.session = session + self.next_id = None + self.next_offset = None + self._completed = RangeSet() + + def received(self, seg): + if self.next_id == None or self.next_offset == None: + raise Exception("todo") + seg.id = self.next_id + seg.offset = self.next_offset + if seg.last: + self.next_id += 1 + self.next_offset = 0 + else: + self.next_offset += len(seg.payload) + + def completed(self, seg): + if seg.id == None: + raise ValueError("cannot complete unidentified segment") + if seg.last: + self._completed.add(seg.id) + +class Sender: + + def __init__(self, session): + self.session = session + self.next_id = 0 + self.next_offset = 0 + self.segments = [] + + def send(self, seg): + seg.id = self.next_id + seg.offset = self.next_offset + if seg.last: + self.next_id += 1 + self.next_offset = 0 + else: + self.next_offset += len(seg.payload) + self.segments.append(seg) + if self.session.send_id: + self.session.send_id = False + self.session.channel.session_command_point(seg.id, seg.offset) + self.session.channel.connection.write_segment(seg) + + def completed(self, commands): + idx = 0 + while idx < len(self.segments): + seg = self.segments[idx] + if seg.id in commands: + del self.segments[idx] + else: + idx += 1 + +from queue import Queue, Closed, Empty + +class Delegate: + + def __init__(self, session): + self.session = session + + def execution_result(self, er): + future = self.session.results[er.command_id] + future.set(er.value) + +class Client(Delegate): + + def message_transfer(self, cmd): + print "TRANSFER:", cmd + + def header(self, hdr): + print "HEADER:", hdr + + def body(self, seg): + print "BODY:", seg |