summaryrefslogtreecommitdiff
path: root/python/qpid/peer.py
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2007-11-07 22:30:40 +0000
committerRafael H. Schloming <rhs@apache.org>2007-11-07 22:30:40 +0000
commitbf7f244c28c899b90789e77eda5f585edda5bcd9 (patch)
treeac0225f4d76fe078bc089c6a70efbaa3211eb51e /python/qpid/peer.py
parent5c952db3bb1fad90ea4f5de985453d13dce45e7c (diff)
downloadqpid-python-bf7f244c28c899b90789e77eda5f585edda5bcd9.tar.gz
python API updates
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@592927 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/qpid/peer.py')
-rw-r--r--python/qpid/peer.py53
1 files changed, 27 insertions, 26 deletions
diff --git a/python/qpid/peer.py b/python/qpid/peer.py
index b734031798..376d81e184 100644
--- a/python/qpid/peer.py
+++ b/python/qpid/peer.py
@@ -51,14 +51,17 @@ class Sequence:
class Peer:
- def __init__(self, conn, delegate, channel_callback=None):
+ def __init__(self, conn, delegate, channel_factory=None):
self.conn = conn
self.delegate = delegate
self.outgoing = Queue(0)
self.work = Queue(0)
self.channels = {}
self.lock = thread.allocate_lock()
- self.channel_callback = channel_callback #notified when channels are created
+ if channel_factory:
+ self.channel_factory = channel_factory
+ else:
+ self.channel_factory = Channel
def channel(self, id):
self.lock.acquire()
@@ -66,10 +69,8 @@ class Peer:
try:
ch = self.channels[id]
except KeyError:
- ch = Channel(id, self.outgoing, self.conn.spec)
+ ch = self.channel_factory(id, self.outgoing, self.conn.spec)
self.channels[id] = ch
- if self.channel_callback:
- self.channel_callback(ch)
finally:
self.lock.release()
return ch
@@ -82,7 +83,7 @@ class Peer:
def fatal(self, message=None):
"""Call when an unexpected exception occurs that will kill a thread."""
if message: print >> sys.stderr, message
- self.close("Fatal error: %s\n%s" % (message or "", traceback.format_exc()))
+ self.closed("Fatal error: %s\n%s" % (message or "", traceback.format_exc()))
def reader(self):
try:
@@ -97,13 +98,13 @@ class Peer:
except:
self.fatal()
- def close(self, reason):
+ def closed(self, reason):
# We must close the delegate first because closing channels
# may wake up waiting threads and we don't want them to see
# the delegate as open.
- self.delegate.close(reason)
+ self.delegate.closed(reason)
for ch in self.channels.values():
- ch.close(reason)
+ ch.closed(reason)
def writer(self):
try:
@@ -112,7 +113,7 @@ class Peer:
message = self.outgoing.get()
self.conn.write(message)
except socket.error, e:
- self.close(e)
+ self.closed(e)
break
self.conn.flush()
except:
@@ -131,7 +132,7 @@ class Peer:
self.delegate(channel, Message(channel, frame, content))
except QueueClosed:
- self.close("worker closed")
+ self.closed("worker closed")
except:
self.fatal()
@@ -181,7 +182,7 @@ class Channel:
self.incoming = Queue(0)
self.responses = Queue(0)
self.queue = None
- self.closed = False
+ self._closed = False
self.reason = None
self.requester = Requester(self.write)
@@ -200,10 +201,10 @@ class Channel:
self.use_execution_layer = (spec.major == 0 and spec.minor == 10)
self.synchronous = True
- def close(self, reason):
- if self.closed:
+ def closed(self, reason):
+ if self._closed:
return
- self.closed = True
+ self._closed = True
self.reason = reason
self.incoming.close()
self.responses.close()
@@ -213,7 +214,7 @@ class Channel:
f.put_response(self, reason)
def write(self, frame, content = None):
- if self.closed:
+ if self._closed:
raise Closed(self.reason)
frame.channel = self.id
self.outgoing.put(frame)
@@ -283,7 +284,7 @@ class Channel:
if self.use_execution_layer and frame.method_type.is_l4_command():
self.execution_sync()
self.completion.wait()
- if self.closed:
+ if self._closed:
raise Closed(self.reason)
return None
try:
@@ -293,7 +294,7 @@ class Channel:
else:
return Message(self, resp)
except QueueClosed, e:
- if self.closed:
+ if self._closed:
raise Closed(self.reason)
else:
raise e
@@ -328,7 +329,7 @@ class Channel:
elif frame.method.result:
if self.synchronous:
fr = future.get_response(timeout=10)
- if self.closed:
+ if self._closed:
raise Closed(self.reason)
return fr
else:
@@ -337,12 +338,12 @@ class Channel:
and self.use_execution_layer and frame.method.is_l4_command():
self.execution_sync()
completed = self.completion.wait(timeout=10)
- if self.closed:
+ if self._closed:
raise Closed(self.reason)
if not completed:
- self.close("Timed-out waiting for completion of %s" % frame)
+ self.closed("Timed-out waiting for completion of %s" % frame)
except QueueClosed, e:
- if self.closed:
+ if self._closed:
raise Closed(self.reason)
else:
raise e
@@ -399,7 +400,7 @@ class OutgoingCompletion:
self.sequence = Sequence(0) #issues ids for outgoing commands
self.command_id = -1 #last issued id
self.mark = -1 #commands up to this mark are known to be complete
- self.closed = False
+ self._closed = False
def next_command(self, method):
#the following test is a hack until the track/sub-channel is available
@@ -413,7 +414,7 @@ class OutgoingCompletion:
self.reset()
self.condition.acquire()
try:
- self.closed = True
+ self._closed = True
self.condition.notifyAll()
finally:
self.condition.release()
@@ -433,10 +434,10 @@ class OutgoingCompletion:
remaining = timeout
self.condition.acquire()
try:
- while not self.closed and point_of_interest > self.mark:
+ while not self._closed and point_of_interest > self.mark:
#print "waiting for %s, mark = %s [%s]" % (point_of_interest, self.mark, self)
self.condition.wait(remaining)
- if not self.closed and point_of_interest > self.mark and timeout:
+ if not self._closed and point_of_interest > self.mark and timeout:
if (start_time + timeout) < time(): break
else: remaining = timeout - (time() - start_time)
finally: