diff options
author | Rafael H. Schloming <rhs@apache.org> | 2006-09-19 22:06:50 +0000 |
---|---|---|
committer | Rafael H. Schloming <rhs@apache.org> | 2006-09-19 22:06:50 +0000 |
commit | 913489deb2ee9dbf44455de5f407ddaf4bd8c540 (patch) | |
tree | 7ea442d6867d0076f1c9ea4f4265664059e7aff5 /python/qpid/client.py | |
download | qpid-python-913489deb2ee9dbf44455de5f407ddaf4bd8c540.tar.gz |
Import of qpid from etp:
URL: https://etp.108.redhat.com/svn/etp/trunk/blaze
Repository Root: https://etp.108.redhat.com/svn/etp
Repository UUID: 06e15bec-b515-0410-bef0-cc27a458cf48
Revision: 608
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@447994 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/qpid/client.py')
-rw-r--r-- | python/qpid/client.py | 111 |
1 files changed, 111 insertions, 0 deletions
diff --git a/python/qpid/client.py b/python/qpid/client.py new file mode 100644 index 0000000000..cef10622ac --- /dev/null +++ b/python/qpid/client.py @@ -0,0 +1,111 @@ +# +# Copyright (c) 2006 The Apache Software Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +An AQMP client implementation that uses a custom delegate for +interacting with the server. +""" + +import threading +from peer import Peer, Closed +from delegate import Delegate +from connection import Connection, Frame +from spec import load +from queue import Queue + + +class Client: + + def __init__(self, host, port, spec, vhost = None): + self.host = host + self.port = port + self.spec = spec + + self.mechanism = None + self.response = None + self.locale = None + + self.vhost = vhost + if self.vhost == None: + self.vhost = self.host + + self.queues = {} + self.lock = threading.Lock() + + self.closed = False + self.started = threading.Event() + + self.conn = Connection(self.host, self.port, self.spec) + self.peer = Peer(self.conn, ClientDelegate(self)) + + def wait(self): + self.started.wait() + if self.closed: + raise EOFError() + + def queue(self, key): + self.lock.acquire() + try: + try: + q = self.queues[key] + except KeyError: + q = Queue(0) + self.queues[key] = q + finally: + self.lock.release() + return q + + def start(self, response, mechanism="AMQPLAIN", locale="en_US"): + self.mechanism = mechanism + self.response = response + self.locale = locale + + self.conn.connect() + self.conn.init() + self.peer.start() + self.wait() + self.channel(0).connection_open(self.vhost) + + def channel(self, id): + return self.peer.channel(id) + +class ClientDelegate(Delegate): + + def __init__(self, client): + Delegate.__init__(self) + self.client = client + + def connection_start(self, ch, msg): + ch.connection_start_ok(mechanism=self.client.mechanism, + response=self.client.response, + locale=self.client.locale) + + def connection_tune(self, ch, msg): + ch.connection_tune_ok(*msg.fields) + self.client.started.set() + + def basic_deliver(self, ch, msg): + self.client.queue(msg.consumer_tag).put(msg) + + def channel_close(self, ch, msg): + ch.close(msg) + + def connection_close(self, ch, msg): + self.client.peer.close(msg) + + def close(self, reason): + self.client.closed = True + self.client.started.set() |