diff options
Diffstat (limited to 'qpid/python/qpid/client.py')
-rw-r--r-- | qpid/python/qpid/client.py | 222 |
1 files changed, 222 insertions, 0 deletions
diff --git a/qpid/python/qpid/client.py b/qpid/python/qpid/client.py new file mode 100644 index 0000000000..45ce8498e8 --- /dev/null +++ b/qpid/python/qpid/client.py @@ -0,0 +1,222 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +""" +An AQMP client implementation that uses a custom delegate for +interacting with the server. +""" + +import os, threading +from peer import Peer, Channel, Closed +from delegate import Delegate +from connection08 import Connection, Frame, connect +from spec08 import load +from queue import Queue +from reference import ReferenceId, References + + +class Client: + + def __init__(self, host, port, spec = None, vhost = None): + self.host = host + self.port = port + if spec: + self.spec = spec + else: + from specs_config import amqp_spec_0_9 + self.spec = load(amqp_spec_0_9) + self.structs = StructFactory(self.spec) + self.sessions = {} + + self.mechanism = None + self.response = None + self.locale = None + + self.vhost = vhost + if self.vhost == None: + self.vhost = "/" + + self.queues = {} + self.lock = threading.Lock() + + self.closed = False + self.reason = None + self.started = threading.Event() + + def wait(self): + self.started.wait() + if self.closed: + raise Closed(self.reason) + + def queue(self, key): + self.lock.acquire() + try: + try: + q = self.queues[key] + except KeyError: + q = Queue(0) + self.queues[key] = q + finally: + self.lock.release() + return q + + def start(self, response, mechanism="AMQPLAIN", locale="en_US", tune_params=None): + self.mechanism = mechanism + self.response = response + self.locale = locale + self.tune_params = tune_params + + self.socket = connect(self.host, self.port) + self.conn = Connection(self.socket, self.spec) + self.peer = Peer(self.conn, ClientDelegate(self), Session) + + self.conn.init() + self.peer.start() + self.wait() + self.channel(0).connection_open(self.vhost) + + def channel(self, id): + self.lock.acquire() + try: + ssn = self.peer.channel(id) + ssn.client = self + self.sessions[id] = ssn + finally: + self.lock.release() + return ssn + + def session(self): + self.lock.acquire() + try: + id = None + for i in xrange(1, 64*1024): + if not self.sessions.has_key(id): + id = i + break + finally: + self.lock.release() + if id == None: + raise RuntimeError("out of channels") + else: + return self.channel(id) + + def close(self): + self.socket.close() + +class ClientDelegate(Delegate): + + def __init__(self, client): + Delegate.__init__(self) + self.client = client + + def connection_start(self, ch, msg): + msg.start_ok(mechanism=self.client.mechanism, + response=self.client.response, + locale=self.client.locale) + + def connection_tune(self, ch, msg): + if self.client.tune_params: + #todo: just override the params, i.e. don't require them + # all to be included in tune_params + msg.tune_ok(**self.client.tune_params) + else: + msg.tune_ok(*msg.frame.args) + self.client.started.set() + + def message_transfer(self, ch, msg): + self.client.queue(msg.destination).put(msg) + + def message_open(self, ch, msg): + ch.references.open(msg.reference) + + def message_close(self, ch, msg): + ch.references.close(msg.reference) + + def message_append(self, ch, msg): + ch.references.get(msg.reference).append(msg.bytes) + + def message_acquired(self, ch, msg): + ch.control_queue.put(msg) + + def basic_deliver(self, ch, msg): + self.client.queue(msg.consumer_tag).put(msg) + + def channel_pong(self, ch, msg): + msg.ok() + + def channel_close(self, ch, msg): + ch.closed(msg) + + def session_ack(self, ch, msg): + pass + + def session_closed(self, ch, msg): + ch.closed(msg) + + def connection_close(self, ch, msg): + self.client.peer.closed(msg) + + def execution_complete(self, ch, msg): + ch.completion.complete(msg.cumulative_execution_mark) + + def execution_result(self, ch, msg): + future = ch.futures[msg.command_id] + future.put_response(ch, msg.data) + + def closed(self, reason): + self.client.closed = True + self.client.reason = reason + self.client.started.set() + +class StructFactory: + + def __init__(self, spec): + self.spec = spec + self.factories = {} + + def __getattr__(self, name): + if self.factories.has_key(name): + return self.factories[name] + elif self.spec.domains.byname.has_key(name): + f = lambda *args, **kwargs: self.struct(name, *args, **kwargs) + self.factories[name] = f + return f + else: + raise AttributeError(name) + + def struct(self, name, *args, **kwargs): + return self.spec.struct(name, *args, **kwargs) + +class Session(Channel): + + def __init__(self, *args): + Channel.__init__(self, *args) + self.references = References() + self.client = None + + def open(self): + self.session_open() + + def close(self): + self.session_close() + self.client.lock.acquire() + try: + del self.client.sessions[self.id] + finally: + self.client.lock.release() |