summaryrefslogtreecommitdiff
path: root/qpid/python/qpid/messaging/endpoints.py
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/python/qpid/messaging/endpoints.py')
-rw-r--r--qpid/python/qpid/messaging/endpoints.py1046
1 files changed, 1046 insertions, 0 deletions
diff --git a/qpid/python/qpid/messaging/endpoints.py b/qpid/python/qpid/messaging/endpoints.py
new file mode 100644
index 0000000000..338ac70ecf
--- /dev/null
+++ b/qpid/python/qpid/messaging/endpoints.py
@@ -0,0 +1,1046 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+"""
+A candidate high level messaging API for python.
+
+Areas that still need work:
+
+ - definition of the arguments for L{Session.sender} and L{Session.receiver}
+ - standard L{Message} properties
+ - L{Message} content encoding
+ - protocol negotiation/multiprotocol impl
+"""
+
+from logging import getLogger
+from math import ceil
+from qpid.codec010 import StringCodec
+from qpid.concurrency import synchronized, Waiter, Condition
+from qpid.datatypes import Serial, uuid4
+from qpid.messaging.constants import *
+from qpid.messaging.exceptions import *
+from qpid.messaging.message import *
+from qpid.ops import PRIMITIVE
+from qpid.util import default, URL
+from threading import Thread, RLock
+
+log = getLogger("qpid.messaging")
+
+static = staticmethod
+
+class Endpoint:
+
+ def _ecwait(self, predicate, timeout=None):
+ result = self._ewait(lambda: self.closed or predicate(), timeout)
+ self.check_closed()
+ return result
+
+class Connection(Endpoint):
+
+ """
+ A Connection manages a group of L{Sessions<Session>} and connects
+ them with a remote endpoint.
+ """
+
+ @static
+ def establish(url=None, **options):
+ """
+ Constructs a L{Connection} with the supplied parameters and opens
+ it.
+ """
+ conn = Connection(url, **options)
+ conn.open()
+ return conn
+
+ def __init__(self, url=None, **options):
+ """
+ Creates a connection. A newly created connection must be connected
+ with the Connection.connect() method before it can be used.
+
+ @type url: str
+ @param url: [ <username> [ / <password> ] @ ] <host> [ : <port> ]
+ @type host: str
+ @param host: the name or ip address of the remote host (overriden by url)
+ @type port: int
+ @param port: the port number of the remote host (overriden by url)
+ @type transport: str
+ @param transport: one of tcp, tcp+tls, or ssl (alias for tcp+tls)
+ @type heartbeat: int
+ @param heartbeat: heartbeat interval in seconds
+
+ @type username: str
+ @param username: the username for authentication (overriden by url)
+ @type password: str
+ @param password: the password for authentication (overriden by url)
+
+ @type sasl_mechanisms: str
+ @param sasl_mechanisms: space separated list of permitted sasl mechanisms
+ @type sasl_service: str
+ @param sasl_service: ???
+ @type sasl_min_ssf: ???
+ @param sasl_min_ssf: ???
+ @type sasl_max_ssf: ???
+ @param sasl_max_ssf: ???
+
+ @type reconnect: bool
+ @param reconnect: enable/disable automatic reconnect
+ @type reconnect_timeout: float
+ @param reconnect_timeout: total time to attempt reconnect
+ @type reconnect_internal_min: float
+ @param reconnect_internal_min: minimum interval between reconnect attempts
+ @type reconnect_internal_max: float
+ @param reconnect_internal_max: maximum interval between reconnect attempts
+ @type reconnect_internal: float
+ @param reconnect_interval: set both min and max reconnect intervals
+ @type reconnect_limit: int
+ @param reconnect_limit: limit the total number of reconnect attempts
+ @type reconnect_urls: list[str]
+ @param reconnect_urls: list of backup hosts specified as urls
+
+ @type address_ttl: float
+ @param address_ttl: time until cached address resolution expires
+
+ @rtype: Connection
+ @return: a disconnected Connection
+ """
+ if url is None:
+ url = options.get("host")
+ if isinstance(url, basestring):
+ url = URL(url)
+ self.host = url.host
+ if options.has_key("transport"):
+ self.transport = options.get("transport")
+ elif url.scheme == url.AMQP:
+ self.transport = "tcp"
+ elif url.scheme == url.AMQPS:
+ self.transport = "ssl"
+ else:
+ self.transport = "tcp"
+ if self.transport in ("ssl", "tcp+tls"):
+ self.port = default(url.port, options.get("port", AMQPS_PORT))
+ else:
+ self.port = default(url.port, options.get("port", AMQP_PORT))
+ self.heartbeat = options.get("heartbeat")
+ self.username = default(url.user, options.get("username", None))
+ self.password = default(url.password, options.get("password", None))
+ self.auth_username = None
+
+ self.sasl_mechanisms = options.get("sasl_mechanisms")
+ self.sasl_service = options.get("sasl_service", "qpidd")
+ self.sasl_min_ssf = options.get("sasl_min_ssf")
+ self.sasl_max_ssf = options.get("sasl_max_ssf")
+
+ self.reconnect = options.get("reconnect", False)
+ self.reconnect_timeout = options.get("reconnect_timeout")
+ reconnect_interval = options.get("reconnect_interval")
+ self.reconnect_interval_min = options.get("reconnect_interval_min",
+ default(reconnect_interval, 1))
+ self.reconnect_interval_max = options.get("reconnect_interval_max",
+ default(reconnect_interval, 2*60))
+ self.reconnect_limit = options.get("reconnect_limit")
+ self.reconnect_urls = options.get("reconnect_urls", [])
+ self.reconnect_log = options.get("reconnect_log", True)
+
+ self.address_ttl = options.get("address_ttl", 60)
+ self.tcp_nodelay = options.get("tcp_nodelay", False)
+
+ self.options = options
+
+
+ self.id = str(uuid4())
+ self.session_counter = 0
+ self.sessions = {}
+ self._open = False
+ self._connected = False
+ self._transport_connected = False
+ self._lock = RLock()
+ self._condition = Condition(self._lock)
+ self._waiter = Waiter(self._condition)
+ self._modcount = Serial(0)
+ self.error = None
+ from driver import Driver
+ self._driver = Driver(self)
+
+ def _wait(self, predicate, timeout=None):
+ return self._waiter.wait(predicate, timeout=timeout)
+
+ def _wakeup(self):
+ self._modcount += 1
+ self._driver.wakeup()
+
+ def check_error(self):
+ if self.error:
+ self._condition.gc()
+ raise self.error
+
+ def get_error(self):
+ return self.error
+
+ def _ewait(self, predicate, timeout=None):
+ result = self._wait(lambda: self.error or predicate(), timeout)
+ self.check_error()
+ return result
+
+ def check_closed(self):
+ if not self._connected:
+ self._condition.gc()
+ raise ConnectionClosed()
+
+ @synchronized
+ def session(self, name=None, transactional=False):
+ """
+ Creates or retrieves the named session. If the name is omitted or
+ None, then a unique name is chosen based on a randomly generated
+ uuid.
+
+ @type name: str
+ @param name: the session name
+ @rtype: Session
+ @return: the named Session
+ """
+
+ if name is None:
+ name = "%s:%s" % (self.id, self.session_counter)
+ self.session_counter += 1
+ else:
+ name = "%s:%s" % (self.id, name)
+
+ if self.sessions.has_key(name):
+ return self.sessions[name]
+ else:
+ ssn = Session(self, name, transactional)
+ self.sessions[name] = ssn
+ self._wakeup()
+ return ssn
+
+ @synchronized
+ def _remove_session(self, ssn):
+ self.sessions.pop(ssn.name, 0)
+
+ @synchronized
+ def open(self):
+ """
+ Opens a connection.
+ """
+ if self._open:
+ raise ConnectionError("already open")
+ self._open = True
+ self.attach()
+
+ @synchronized
+ def opened(self):
+ """
+ Return true if the connection is open, false otherwise.
+ """
+ return self._open
+
+ @synchronized
+ def attach(self):
+ """
+ Attach to the remote endpoint.
+ """
+ if not self._connected:
+ self._connected = True
+ self._driver.start()
+ self._wakeup()
+ self._ewait(lambda: self._transport_connected and not self._unlinked())
+
+ def _unlinked(self):
+ return [l
+ for ssn in self.sessions.values()
+ if not (ssn.error or ssn.closed)
+ for l in ssn.senders + ssn.receivers
+ if not (l.linked or l.error or l.closed)]
+
+ @synchronized
+ def detach(self, timeout=None):
+ """
+ Detach from the remote endpoint.
+ """
+ if self._connected:
+ self._connected = False
+ self._wakeup()
+ cleanup = True
+ else:
+ cleanup = False
+ try:
+ if not self._wait(lambda: not self._transport_connected, timeout=timeout):
+ raise Timeout("detach timed out")
+ finally:
+ if cleanup:
+ self._driver.stop()
+ self._condition.gc()
+
+ @synchronized
+ def attached(self):
+ """
+ Return true if the connection is attached, false otherwise.
+ """
+ return self._connected
+
+ @synchronized
+ def close(self, timeout=None):
+ """
+ Close the connection and all sessions.
+ """
+ try:
+ for ssn in self.sessions.values():
+ ssn.close(timeout=timeout)
+ finally:
+ self.detach(timeout=timeout)
+ self._open = False
+
+class Session(Endpoint):
+
+ """
+ Sessions provide a linear context for sending and receiving
+ L{Messages<Message>}. L{Messages<Message>} are sent and received
+ using the L{Sender.send} and L{Receiver.fetch} methods of the
+ L{Sender} and L{Receiver} objects associated with a Session.
+
+ Each L{Sender} and L{Receiver} is created by supplying either a
+ target or source address to the L{sender} and L{receiver} methods of
+ the Session. The address is supplied via a string syntax documented
+ below.
+
+ Addresses
+ =========
+
+ An address identifies a source or target for messages. In its
+ simplest form this is just a name. In general a target address may
+ also be used as a source address, however not all source addresses
+ may be used as a target, e.g. a source might additionally have some
+ filtering criteria that would not be present in a target.
+
+ A subject may optionally be specified along with the name. When an
+ address is used as a target, any subject specified in the address is
+ used as the default subject of outgoing messages for that target.
+ When an address is used as a source, any subject specified in the
+ address is pattern matched against the subject of available messages
+ as a filter for incoming messages from that source.
+
+ The options map contains additional information about the address
+ including:
+
+ - policies for automatically creating, and deleting the node to
+ which an address refers
+
+ - policies for asserting facts about the node to which an address
+ refers
+
+ - extension points that can be used for sender/receiver
+ configuration
+
+ Mapping to AMQP 0-10
+ --------------------
+ The name is resolved to either an exchange or a queue by querying
+ the broker.
+
+ The subject is set as a property on the message. Additionally, if
+ the name refers to an exchange, the routing key is set to the
+ subject.
+
+ Syntax
+ ------
+ The following regular expressions define the tokens used to parse
+ addresses::
+ LBRACE: \\{
+ RBRACE: \\}
+ LBRACK: \\[
+ RBRACK: \\]
+ COLON: :
+ SEMI: ;
+ SLASH: /
+ COMMA: ,
+ NUMBER: [+-]?[0-9]*\\.?[0-9]+
+ ID: [a-zA-Z_](?:[a-zA-Z0-9_-]*[a-zA-Z0-9_])?
+ STRING: "(?:[^\\\\"]|\\\\.)*"|\'(?:[^\\\\\']|\\\\.)*\'
+ ESC: \\\\[^ux]|\\\\x[0-9a-fA-F][0-9a-fA-F]|\\\\u[0-9a-fA-F][0-9a-fA-F][0-9a-fA-F][0-9a-fA-F]
+ SYM: [.#*%@$^!+-]
+ WSPACE: [ \\n\\r\\t]+
+
+ The formal grammar for addresses is given below::
+ address = name [ "/" subject ] [ ";" options ]
+ name = ( part | quoted )+
+ subject = ( part | quoted | "/" )*
+ quoted = STRING / ESC
+ part = LBRACE / RBRACE / COLON / COMMA / NUMBER / ID / SYM
+ options = map
+ map = "{" ( keyval ( "," keyval )* )? "}"
+ keyval = ID ":" value
+ value = NUMBER / STRING / ID / map / list
+ list = "[" ( value ( "," value )* )? "]"
+
+ This grammar resuls in the following informal syntax::
+
+ <name> [ / <subject> ] [ ; <options> ]
+
+ Where options is::
+
+ { <key> : <value>, ... }
+
+ And values may be:
+ - numbers
+ - single, double, or non quoted strings
+ - maps (dictionaries)
+ - lists
+
+ Options
+ -------
+ The options map permits the following parameters::
+
+ <name> [ / <subject> ] ; {
+ create: always | sender | receiver | never,
+ delete: always | sender | receiver | never,
+ assert: always | sender | receiver | never,
+ mode: browse | consume,
+ node: {
+ type: queue | topic,
+ durable: True | False,
+ x-declare: { ... <declare-overrides> ... },
+ x-bindings: [<binding_1>, ... <binding_n>]
+ },
+ link: {
+ name: <link-name>,
+ durable: True | False,
+ reliability: unreliable | at-most-once | at-least-once | exactly-once,
+ x-declare: { ... <declare-overrides> ... },
+ x-bindings: [<binding_1>, ... <binding_n>],
+ x-subscribe: { ... <subscribe-overrides> ... }
+ }
+ }
+
+ Bindings are specified as a map with the following options::
+
+ {
+ exchange: <exchange>,
+ queue: <queue>,
+ key: <key>,
+ arguments: <arguments>
+ }
+
+ The create, delete, and assert policies specify who should perfom
+ the associated action:
+
+ - I{always}: the action will always be performed
+ - I{sender}: the action will only be performed by the sender
+ - I{receiver}: the action will only be performed by the receiver
+ - I{never}: the action will never be performed (this is the default)
+
+ The node-type is one of:
+
+ - I{topic}: a topic node will default to the topic exchange,
+ x-declare may be used to specify other exchange types
+ - I{queue}: this is the default node-type
+
+ The x-declare map permits protocol specific keys and values to be
+ specified when exchanges or queues are declared. These keys and
+ values are passed through when creating a node or asserting facts
+ about an existing node.
+
+ Examples
+ --------
+ A simple name resolves to any named node, usually a queue or a
+ topic::
+
+ my-queue-or-topic
+
+ A simple name with a subject will also resolve to a node, but the
+ presence of the subject will cause a sender using this address to
+ set the subject on outgoing messages, and receivers to filter based
+ on the subject::
+
+ my-queue-or-topic/my-subject
+
+ A subject pattern can be used and will cause filtering if used by
+ the receiver. If used for a sender, the literal value gets set as
+ the subject::
+
+ my-queue-or-topic/my-*
+
+ In all the above cases, the address is resolved to an existing node.
+ If you want the node to be auto-created, then you can do the
+ following. By default nonexistent nodes are assumed to be queues::
+
+ my-queue; {create: always}
+
+ You can customize the properties of the queue::
+
+ my-queue; {create: always, node: {durable: True}}
+
+ You can create a topic instead if you want::
+
+ my-queue; {create: always, node: {type: topic}}
+
+ You can assert that the address resolves to a node with particular
+ properties::
+
+ my-transient-topic; {
+ assert: always,
+ node: {
+ type: topic,
+ durable: False
+ }
+ }
+ """
+
+ def __init__(self, connection, name, transactional):
+ self.connection = connection
+ self.name = name
+ self.log_id = "%x" % id(self)
+
+ self.transactional = transactional
+
+ self.committing = False
+ self.committed = True
+ self.aborting = False
+ self.aborted = False
+
+ self.next_sender_id = 0
+ self.senders = []
+ self.next_receiver_id = 0
+ self.receivers = []
+ self.outgoing = []
+ self.incoming = []
+ self.unacked = []
+ self.acked = []
+ # XXX: I hate this name.
+ self.ack_capacity = UNLIMITED
+
+ self.error = None
+ self.closing = False
+ self.closed = False
+
+ self._lock = connection._lock
+
+ def __repr__(self):
+ return "<Session %s>" % self.name
+
+ def _wait(self, predicate, timeout=None):
+ return self.connection._wait(predicate, timeout=timeout)
+
+ def _wakeup(self):
+ self.connection._wakeup()
+
+ def check_error(self):
+ self.connection.check_error()
+ if self.error:
+ raise self.error
+
+ def get_error(self):
+ err = self.connection.get_error()
+ if err:
+ return err
+ else:
+ return self.error
+
+ def _ewait(self, predicate, timeout=None):
+ result = self.connection._ewait(lambda: self.error or predicate(), timeout)
+ self.check_error()
+ return result
+
+ def check_closed(self):
+ if self.closed:
+ raise SessionClosed()
+
+ @synchronized
+ def sender(self, target, **options):
+ """
+ Creates a L{Sender} that may be used to send L{Messages<Message>}
+ to the specified target.
+
+ @type target: str
+ @param target: the target to which messages will be sent
+ @rtype: Sender
+ @return: a new Sender for the specified target
+ """
+ target = _mangle(target)
+ sender = Sender(self, self.next_sender_id, target, options)
+ self.next_sender_id += 1
+ self.senders.append(sender)
+ if not self.closed and self.connection._connected:
+ self._wakeup()
+ try:
+ sender._ewait(lambda: sender.linked)
+ except LinkError, e:
+ sender.close()
+ raise e
+ return sender
+
+ @synchronized
+ def receiver(self, source, **options):
+ """
+ Creates a receiver that may be used to fetch L{Messages<Message>}
+ from the specified source.
+
+ @type source: str
+ @param source: the source of L{Messages<Message>}
+ @rtype: Receiver
+ @return: a new Receiver for the specified source
+ """
+ source = _mangle(source)
+ receiver = Receiver(self, self.next_receiver_id, source, options)
+ self.next_receiver_id += 1
+ self.receivers.append(receiver)
+ if not self.closed and self.connection._connected:
+ self._wakeup()
+ try:
+ receiver._ewait(lambda: receiver.linked)
+ except LinkError, e:
+ receiver.close()
+ raise e
+ return receiver
+
+ @synchronized
+ def _count(self, predicate):
+ result = 0
+ for msg in self.incoming:
+ if predicate(msg):
+ result += 1
+ return result
+
+ def _peek(self, receiver):
+ for msg in self.incoming:
+ if msg._receiver == receiver:
+ return msg
+
+ def _pop(self, receiver):
+ i = 0
+ while i < len(self.incoming):
+ msg = self.incoming[i]
+ if msg._receiver == receiver:
+ del self.incoming[i]
+ return msg
+ else:
+ i += 1
+
+ @synchronized
+ def _get(self, receiver, timeout=None):
+ if self._ewait(lambda: ((self._peek(receiver) is not None) or
+ self.closing or receiver.closed),
+ timeout):
+ msg = self._pop(receiver)
+ if msg is not None:
+ msg._receiver.returned += 1
+ self.unacked.append(msg)
+ log.debug("RETR[%s]: %s", self.log_id, msg)
+ return msg
+ return None
+
+ @synchronized
+ def next_receiver(self, timeout=None):
+ if self._ecwait(lambda: self.incoming, timeout):
+ return self.incoming[0]._receiver
+ else:
+ raise Empty
+
+ @synchronized
+ def acknowledge(self, message=None, disposition=None, sync=True):
+ """
+ Acknowledge the given L{Message}. If message is None, then all
+ unacknowledged messages on the session are acknowledged.
+
+ @type message: Message
+ @param message: the message to acknowledge or None
+ @type sync: boolean
+ @param sync: if true then block until the message(s) are acknowledged
+ """
+ if message is None:
+ messages = self.unacked[:]
+ else:
+ messages = [message]
+
+ for m in messages:
+ if self.ack_capacity is not UNLIMITED:
+ if self.ack_capacity <= 0:
+ # XXX: this is currently a SendError, maybe it should be a SessionError?
+ raise InsufficientCapacity("ack_capacity = %s" % self.ack_capacity)
+ self._wakeup()
+ self._ecwait(lambda: len(self.acked) < self.ack_capacity)
+ m._disposition = disposition
+ self.unacked.remove(m)
+ self.acked.append(m)
+
+ self._wakeup()
+ if sync:
+ self._ecwait(lambda: not [m for m in messages if m in self.acked])
+
+ @synchronized
+ def commit(self):
+ """
+ Commit outstanding transactional work. This consists of all
+ message sends and receives since the prior commit or rollback.
+ """
+ if not self.transactional:
+ raise NontransactionalSession()
+ self.committing = True
+ self._wakeup()
+ self._ecwait(lambda: not self.committing)
+ if self.aborted:
+ raise TransactionAborted()
+ assert self.committed
+
+ @synchronized
+ def rollback(self):
+ """
+ Rollback outstanding transactional work. This consists of all
+ message sends and receives since the prior commit or rollback.
+ """
+ if not self.transactional:
+ raise NontransactionalSession()
+ self.aborting = True
+ self._wakeup()
+ self._ecwait(lambda: not self.aborting)
+ assert self.aborted
+
+ @synchronized
+ def sync(self, timeout=None):
+ """
+ Sync the session.
+ """
+ for snd in self.senders:
+ snd.sync(timeout=timeout)
+ if not self._ewait(lambda: not self.outgoing and not self.acked, timeout=timeout):
+ raise Timeout("session sync timed out")
+
+ @synchronized
+ def close(self, timeout=None):
+ """
+ Close the session.
+ """
+ self.sync(timeout=timeout)
+
+ for link in self.receivers + self.senders:
+ link.close(timeout=timeout)
+
+ if not self.closing:
+ self.closing = True
+ self._wakeup()
+
+ try:
+ if not self._ewait(lambda: self.closed, timeout=timeout):
+ raise Timeout("session close timed out")
+ finally:
+ self.connection._remove_session(self)
+
+def _mangle(addr):
+ if addr and addr.startswith("#"):
+ return str(uuid4()) + addr
+ else:
+ return addr
+
+class Sender(Endpoint):
+
+ """
+ Sends outgoing messages.
+ """
+
+ def __init__(self, session, id, target, options):
+ self.session = session
+ self.id = id
+ self.target = target
+ self.options = options
+ self.capacity = options.get("capacity", UNLIMITED)
+ self.threshold = 0.5
+ self.durable = options.get("durable")
+ self.queued = Serial(0)
+ self.synced = Serial(0)
+ self.acked = Serial(0)
+ self.error = None
+ self.linked = False
+ self.closing = False
+ self.closed = False
+ self._lock = self.session._lock
+
+ def _wakeup(self):
+ self.session._wakeup()
+
+ def check_error(self):
+ self.session.check_error()
+ if self.error:
+ raise self.error
+
+ def get_error(self):
+ err = self.session.get_error()
+ if err:
+ return err
+ else:
+ return self.error
+
+ def _ewait(self, predicate, timeout=None):
+ result = self.session._ewait(lambda: self.error or predicate(), timeout)
+ self.check_error()
+ return result
+
+ def check_closed(self):
+ if self.closed:
+ raise LinkClosed()
+
+ @synchronized
+ def unsettled(self):
+ """
+ Returns the number of messages awaiting acknowledgment.
+ @rtype: int
+ @return: the number of unacknowledged messages
+ """
+ return self.queued - self.acked
+
+ @synchronized
+ def available(self):
+ if self.capacity is UNLIMITED:
+ return UNLIMITED
+ else:
+ return self.capacity - self.unsettled()
+
+ @synchronized
+ def send(self, object, sync=True, timeout=None):
+ """
+ Send a message. If the object passed in is of type L{unicode},
+ L{str}, L{list}, or L{dict}, it will automatically be wrapped in a
+ L{Message} and sent. If it is of type L{Message}, it will be sent
+ directly. If the sender capacity is not L{UNLIMITED} then send
+ will block until there is available capacity to send the message.
+ If the timeout parameter is specified, then send will throw an
+ L{InsufficientCapacity} exception if capacity does not become
+ available within the specified time.
+
+ @type object: unicode, str, list, dict, Message
+ @param object: the message or content to send
+
+ @type sync: boolean
+ @param sync: if true then block until the message is sent
+
+ @type timeout: float
+ @param timeout: the time to wait for available capacity
+ """
+
+ if not self.session.connection._connected or self.session.closing:
+ raise Detached()
+
+ self._ecwait(lambda: self.linked)
+
+ if isinstance(object, Message):
+ message = object
+ else:
+ message = Message(object)
+
+ if message.durable is None:
+ message.durable = self.durable
+
+ if self.capacity is not UNLIMITED:
+ if self.capacity <= 0:
+ raise InsufficientCapacity("capacity = %s" % self.capacity)
+ if not self._ecwait(self.available, timeout=timeout):
+ raise InsufficientCapacity("capacity = %s" % self.capacity)
+
+ # XXX: what if we send the same message to multiple senders?
+ message._sender = self
+ if self.capacity is not UNLIMITED:
+ message._sync = sync or self.available() <= int(ceil(self.threshold*self.capacity))
+ else:
+ message._sync = sync
+ self.session.outgoing.append(message)
+ self.queued += 1
+
+ if sync:
+ self.sync()
+ assert message not in self.session.outgoing
+ else:
+ self._wakeup()
+
+ @synchronized
+ def sync(self, timeout=None):
+ mno = self.queued
+ if self.synced < mno:
+ self.synced = mno
+ self._wakeup()
+ if not self._ewait(lambda: self.acked >= mno, timeout=timeout):
+ raise Timeout("sender sync timed out")
+
+ @synchronized
+ def close(self, timeout=None):
+ """
+ Close the Sender.
+ """
+ # avoid erroring out when closing a sender that was never
+ # established
+ if self.acked < self.queued:
+ self.sync(timeout=timeout)
+
+ if not self.closing:
+ self.closing = True
+ self._wakeup()
+
+ try:
+ if not self.session._ewait(lambda: self.closed, timeout=timeout):
+ raise Timeout("sender close timed out")
+ finally:
+ try:
+ self.session.senders.remove(self)
+ except ValueError:
+ pass
+
+class Receiver(Endpoint, object):
+
+ """
+ Receives incoming messages from a remote source. Messages may be
+ fetched with L{fetch}.
+ """
+
+ def __init__(self, session, id, source, options):
+ self.session = session
+ self.id = id
+ self.source = source
+ self.options = options
+
+ self.granted = Serial(0)
+ self.draining = False
+ self.impending = Serial(0)
+ self.received = Serial(0)
+ self.returned = Serial(0)
+
+ self.error = None
+ self.linked = False
+ self.closing = False
+ self.closed = False
+ self._lock = self.session._lock
+ self._capacity = 0
+ self._set_capacity(options.get("capacity", 0), False)
+ self.threshold = 0.5
+
+ @synchronized
+ def _set_capacity(self, c, wakeup=True):
+ if c is UNLIMITED:
+ self._capacity = c.value
+ else:
+ self._capacity = c
+ self._grant()
+ if wakeup:
+ self._wakeup()
+
+ def _get_capacity(self):
+ if self._capacity == UNLIMITED.value:
+ return UNLIMITED
+ else:
+ return self._capacity
+
+ capacity = property(_get_capacity, _set_capacity)
+
+ def _wakeup(self):
+ self.session._wakeup()
+
+ def check_error(self):
+ self.session.check_error()
+ if self.error:
+ raise self.error
+
+ def get_error(self):
+ err = self.session.get_error()
+ if err:
+ return err
+ else:
+ return self.error
+
+ def _ewait(self, predicate, timeout=None):
+ result = self.session._ewait(lambda: self.error or predicate(), timeout)
+ self.check_error()
+ return result
+
+ def check_closed(self):
+ if self.closed:
+ raise LinkClosed()
+
+ @synchronized
+ def unsettled(self):
+ """
+ Returns the number of acknowledged messages awaiting confirmation.
+ """
+ return len([m for m in self.acked if m._receiver is self])
+
+ @synchronized
+ def available(self):
+ """
+ Returns the number of messages available to be fetched by the
+ application.
+
+ @rtype: int
+ @return: the number of available messages
+ """
+ return self.received - self.returned
+
+ @synchronized
+ def fetch(self, timeout=None):
+ """
+ Fetch and return a single message. A timeout of None will block
+ forever waiting for a message to arrive, a timeout of zero will
+ return immediately if no messages are available.
+
+ @type timeout: float
+ @param timeout: the time to wait for a message to be available
+ """
+
+ self._ecwait(lambda: self.linked)
+
+ if self._capacity == 0:
+ self.granted = self.returned + 1
+ self._wakeup()
+ self._ecwait(lambda: self.impending >= self.granted)
+ msg = self.session._get(self, timeout=timeout)
+ if msg is None:
+ self.check_closed()
+ self.draining = True
+ self._wakeup()
+ self._ecwait(lambda: not self.draining)
+ msg = self.session._get(self, timeout=0)
+ self._grant()
+ self._wakeup()
+ if msg is None:
+ raise Empty()
+ elif self._capacity not in (0, UNLIMITED.value):
+ t = int(ceil(self.threshold * self._capacity))
+ if self.received - self.returned <= t:
+ self.granted = self.returned + self._capacity
+ self._wakeup()
+ return msg
+
+ def _grant(self):
+ if self._capacity == UNLIMITED.value:
+ self.granted = UNLIMITED
+ else:
+ self.granted = self.returned + self._capacity
+
+ @synchronized
+ def close(self, timeout=None):
+ """
+ Close the receiver.
+ """
+ if not self.closing:
+ self.closing = True
+ self._wakeup()
+
+ try:
+ if not self.session._ewait(lambda: self.closed, timeout=timeout):
+ raise Timeout("receiver close timed out")
+ finally:
+ try:
+ self.session.receivers.remove(self)
+ except ValueError:
+ pass
+
+__all__ = ["Connection", "Session", "Sender", "Receiver"]