summaryrefslogtreecommitdiff
path: root/python/qpid/client.py
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2007-11-07 22:30:40 +0000
committerRafael H. Schloming <rhs@apache.org>2007-11-07 22:30:40 +0000
commitbf7f244c28c899b90789e77eda5f585edda5bcd9 (patch)
treeac0225f4d76fe078bc089c6a70efbaa3211eb51e /python/qpid/client.py
parent5c952db3bb1fad90ea4f5de985453d13dce45e7c (diff)
downloadqpid-python-bf7f244c28c899b90789e77eda5f585edda5bcd9.tar.gz
python API updates
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@592927 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/qpid/client.py')
-rw-r--r--python/qpid/client.py56
1 files changed, 47 insertions, 9 deletions
diff --git a/python/qpid/client.py b/python/qpid/client.py
index 44fd6c053f..13c3f13fe5 100644
--- a/python/qpid/client.py
+++ b/python/qpid/client.py
@@ -23,7 +23,7 @@ interacting with the server.
"""
import os, threading
-from peer import Peer, Closed
+from peer import Peer, Channel, Closed
from delegate import Delegate
from connection import Connection, Frame, connect
from spec import load
@@ -45,6 +45,7 @@ class Client:
raise EnvironmentError("environment variable AMQP_SPEC must be set")
self.spec = load(name)
self.structs = StructFactory(self.spec)
+ self.sessions = {}
self.mechanism = None
self.response = None
@@ -86,7 +87,7 @@ class Client:
self.socket = connect(self.host, self.port)
self.conn = Connection(self.socket, self.spec)
- self.peer = Peer(self.conn, ClientDelegate(self), self.opened)
+ self.peer = Peer(self.conn, ClientDelegate(self), Session)
self.conn.init()
self.peer.start()
@@ -94,10 +95,29 @@ class Client:
self.channel(0).connection_open(self.vhost)
def channel(self, id):
- return self.peer.channel(id)
+ self.lock.acquire()
+ try:
+ ssn = self.peer.channel(id)
+ ssn.client = self
+ self.sessions[id] = ssn
+ finally:
+ self.lock.release()
+ return ssn
- def opened(self, ch):
- ch.references = References()
+ def session(self):
+ self.lock.acquire()
+ try:
+ id = None
+ for i in xrange(1, 64*1024):
+ if not self.sessions.has_key(id):
+ id = i
+ break
+ finally:
+ self.lock.release()
+ if id == None:
+ raise RuntimeError("out of channels")
+ else:
+ return self.channel(id)
def close(self):
self.socket.close()
@@ -144,16 +164,16 @@ class ClientDelegate(Delegate):
msg.ok()
def channel_close(self, ch, msg):
- ch.close(msg)
+ ch.closed(msg)
def session_ack(self, ch, msg):
pass
def session_closed(self, ch, msg):
- ch.close(msg)
+ ch.closed(msg)
def connection_close(self, ch, msg):
- self.client.peer.close(msg)
+ self.client.peer.closed(msg)
def execution_complete(self, ch, msg):
ch.completion.complete(msg.cumulative_execution_mark)
@@ -162,7 +182,7 @@ class ClientDelegate(Delegate):
future = ch.futures[msg.command_id]
future.put_response(ch, msg.data)
- def close(self, reason):
+ def closed(self, reason):
self.client.closed = True
self.client.reason = reason
self.client.started.set()
@@ -185,3 +205,21 @@ class StructFactory:
def struct(self, name, *args, **kwargs):
return self.spec.struct(name, *args, **kwargs)
+
+class Session(Channel):
+
+ def __init__(self, *args):
+ Channel.__init__(self, *args)
+ self.references = References()
+ self.client = None
+
+ def open(self):
+ self.session_open()
+
+ def close(self):
+ self.session_close()
+ self.client.lock.acquire()
+ try:
+ del self.client.sessions[self.id]
+ finally:
+ self.client.lock.release()