diff options
author | Rafael H. Schloming <rhs@apache.org> | 2007-11-07 22:30:40 +0000 |
---|---|---|
committer | Rafael H. Schloming <rhs@apache.org> | 2007-11-07 22:30:40 +0000 |
commit | bf7f244c28c899b90789e77eda5f585edda5bcd9 (patch) | |
tree | ac0225f4d76fe078bc089c6a70efbaa3211eb51e | |
parent | 5c952db3bb1fad90ea4f5de985453d13dce45e7c (diff) | |
download | qpid-python-bf7f244c28c899b90789e77eda5f585edda5bcd9.tar.gz |
python API updates
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@592927 13f79535-47bb-0310-9956-ffa450edef68
-rwxr-xr-x | python/hello-world | 19 | ||||
-rw-r--r-- | python/qpid/client.py | 56 | ||||
-rw-r--r-- | python/qpid/delegate.py | 2 | ||||
-rw-r--r-- | python/qpid/peer.py | 53 |
4 files changed, 85 insertions, 45 deletions
diff --git a/python/hello-world b/python/hello-world index d419fd988b..518992409e 100755 --- a/python/hello-world +++ b/python/hello-world @@ -5,20 +5,21 @@ from qpid.content import Content client = Client("127.0.0.1", 5672) client.start({"LOGIN": "guest", "PASSWORD": "guest"}) -ch = client.channel(1) -ch.session_open() -ch.queue_declare(queue="test") -ch.queue_bind(exchange="amq.direct", queue="test", routing_key="test") -#print ch.queue_query(queue="test") -ch.message_subscribe(queue="test", destination="amq.direct") -ch.message_flow("amq.direct", 0, 0xFFFFFFFF) -ch.message_flow("amq.direct", 1, 0xFFFFFFFF) +ssn = client.session() +ssn.open() +ssn.queue_declare(queue="test") +ssn.queue_bind(exchange="amq.direct", queue="test", routing_key="test") +#print ssn.queue_query(queue="test") +ssn.message_subscribe(queue="test", destination="amq.direct") +ssn.message_flow("amq.direct", 0, 0xFFFFFFFF) +ssn.message_flow("amq.direct", 1, 0xFFFFFFFF) msg = Content("hello world") msg["content_type"] = "text/plain" msg["routing_key"] = "test" msg["reply_to"] = client.structs.reply_to("asdf", "fdsa") msg["application_headers"] = {"x": 1, "y": 2, "z": "zee"} -ch.message_transfer(destination="amq.direct", content=msg) +ssn.message_transfer(destination="amq.direct", content=msg) queue = client.queue("amq.direct") msg = queue.get(timeout=10) print msg +ssn.close() diff --git a/python/qpid/client.py b/python/qpid/client.py index 44fd6c053f..13c3f13fe5 100644 --- a/python/qpid/client.py +++ b/python/qpid/client.py @@ -23,7 +23,7 @@ interacting with the server. """ import os, threading -from peer import Peer, Closed +from peer import Peer, Channel, Closed from delegate import Delegate from connection import Connection, Frame, connect from spec import load @@ -45,6 +45,7 @@ class Client: raise EnvironmentError("environment variable AMQP_SPEC must be set") self.spec = load(name) self.structs = StructFactory(self.spec) + self.sessions = {} self.mechanism = None self.response = None @@ -86,7 +87,7 @@ class Client: self.socket = connect(self.host, self.port) self.conn = Connection(self.socket, self.spec) - self.peer = Peer(self.conn, ClientDelegate(self), self.opened) + self.peer = Peer(self.conn, ClientDelegate(self), Session) self.conn.init() self.peer.start() @@ -94,10 +95,29 @@ class Client: self.channel(0).connection_open(self.vhost) def channel(self, id): - return self.peer.channel(id) + self.lock.acquire() + try: + ssn = self.peer.channel(id) + ssn.client = self + self.sessions[id] = ssn + finally: + self.lock.release() + return ssn - def opened(self, ch): - ch.references = References() + def session(self): + self.lock.acquire() + try: + id = None + for i in xrange(1, 64*1024): + if not self.sessions.has_key(id): + id = i + break + finally: + self.lock.release() + if id == None: + raise RuntimeError("out of channels") + else: + return self.channel(id) def close(self): self.socket.close() @@ -144,16 +164,16 @@ class ClientDelegate(Delegate): msg.ok() def channel_close(self, ch, msg): - ch.close(msg) + ch.closed(msg) def session_ack(self, ch, msg): pass def session_closed(self, ch, msg): - ch.close(msg) + ch.closed(msg) def connection_close(self, ch, msg): - self.client.peer.close(msg) + self.client.peer.closed(msg) def execution_complete(self, ch, msg): ch.completion.complete(msg.cumulative_execution_mark) @@ -162,7 +182,7 @@ class ClientDelegate(Delegate): future = ch.futures[msg.command_id] future.put_response(ch, msg.data) - def close(self, reason): + def closed(self, reason): self.client.closed = True self.client.reason = reason self.client.started.set() @@ -185,3 +205,21 @@ class StructFactory: def struct(self, name, *args, **kwargs): return self.spec.struct(name, *args, **kwargs) + +class Session(Channel): + + def __init__(self, *args): + Channel.__init__(self, *args) + self.references = References() + self.client = None + + def open(self): + self.session_open() + + def close(self): + self.session_close() + self.client.lock.acquire() + try: + del self.client.sessions[self.id] + finally: + self.client.lock.release() diff --git a/python/qpid/delegate.py b/python/qpid/delegate.py index 8f5033e485..c4c47592b4 100644 --- a/python/qpid/delegate.py +++ b/python/qpid/delegate.py @@ -49,5 +49,5 @@ class Delegate: print >> sys.stderr, "Error in handler: %s\n\n%s" % \ (_handler_name(method), traceback.format_exc()) - def close(self, reason): + def closed(self, reason): print "Connection closed: %s" % reason diff --git a/python/qpid/peer.py b/python/qpid/peer.py index b734031798..376d81e184 100644 --- a/python/qpid/peer.py +++ b/python/qpid/peer.py @@ -51,14 +51,17 @@ class Sequence: class Peer: - def __init__(self, conn, delegate, channel_callback=None): + def __init__(self, conn, delegate, channel_factory=None): self.conn = conn self.delegate = delegate self.outgoing = Queue(0) self.work = Queue(0) self.channels = {} self.lock = thread.allocate_lock() - self.channel_callback = channel_callback #notified when channels are created + if channel_factory: + self.channel_factory = channel_factory + else: + self.channel_factory = Channel def channel(self, id): self.lock.acquire() @@ -66,10 +69,8 @@ class Peer: try: ch = self.channels[id] except KeyError: - ch = Channel(id, self.outgoing, self.conn.spec) + ch = self.channel_factory(id, self.outgoing, self.conn.spec) self.channels[id] = ch - if self.channel_callback: - self.channel_callback(ch) finally: self.lock.release() return ch @@ -82,7 +83,7 @@ class Peer: def fatal(self, message=None): """Call when an unexpected exception occurs that will kill a thread.""" if message: print >> sys.stderr, message - self.close("Fatal error: %s\n%s" % (message or "", traceback.format_exc())) + self.closed("Fatal error: %s\n%s" % (message or "", traceback.format_exc())) def reader(self): try: @@ -97,13 +98,13 @@ class Peer: except: self.fatal() - def close(self, reason): + def closed(self, reason): # We must close the delegate first because closing channels # may wake up waiting threads and we don't want them to see # the delegate as open. - self.delegate.close(reason) + self.delegate.closed(reason) for ch in self.channels.values(): - ch.close(reason) + ch.closed(reason) def writer(self): try: @@ -112,7 +113,7 @@ class Peer: message = self.outgoing.get() self.conn.write(message) except socket.error, e: - self.close(e) + self.closed(e) break self.conn.flush() except: @@ -131,7 +132,7 @@ class Peer: self.delegate(channel, Message(channel, frame, content)) except QueueClosed: - self.close("worker closed") + self.closed("worker closed") except: self.fatal() @@ -181,7 +182,7 @@ class Channel: self.incoming = Queue(0) self.responses = Queue(0) self.queue = None - self.closed = False + self._closed = False self.reason = None self.requester = Requester(self.write) @@ -200,10 +201,10 @@ class Channel: self.use_execution_layer = (spec.major == 0 and spec.minor == 10) self.synchronous = True - def close(self, reason): - if self.closed: + def closed(self, reason): + if self._closed: return - self.closed = True + self._closed = True self.reason = reason self.incoming.close() self.responses.close() @@ -213,7 +214,7 @@ class Channel: f.put_response(self, reason) def write(self, frame, content = None): - if self.closed: + if self._closed: raise Closed(self.reason) frame.channel = self.id self.outgoing.put(frame) @@ -283,7 +284,7 @@ class Channel: if self.use_execution_layer and frame.method_type.is_l4_command(): self.execution_sync() self.completion.wait() - if self.closed: + if self._closed: raise Closed(self.reason) return None try: @@ -293,7 +294,7 @@ class Channel: else: return Message(self, resp) except QueueClosed, e: - if self.closed: + if self._closed: raise Closed(self.reason) else: raise e @@ -328,7 +329,7 @@ class Channel: elif frame.method.result: if self.synchronous: fr = future.get_response(timeout=10) - if self.closed: + if self._closed: raise Closed(self.reason) return fr else: @@ -337,12 +338,12 @@ class Channel: and self.use_execution_layer and frame.method.is_l4_command(): self.execution_sync() completed = self.completion.wait(timeout=10) - if self.closed: + if self._closed: raise Closed(self.reason) if not completed: - self.close("Timed-out waiting for completion of %s" % frame) + self.closed("Timed-out waiting for completion of %s" % frame) except QueueClosed, e: - if self.closed: + if self._closed: raise Closed(self.reason) else: raise e @@ -399,7 +400,7 @@ class OutgoingCompletion: self.sequence = Sequence(0) #issues ids for outgoing commands self.command_id = -1 #last issued id self.mark = -1 #commands up to this mark are known to be complete - self.closed = False + self._closed = False def next_command(self, method): #the following test is a hack until the track/sub-channel is available @@ -413,7 +414,7 @@ class OutgoingCompletion: self.reset() self.condition.acquire() try: - self.closed = True + self._closed = True self.condition.notifyAll() finally: self.condition.release() @@ -433,10 +434,10 @@ class OutgoingCompletion: remaining = timeout self.condition.acquire() try: - while not self.closed and point_of_interest > self.mark: + while not self._closed and point_of_interest > self.mark: #print "waiting for %s, mark = %s [%s]" % (point_of_interest, self.mark, self) self.condition.wait(remaining) - if not self.closed and point_of_interest > self.mark and timeout: + if not self._closed and point_of_interest > self.mark and timeout: if (start_time + timeout) < time(): break else: remaining = timeout - (time() - start_time) finally: |