diff options
Diffstat (limited to 'python/qpid/messaging.py')
-rw-r--r-- | python/qpid/messaging.py | 822 |
1 files changed, 822 insertions, 0 deletions
diff --git a/python/qpid/messaging.py b/python/qpid/messaging.py new file mode 100644 index 0000000000..4f2c190ce2 --- /dev/null +++ b/python/qpid/messaging.py @@ -0,0 +1,822 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +""" +A candidate high level messaging API for python. + +Areas that still need work: + + - asynchronous send + - asynchronous error notification + - definition of the arguments for L{Session.sender} and L{Session.receiver} + - standard L{Message} properties + - L{Message} content encoding + - protocol negotiation/multiprotocol impl +""" + +from codec010 import StringCodec +from concurrency import synchronized, Waiter, Condition +from datatypes import timestamp, uuid4, Serial +from logging import getLogger +from ops import PRIMITIVE +from threading import Thread, RLock +from util import default + +log = getLogger("qpid.messaging") + +static = staticmethod + +AMQP_PORT = 5672 +AMQPS_PORT = 5671 + +class Constant: + + def __init__(self, name, value=None): + self.name = name + self.value = value + + def __repr__(self): + return self.name + +UNLIMITED = Constant("UNLIMITED", 0xFFFFFFFFL) + +class ConnectionError(Exception): + """ + The base class for all connection related exceptions. + """ + pass + +class ConnectError(ConnectionError): + """ + Exception raised when there is an error connecting to the remote + peer. + """ + pass + +class Connection: + + """ + A Connection manages a group of L{Sessions<Session>} and connects + them with a remote endpoint. + """ + + @static + def open(host, port=None, username="guest", password="guest", + mechanism="PLAIN", heartbeat=None, **options): + """ + Creates an AMQP connection and connects it to the given host and port. + + @type host: str + @param host: the name or ip address of the remote host + @type port: int + @param port: the port number of the remote host + @rtype: Connection + @return: a connected Connection + """ + conn = Connection(host, port, username, password, mechanism, heartbeat, **options) + conn.connect() + return conn + + def __init__(self, host, port=None, username="guest", password="guest", + mechanism="PLAIN", heartbeat=None, **options): + """ + Creates a connection. A newly created connection must be connected + with the Connection.connect() method before it can be used. + + @type host: str + @param host: the name or ip address of the remote host + @type port: int + @param port: the port number of the remote host + @rtype: Connection + @return: a disconnected Connection + """ + self.host = host + self.port = default(port, AMQP_PORT) + self.username = username + self.password = password + self.mechanism = mechanism + self.heartbeat = heartbeat + + self.id = str(uuid4()) + self.session_counter = 0 + self.sessions = {} + self.reconnect = options.get("reconnect", False) + self._connected = False + self._lock = RLock() + self._condition = Condition(self._lock) + self._waiter = Waiter(self._condition) + self._modcount = Serial(0) + self.error = None + from driver import Driver + self._driver = Driver(self) + self._driver.start() + + def _wait(self, predicate, timeout=None): + return self._waiter.wait(predicate, timeout=timeout) + + def _wakeup(self): + self._modcount += 1 + self._driver.wakeup() + + def _check_error(self, exc=ConnectionError): + if self.error: + raise exc(*self.error) + + def _ewait(self, predicate, timeout=None, exc=ConnectionError): + result = self._wait(lambda: self.error or predicate(), timeout) + self._check_error(exc) + return result + + @synchronized + def session(self, name=None, transactional=False): + """ + Creates or retrieves the named session. If the name is omitted or + None, then a unique name is chosen based on a randomly generated + uuid. + + @type name: str + @param name: the session name + @rtype: Session + @return: the named Session + """ + + if name is None: + name = "%s:%s" % (self.id, self.session_counter) + self.session_counter += 1 + else: + name = "%s:%s" % (self.id, name) + + if self.sessions.has_key(name): + return self.sessions[name] + else: + ssn = Session(self, name, transactional) + self.sessions[name] = ssn + self._wakeup() + return ssn + + @synchronized + def _remove_session(self, ssn): + del self.sessions[ssn.name] + + @synchronized + def connect(self): + """ + Connect to the remote endpoint. + """ + self._connected = True + self._wakeup() + self._ewait(lambda: self._driver._connected, exc=ConnectError) + + @synchronized + def disconnect(self): + """ + Disconnect from the remote endpoint. + """ + self._connected = False + self._wakeup() + self._ewait(lambda: not self._driver._connected) + + @synchronized + def connected(self): + """ + Return true if the connection is connected, false otherwise. + """ + return self._connected + + @synchronized + def close(self): + """ + Close the connection and all sessions. + """ + for ssn in self.sessions.values(): + ssn.close() + self.disconnect() + +class Pattern: + """ + The pattern filter matches the supplied wildcard pattern against a + message subject. + """ + + def __init__(self, value): + self.value = value + + # XXX: this should become part of the driver + def _bind(self, sst, exchange, queue): + from qpid.ops import ExchangeBind + sst.write_cmd(ExchangeBind(exchange=exchange, queue=queue, + binding_key=self.value.replace("*", "#"))) + +class SessionError(Exception): + pass + +class Disconnected(SessionError): + """ + Exception raised when an operation is attempted that is illegal when + disconnected. + """ + pass + +class NontransactionalSession(SessionError): + """ + Exception raised when commit or rollback is attempted on a non + transactional session. + """ + pass + +class TransactionAborted(SessionError): + pass + +class Session: + + """ + Sessions provide a linear context for sending and receiving + messages, and manage various Senders and Receivers. + """ + + def __init__(self, connection, name, transactional): + self.connection = connection + self.name = name + + self.transactional = transactional + + self.committing = False + self.committed = True + self.aborting = False + self.aborted = False + + self.senders = [] + self.receivers = [] + self.outgoing = [] + self.incoming = [] + self.unacked = [] + self.acked = [] + # XXX: I hate this name. + self.ack_capacity = UNLIMITED + + self.error = None + self.closing = False + self.closed = False + + self._lock = connection._lock + + def __repr__(self): + return "<Session %s>" % self.name + + def _wait(self, predicate, timeout=None): + return self.connection._wait(predicate, timeout=timeout) + + def _wakeup(self): + self.connection._wakeup() + + def _check_error(self, exc=SessionError): + self.connection._check_error(exc) + if self.error: + raise exc(*self.error) + + def _ewait(self, predicate, timeout=None, exc=SessionError): + result = self.connection._ewait(lambda: self.error or predicate(), timeout, exc) + self._check_error(exc) + return result + + @synchronized + def sender(self, target, **options): + """ + Creates a L{Sender} that may be used to send L{Messages<Message>} + to the specified target. + + @type target: str + @param target: the target to which messages will be sent + @rtype: Sender + @return: a new Sender for the specified target + """ + sender = Sender(self, len(self.senders), target, options) + self.senders.append(sender) + self._wakeup() + # XXX: because of the lack of waiting here we can end up getting + # into the driver loop with messages sent for senders that haven't + # been linked yet, something similar can probably happen for + # receivers + return sender + + @synchronized + def receiver(self, source, **options): + """ + Creates a receiver that may be used to fetch L{Messages<Message>} + from the specified source. + + @type source: str + @param source: the source of L{Messages<Message>} + @rtype: Receiver + @return: a new Receiver for the specified source + """ + receiver = Receiver(self, len(self.receivers), source, options) + self.receivers.append(receiver) + self._wakeup() + return receiver + + @synchronized + def _count(self, predicate): + result = 0 + for msg in self.incoming: + if predicate(msg): + result += 1 + return result + + def _peek(self, predicate): + for msg in self.incoming: + if predicate(msg): + return msg + + def _pop(self, predicate): + i = 0 + while i < len(self.incoming): + msg = self.incoming[i] + if predicate(msg): + del self.incoming[i] + return msg + else: + i += 1 + + @synchronized + def _get(self, predicate, timeout=None): + if self._ewait(lambda: ((self._peek(predicate) is not None) or self.closing), + timeout): + msg = self._pop(predicate) + if msg is not None: + msg._receiver.returned += 1 + self.unacked.append(msg) + log.debug("RETR [%s] %s", self, msg) + return msg + return None + + @synchronized + def next_receiver(self, timeout=None): + if self._ewait(lambda: self.incoming, timeout): + return self.incoming[0]._receiver + else: + raise Empty + + @synchronized + def acknowledge(self, message=None, sync=True): + """ + Acknowledge the given L{Message}. If message is None, then all + unacknowledged messages on the session are acknowledged. + + @type message: Message + @param message: the message to acknowledge or None + @type sync: boolean + @param sync: if true then block until the message(s) are acknowledged + """ + if message is None: + messages = self.unacked[:] + else: + messages = [message] + + for m in messages: + if self.ack_capacity is not UNLIMITED: + if self.ack_capacity <= 0: + # XXX: this is currently a SendError, maybe it should be a SessionError? + raise InsufficientCapacity("ack_capacity = %s" % self.ack_capacity) + self._wakeup() + self._ewait(lambda: len(self.acked) < self.ack_capacity) + self.unacked.remove(m) + self.acked.append(m) + + self._wakeup() + if sync: + self._ewait(lambda: not [m for m in messages if m in self.acked]) + + @synchronized + def commit(self): + """ + Commit outstanding transactional work. This consists of all + message sends and receives since the prior commit or rollback. + """ + if not self.transactional: + raise NontransactionalSession() + self.committing = True + self._wakeup() + self._ewait(lambda: not self.committing) + if self.aborted: + raise TransactionAborted() + assert self.committed + + @synchronized + def rollback(self): + """ + Rollback outstanding transactional work. This consists of all + message sends and receives since the prior commit or rollback. + """ + if not self.transactional: + raise NontransactionalSession() + self.aborting = True + self._wakeup() + self._ewait(lambda: not self.aborting) + assert self.aborted + + @synchronized + def close(self): + """ + Close the session. + """ + # XXX: should be able to express this condition through API calls + self._ewait(lambda: not self.outgoing and not self.acked) + + for link in self.receivers + self.senders: + link.close() + + self.closing = True + self._wakeup() + self._ewait(lambda: self.closed) + self.connection._remove_session(self) + +class SendError(SessionError): + pass + +class InsufficientCapacity(SendError): + pass + +class Sender: + + """ + Sends outgoing messages. + """ + + def __init__(self, session, index, target, options): + self.session = session + self.index = index + self.target = target + self.options = options + self.capacity = options.get("capacity", UNLIMITED) + self.durable = options.get("durable") + self.queued = Serial(0) + self.acked = Serial(0) + self.error = None + self.linked = False + self.closing = False + self.closed = False + self._lock = self.session._lock + + def _wakeup(self): + self.session._wakeup() + + def _check_error(self, exc=SendError): + self.session._check_error(exc) + if self.error: + raise exc(*self.error) + + def _ewait(self, predicate, timeout=None, exc=SendError): + result = self.session._ewait(lambda: self.error or predicate(), timeout, exc) + self._check_error(exc) + return result + + @synchronized + def pending(self): + """ + Returns the number of messages awaiting acknowledgment. + @rtype: int + @return: the number of unacknowledged messages + """ + return self.queued - self.acked + + @synchronized + def send(self, object, sync=True, timeout=None): + """ + Send a message. If the object passed in is of type L{unicode}, + L{str}, L{list}, or L{dict}, it will automatically be wrapped in a + L{Message} and sent. If it is of type L{Message}, it will be sent + directly. If the sender capacity is not L{UNLIMITED} then send + will block until there is available capacity to send the message. + If the timeout parameter is specified, then send will throw an + L{InsufficientCapacity} exception if capacity does not become + available within the specified time. + + @type object: unicode, str, list, dict, Message + @param object: the message or content to send + + @type sync: boolean + @param sync: if true then block until the message is sent + + @type timeout: float + @param timeout: the time to wait for available capacity + """ + + if not self.session.connection._connected or self.session.closing: + raise Disconnected() + + self._ewait(lambda: self.linked) + + if isinstance(object, Message): + message = object + else: + message = Message(object) + + if message.durable is None: + message.durable = self.durable + + if self.capacity is not UNLIMITED: + if self.capacity <= 0: + raise InsufficientCapacity("capacity = %s" % self.capacity) + if not self._ewait(lambda: self.pending() < self.capacity, timeout=timeout): + raise InsufficientCapacity("capacity = %s" % self.capacity) + + # XXX: what if we send the same message to multiple senders? + message._sender = self + self.session.outgoing.append(message) + self.queued += 1 + + self._wakeup() + + if sync: + self.sync() + assert message not in self.session.outgoing + + @synchronized + def sync(self): + mno = self.queued + self._ewait(lambda: self.acked >= mno) + + @synchronized + def close(self): + """ + Close the Sender. + """ + self.closing = True + self._wakeup() + try: + self.session._ewait(lambda: self.closed) + finally: + self.session.senders.remove(self) + +class ReceiveError(SessionError): + pass + +class Empty(ReceiveError): + """ + Exception raised by L{Receiver.fetch} when there is no message + available within the alloted time. + """ + pass + +class Receiver(object): + + """ + Receives incoming messages from a remote source. Messages may be + fetched with L{fetch}. + """ + + def __init__(self, session, index, source, options): + self.session = session + self.index = index + self.destination = str(self.index) + self.source = source + self.options = options + + self.granted = Serial(0) + self.draining = False + self.impending = Serial(0) + self.received = Serial(0) + self.returned = Serial(0) + + self.error = None + self.linked = False + self.closing = False + self.closed = False + self._lock = self.session._lock + self._capacity = 0 + self._set_capacity(options.get("capacity", 0), False) + + @synchronized + def _set_capacity(self, c, wakeup=True): + if c is UNLIMITED: + self._capacity = c.value + else: + self._capacity = c + self._grant() + if wakeup: + self._wakeup() + + def _get_capacity(self): + if self._capacity == UNLIMITED.value: + return UNLIMITED + else: + return self._capacity + + capacity = property(_get_capacity, _set_capacity) + + def _wakeup(self): + self.session._wakeup() + + def _check_error(self, exc=ReceiveError): + self.session._check_error(exc) + if self.error: + raise exc(*self.error) + + def _ewait(self, predicate, timeout=None, exc=ReceiveError): + result = self.session._ewait(lambda: self.error or predicate(), timeout, exc) + self._check_error(exc) + return result + + @synchronized + def pending(self): + """ + Returns the number of messages available to be fetched by the + application. + + @rtype: int + @return: the number of available messages + """ + return self.received - self.returned + + def _pred(self, msg): + return msg._receiver == self + + @synchronized + def fetch(self, timeout=None): + """ + Fetch and return a single message. A timeout of None will block + forever waiting for a message to arrive, a timeout of zero will + return immediately if no messages are available. + + @type timeout: float + @param timeout: the time to wait for a message to be available + """ + + self._ewait(lambda: self.linked) + + if self._capacity == 0: + self.granted = self.returned + 1 + self._wakeup() + self._ewait(lambda: self.impending >= self.granted) + msg = self.session._get(self._pred, timeout=timeout) + if msg is None: + self.draining = True + self._wakeup() + self._ewait(lambda: not self.draining) + self._grant() + self._wakeup() + msg = self.session._get(self._pred, timeout=0) + if msg is None: + raise Empty() + elif self._capacity not in (0, UNLIMITED.value): + self.granted += 1 + self._wakeup() + return msg + + def _grant(self): + if self._capacity == UNLIMITED.value: + self.granted = UNLIMITED + else: + self.granted = self.received + self._capacity + + @synchronized + def close(self): + """ + Close the receiver. + """ + self.closing = True + self._wakeup() + try: + self.session._ewait(lambda: self.closed) + finally: + self.session.receivers.remove(self) + +def codec(name): + type = PRIMITIVE[name] + + def encode(x): + sc = StringCodec() + sc.write_primitive(type, x) + return sc.encoded + + def decode(x): + sc = StringCodec(x) + return sc.read_primitive(type) + + return encode, decode + +# XXX: need to correctly parse the mime type and deal with +# content-encoding header + +TYPE_MAPPINGS={ + dict: "amqp/map", + list: "amqp/list", + unicode: "text/plain; charset=utf8", + unicode: "text/plain", + buffer: None, + str: None, + None.__class__: None + } + +TYPE_CODEC={ + "amqp/map": codec("map"), + "amqp/list": codec("list"), + "text/plain; charset=utf8": (lambda x: x.encode("utf8"), lambda x: x.decode("utf8")), + "text/plain": (lambda x: x.encode("utf8"), lambda x: x.decode("utf8")), + "": (lambda x: x, lambda x: x), + None: (lambda x: x, lambda x: x) + } + +def get_type(content): + return TYPE_MAPPINGS[content.__class__] + +def get_codec(content_type): + return TYPE_CODEC[content_type] + +UNSPECIFIED = object() + +class Message: + + """ + A message consists of a standard set of fields, an application + defined set of properties, and some content. + + @type id: str + @ivar id: the message id + @type user_id: ??? + @ivar user_id: the user-id of the message producer + @type to: ??? + @ivar to: ??? + @type reply_to: ??? + @ivar reply_to: ??? + @type correlation_id: str + @ivar correlation_id: a correlation-id for the message + @type properties: dict + @ivar properties: application specific message properties + @type content_type: str + @ivar content_type: the content-type of the message + @type content: str, unicode, buffer, dict, list + @ivar content: the message content + """ + + def __init__(self, content=None, content_type=UNSPECIFIED, id=None, + subject=None, to=None, user_id=None, reply_to=None, + correlation_id=None, durable=None, properties=None): + """ + Construct a new message with the supplied content. The + content-type of the message will be automatically inferred from + type of the content parameter. + + @type content: str, unicode, buffer, dict, list + @param content: the message content + + @type content_type: str + @param content_type: the content-type of the message + """ + self.id = id + self.subject = subject + self.to = to + self.user_id = user_id + self.reply_to = reply_to + self.correlation_id = correlation_id + self.durable = durable + self.redelivered = False + if properties is None: + self.properties = {} + else: + self.properties = properties + if content_type is UNSPECIFIED: + self.content_type = get_type(content) + else: + self.content_type = content_type + self.content = content + + def __repr__(self): + args = [] + for name in ["id", "subject", "to", "user_id", "reply_to", + "correlation_id"]: + value = self.__dict__[name] + if value is not None: args.append("%s=%r" % (name, value)) + for name in ["durable", "properties"]: + value = self.__dict__[name] + if value: args.append("%s=%r" % (name, value)) + if self.content_type != get_type(self.content): + args.append("content_type=%r" % self.content_type) + if self.content is not None: + if args: + args.append("content=%r" % self.content) + else: + args.append(repr(self.content)) + return "Message(%s)" % ", ".join(args) + +__all__ = ["Connection", "Session", "Sender", "Receiver", "Pattern", "Message", + "ConnectionError", "ConnectError", "SessionError", "Disconnected", + "SendError", "InsufficientCapacity", "ReceiveError", "Empty", + "timestamp", "uuid4", "UNLIMITED", "AMQP_PORT", "AMQPS_PORT"] |