summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2007-11-07 22:30:40 +0000
committerRafael H. Schloming <rhs@apache.org>2007-11-07 22:30:40 +0000
commitbf7f244c28c899b90789e77eda5f585edda5bcd9 (patch)
treeac0225f4d76fe078bc089c6a70efbaa3211eb51e
parent5c952db3bb1fad90ea4f5de985453d13dce45e7c (diff)
downloadqpid-python-bf7f244c28c899b90789e77eda5f585edda5bcd9.tar.gz
python API updates
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@592927 13f79535-47bb-0310-9956-ffa450edef68
-rwxr-xr-xpython/hello-world19
-rw-r--r--python/qpid/client.py56
-rw-r--r--python/qpid/delegate.py2
-rw-r--r--python/qpid/peer.py53
4 files changed, 85 insertions, 45 deletions
diff --git a/python/hello-world b/python/hello-world
index d419fd988b..518992409e 100755
--- a/python/hello-world
+++ b/python/hello-world
@@ -5,20 +5,21 @@ from qpid.content import Content
client = Client("127.0.0.1", 5672)
client.start({"LOGIN": "guest", "PASSWORD": "guest"})
-ch = client.channel(1)
-ch.session_open()
-ch.queue_declare(queue="test")
-ch.queue_bind(exchange="amq.direct", queue="test", routing_key="test")
-#print ch.queue_query(queue="test")
-ch.message_subscribe(queue="test", destination="amq.direct")
-ch.message_flow("amq.direct", 0, 0xFFFFFFFF)
-ch.message_flow("amq.direct", 1, 0xFFFFFFFF)
+ssn = client.session()
+ssn.open()
+ssn.queue_declare(queue="test")
+ssn.queue_bind(exchange="amq.direct", queue="test", routing_key="test")
+#print ssn.queue_query(queue="test")
+ssn.message_subscribe(queue="test", destination="amq.direct")
+ssn.message_flow("amq.direct", 0, 0xFFFFFFFF)
+ssn.message_flow("amq.direct", 1, 0xFFFFFFFF)
msg = Content("hello world")
msg["content_type"] = "text/plain"
msg["routing_key"] = "test"
msg["reply_to"] = client.structs.reply_to("asdf", "fdsa")
msg["application_headers"] = {"x": 1, "y": 2, "z": "zee"}
-ch.message_transfer(destination="amq.direct", content=msg)
+ssn.message_transfer(destination="amq.direct", content=msg)
queue = client.queue("amq.direct")
msg = queue.get(timeout=10)
print msg
+ssn.close()
diff --git a/python/qpid/client.py b/python/qpid/client.py
index 44fd6c053f..13c3f13fe5 100644
--- a/python/qpid/client.py
+++ b/python/qpid/client.py
@@ -23,7 +23,7 @@ interacting with the server.
"""
import os, threading
-from peer import Peer, Closed
+from peer import Peer, Channel, Closed
from delegate import Delegate
from connection import Connection, Frame, connect
from spec import load
@@ -45,6 +45,7 @@ class Client:
raise EnvironmentError("environment variable AMQP_SPEC must be set")
self.spec = load(name)
self.structs = StructFactory(self.spec)
+ self.sessions = {}
self.mechanism = None
self.response = None
@@ -86,7 +87,7 @@ class Client:
self.socket = connect(self.host, self.port)
self.conn = Connection(self.socket, self.spec)
- self.peer = Peer(self.conn, ClientDelegate(self), self.opened)
+ self.peer = Peer(self.conn, ClientDelegate(self), Session)
self.conn.init()
self.peer.start()
@@ -94,10 +95,29 @@ class Client:
self.channel(0).connection_open(self.vhost)
def channel(self, id):
- return self.peer.channel(id)
+ self.lock.acquire()
+ try:
+ ssn = self.peer.channel(id)
+ ssn.client = self
+ self.sessions[id] = ssn
+ finally:
+ self.lock.release()
+ return ssn
- def opened(self, ch):
- ch.references = References()
+ def session(self):
+ self.lock.acquire()
+ try:
+ id = None
+ for i in xrange(1, 64*1024):
+ if not self.sessions.has_key(id):
+ id = i
+ break
+ finally:
+ self.lock.release()
+ if id == None:
+ raise RuntimeError("out of channels")
+ else:
+ return self.channel(id)
def close(self):
self.socket.close()
@@ -144,16 +164,16 @@ class ClientDelegate(Delegate):
msg.ok()
def channel_close(self, ch, msg):
- ch.close(msg)
+ ch.closed(msg)
def session_ack(self, ch, msg):
pass
def session_closed(self, ch, msg):
- ch.close(msg)
+ ch.closed(msg)
def connection_close(self, ch, msg):
- self.client.peer.close(msg)
+ self.client.peer.closed(msg)
def execution_complete(self, ch, msg):
ch.completion.complete(msg.cumulative_execution_mark)
@@ -162,7 +182,7 @@ class ClientDelegate(Delegate):
future = ch.futures[msg.command_id]
future.put_response(ch, msg.data)
- def close(self, reason):
+ def closed(self, reason):
self.client.closed = True
self.client.reason = reason
self.client.started.set()
@@ -185,3 +205,21 @@ class StructFactory:
def struct(self, name, *args, **kwargs):
return self.spec.struct(name, *args, **kwargs)
+
+class Session(Channel):
+
+ def __init__(self, *args):
+ Channel.__init__(self, *args)
+ self.references = References()
+ self.client = None
+
+ def open(self):
+ self.session_open()
+
+ def close(self):
+ self.session_close()
+ self.client.lock.acquire()
+ try:
+ del self.client.sessions[self.id]
+ finally:
+ self.client.lock.release()
diff --git a/python/qpid/delegate.py b/python/qpid/delegate.py
index 8f5033e485..c4c47592b4 100644
--- a/python/qpid/delegate.py
+++ b/python/qpid/delegate.py
@@ -49,5 +49,5 @@ class Delegate:
print >> sys.stderr, "Error in handler: %s\n\n%s" % \
(_handler_name(method), traceback.format_exc())
- def close(self, reason):
+ def closed(self, reason):
print "Connection closed: %s" % reason
diff --git a/python/qpid/peer.py b/python/qpid/peer.py
index b734031798..376d81e184 100644
--- a/python/qpid/peer.py
+++ b/python/qpid/peer.py
@@ -51,14 +51,17 @@ class Sequence:
class Peer:
- def __init__(self, conn, delegate, channel_callback=None):
+ def __init__(self, conn, delegate, channel_factory=None):
self.conn = conn
self.delegate = delegate
self.outgoing = Queue(0)
self.work = Queue(0)
self.channels = {}
self.lock = thread.allocate_lock()
- self.channel_callback = channel_callback #notified when channels are created
+ if channel_factory:
+ self.channel_factory = channel_factory
+ else:
+ self.channel_factory = Channel
def channel(self, id):
self.lock.acquire()
@@ -66,10 +69,8 @@ class Peer:
try:
ch = self.channels[id]
except KeyError:
- ch = Channel(id, self.outgoing, self.conn.spec)
+ ch = self.channel_factory(id, self.outgoing, self.conn.spec)
self.channels[id] = ch
- if self.channel_callback:
- self.channel_callback(ch)
finally:
self.lock.release()
return ch
@@ -82,7 +83,7 @@ class Peer:
def fatal(self, message=None):
"""Call when an unexpected exception occurs that will kill a thread."""
if message: print >> sys.stderr, message
- self.close("Fatal error: %s\n%s" % (message or "", traceback.format_exc()))
+ self.closed("Fatal error: %s\n%s" % (message or "", traceback.format_exc()))
def reader(self):
try:
@@ -97,13 +98,13 @@ class Peer:
except:
self.fatal()
- def close(self, reason):
+ def closed(self, reason):
# We must close the delegate first because closing channels
# may wake up waiting threads and we don't want them to see
# the delegate as open.
- self.delegate.close(reason)
+ self.delegate.closed(reason)
for ch in self.channels.values():
- ch.close(reason)
+ ch.closed(reason)
def writer(self):
try:
@@ -112,7 +113,7 @@ class Peer:
message = self.outgoing.get()
self.conn.write(message)
except socket.error, e:
- self.close(e)
+ self.closed(e)
break
self.conn.flush()
except:
@@ -131,7 +132,7 @@ class Peer:
self.delegate(channel, Message(channel, frame, content))
except QueueClosed:
- self.close("worker closed")
+ self.closed("worker closed")
except:
self.fatal()
@@ -181,7 +182,7 @@ class Channel:
self.incoming = Queue(0)
self.responses = Queue(0)
self.queue = None
- self.closed = False
+ self._closed = False
self.reason = None
self.requester = Requester(self.write)
@@ -200,10 +201,10 @@ class Channel:
self.use_execution_layer = (spec.major == 0 and spec.minor == 10)
self.synchronous = True
- def close(self, reason):
- if self.closed:
+ def closed(self, reason):
+ if self._closed:
return
- self.closed = True
+ self._closed = True
self.reason = reason
self.incoming.close()
self.responses.close()
@@ -213,7 +214,7 @@ class Channel:
f.put_response(self, reason)
def write(self, frame, content = None):
- if self.closed:
+ if self._closed:
raise Closed(self.reason)
frame.channel = self.id
self.outgoing.put(frame)
@@ -283,7 +284,7 @@ class Channel:
if self.use_execution_layer and frame.method_type.is_l4_command():
self.execution_sync()
self.completion.wait()
- if self.closed:
+ if self._closed:
raise Closed(self.reason)
return None
try:
@@ -293,7 +294,7 @@ class Channel:
else:
return Message(self, resp)
except QueueClosed, e:
- if self.closed:
+ if self._closed:
raise Closed(self.reason)
else:
raise e
@@ -328,7 +329,7 @@ class Channel:
elif frame.method.result:
if self.synchronous:
fr = future.get_response(timeout=10)
- if self.closed:
+ if self._closed:
raise Closed(self.reason)
return fr
else:
@@ -337,12 +338,12 @@ class Channel:
and self.use_execution_layer and frame.method.is_l4_command():
self.execution_sync()
completed = self.completion.wait(timeout=10)
- if self.closed:
+ if self._closed:
raise Closed(self.reason)
if not completed:
- self.close("Timed-out waiting for completion of %s" % frame)
+ self.closed("Timed-out waiting for completion of %s" % frame)
except QueueClosed, e:
- if self.closed:
+ if self._closed:
raise Closed(self.reason)
else:
raise e
@@ -399,7 +400,7 @@ class OutgoingCompletion:
self.sequence = Sequence(0) #issues ids for outgoing commands
self.command_id = -1 #last issued id
self.mark = -1 #commands up to this mark are known to be complete
- self.closed = False
+ self._closed = False
def next_command(self, method):
#the following test is a hack until the track/sub-channel is available
@@ -413,7 +414,7 @@ class OutgoingCompletion:
self.reset()
self.condition.acquire()
try:
- self.closed = True
+ self._closed = True
self.condition.notifyAll()
finally:
self.condition.release()
@@ -433,10 +434,10 @@ class OutgoingCompletion:
remaining = timeout
self.condition.acquire()
try:
- while not self.closed and point_of_interest > self.mark:
+ while not self._closed and point_of_interest > self.mark:
#print "waiting for %s, mark = %s [%s]" % (point_of_interest, self.mark, self)
self.condition.wait(remaining)
- if not self.closed and point_of_interest > self.mark and timeout:
+ if not self._closed and point_of_interest > self.mark and timeout:
if (start_time + timeout) < time(): break
else: remaining = timeout - (time() - start_time)
finally: