diff options
Diffstat (limited to 'qpid/python/qpid/peer.py')
-rw-r--r-- | qpid/python/qpid/peer.py | 466 |
1 files changed, 466 insertions, 0 deletions
diff --git a/qpid/python/qpid/peer.py b/qpid/python/qpid/peer.py new file mode 100644 index 0000000000..95055cc014 --- /dev/null +++ b/qpid/python/qpid/peer.py @@ -0,0 +1,466 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +""" +This module contains a skeletal peer implementation useful for +implementing an AMQP server, client, or proxy. The peer implementation +sorts incoming frames to their intended channels, and dispatches +incoming method frames to a delegate. +""" + +import thread, threading, traceback, socket, sys, logging +from connection08 import EOF, Method, Header, Body, Request, Response, VersionError +from message import Message +from queue import Queue, Closed as QueueClosed +from content import Content +from cStringIO import StringIO +from time import time +from exceptions import Closed + +class Sequence: + + def __init__(self, start, step = 1): + # we should keep start for wrap around + self._next = start + self.step = step + self.lock = thread.allocate_lock() + + def next(self): + self.lock.acquire() + try: + result = self._next + self._next += self.step + return result + finally: + self.lock.release() + +class Peer: + + def __init__(self, conn, delegate, channel_factory=None): + self.conn = conn + self.delegate = delegate + self.outgoing = Queue(0) + self.work = Queue(0) + self.channels = {} + self.lock = thread.allocate_lock() + if channel_factory: + self.channel_factory = channel_factory + else: + self.channel_factory = Channel + + def channel(self, id): + self.lock.acquire() + try: + try: + ch = self.channels[id] + except KeyError: + ch = self.channel_factory(id, self.outgoing, self.conn.spec) + self.channels[id] = ch + finally: + self.lock.release() + return ch + + def start(self): + thread.start_new_thread(self.writer, ()) + thread.start_new_thread(self.reader, ()) + thread.start_new_thread(self.worker, ()) + + def fatal(self, message=None): + """Call when an unexpected exception occurs that will kill a thread.""" + if message: print >> sys.stderr, message + self.closed("Fatal error: %s\n%s" % (message or "", traceback.format_exc())) + + def reader(self): + try: + while True: + try: + frame = self.conn.read() + except EOF, e: + self.work.close() + break + ch = self.channel(frame.channel) + ch.receive(frame, self.work) + except VersionError, e: + self.closed(e) + except: + self.fatal() + + def closed(self, reason): + # We must close the delegate first because closing channels + # may wake up waiting threads and we don't want them to see + # the delegate as open. + self.delegate.closed(reason) + for ch in self.channels.values(): + ch.closed(reason) + + def writer(self): + try: + while True: + try: + message = self.outgoing.get() + self.conn.write(message) + except socket.error, e: + self.closed(e) + break + self.conn.flush() + except: + self.fatal() + + def worker(self): + try: + while True: + queue = self.work.get() + frame = queue.get() + channel = self.channel(frame.channel) + if frame.method_type.content: + content = read_content(queue) + else: + content = None + + self.delegate(channel, Message(channel, frame, content)) + except QueueClosed: + self.closed("worker closed") + except: + self.fatal() + +class Requester: + + def __init__(self, writer): + self.write = writer + self.sequence = Sequence(1) + self.mark = 0 + # request_id -> listener + self.outstanding = {} + + def request(self, method, listener, content = None): + frame = Request(self.sequence.next(), self.mark, method) + self.outstanding[frame.id] = listener + self.write(frame, content) + + def receive(self, channel, frame): + listener = self.outstanding.pop(frame.request_id) + listener(channel, frame) + +class Responder: + + def __init__(self, writer): + self.write = writer + self.sequence = Sequence(1) + + def respond(self, method, batch, request): + if isinstance(request, Method): + self.write(method) + else: + # allow batching from frame at either end + if batch<0: + frame = Response(self.sequence.next(), request.id+batch, -batch, method) + else: + frame = Response(self.sequence.next(), request.id, batch, method) + self.write(frame) + +class Channel: + + def __init__(self, id, outgoing, spec): + self.id = id + self.outgoing = outgoing + self.spec = spec + self.incoming = Queue(0) + self.responses = Queue(0) + self.queue = None + self._closed = False + self.reason = None + + self.requester = Requester(self.write) + self.responder = Responder(self.write) + + self.completion = OutgoingCompletion() + self.incoming_completion = IncomingCompletion(self) + self.futures = {} + self.control_queue = Queue(0)#used for incoming methods that appas may want to handle themselves + + self.invoker = self.invoke_method + self.use_execution_layer = (spec.major == 0 and spec.minor == 10) or (spec.major == 99 and spec.minor == 0) + self.synchronous = True + + def closed(self, reason): + if self._closed: + return + self._closed = True + self.reason = reason + self.incoming.close() + self.responses.close() + self.completion.close() + self.incoming_completion.reset() + for f in self.futures.values(): + f.put_response(self, reason) + + def write(self, frame, content = None): + if self._closed: + raise Closed(self.reason) + frame.channel = self.id + self.outgoing.put(frame) + if (isinstance(frame, (Method, Request)) + and content == None + and frame.method_type.content): + content = Content() + if content != None: + self.write_content(frame.method_type.klass, content) + + def write_content(self, klass, content): + header = Header(klass, content.weight(), content.size(), content.properties) + self.write(header) + for child in content.children: + self.write_content(klass, child) + # should split up if content.body exceeds max frame size + if content.body: + self.write(Body(content.body)) + + def receive(self, frame, work): + if isinstance(frame, Method): + if frame.method.response: + self.queue = self.responses + else: + self.queue = self.incoming + work.put(self.incoming) + elif isinstance(frame, Request): + self.queue = self.incoming + work.put(self.incoming) + elif isinstance(frame, Response): + self.requester.receive(self, frame) + if frame.method_type.content: + self.queue = self.responses + return + self.queue.put(frame) + + def queue_response(self, channel, frame): + channel.responses.put(frame.method) + + def request(self, method, listener, content = None): + self.requester.request(method, listener, content) + + def respond(self, method, batch, request): + self.responder.respond(method, batch, request) + + def invoke(self, type, args, kwargs): + if (type.klass.name in ["channel", "session"]) and (type.name in ["close", "open", "closed"]): + self.completion.reset() + self.incoming_completion.reset() + self.completion.next_command(type) + + content = kwargs.pop("content", None) + frame = Method(type, type.arguments(*args, **kwargs)) + return self.invoker(frame, content) + + # used for 0-9 + def invoke_reliable(self, frame, content = None): + if not self.synchronous: + future = Future() + self.request(frame, future.put_response, content) + if not frame.method.responses: return None + else: return future + + self.request(frame, self.queue_response, content) + if not frame.method.responses: + if self.use_execution_layer and frame.method_type.is_l4_command(): + self.execution_sync() + self.completion.wait() + if self._closed: + raise Closed(self.reason) + return None + try: + resp = self.responses.get() + if resp.method_type.content: + return Message(self, resp, read_content(self.responses)) + else: + return Message(self, resp) + except QueueClosed, e: + if self._closed: + raise Closed(self.reason) + else: + raise e + + # used for 0-8 and 0-10 + def invoke_method(self, frame, content = None): + if frame.method.result: + cmd_id = self.completion.command_id + future = Future() + self.futures[cmd_id] = future + + self.write(frame, content) + + try: + # here we depend on all nowait fields being named nowait + f = frame.method.fields.byname["nowait"] + nowait = frame.args[frame.method.fields.index(f)] + except KeyError: + nowait = False + + try: + if not nowait and frame.method.responses: + resp = self.responses.get() + if resp.method.content: + content = read_content(self.responses) + else: + content = None + if resp.method in frame.method.responses: + return Message(self, resp, content) + else: + raise ValueError(resp) + elif frame.method.result: + if self.synchronous: + fr = future.get_response(timeout=10) + if self._closed: + raise Closed(self.reason) + return fr + else: + return future + elif self.synchronous and not frame.method.response \ + and self.use_execution_layer and frame.method.is_l4_command(): + self.execution_sync() + completed = self.completion.wait(timeout=10) + if self._closed: + raise Closed(self.reason) + if not completed: + self.closed("Timed-out waiting for completion of %s" % frame) + except QueueClosed, e: + if self._closed: + raise Closed(self.reason) + else: + raise e + + def __getattr__(self, name): + type = self.spec.method(name) + if type == None: raise AttributeError(name) + method = lambda *args, **kwargs: self.invoke(type, args, kwargs) + self.__dict__[name] = method + return method + +def read_content(queue): + header = queue.get() + children = [] + for i in range(header.weight): + children.append(read_content(queue)) + buf = StringIO() + eof = header.eof + while not eof: + body = queue.get() + eof = body.eof + content = body.content + buf.write(content) + return Content(buf.getvalue(), children, header.properties.copy()) + +class Future: + def __init__(self): + self.completed = threading.Event() + + def put_response(self, channel, response): + self.response = response + self.completed.set() + + def get_response(self, timeout=None): + self.completed.wait(timeout) + if self.completed.isSet(): + return self.response + else: + return None + + def is_complete(self): + return self.completed.isSet() + +class OutgoingCompletion: + """ + Manages completion of outgoing commands i.e. command sent by this peer + """ + + def __init__(self): + self.condition = threading.Condition() + + #todo, implement proper wraparound + self.sequence = Sequence(0) #issues ids for outgoing commands + self.command_id = -1 #last issued id + self.mark = -1 #commands up to this mark are known to be complete + self._closed = False + + def next_command(self, method): + #the following test is a hack until the track/sub-channel is available + if method.is_l4_command(): + self.command_id = self.sequence.next() + + def reset(self): + self.sequence = Sequence(0) #reset counter + + def close(self): + self.reset() + self.condition.acquire() + try: + self._closed = True + self.condition.notifyAll() + finally: + self.condition.release() + + def complete(self, mark): + self.condition.acquire() + try: + self.mark = mark + #print "set mark to %s [%s] " % (self.mark, self) + self.condition.notifyAll() + finally: + self.condition.release() + + def wait(self, point_of_interest=-1, timeout=None): + if point_of_interest == -1: point_of_interest = self.command_id + start_time = time() + remaining = timeout + self.condition.acquire() + try: + while not self._closed and point_of_interest > self.mark: + #print "waiting for %s, mark = %s [%s]" % (point_of_interest, self.mark, self) + self.condition.wait(remaining) + if not self._closed and point_of_interest > self.mark and timeout: + if (start_time + timeout) < time(): break + else: remaining = timeout - (time() - start_time) + finally: + self.condition.release() + return point_of_interest <= self.mark + +class IncomingCompletion: + """ + Manages completion of incoming commands i.e. command received by this peer + """ + + def __init__(self, channel): + self.sequence = Sequence(0) #issues ids for incoming commands + self.mark = -1 #id of last command of whose completion notification was sent to the other peer + self.channel = channel + + def reset(self): + self.sequence = Sequence(0) #reset counter + + def complete(self, mark, cumulative=True): + if cumulative: + if mark > self.mark: + self.mark = mark + self.channel.execution_complete(cumulative_execution_mark=self.mark) + else: + #TODO: record and manage the ranges properly + range = [mark, mark] + if (self.mark == -1):#hack until wraparound is implemented + self.channel.execution_complete(cumulative_execution_mark=0xFFFFFFFFL, ranged_execution_set=range) + else: + self.channel.execution_complete(cumulative_execution_mark=self.mark, ranged_execution_set=range) |