diff options
author | Rafael H. Schloming <rhs@apache.org> | 2007-11-07 22:30:40 +0000 |
---|---|---|
committer | Rafael H. Schloming <rhs@apache.org> | 2007-11-07 22:30:40 +0000 |
commit | bf7f244c28c899b90789e77eda5f585edda5bcd9 (patch) | |
tree | ac0225f4d76fe078bc089c6a70efbaa3211eb51e /python/qpid/client.py | |
parent | 5c952db3bb1fad90ea4f5de985453d13dce45e7c (diff) | |
download | qpid-python-bf7f244c28c899b90789e77eda5f585edda5bcd9.tar.gz |
python API updates
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@592927 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/qpid/client.py')
-rw-r--r-- | python/qpid/client.py | 56 |
1 files changed, 47 insertions, 9 deletions
diff --git a/python/qpid/client.py b/python/qpid/client.py index 44fd6c053f..13c3f13fe5 100644 --- a/python/qpid/client.py +++ b/python/qpid/client.py @@ -23,7 +23,7 @@ interacting with the server. """ import os, threading -from peer import Peer, Closed +from peer import Peer, Channel, Closed from delegate import Delegate from connection import Connection, Frame, connect from spec import load @@ -45,6 +45,7 @@ class Client: raise EnvironmentError("environment variable AMQP_SPEC must be set") self.spec = load(name) self.structs = StructFactory(self.spec) + self.sessions = {} self.mechanism = None self.response = None @@ -86,7 +87,7 @@ class Client: self.socket = connect(self.host, self.port) self.conn = Connection(self.socket, self.spec) - self.peer = Peer(self.conn, ClientDelegate(self), self.opened) + self.peer = Peer(self.conn, ClientDelegate(self), Session) self.conn.init() self.peer.start() @@ -94,10 +95,29 @@ class Client: self.channel(0).connection_open(self.vhost) def channel(self, id): - return self.peer.channel(id) + self.lock.acquire() + try: + ssn = self.peer.channel(id) + ssn.client = self + self.sessions[id] = ssn + finally: + self.lock.release() + return ssn - def opened(self, ch): - ch.references = References() + def session(self): + self.lock.acquire() + try: + id = None + for i in xrange(1, 64*1024): + if not self.sessions.has_key(id): + id = i + break + finally: + self.lock.release() + if id == None: + raise RuntimeError("out of channels") + else: + return self.channel(id) def close(self): self.socket.close() @@ -144,16 +164,16 @@ class ClientDelegate(Delegate): msg.ok() def channel_close(self, ch, msg): - ch.close(msg) + ch.closed(msg) def session_ack(self, ch, msg): pass def session_closed(self, ch, msg): - ch.close(msg) + ch.closed(msg) def connection_close(self, ch, msg): - self.client.peer.close(msg) + self.client.peer.closed(msg) def execution_complete(self, ch, msg): ch.completion.complete(msg.cumulative_execution_mark) @@ -162,7 +182,7 @@ class ClientDelegate(Delegate): future = ch.futures[msg.command_id] future.put_response(ch, msg.data) - def close(self, reason): + def closed(self, reason): self.client.closed = True self.client.reason = reason self.client.started.set() @@ -185,3 +205,21 @@ class StructFactory: def struct(self, name, *args, **kwargs): return self.spec.struct(name, *args, **kwargs) + +class Session(Channel): + + def __init__(self, *args): + Channel.__init__(self, *args) + self.references = References() + self.client = None + + def open(self): + self.session_open() + + def close(self): + self.session_close() + self.client.lock.acquire() + try: + del self.client.sessions[self.id] + finally: + self.client.lock.release() |