summaryrefslogtreecommitdiff
path: root/qpid/python/qpid/messaging.py
diff options
context:
space:
mode:
authorAidan Skinner <aidan@apache.org>2009-09-09 13:05:43 +0000
committerAidan Skinner <aidan@apache.org>2009-09-09 13:05:43 +0000
commitc1ebe66bfab328c5192a35c21ea290b5c45f40f5 (patch)
tree6e61e50d92442f29287a82c22b54de6beac43b2c /qpid/python/qpid/messaging.py
parent7b28732091473d93ce7546c70fa1d2dbd685161a (diff)
downloadqpid-python-c1ebe66bfab328c5192a35c21ea290b5c45f40f5.tar.gz
Merge from trunk
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-network-refactor@812936 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/python/qpid/messaging.py')
-rw-r--r--qpid/python/qpid/messaging.py587
1 files changed, 259 insertions, 328 deletions
diff --git a/qpid/python/qpid/messaging.py b/qpid/python/qpid/messaging.py
index 9b3fecbf9b..d755aa5054 100644
--- a/qpid/python/qpid/messaging.py
+++ b/qpid/python/qpid/messaging.py
@@ -6,9 +6,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -30,63 +30,18 @@ Areas that still need work:
- protocol negotiation/multiprotocol impl
"""
-import connection, time, socket, sys, traceback
from codec010 import StringCodec
-from datatypes import timestamp, uuid4, RangedSet, Message as Message010
+from concurrency import synchronized, Waiter
+from datatypes import timestamp, uuid4, Serial
from logging import getLogger
from ops import PRIMITIVE
-from session import Client, INCOMPLETE
from threading import Thread, RLock, Condition
-from util import connect
+from util import default
log = getLogger("qpid.messaging")
static = staticmethod
-def synchronized(meth):
- def sync_wrapper(self, *args, **kwargs):
- self.lock()
- try:
- return meth(self, *args, **kwargs)
- finally:
- self.unlock()
- return sync_wrapper
-
-class Lockable(object):
-
- def lock(self):
- self._lock.acquire()
-
- def unlock(self):
- self._lock.release()
-
- def wait(self, predicate, timeout=None):
- passed = 0
- start = time.time()
- while not predicate():
- if timeout is None:
- # using the timed wait prevents keyboard interrupts from being
- # blocked while waiting
- self._condition.wait(3)
- elif passed < timeout:
- self._condition.wait(timeout - passed)
- else:
- return False
- passed = time.time() - start
- return True
-
- def notify(self):
- self._condition.notify()
-
- def notifyAll(self):
- self._condition.notifyAll()
-
-def default(value, default):
- if value is None:
- return default
- else:
- return value
-
AMQP_PORT = 5672
AMQPS_PORT = 5671
@@ -101,10 +56,20 @@ class Constant:
UNLIMITED = Constant("UNLIMITED", 0xFFFFFFFFL)
-class ConnectError(Exception):
+class ConnectionError(Exception):
+ """
+ The base class for all connection related exceptions.
+ """
pass
-class Connection(Lockable):
+class ConnectError(ConnectionError):
+ """
+ Exception raised when there is an error connecting to the remote
+ peer.
+ """
+ pass
+
+class Connection:
"""
A Connection manages a group of L{Sessions<Session>} and connects
@@ -142,12 +107,35 @@ class Connection(Lockable):
self.host = host
self.port = default(port, AMQP_PORT)
self.started = False
- self._conn = None
self.id = str(uuid4())
self.session_counter = 0
self.sessions = {}
+ self.reconnect = False
+ self._connected = False
self._lock = RLock()
self._condition = Condition(self._lock)
+ self._waiter = Waiter(self._condition)
+ self._modcount = Serial(0)
+ self.error = None
+ from driver import Driver
+ self._driver = Driver(self)
+ self._driver.start()
+
+ def _wait(self, predicate, timeout=None):
+ return self._waiter.wait(predicate, timeout=timeout)
+
+ def _wakeup(self):
+ self._modcount += 1
+ self._driver.wakeup()
+
+ def _check_error(self, exc=ConnectionError):
+ if self.error:
+ raise exc(*self.error)
+
+ def _ewait(self, predicate, timeout=None, exc=ConnectionError):
+ result = self._wait(lambda: self.error or predicate(), timeout)
+ self._check_error(exc)
+ return result
@synchronized
def session(self, name=None, transactional=False):
@@ -173,8 +161,7 @@ class Connection(Lockable):
else:
ssn = Session(self, name, self.started, transactional=transactional)
self.sessions[name] = ssn
- if self._conn is not None:
- ssn._attach()
+ self._wakeup()
return ssn
@synchronized
@@ -186,38 +173,25 @@ class Connection(Lockable):
"""
Connect to the remote endpoint.
"""
- if self._conn is not None:
- return
- try:
- self._socket = connect(self.host, self.port)
- except socket.error, e:
- raise ConnectError(e)
- self._conn = connection.Connection(self._socket)
- try:
- self._conn.start()
- except connection.VersionError, e:
- raise ConnectError(e)
-
- for ssn in self.sessions.values():
- ssn._attach()
+ self._connected = True
+ self._wakeup()
+ self._ewait(lambda: self._driver._connected, exc=ConnectError)
@synchronized
def disconnect(self):
"""
Disconnect from the remote endpoint.
"""
- if self._conn is not None:
- self._conn.close()
- self._conn = None
- for ssn in self.sessions.values():
- ssn._disconnected()
+ self._connected = False
+ self._wakeup()
+ self._ewait(lambda: not self._driver._connected)
@synchronized
def connected(self):
"""
Return true if the connection is connected, false otherwise.
"""
- return self._conn is not None
+ return self._connected
@synchronized
def start(self):
@@ -255,22 +229,32 @@ class Pattern:
def __init__(self, value):
self.value = value
+ # XXX: this should become part of the driver
def _bind(self, ssn, exchange, queue):
ssn.exchange_bind(exchange=exchange, queue=queue,
binding_key=self.value.replace("*", "#"))
-FILTER_DEFAULTS = {
- "topic": Pattern("*")
- }
+class SessionError(Exception):
+ pass
+
+class Disconnected(SessionError):
+ """
+ Exception raised when an operation is attempted that is illegal when
+ disconnected.
+ """
+ pass
-def delegate(session):
- class Delegate(Client):
+class NontransactionalSession(SessionError):
+ """
+ Exception raised when commit or rollback is attempted on a non
+ transactional session.
+ """
+ pass
- def message_transfer(self, cmd):
- session._message_transfer(cmd)
- return Delegate
+class TransactionAborted(SessionError):
+ pass
-class Session(Lockable):
+class Session:
"""
Sessions provide a linear context for sending and receiving
@@ -281,18 +265,28 @@ class Session(Lockable):
self.connection = connection
self.name = name
self.started = started
+
self.transactional = transactional
- self._ssn = None
+
+ self.committing = False
+ self.committed = True
+ self.aborting = False
+ self.aborted = False
+
self.senders = []
self.receivers = []
- self.closing = False
+ self.outgoing = []
self.incoming = []
- self.closed = False
self.unacked = []
- if self.transactional:
- self.acked = []
- self._lock = RLock()
- self._condition = Condition(self._lock)
+ self.acked = []
+ # XXX: I hate this name.
+ self.ack_capacity = UNLIMITED
+
+ self.closing = False
+ self.closed = False
+
+ self._lock = connection._lock
+ self.running = True
self.thread = Thread(target = self.run)
self.thread.setDaemon(True)
self.thread.start()
@@ -300,60 +294,17 @@ class Session(Lockable):
def __repr__(self):
return "<Session %s>" % self.name
- def _attach(self):
- self._ssn = self.connection._conn.session(self.name, delegate=delegate(self))
- self._ssn.auto_sync = False
- self._ssn.invoke_lock = self._lock
- self._ssn.lock = self._lock
- self._ssn.condition = self._condition
- if self.transactional:
- self._ssn.tx_select()
- for link in self.senders + self.receivers:
- link._link()
-
- def _disconnected(self):
- self._ssn = None
- for link in self.senders + self.receivers:
- link._disconnected()
+ def _wait(self, predicate, timeout=None):
+ return self.connection._wait(predicate, timeout=timeout)
- @synchronized
- def _message_transfer(self, cmd):
- m = Message010(cmd.payload)
- m.headers = cmd.headers
- m.id = cmd.id
- msg = self._decode(m)
- rcv = self.receivers[int(cmd.destination)]
- msg._receiver = rcv
- log.debug("RECV [%s] %s", self, msg)
- self.incoming.append(msg)
- self.notifyAll()
- return INCOMPLETE
-
- def _decode(self, message):
- dp = message.get("delivery_properties")
- mp = message.get("message_properties")
- ap = mp.application_headers
- enc, dec = get_codec(mp.content_type)
- content = dec(message.body)
- msg = Message(content)
- msg.id = mp.message_id
- if ap is not None:
- msg.to = ap.get("to")
- msg.subject = ap.get("subject")
- msg.user_id = mp.user_id
- if mp.reply_to is not None:
- msg.reply_to = reply_to2addr(mp.reply_to)
- msg.correlation_id = mp.correlation_id
- msg.properties = mp.application_headers
- msg.content_type = mp.content_type
- msg._transfer_id = message.id
- return msg
+ def _wakeup(self):
+ self.connection._wakeup()
- def _exchange_query(self, address):
- # XXX: auto sync hack is to avoid deadlock on future
- result = self._ssn.exchange_query(name=address, sync=True)
- self._ssn.sync()
- return result.get()
+ def _check_error(self, exc=SessionError):
+ self.connection._check_error(exc)
+
+ def _ewait(self, predicate, timeout=None, exc=SessionError):
+ return self.connection._ewait(predicate, timeout, exc)
@synchronized
def sender(self, target):
@@ -368,8 +319,11 @@ class Session(Lockable):
"""
sender = Sender(self, len(self.senders), target)
self.senders.append(sender)
- if self._ssn is not None:
- sender._link()
+ self._wakeup()
+ # XXX: because of the lack of waiting here we can end up getting
+ # into the driver loop with messages sent for senders that haven't
+ # been linked yet, something similar can probably happen for
+ # receivers
return sender
@synchronized
@@ -386,8 +340,7 @@ class Session(Lockable):
receiver = Receiver(self, len(self.receivers), source, filter,
self.started)
self.receivers.append(receiver)
- if self._ssn is not None:
- receiver._link()
+ self._wakeup()
return receiver
@synchronized
@@ -415,43 +368,45 @@ class Session(Lockable):
@synchronized
def _get(self, predicate, timeout=None):
- if self.wait(lambda: ((self._peek(predicate) is not None) or self.closing),
- timeout):
+ if self._wait(lambda: ((self._peek(predicate) is not None) or self.closing),
+ timeout):
msg = self._pop(predicate)
if msg is not None:
+ msg._receiver.returned += 1
self.unacked.append(msg)
log.debug("RETR [%s] %s", self, msg)
return msg
return None
@synchronized
- def acknowledge(self, message=None):
+ def acknowledge(self, message=None, sync=True):
"""
Acknowledge the given L{Message}. If message is None, then all
unacknowledged messages on the session are acknowledged.
@type message: Message
@param message: the message to acknowledge or None
+ @type sync: boolean
+ @param sync: if true then block until the message(s) are acknowledged
"""
if message is None:
messages = self.unacked[:]
else:
messages = [message]
- ids = RangedSet(*[m._transfer_id for m in messages])
- for range in ids:
- self._ssn.receiver._completed.add_range(range)
- self._ssn.channel.session_completed(self._ssn.receiver._completed)
- self._ssn.message_accept(ids, sync=True)
- self._ssn.sync()
-
for m in messages:
- try:
- self.unacked.remove(m)
- except ValueError:
- pass
- if self.transactional:
- self.acked.append(m)
+ if self.ack_capacity is not UNLIMITED:
+ if self.ack_capacity <= 0:
+ # XXX: this is currently a SendError, maybe it should be a SessionError?
+ raise InsufficientCapacity("ack_capacity = %s" % self.ack_capacity)
+ self._wakeup()
+ self._ewait(lambda: len(self.acked) < self.ack_capacity)
+ self.unacked.remove(m)
+ self.acked.append(m)
+
+ self._wakeup()
+ if sync:
+ self._ewait(lambda: not [m for m in messages if m in self.acked])
@synchronized
def commit(self):
@@ -461,11 +416,12 @@ class Session(Lockable):
"""
if not self.transactional:
raise NontransactionalSession()
- if self._ssn is None:
- raise Disconnected()
- self._ssn.tx_commit(sync=True)
- del self.acked[:]
- self._ssn.sync()
+ self.committing = True
+ self._wakeup()
+ self._ewait(lambda: not self.committing)
+ if self.aborted:
+ raise TransactionAborted()
+ assert self.committed
@synchronized
def rollback(self):
@@ -475,21 +431,10 @@ class Session(Lockable):
"""
if not self.transactional:
raise NontransactionalSession()
- if self._ssn is None:
- raise Disconnected()
-
- ids = RangedSet(*[m._transfer_id for m in self.acked + self.unacked + self.incoming])
- for range in ids:
- self._ssn.receiver._completed.add_range(range)
- self._ssn.channel.session_completed(self._ssn.receiver._completed)
- self._ssn.message_release(ids)
- self._ssn.tx_rollback(sync=True)
-
- del self.incoming[:]
- del self.unacked[:]
- del self.acked[:]
-
- self._ssn.sync()
+ self.aborting = True
+ self._wakeup()
+ self._ewait(lambda: not self.aborting)
+ assert self.aborted
@synchronized
def start(self):
@@ -508,7 +453,7 @@ class Session(Lockable):
for rcv in self.receivers:
rcv.stop()
# TODO: think about stopping individual receivers in listen mode
- self.wait(lambda: self._peek(self._pred) is None)
+ self._wait(lambda: self._peek(self._pred) is None)
self.started = False
def _pred(self, m):
@@ -516,6 +461,7 @@ class Session(Lockable):
@synchronized
def run(self):
+ self.running = True
try:
while True:
msg = self._get(self._pred)
@@ -524,10 +470,10 @@ class Session(Lockable):
else:
msg._receiver.listener(msg)
if self._peek(self._pred) is None:
- self.notifyAll()
+ self.connection._waiter.notifyAll()
finally:
- self.closed = True
- self.notifyAll()
+ self.running = False
+ self.connection._waiter.notifyAll()
@synchronized
def close(self):
@@ -538,45 +484,22 @@ class Session(Lockable):
link.close()
self.closing = True
- self.notifyAll()
- self.wait(lambda: self.closed)
+ self._wakeup()
+ self._ewait(lambda: self.closed and not self.running)
while self.thread.isAlive():
self.thread.join(3)
self.thread = None
- self._ssn.close()
- self._ssn = None
+ # XXX: should be able to express this condition through API calls
+ self._ewait(lambda: not self.outgoing and not self.acked)
self.connection._remove_session(self)
-def parse_addr(address):
- parts = address.split("/", 1)
- if len(parts) == 1:
- return parts[0], None
- else:
- return parts[0], parts[i1]
-
-def reply_to2addr(reply_to):
- if reply_to.routing_key is None:
- return reply_to.exchange
- elif reply_to.exchange in (None, ""):
- return reply_to.routing_key
- else:
- return "%s/%s" % (reply_to.exchange, reply_to.routing_key)
-
-class Disconnected(Exception):
- """
- Exception raised when an operation is attempted that is illegal when
- disconnected.
- """
+class SendError(SessionError):
pass
-class NontransactionalSession(Exception):
- """
- Exception raised when commit or rollback is attempted on a non
- transactional session.
- """
+class InsufficientCapacity(SendError):
pass
-class Sender(Lockable):
+class Sender:
"""
Sends outgoing messages.
@@ -586,100 +509,99 @@ class Sender(Lockable):
self.session = session
self.index = index
self.target = target
+ self.capacity = UNLIMITED
+ self.queued = Serial(0)
+ self.acked = Serial(0)
self.closed = False
- self._ssn = None
- self._exchange = None
- self._routing_key = None
- self._subject = None
self._lock = self.session._lock
- self._condition = self.session._condition
-
- def _link(self):
- self._ssn = self.session._ssn
- node, self._subject = parse_addr(self.target)
- result = self.session._exchange_query(node)
- if result.not_found:
- # XXX: should check 'create' option
- self._ssn.queue_declare(queue=node, durable=False, sync=True)
- self._ssn.sync()
- self._exchange = ""
- self._routing_key = node
- else:
- self._exchange = node
- self._routing_key = self._subject
- def _disconnected(self):
- self._ssn = None
+ def _wakeup(self):
+ self.session._wakeup()
+
+ def _check_error(self, exc=SendError):
+ self.session._check_error(exc)
+
+ def _ewait(self, predicate, timeout=None, exc=SendError):
+ return self.session._ewait(predicate, timeout, exc)
@synchronized
- def send(self, object):
+ def pending(self):
+ """
+ Returns the number of messages awaiting acknowledgment.
+ @rtype: int
+ @return: the number of unacknowledged messages
+ """
+ return self.queued - self.acked
+
+ @synchronized
+ def send(self, object, sync=True, timeout=None):
"""
Send a message. If the object passed in is of type L{unicode},
L{str}, L{list}, or L{dict}, it will automatically be wrapped in a
L{Message} and sent. If it is of type L{Message}, it will be sent
- directly.
+ directly. If the sender capacity is not L{UNLIMITED} then send
+ will block until there is available capacity to send the message.
+ If the timeout parameter is specified, then send will throw an
+ L{InsufficientCapacity} exception if capacity does not become
+ available within the specified time.
@type object: unicode, str, list, dict, Message
@param object: the message or content to send
+
+ @type sync: boolean
+ @param sync: if true then block until the message is sent
+
+ @type timeout: float
+ @param timeout: the time to wait for available capacity
"""
- if self._ssn is None:
+ if not self.session.connection._connected or self.session.closing:
raise Disconnected()
if isinstance(object, Message):
message = object
else:
message = Message(object)
- # XXX: what if subject is specified for a normal queue?
- if self._routing_key is None:
- rk = message.subject
- else:
- rk = self._routing_key
- # XXX: do we need to query to figure out how to create the reply-to interoperably?
- if message.reply_to:
- rt = self._ssn.reply_to(*parse_addr(message.reply_to))
- else:
- rt = None
- dp = self._ssn.delivery_properties(routing_key=rk)
- mp = self._ssn.message_properties(message_id=message.id,
- user_id=message.user_id,
- reply_to=rt,
- correlation_id=message.correlation_id,
- content_type=message.content_type,
- application_headers=message.properties)
- if message.subject is not None:
- if mp.application_headers is None:
- mp.application_headers = {}
- mp.application_headers["subject"] = message.subject
- if message.to is not None:
- if mp.application_headers is None:
- mp.application_headers = {}
- mp.application_headers["to"] = message.to
- enc, dec = get_codec(message.content_type)
- body = enc(message.content)
- self._ssn.message_transfer(destination=self._exchange,
- message=Message010(dp, mp, body),
- sync=True)
- log.debug("SENT [%s] %s", self.session, message)
- self._ssn.sync()
+
+ if self.capacity is not UNLIMITED:
+ if self.capacity <= 0:
+ raise InsufficientCapacity("capacity = %s" % self.capacity)
+ if not self._ewait(lambda: self.pending() < self.capacity, timeout=timeout):
+ raise InsufficientCapacity("capacity = %s" % self.capacity)
+
+ # XXX: what if we send the same message to multiple senders?
+ message._sender = self
+ self.session.outgoing.append(message)
+ self.queued += 1
+ mno = self.queued
+
+ self._wakeup()
+
+ if sync:
+ self._ewait(lambda: self.acked >= mno)
+ assert message not in self.session.outgoing
@synchronized
def close(self):
"""
Close the Sender.
"""
+ # XXX: should make driver do something here
if not self.closed:
self.session.senders.remove(self)
self.closed = True
-class Empty(Exception):
+class ReceiveError(SessionError):
+ pass
+
+class Empty(ReceiveError):
"""
Exception raised by L{Receiver.fetch} when there is no message
available within the alloted time.
"""
pass
-class Receiver(Lockable):
+class Receiver:
"""
Receives incoming messages from a remote source. Messages may be
@@ -693,43 +615,39 @@ class Receiver(Lockable):
self.destination = str(self.index)
self.source = source
self.filter = filter
+
self.started = started
self.capacity = UNLIMITED
+ self.granted = Serial(0)
+ self.drain = False
+ self.impending = Serial(0)
+ self.received = Serial(0)
+ self.returned = Serial(0)
+
+ self.closing = False
self.closed = False
self.listener = None
- self._ssn = None
- self._queue = None
self._lock = self.session._lock
- self._condition = self.session._condition
-
- def _link(self):
- self._ssn = self.session._ssn
- result = self.session._exchange_query(self.source)
- if result.not_found:
- self._queue = self.source
- # XXX: should check 'create' option
- self._ssn.queue_declare(queue=self._queue, durable=False)
- else:
- self._queue = "%s.%s" % (self.session.name, self.destination)
- self._ssn.queue_declare(queue=self._queue, durable=False, exclusive=True, auto_delete=True)
- if self.filter is None:
- f = FILTER_DEFAULTS[result.type]
- else:
- f = self.filter
- f._bind(self._ssn, self.source, self._queue)
- self._ssn.message_subscribe(queue=self._queue, destination=self.destination,
- sync=True)
- self._ssn.message_set_flow_mode(self.destination, self._ssn.flow_mode.credit)
- self._ssn.sync()
- if self.started:
- self._start()
- def _disconnected(self):
- self._ssn = None
+ def _wakeup(self):
+ self.session._wakeup()
+
+ def _check_error(self, exc=ReceiveError):
+ self.session._check_error(exc)
+
+ def _ewait(self, predicate, timeout=None, exc=ReceiveError):
+ return self.session._ewait(predicate, timeout, exc)
@synchronized
def pending(self):
- return self.session._count(self._pred)
+ """
+ Returns the number of messages available to be fetched by the
+ application.
+
+ @rtype: int
+ @return: the number of available messages
+ """
+ return self.received - self.returned
def _capacity(self):
if not self.started:
@@ -762,23 +680,36 @@ class Receiver(Lockable):
@type timeout: float
@param timeout: the time to wait for a message to be available
"""
- if self.capacity is not UNLIMITED or not self.started:
- self._ssn.message_flow(self.destination, self._ssn.credit_unit.byte,
- UNLIMITED.value)
- self._ssn.message_flow(self.destination, self._ssn.credit_unit.message, 1)
+ if self._capacity() == 0:
+ self.granted = self.returned + 1
+ self._wakeup()
+ self._ewait(lambda: self.impending >= self.granted)
msg = self.session._get(self._pred, timeout=timeout)
if msg is None:
- self._ssn.message_flush(self.destination)
- self._start()
- self._ssn.sync()
+ self.drain = True
+ self.granted = self.received
+ self._wakeup()
+ self._ewait(lambda: self.impending == self.received)
+ self.drain = False
+ self._grant()
+ self._wakeup()
msg = self.session._get(self._pred, timeout=0)
if msg is None:
raise Empty()
+ elif self._capacity() not in (0, UNLIMITED.value):
+ self.granted += 1
+ self._wakeup()
return msg
- def _start(self):
- self._ssn.message_flow(self.destination, self._ssn.credit_unit.byte, UNLIMITED.value)
- self._ssn.message_flow(self.destination, self._ssn.credit_unit.message, self._capacity())
+ def _grant(self):
+ if self.started:
+ if self.capacity is UNLIMITED:
+ self.granted = UNLIMITED
+ else:
+ self.granted = self.received + self._capacity()
+ else:
+ self.granted = self.received
+
@synchronized
def start(self):
@@ -786,34 +717,31 @@ class Receiver(Lockable):
Start incoming message delivery for this receiver.
"""
self.started = True
- if self._ssn is not None:
- self._start()
-
- def _stop(self):
- self._ssn.message_stop(self.destination)
+ self._grant()
+ self._wakeup()
@synchronized
def stop(self):
"""
Stop incoming message delivery for this receiver.
"""
- if self._ssn is not None:
- self._stop()
self.started = False
+ self._grant()
+ self._wakeup()
+ self._ewait(lambda: self.impending == self.received)
@synchronized
def close(self):
"""
Close the receiver.
"""
- if not self.closed:
- self.closed = True
- self._ssn.message_cancel(self.destination, sync=True)
- self._ssn.sync()
+ self.closing = True
+ self._wakeup()
+ try:
+ self._ewait(lambda: self.closed)
+ finally:
self.session.receivers.remove(self)
-
-
def codec(name):
type = PRIMITIVE[name]
@@ -889,6 +817,7 @@ class Message:
self.to = None
self.reply_to = None
self.correlation_id = None
+ self.durable = False
self.properties = {}
self.content_type = get_type(content)
self.content = content
@@ -896,5 +825,7 @@ class Message:
def __repr__(self):
return "Message(%r)" % self.content
-__all__ = ["Connection", "Pattern", "Session", "Sender", "Receiver", "Message",
- "Empty", "timestamp", "uuid4"]
+__all__ = ["Connection", "Session", "Sender", "Receiver", "Pattern", "Message",
+ "ConnectionError", "ConnectError", "SessionError", "Disconnected",
+ "SendError", "InsufficientCapacity", "ReceiveError", "Empty",
+ "timestamp", "uuid4", "UNLIMITED", "AMQP_PORT", "AMQPS_PORT"]