diff options
Diffstat (limited to 'qpid/python/qpid/messaging/endpoints.py')
-rw-r--r-- | qpid/python/qpid/messaging/endpoints.py | 1046 |
1 files changed, 1046 insertions, 0 deletions
diff --git a/qpid/python/qpid/messaging/endpoints.py b/qpid/python/qpid/messaging/endpoints.py new file mode 100644 index 0000000000..338ac70ecf --- /dev/null +++ b/qpid/python/qpid/messaging/endpoints.py @@ -0,0 +1,1046 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +""" +A candidate high level messaging API for python. + +Areas that still need work: + + - definition of the arguments for L{Session.sender} and L{Session.receiver} + - standard L{Message} properties + - L{Message} content encoding + - protocol negotiation/multiprotocol impl +""" + +from logging import getLogger +from math import ceil +from qpid.codec010 import StringCodec +from qpid.concurrency import synchronized, Waiter, Condition +from qpid.datatypes import Serial, uuid4 +from qpid.messaging.constants import * +from qpid.messaging.exceptions import * +from qpid.messaging.message import * +from qpid.ops import PRIMITIVE +from qpid.util import default, URL +from threading import Thread, RLock + +log = getLogger("qpid.messaging") + +static = staticmethod + +class Endpoint: + + def _ecwait(self, predicate, timeout=None): + result = self._ewait(lambda: self.closed or predicate(), timeout) + self.check_closed() + return result + +class Connection(Endpoint): + + """ + A Connection manages a group of L{Sessions<Session>} and connects + them with a remote endpoint. + """ + + @static + def establish(url=None, **options): + """ + Constructs a L{Connection} with the supplied parameters and opens + it. + """ + conn = Connection(url, **options) + conn.open() + return conn + + def __init__(self, url=None, **options): + """ + Creates a connection. A newly created connection must be connected + with the Connection.connect() method before it can be used. + + @type url: str + @param url: [ <username> [ / <password> ] @ ] <host> [ : <port> ] + @type host: str + @param host: the name or ip address of the remote host (overriden by url) + @type port: int + @param port: the port number of the remote host (overriden by url) + @type transport: str + @param transport: one of tcp, tcp+tls, or ssl (alias for tcp+tls) + @type heartbeat: int + @param heartbeat: heartbeat interval in seconds + + @type username: str + @param username: the username for authentication (overriden by url) + @type password: str + @param password: the password for authentication (overriden by url) + + @type sasl_mechanisms: str + @param sasl_mechanisms: space separated list of permitted sasl mechanisms + @type sasl_service: str + @param sasl_service: ??? + @type sasl_min_ssf: ??? + @param sasl_min_ssf: ??? + @type sasl_max_ssf: ??? + @param sasl_max_ssf: ??? + + @type reconnect: bool + @param reconnect: enable/disable automatic reconnect + @type reconnect_timeout: float + @param reconnect_timeout: total time to attempt reconnect + @type reconnect_internal_min: float + @param reconnect_internal_min: minimum interval between reconnect attempts + @type reconnect_internal_max: float + @param reconnect_internal_max: maximum interval between reconnect attempts + @type reconnect_internal: float + @param reconnect_interval: set both min and max reconnect intervals + @type reconnect_limit: int + @param reconnect_limit: limit the total number of reconnect attempts + @type reconnect_urls: list[str] + @param reconnect_urls: list of backup hosts specified as urls + + @type address_ttl: float + @param address_ttl: time until cached address resolution expires + + @rtype: Connection + @return: a disconnected Connection + """ + if url is None: + url = options.get("host") + if isinstance(url, basestring): + url = URL(url) + self.host = url.host + if options.has_key("transport"): + self.transport = options.get("transport") + elif url.scheme == url.AMQP: + self.transport = "tcp" + elif url.scheme == url.AMQPS: + self.transport = "ssl" + else: + self.transport = "tcp" + if self.transport in ("ssl", "tcp+tls"): + self.port = default(url.port, options.get("port", AMQPS_PORT)) + else: + self.port = default(url.port, options.get("port", AMQP_PORT)) + self.heartbeat = options.get("heartbeat") + self.username = default(url.user, options.get("username", None)) + self.password = default(url.password, options.get("password", None)) + self.auth_username = None + + self.sasl_mechanisms = options.get("sasl_mechanisms") + self.sasl_service = options.get("sasl_service", "qpidd") + self.sasl_min_ssf = options.get("sasl_min_ssf") + self.sasl_max_ssf = options.get("sasl_max_ssf") + + self.reconnect = options.get("reconnect", False) + self.reconnect_timeout = options.get("reconnect_timeout") + reconnect_interval = options.get("reconnect_interval") + self.reconnect_interval_min = options.get("reconnect_interval_min", + default(reconnect_interval, 1)) + self.reconnect_interval_max = options.get("reconnect_interval_max", + default(reconnect_interval, 2*60)) + self.reconnect_limit = options.get("reconnect_limit") + self.reconnect_urls = options.get("reconnect_urls", []) + self.reconnect_log = options.get("reconnect_log", True) + + self.address_ttl = options.get("address_ttl", 60) + self.tcp_nodelay = options.get("tcp_nodelay", False) + + self.options = options + + + self.id = str(uuid4()) + self.session_counter = 0 + self.sessions = {} + self._open = False + self._connected = False + self._transport_connected = False + self._lock = RLock() + self._condition = Condition(self._lock) + self._waiter = Waiter(self._condition) + self._modcount = Serial(0) + self.error = None + from driver import Driver + self._driver = Driver(self) + + def _wait(self, predicate, timeout=None): + return self._waiter.wait(predicate, timeout=timeout) + + def _wakeup(self): + self._modcount += 1 + self._driver.wakeup() + + def check_error(self): + if self.error: + self._condition.gc() + raise self.error + + def get_error(self): + return self.error + + def _ewait(self, predicate, timeout=None): + result = self._wait(lambda: self.error or predicate(), timeout) + self.check_error() + return result + + def check_closed(self): + if not self._connected: + self._condition.gc() + raise ConnectionClosed() + + @synchronized + def session(self, name=None, transactional=False): + """ + Creates or retrieves the named session. If the name is omitted or + None, then a unique name is chosen based on a randomly generated + uuid. + + @type name: str + @param name: the session name + @rtype: Session + @return: the named Session + """ + + if name is None: + name = "%s:%s" % (self.id, self.session_counter) + self.session_counter += 1 + else: + name = "%s:%s" % (self.id, name) + + if self.sessions.has_key(name): + return self.sessions[name] + else: + ssn = Session(self, name, transactional) + self.sessions[name] = ssn + self._wakeup() + return ssn + + @synchronized + def _remove_session(self, ssn): + self.sessions.pop(ssn.name, 0) + + @synchronized + def open(self): + """ + Opens a connection. + """ + if self._open: + raise ConnectionError("already open") + self._open = True + self.attach() + + @synchronized + def opened(self): + """ + Return true if the connection is open, false otherwise. + """ + return self._open + + @synchronized + def attach(self): + """ + Attach to the remote endpoint. + """ + if not self._connected: + self._connected = True + self._driver.start() + self._wakeup() + self._ewait(lambda: self._transport_connected and not self._unlinked()) + + def _unlinked(self): + return [l + for ssn in self.sessions.values() + if not (ssn.error or ssn.closed) + for l in ssn.senders + ssn.receivers + if not (l.linked or l.error or l.closed)] + + @synchronized + def detach(self, timeout=None): + """ + Detach from the remote endpoint. + """ + if self._connected: + self._connected = False + self._wakeup() + cleanup = True + else: + cleanup = False + try: + if not self._wait(lambda: not self._transport_connected, timeout=timeout): + raise Timeout("detach timed out") + finally: + if cleanup: + self._driver.stop() + self._condition.gc() + + @synchronized + def attached(self): + """ + Return true if the connection is attached, false otherwise. + """ + return self._connected + + @synchronized + def close(self, timeout=None): + """ + Close the connection and all sessions. + """ + try: + for ssn in self.sessions.values(): + ssn.close(timeout=timeout) + finally: + self.detach(timeout=timeout) + self._open = False + +class Session(Endpoint): + + """ + Sessions provide a linear context for sending and receiving + L{Messages<Message>}. L{Messages<Message>} are sent and received + using the L{Sender.send} and L{Receiver.fetch} methods of the + L{Sender} and L{Receiver} objects associated with a Session. + + Each L{Sender} and L{Receiver} is created by supplying either a + target or source address to the L{sender} and L{receiver} methods of + the Session. The address is supplied via a string syntax documented + below. + + Addresses + ========= + + An address identifies a source or target for messages. In its + simplest form this is just a name. In general a target address may + also be used as a source address, however not all source addresses + may be used as a target, e.g. a source might additionally have some + filtering criteria that would not be present in a target. + + A subject may optionally be specified along with the name. When an + address is used as a target, any subject specified in the address is + used as the default subject of outgoing messages for that target. + When an address is used as a source, any subject specified in the + address is pattern matched against the subject of available messages + as a filter for incoming messages from that source. + + The options map contains additional information about the address + including: + + - policies for automatically creating, and deleting the node to + which an address refers + + - policies for asserting facts about the node to which an address + refers + + - extension points that can be used for sender/receiver + configuration + + Mapping to AMQP 0-10 + -------------------- + The name is resolved to either an exchange or a queue by querying + the broker. + + The subject is set as a property on the message. Additionally, if + the name refers to an exchange, the routing key is set to the + subject. + + Syntax + ------ + The following regular expressions define the tokens used to parse + addresses:: + LBRACE: \\{ + RBRACE: \\} + LBRACK: \\[ + RBRACK: \\] + COLON: : + SEMI: ; + SLASH: / + COMMA: , + NUMBER: [+-]?[0-9]*\\.?[0-9]+ + ID: [a-zA-Z_](?:[a-zA-Z0-9_-]*[a-zA-Z0-9_])? + STRING: "(?:[^\\\\"]|\\\\.)*"|\'(?:[^\\\\\']|\\\\.)*\' + ESC: \\\\[^ux]|\\\\x[0-9a-fA-F][0-9a-fA-F]|\\\\u[0-9a-fA-F][0-9a-fA-F][0-9a-fA-F][0-9a-fA-F] + SYM: [.#*%@$^!+-] + WSPACE: [ \\n\\r\\t]+ + + The formal grammar for addresses is given below:: + address = name [ "/" subject ] [ ";" options ] + name = ( part | quoted )+ + subject = ( part | quoted | "/" )* + quoted = STRING / ESC + part = LBRACE / RBRACE / COLON / COMMA / NUMBER / ID / SYM + options = map + map = "{" ( keyval ( "," keyval )* )? "}" + keyval = ID ":" value + value = NUMBER / STRING / ID / map / list + list = "[" ( value ( "," value )* )? "]" + + This grammar resuls in the following informal syntax:: + + <name> [ / <subject> ] [ ; <options> ] + + Where options is:: + + { <key> : <value>, ... } + + And values may be: + - numbers + - single, double, or non quoted strings + - maps (dictionaries) + - lists + + Options + ------- + The options map permits the following parameters:: + + <name> [ / <subject> ] ; { + create: always | sender | receiver | never, + delete: always | sender | receiver | never, + assert: always | sender | receiver | never, + mode: browse | consume, + node: { + type: queue | topic, + durable: True | False, + x-declare: { ... <declare-overrides> ... }, + x-bindings: [<binding_1>, ... <binding_n>] + }, + link: { + name: <link-name>, + durable: True | False, + reliability: unreliable | at-most-once | at-least-once | exactly-once, + x-declare: { ... <declare-overrides> ... }, + x-bindings: [<binding_1>, ... <binding_n>], + x-subscribe: { ... <subscribe-overrides> ... } + } + } + + Bindings are specified as a map with the following options:: + + { + exchange: <exchange>, + queue: <queue>, + key: <key>, + arguments: <arguments> + } + + The create, delete, and assert policies specify who should perfom + the associated action: + + - I{always}: the action will always be performed + - I{sender}: the action will only be performed by the sender + - I{receiver}: the action will only be performed by the receiver + - I{never}: the action will never be performed (this is the default) + + The node-type is one of: + + - I{topic}: a topic node will default to the topic exchange, + x-declare may be used to specify other exchange types + - I{queue}: this is the default node-type + + The x-declare map permits protocol specific keys and values to be + specified when exchanges or queues are declared. These keys and + values are passed through when creating a node or asserting facts + about an existing node. + + Examples + -------- + A simple name resolves to any named node, usually a queue or a + topic:: + + my-queue-or-topic + + A simple name with a subject will also resolve to a node, but the + presence of the subject will cause a sender using this address to + set the subject on outgoing messages, and receivers to filter based + on the subject:: + + my-queue-or-topic/my-subject + + A subject pattern can be used and will cause filtering if used by + the receiver. If used for a sender, the literal value gets set as + the subject:: + + my-queue-or-topic/my-* + + In all the above cases, the address is resolved to an existing node. + If you want the node to be auto-created, then you can do the + following. By default nonexistent nodes are assumed to be queues:: + + my-queue; {create: always} + + You can customize the properties of the queue:: + + my-queue; {create: always, node: {durable: True}} + + You can create a topic instead if you want:: + + my-queue; {create: always, node: {type: topic}} + + You can assert that the address resolves to a node with particular + properties:: + + my-transient-topic; { + assert: always, + node: { + type: topic, + durable: False + } + } + """ + + def __init__(self, connection, name, transactional): + self.connection = connection + self.name = name + self.log_id = "%x" % id(self) + + self.transactional = transactional + + self.committing = False + self.committed = True + self.aborting = False + self.aborted = False + + self.next_sender_id = 0 + self.senders = [] + self.next_receiver_id = 0 + self.receivers = [] + self.outgoing = [] + self.incoming = [] + self.unacked = [] + self.acked = [] + # XXX: I hate this name. + self.ack_capacity = UNLIMITED + + self.error = None + self.closing = False + self.closed = False + + self._lock = connection._lock + + def __repr__(self): + return "<Session %s>" % self.name + + def _wait(self, predicate, timeout=None): + return self.connection._wait(predicate, timeout=timeout) + + def _wakeup(self): + self.connection._wakeup() + + def check_error(self): + self.connection.check_error() + if self.error: + raise self.error + + def get_error(self): + err = self.connection.get_error() + if err: + return err + else: + return self.error + + def _ewait(self, predicate, timeout=None): + result = self.connection._ewait(lambda: self.error or predicate(), timeout) + self.check_error() + return result + + def check_closed(self): + if self.closed: + raise SessionClosed() + + @synchronized + def sender(self, target, **options): + """ + Creates a L{Sender} that may be used to send L{Messages<Message>} + to the specified target. + + @type target: str + @param target: the target to which messages will be sent + @rtype: Sender + @return: a new Sender for the specified target + """ + target = _mangle(target) + sender = Sender(self, self.next_sender_id, target, options) + self.next_sender_id += 1 + self.senders.append(sender) + if not self.closed and self.connection._connected: + self._wakeup() + try: + sender._ewait(lambda: sender.linked) + except LinkError, e: + sender.close() + raise e + return sender + + @synchronized + def receiver(self, source, **options): + """ + Creates a receiver that may be used to fetch L{Messages<Message>} + from the specified source. + + @type source: str + @param source: the source of L{Messages<Message>} + @rtype: Receiver + @return: a new Receiver for the specified source + """ + source = _mangle(source) + receiver = Receiver(self, self.next_receiver_id, source, options) + self.next_receiver_id += 1 + self.receivers.append(receiver) + if not self.closed and self.connection._connected: + self._wakeup() + try: + receiver._ewait(lambda: receiver.linked) + except LinkError, e: + receiver.close() + raise e + return receiver + + @synchronized + def _count(self, predicate): + result = 0 + for msg in self.incoming: + if predicate(msg): + result += 1 + return result + + def _peek(self, receiver): + for msg in self.incoming: + if msg._receiver == receiver: + return msg + + def _pop(self, receiver): + i = 0 + while i < len(self.incoming): + msg = self.incoming[i] + if msg._receiver == receiver: + del self.incoming[i] + return msg + else: + i += 1 + + @synchronized + def _get(self, receiver, timeout=None): + if self._ewait(lambda: ((self._peek(receiver) is not None) or + self.closing or receiver.closed), + timeout): + msg = self._pop(receiver) + if msg is not None: + msg._receiver.returned += 1 + self.unacked.append(msg) + log.debug("RETR[%s]: %s", self.log_id, msg) + return msg + return None + + @synchronized + def next_receiver(self, timeout=None): + if self._ecwait(lambda: self.incoming, timeout): + return self.incoming[0]._receiver + else: + raise Empty + + @synchronized + def acknowledge(self, message=None, disposition=None, sync=True): + """ + Acknowledge the given L{Message}. If message is None, then all + unacknowledged messages on the session are acknowledged. + + @type message: Message + @param message: the message to acknowledge or None + @type sync: boolean + @param sync: if true then block until the message(s) are acknowledged + """ + if message is None: + messages = self.unacked[:] + else: + messages = [message] + + for m in messages: + if self.ack_capacity is not UNLIMITED: + if self.ack_capacity <= 0: + # XXX: this is currently a SendError, maybe it should be a SessionError? + raise InsufficientCapacity("ack_capacity = %s" % self.ack_capacity) + self._wakeup() + self._ecwait(lambda: len(self.acked) < self.ack_capacity) + m._disposition = disposition + self.unacked.remove(m) + self.acked.append(m) + + self._wakeup() + if sync: + self._ecwait(lambda: not [m for m in messages if m in self.acked]) + + @synchronized + def commit(self): + """ + Commit outstanding transactional work. This consists of all + message sends and receives since the prior commit or rollback. + """ + if not self.transactional: + raise NontransactionalSession() + self.committing = True + self._wakeup() + self._ecwait(lambda: not self.committing) + if self.aborted: + raise TransactionAborted() + assert self.committed + + @synchronized + def rollback(self): + """ + Rollback outstanding transactional work. This consists of all + message sends and receives since the prior commit or rollback. + """ + if not self.transactional: + raise NontransactionalSession() + self.aborting = True + self._wakeup() + self._ecwait(lambda: not self.aborting) + assert self.aborted + + @synchronized + def sync(self, timeout=None): + """ + Sync the session. + """ + for snd in self.senders: + snd.sync(timeout=timeout) + if not self._ewait(lambda: not self.outgoing and not self.acked, timeout=timeout): + raise Timeout("session sync timed out") + + @synchronized + def close(self, timeout=None): + """ + Close the session. + """ + self.sync(timeout=timeout) + + for link in self.receivers + self.senders: + link.close(timeout=timeout) + + if not self.closing: + self.closing = True + self._wakeup() + + try: + if not self._ewait(lambda: self.closed, timeout=timeout): + raise Timeout("session close timed out") + finally: + self.connection._remove_session(self) + +def _mangle(addr): + if addr and addr.startswith("#"): + return str(uuid4()) + addr + else: + return addr + +class Sender(Endpoint): + + """ + Sends outgoing messages. + """ + + def __init__(self, session, id, target, options): + self.session = session + self.id = id + self.target = target + self.options = options + self.capacity = options.get("capacity", UNLIMITED) + self.threshold = 0.5 + self.durable = options.get("durable") + self.queued = Serial(0) + self.synced = Serial(0) + self.acked = Serial(0) + self.error = None + self.linked = False + self.closing = False + self.closed = False + self._lock = self.session._lock + + def _wakeup(self): + self.session._wakeup() + + def check_error(self): + self.session.check_error() + if self.error: + raise self.error + + def get_error(self): + err = self.session.get_error() + if err: + return err + else: + return self.error + + def _ewait(self, predicate, timeout=None): + result = self.session._ewait(lambda: self.error or predicate(), timeout) + self.check_error() + return result + + def check_closed(self): + if self.closed: + raise LinkClosed() + + @synchronized + def unsettled(self): + """ + Returns the number of messages awaiting acknowledgment. + @rtype: int + @return: the number of unacknowledged messages + """ + return self.queued - self.acked + + @synchronized + def available(self): + if self.capacity is UNLIMITED: + return UNLIMITED + else: + return self.capacity - self.unsettled() + + @synchronized + def send(self, object, sync=True, timeout=None): + """ + Send a message. If the object passed in is of type L{unicode}, + L{str}, L{list}, or L{dict}, it will automatically be wrapped in a + L{Message} and sent. If it is of type L{Message}, it will be sent + directly. If the sender capacity is not L{UNLIMITED} then send + will block until there is available capacity to send the message. + If the timeout parameter is specified, then send will throw an + L{InsufficientCapacity} exception if capacity does not become + available within the specified time. + + @type object: unicode, str, list, dict, Message + @param object: the message or content to send + + @type sync: boolean + @param sync: if true then block until the message is sent + + @type timeout: float + @param timeout: the time to wait for available capacity + """ + + if not self.session.connection._connected or self.session.closing: + raise Detached() + + self._ecwait(lambda: self.linked) + + if isinstance(object, Message): + message = object + else: + message = Message(object) + + if message.durable is None: + message.durable = self.durable + + if self.capacity is not UNLIMITED: + if self.capacity <= 0: + raise InsufficientCapacity("capacity = %s" % self.capacity) + if not self._ecwait(self.available, timeout=timeout): + raise InsufficientCapacity("capacity = %s" % self.capacity) + + # XXX: what if we send the same message to multiple senders? + message._sender = self + if self.capacity is not UNLIMITED: + message._sync = sync or self.available() <= int(ceil(self.threshold*self.capacity)) + else: + message._sync = sync + self.session.outgoing.append(message) + self.queued += 1 + + if sync: + self.sync() + assert message not in self.session.outgoing + else: + self._wakeup() + + @synchronized + def sync(self, timeout=None): + mno = self.queued + if self.synced < mno: + self.synced = mno + self._wakeup() + if not self._ewait(lambda: self.acked >= mno, timeout=timeout): + raise Timeout("sender sync timed out") + + @synchronized + def close(self, timeout=None): + """ + Close the Sender. + """ + # avoid erroring out when closing a sender that was never + # established + if self.acked < self.queued: + self.sync(timeout=timeout) + + if not self.closing: + self.closing = True + self._wakeup() + + try: + if not self.session._ewait(lambda: self.closed, timeout=timeout): + raise Timeout("sender close timed out") + finally: + try: + self.session.senders.remove(self) + except ValueError: + pass + +class Receiver(Endpoint, object): + + """ + Receives incoming messages from a remote source. Messages may be + fetched with L{fetch}. + """ + + def __init__(self, session, id, source, options): + self.session = session + self.id = id + self.source = source + self.options = options + + self.granted = Serial(0) + self.draining = False + self.impending = Serial(0) + self.received = Serial(0) + self.returned = Serial(0) + + self.error = None + self.linked = False + self.closing = False + self.closed = False + self._lock = self.session._lock + self._capacity = 0 + self._set_capacity(options.get("capacity", 0), False) + self.threshold = 0.5 + + @synchronized + def _set_capacity(self, c, wakeup=True): + if c is UNLIMITED: + self._capacity = c.value + else: + self._capacity = c + self._grant() + if wakeup: + self._wakeup() + + def _get_capacity(self): + if self._capacity == UNLIMITED.value: + return UNLIMITED + else: + return self._capacity + + capacity = property(_get_capacity, _set_capacity) + + def _wakeup(self): + self.session._wakeup() + + def check_error(self): + self.session.check_error() + if self.error: + raise self.error + + def get_error(self): + err = self.session.get_error() + if err: + return err + else: + return self.error + + def _ewait(self, predicate, timeout=None): + result = self.session._ewait(lambda: self.error or predicate(), timeout) + self.check_error() + return result + + def check_closed(self): + if self.closed: + raise LinkClosed() + + @synchronized + def unsettled(self): + """ + Returns the number of acknowledged messages awaiting confirmation. + """ + return len([m for m in self.acked if m._receiver is self]) + + @synchronized + def available(self): + """ + Returns the number of messages available to be fetched by the + application. + + @rtype: int + @return: the number of available messages + """ + return self.received - self.returned + + @synchronized + def fetch(self, timeout=None): + """ + Fetch and return a single message. A timeout of None will block + forever waiting for a message to arrive, a timeout of zero will + return immediately if no messages are available. + + @type timeout: float + @param timeout: the time to wait for a message to be available + """ + + self._ecwait(lambda: self.linked) + + if self._capacity == 0: + self.granted = self.returned + 1 + self._wakeup() + self._ecwait(lambda: self.impending >= self.granted) + msg = self.session._get(self, timeout=timeout) + if msg is None: + self.check_closed() + self.draining = True + self._wakeup() + self._ecwait(lambda: not self.draining) + msg = self.session._get(self, timeout=0) + self._grant() + self._wakeup() + if msg is None: + raise Empty() + elif self._capacity not in (0, UNLIMITED.value): + t = int(ceil(self.threshold * self._capacity)) + if self.received - self.returned <= t: + self.granted = self.returned + self._capacity + self._wakeup() + return msg + + def _grant(self): + if self._capacity == UNLIMITED.value: + self.granted = UNLIMITED + else: + self.granted = self.returned + self._capacity + + @synchronized + def close(self, timeout=None): + """ + Close the receiver. + """ + if not self.closing: + self.closing = True + self._wakeup() + + try: + if not self.session._ewait(lambda: self.closed, timeout=timeout): + raise Timeout("receiver close timed out") + finally: + try: + self.session.receivers.remove(self) + except ValueError: + pass + +__all__ = ["Connection", "Session", "Sender", "Receiver"] |