summaryrefslogtreecommitdiff
path: root/qpid/python/qpid/peer.py
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/python/qpid/peer.py')
-rw-r--r--qpid/python/qpid/peer.py466
1 files changed, 466 insertions, 0 deletions
diff --git a/qpid/python/qpid/peer.py b/qpid/python/qpid/peer.py
new file mode 100644
index 0000000000..95055cc014
--- /dev/null
+++ b/qpid/python/qpid/peer.py
@@ -0,0 +1,466 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+"""
+This module contains a skeletal peer implementation useful for
+implementing an AMQP server, client, or proxy. The peer implementation
+sorts incoming frames to their intended channels, and dispatches
+incoming method frames to a delegate.
+"""
+
+import thread, threading, traceback, socket, sys, logging
+from connection08 import EOF, Method, Header, Body, Request, Response, VersionError
+from message import Message
+from queue import Queue, Closed as QueueClosed
+from content import Content
+from cStringIO import StringIO
+from time import time
+from exceptions import Closed
+
+class Sequence:
+
+ def __init__(self, start, step = 1):
+ # we should keep start for wrap around
+ self._next = start
+ self.step = step
+ self.lock = thread.allocate_lock()
+
+ def next(self):
+ self.lock.acquire()
+ try:
+ result = self._next
+ self._next += self.step
+ return result
+ finally:
+ self.lock.release()
+
+class Peer:
+
+ def __init__(self, conn, delegate, channel_factory=None):
+ self.conn = conn
+ self.delegate = delegate
+ self.outgoing = Queue(0)
+ self.work = Queue(0)
+ self.channels = {}
+ self.lock = thread.allocate_lock()
+ if channel_factory:
+ self.channel_factory = channel_factory
+ else:
+ self.channel_factory = Channel
+
+ def channel(self, id):
+ self.lock.acquire()
+ try:
+ try:
+ ch = self.channels[id]
+ except KeyError:
+ ch = self.channel_factory(id, self.outgoing, self.conn.spec)
+ self.channels[id] = ch
+ finally:
+ self.lock.release()
+ return ch
+
+ def start(self):
+ thread.start_new_thread(self.writer, ())
+ thread.start_new_thread(self.reader, ())
+ thread.start_new_thread(self.worker, ())
+
+ def fatal(self, message=None):
+ """Call when an unexpected exception occurs that will kill a thread."""
+ if message: print >> sys.stderr, message
+ self.closed("Fatal error: %s\n%s" % (message or "", traceback.format_exc()))
+
+ def reader(self):
+ try:
+ while True:
+ try:
+ frame = self.conn.read()
+ except EOF, e:
+ self.work.close()
+ break
+ ch = self.channel(frame.channel)
+ ch.receive(frame, self.work)
+ except VersionError, e:
+ self.closed(e)
+ except:
+ self.fatal()
+
+ def closed(self, reason):
+ # We must close the delegate first because closing channels
+ # may wake up waiting threads and we don't want them to see
+ # the delegate as open.
+ self.delegate.closed(reason)
+ for ch in self.channels.values():
+ ch.closed(reason)
+
+ def writer(self):
+ try:
+ while True:
+ try:
+ message = self.outgoing.get()
+ self.conn.write(message)
+ except socket.error, e:
+ self.closed(e)
+ break
+ self.conn.flush()
+ except:
+ self.fatal()
+
+ def worker(self):
+ try:
+ while True:
+ queue = self.work.get()
+ frame = queue.get()
+ channel = self.channel(frame.channel)
+ if frame.method_type.content:
+ content = read_content(queue)
+ else:
+ content = None
+
+ self.delegate(channel, Message(channel, frame, content))
+ except QueueClosed:
+ self.closed("worker closed")
+ except:
+ self.fatal()
+
+class Requester:
+
+ def __init__(self, writer):
+ self.write = writer
+ self.sequence = Sequence(1)
+ self.mark = 0
+ # request_id -> listener
+ self.outstanding = {}
+
+ def request(self, method, listener, content = None):
+ frame = Request(self.sequence.next(), self.mark, method)
+ self.outstanding[frame.id] = listener
+ self.write(frame, content)
+
+ def receive(self, channel, frame):
+ listener = self.outstanding.pop(frame.request_id)
+ listener(channel, frame)
+
+class Responder:
+
+ def __init__(self, writer):
+ self.write = writer
+ self.sequence = Sequence(1)
+
+ def respond(self, method, batch, request):
+ if isinstance(request, Method):
+ self.write(method)
+ else:
+ # allow batching from frame at either end
+ if batch<0:
+ frame = Response(self.sequence.next(), request.id+batch, -batch, method)
+ else:
+ frame = Response(self.sequence.next(), request.id, batch, method)
+ self.write(frame)
+
+class Channel:
+
+ def __init__(self, id, outgoing, spec):
+ self.id = id
+ self.outgoing = outgoing
+ self.spec = spec
+ self.incoming = Queue(0)
+ self.responses = Queue(0)
+ self.queue = None
+ self._closed = False
+ self.reason = None
+
+ self.requester = Requester(self.write)
+ self.responder = Responder(self.write)
+
+ self.completion = OutgoingCompletion()
+ self.incoming_completion = IncomingCompletion(self)
+ self.futures = {}
+ self.control_queue = Queue(0)#used for incoming methods that appas may want to handle themselves
+
+ self.invoker = self.invoke_method
+ self.use_execution_layer = (spec.major == 0 and spec.minor == 10) or (spec.major == 99 and spec.minor == 0)
+ self.synchronous = True
+
+ def closed(self, reason):
+ if self._closed:
+ return
+ self._closed = True
+ self.reason = reason
+ self.incoming.close()
+ self.responses.close()
+ self.completion.close()
+ self.incoming_completion.reset()
+ for f in self.futures.values():
+ f.put_response(self, reason)
+
+ def write(self, frame, content = None):
+ if self._closed:
+ raise Closed(self.reason)
+ frame.channel = self.id
+ self.outgoing.put(frame)
+ if (isinstance(frame, (Method, Request))
+ and content == None
+ and frame.method_type.content):
+ content = Content()
+ if content != None:
+ self.write_content(frame.method_type.klass, content)
+
+ def write_content(self, klass, content):
+ header = Header(klass, content.weight(), content.size(), content.properties)
+ self.write(header)
+ for child in content.children:
+ self.write_content(klass, child)
+ # should split up if content.body exceeds max frame size
+ if content.body:
+ self.write(Body(content.body))
+
+ def receive(self, frame, work):
+ if isinstance(frame, Method):
+ if frame.method.response:
+ self.queue = self.responses
+ else:
+ self.queue = self.incoming
+ work.put(self.incoming)
+ elif isinstance(frame, Request):
+ self.queue = self.incoming
+ work.put(self.incoming)
+ elif isinstance(frame, Response):
+ self.requester.receive(self, frame)
+ if frame.method_type.content:
+ self.queue = self.responses
+ return
+ self.queue.put(frame)
+
+ def queue_response(self, channel, frame):
+ channel.responses.put(frame.method)
+
+ def request(self, method, listener, content = None):
+ self.requester.request(method, listener, content)
+
+ def respond(self, method, batch, request):
+ self.responder.respond(method, batch, request)
+
+ def invoke(self, type, args, kwargs):
+ if (type.klass.name in ["channel", "session"]) and (type.name in ["close", "open", "closed"]):
+ self.completion.reset()
+ self.incoming_completion.reset()
+ self.completion.next_command(type)
+
+ content = kwargs.pop("content", None)
+ frame = Method(type, type.arguments(*args, **kwargs))
+ return self.invoker(frame, content)
+
+ # used for 0-9
+ def invoke_reliable(self, frame, content = None):
+ if not self.synchronous:
+ future = Future()
+ self.request(frame, future.put_response, content)
+ if not frame.method.responses: return None
+ else: return future
+
+ self.request(frame, self.queue_response, content)
+ if not frame.method.responses:
+ if self.use_execution_layer and frame.method_type.is_l4_command():
+ self.execution_sync()
+ self.completion.wait()
+ if self._closed:
+ raise Closed(self.reason)
+ return None
+ try:
+ resp = self.responses.get()
+ if resp.method_type.content:
+ return Message(self, resp, read_content(self.responses))
+ else:
+ return Message(self, resp)
+ except QueueClosed, e:
+ if self._closed:
+ raise Closed(self.reason)
+ else:
+ raise e
+
+ # used for 0-8 and 0-10
+ def invoke_method(self, frame, content = None):
+ if frame.method.result:
+ cmd_id = self.completion.command_id
+ future = Future()
+ self.futures[cmd_id] = future
+
+ self.write(frame, content)
+
+ try:
+ # here we depend on all nowait fields being named nowait
+ f = frame.method.fields.byname["nowait"]
+ nowait = frame.args[frame.method.fields.index(f)]
+ except KeyError:
+ nowait = False
+
+ try:
+ if not nowait and frame.method.responses:
+ resp = self.responses.get()
+ if resp.method.content:
+ content = read_content(self.responses)
+ else:
+ content = None
+ if resp.method in frame.method.responses:
+ return Message(self, resp, content)
+ else:
+ raise ValueError(resp)
+ elif frame.method.result:
+ if self.synchronous:
+ fr = future.get_response(timeout=10)
+ if self._closed:
+ raise Closed(self.reason)
+ return fr
+ else:
+ return future
+ elif self.synchronous and not frame.method.response \
+ and self.use_execution_layer and frame.method.is_l4_command():
+ self.execution_sync()
+ completed = self.completion.wait(timeout=10)
+ if self._closed:
+ raise Closed(self.reason)
+ if not completed:
+ self.closed("Timed-out waiting for completion of %s" % frame)
+ except QueueClosed, e:
+ if self._closed:
+ raise Closed(self.reason)
+ else:
+ raise e
+
+ def __getattr__(self, name):
+ type = self.spec.method(name)
+ if type == None: raise AttributeError(name)
+ method = lambda *args, **kwargs: self.invoke(type, args, kwargs)
+ self.__dict__[name] = method
+ return method
+
+def read_content(queue):
+ header = queue.get()
+ children = []
+ for i in range(header.weight):
+ children.append(read_content(queue))
+ buf = StringIO()
+ eof = header.eof
+ while not eof:
+ body = queue.get()
+ eof = body.eof
+ content = body.content
+ buf.write(content)
+ return Content(buf.getvalue(), children, header.properties.copy())
+
+class Future:
+ def __init__(self):
+ self.completed = threading.Event()
+
+ def put_response(self, channel, response):
+ self.response = response
+ self.completed.set()
+
+ def get_response(self, timeout=None):
+ self.completed.wait(timeout)
+ if self.completed.isSet():
+ return self.response
+ else:
+ return None
+
+ def is_complete(self):
+ return self.completed.isSet()
+
+class OutgoingCompletion:
+ """
+ Manages completion of outgoing commands i.e. command sent by this peer
+ """
+
+ def __init__(self):
+ self.condition = threading.Condition()
+
+ #todo, implement proper wraparound
+ self.sequence = Sequence(0) #issues ids for outgoing commands
+ self.command_id = -1 #last issued id
+ self.mark = -1 #commands up to this mark are known to be complete
+ self._closed = False
+
+ def next_command(self, method):
+ #the following test is a hack until the track/sub-channel is available
+ if method.is_l4_command():
+ self.command_id = self.sequence.next()
+
+ def reset(self):
+ self.sequence = Sequence(0) #reset counter
+
+ def close(self):
+ self.reset()
+ self.condition.acquire()
+ try:
+ self._closed = True
+ self.condition.notifyAll()
+ finally:
+ self.condition.release()
+
+ def complete(self, mark):
+ self.condition.acquire()
+ try:
+ self.mark = mark
+ #print "set mark to %s [%s] " % (self.mark, self)
+ self.condition.notifyAll()
+ finally:
+ self.condition.release()
+
+ def wait(self, point_of_interest=-1, timeout=None):
+ if point_of_interest == -1: point_of_interest = self.command_id
+ start_time = time()
+ remaining = timeout
+ self.condition.acquire()
+ try:
+ while not self._closed and point_of_interest > self.mark:
+ #print "waiting for %s, mark = %s [%s]" % (point_of_interest, self.mark, self)
+ self.condition.wait(remaining)
+ if not self._closed and point_of_interest > self.mark and timeout:
+ if (start_time + timeout) < time(): break
+ else: remaining = timeout - (time() - start_time)
+ finally:
+ self.condition.release()
+ return point_of_interest <= self.mark
+
+class IncomingCompletion:
+ """
+ Manages completion of incoming commands i.e. command received by this peer
+ """
+
+ def __init__(self, channel):
+ self.sequence = Sequence(0) #issues ids for incoming commands
+ self.mark = -1 #id of last command of whose completion notification was sent to the other peer
+ self.channel = channel
+
+ def reset(self):
+ self.sequence = Sequence(0) #reset counter
+
+ def complete(self, mark, cumulative=True):
+ if cumulative:
+ if mark > self.mark:
+ self.mark = mark
+ self.channel.execution_complete(cumulative_execution_mark=self.mark)
+ else:
+ #TODO: record and manage the ranges properly
+ range = [mark, mark]
+ if (self.mark == -1):#hack until wraparound is implemented
+ self.channel.execution_complete(cumulative_execution_mark=0xFFFFFFFFL, ranged_execution_set=range)
+ else:
+ self.channel.execution_complete(cumulative_execution_mark=self.mark, ranged_execution_set=range)