diff options
Diffstat (limited to 'qpid/python/qpid/client.py')
-rw-r--r-- | qpid/python/qpid/client.py | 277 |
1 files changed, 277 insertions, 0 deletions
diff --git a/qpid/python/qpid/client.py b/qpid/python/qpid/client.py new file mode 100644 index 0000000000..5fedaa2cb1 --- /dev/null +++ b/qpid/python/qpid/client.py @@ -0,0 +1,277 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +""" +An AMQP client implementation that uses a custom delegate for +interacting with the server. +""" + +import os, threading +from peer import Peer, Channel, Closed +from delegate import Delegate +from util import get_client_properties_with_defaults +from connection08 import Connection, Frame, connect +from spec08 import load +from queue import Queue +from reference import ReferenceId, References +from saslmech.finder import get_sasl_mechanism +from saslmech.sasl import SaslException + + +class Client: + + def __init__(self, host, port, spec = None, vhost = None): + self.host = host + self.port = port + if spec: + self.spec = spec + else: + from specs_config import amqp_spec_0_9 + self.spec = load(amqp_spec_0_9) + self.structs = StructFactory(self.spec) + self.sessions = {} + + self.mechanism = None + self.response = None + self.locale = None + self.sasl = None + + self.vhost = vhost + if self.vhost == None: + self.vhost = "/" + + self.queues = {} + self.lock = threading.Lock() + + self.closed = False + self.reason = None + self.started = threading.Event() + self.peer = None + + def wait(self): + self.started.wait() + if self.closed: + raise Closed(self.reason) + + def queue(self, key): + self.lock.acquire() + try: + try: + q = self.queues[key] + except KeyError: + q = Queue(0) + self.queues[key] = q + finally: + self.lock.release() + return q + + def start(self, response=None, mechanism=None, locale="en_US", tune_params=None, + username=None, password=None, + client_properties=None, connection_options=None, sasl_options = None, + channel_options=None): + self.mechanism = mechanism + self.response = response + self.username = username + self.password = password + self.locale = locale + self.tune_params = tune_params + self.client_properties=get_client_properties_with_defaults(provided_client_properties=client_properties, version_property_key="version") + self.sasl_options = sasl_options + self.socket = connect(self.host, self.port, connection_options) + self.conn = Connection(self.socket, self.spec) + self.peer = Peer(self.conn, ClientDelegate(self), Session, channel_options) + + self.conn.init() + self.peer.start() + self.wait() + self.channel(0).connection_open(self.vhost) + + def channel(self, id): + self.lock.acquire() + try: + ssn = self.peer.channel(id) + ssn.client = self + self.sessions[id] = ssn + finally: + self.lock.release() + return ssn + + def session(self): + self.lock.acquire() + try: + id = None + for i in xrange(1, 64*1024): + if not self.sessions.has_key(i): + id = i + break + finally: + self.lock.release() + if id == None: + raise RuntimeError("out of channels") + else: + return self.channel(id) + + def close(self): + if self.peer: + try: + if not self.closed: + channel = self.channel(0); + if channel and not channel._closed: + try: + channel.connection_close(reply_code=200) + except: + pass + self.closed = True + finally: + self.peer.stop() + +class ClientDelegate(Delegate): + + def __init__(self, client): + Delegate.__init__(self) + self.client = client + + def connection_start(self, ch, msg): + + if self.client.mechanism is None: + if self.client.response is not None: + # Supports users passing the response argument alon + self.client.mechanism = "AMQPLAIN" + else: + supportedMechs = msg.frame.args[3].split() + + self.client.sasl = get_sasl_mechanism(supportedMechs, self.client.username, self.client.password, sasl_options=self.client.sasl_options) + + if self.client.sasl == None: + raise SaslException("sasl negotiation failed: no mechanism agreed. Server supports: %s " % supportedMechs) + + self.client.mechanism = self.client.sasl.mechanismName() + + if self.client.response is None: + self.client.response = self.client.sasl.initialResponse() + + msg.start_ok(mechanism=self.client.mechanism, + response=self.client.response or "", + locale=self.client.locale, + client_properties=self.client.client_properties) + + def connection_secure(self, ch, msg): + msg.secure_ok(response=self.client.sasl.response(msg.challenge)) + + def connection_tune(self, ch, msg): + if self.client.tune_params: + #todo: just override the params, i.e. don't require them + # all to be included in tune_params + msg.tune_ok(**self.client.tune_params) + else: + msg.tune_ok(*msg.frame.args) + self.client.started.set() + + def message_transfer(self, ch, msg): + self.client.queue(msg.destination).put(msg) + + def message_open(self, ch, msg): + ch.references.open(msg.reference) + + def message_close(self, ch, msg): + ch.references.close(msg.reference) + + def message_append(self, ch, msg): + ch.references.get(msg.reference).append(msg.bytes) + + def message_acquired(self, ch, msg): + ch.control_queue.put(msg) + + def basic_deliver(self, ch, msg): + self.client.queue(msg.consumer_tag).put(msg) + + def channel_pong(self, ch, msg): + msg.ok() + + def channel_close(self, ch, msg): + ch.closed(msg) + + def channel_flow(self, ch, msg): + # On resuming we don't want to send a message before flow-ok has been sent. + # Therefore, we send flow-ok before we set the flow_control flag. + if msg.active: + msg.flow_ok() + ch.set_flow_control(not msg.active) + # On suspending we don't want to send a message after flow-ok has been sent. + # Therefore, we send flow-ok after we set the flow_control flag. + if not msg.active: + msg.flow_ok() + + def session_ack(self, ch, msg): + pass + + def session_closed(self, ch, msg): + ch.closed(msg) + + def connection_close(self, ch, msg): + self.client.peer.closed(msg) + + def execution_complete(self, ch, msg): + ch.completion.complete(msg.cumulative_execution_mark) + + def execution_result(self, ch, msg): + future = ch.futures[msg.command_id] + future.put_response(ch, msg.data) + + def closed(self, reason): + self.client.closed = True + self.client.reason = reason + self.client.started.set() + +class StructFactory: + + def __init__(self, spec): + self.spec = spec + self.factories = {} + + def __getattr__(self, name): + if self.factories.has_key(name): + return self.factories[name] + elif self.spec.domains.byname.has_key(name): + f = lambda *args, **kwargs: self.struct(name, *args, **kwargs) + self.factories[name] = f + return f + else: + raise AttributeError(name) + + def struct(self, name, *args, **kwargs): + return self.spec.struct(name, *args, **kwargs) + +class Session(Channel): + + def __init__(self, *args): + Channel.__init__(self, *args) + self.references = References() + self.client = None + + def open(self): + self.session_open() + + def close(self): + self.session_close() + self.client.lock.acquire() + try: + del self.client.sessions[self.id] + finally: + self.client.lock.release() |