summaryrefslogtreecommitdiff
path: root/qpid/python/qpid/messaging.py
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/python/qpid/messaging.py')
-rw-r--r--qpid/python/qpid/messaging.py148
1 files changed, 107 insertions, 41 deletions
diff --git a/qpid/python/qpid/messaging.py b/qpid/python/qpid/messaging.py
index d755aa5054..3e3c8f36cb 100644
--- a/qpid/python/qpid/messaging.py
+++ b/qpid/python/qpid/messaging.py
@@ -77,7 +77,8 @@ class Connection:
"""
@static
- def open(host, port=None):
+ def open(host, port=None, username="guest", password="guest",
+ mechanism="PLAIN", heartbeat=None, **options):
"""
Creates an AMQP connection and connects it to the given host and port.
@@ -88,11 +89,12 @@ class Connection:
@rtype: Connection
@return: a connected Connection
"""
- conn = Connection(host, port)
+ conn = Connection(host, port, username, password, mechanism, heartbeat, **options)
conn.connect()
return conn
- def __init__(self, host, port=None):
+ def __init__(self, host, port=None, username="guest", password="guest",
+ mechanism="PLAIN", heartbeat=None, **options):
"""
Creates a connection. A newly created connection must be connected
with the Connection.connect() method before it can be started.
@@ -106,11 +108,16 @@ class Connection:
"""
self.host = host
self.port = default(port, AMQP_PORT)
+ self.username = username
+ self.password = password
+ self.mechanism = mechanism
+ self.heartbeat = heartbeat
+
self.started = False
self.id = str(uuid4())
self.session_counter = 0
self.sessions = {}
- self.reconnect = False
+ self.reconnect = options.get("reconnect", False)
self._connected = False
self._lock = RLock()
self._condition = Condition(self._lock)
@@ -230,9 +237,10 @@ class Pattern:
self.value = value
# XXX: this should become part of the driver
- def _bind(self, ssn, exchange, queue):
- ssn.exchange_bind(exchange=exchange, queue=queue,
- binding_key=self.value.replace("*", "#"))
+ def _bind(self, sst, exchange, queue):
+ from qpid.ops import ExchangeBind
+ sst.write_cmd(ExchangeBind(exchange=exchange, queue=queue,
+ binding_key=self.value.replace("*", "#")))
class SessionError(Exception):
pass
@@ -282,6 +290,7 @@ class Session:
# XXX: I hate this name.
self.ack_capacity = UNLIMITED
+ self.error = None
self.closing = False
self.closed = False
@@ -302,12 +311,16 @@ class Session:
def _check_error(self, exc=SessionError):
self.connection._check_error(exc)
+ if self.error:
+ raise exc(*self.error)
def _ewait(self, predicate, timeout=None, exc=SessionError):
- return self.connection._ewait(predicate, timeout, exc)
+ result = self.connection._ewait(lambda: self.error or predicate(), timeout, exc)
+ self._check_error(exc)
+ return result
@synchronized
- def sender(self, target):
+ def sender(self, target, **options):
"""
Creates a L{Sender} that may be used to send L{Messages<Message>}
to the specified target.
@@ -317,7 +330,7 @@ class Session:
@rtype: Sender
@return: a new Sender for the specified target
"""
- sender = Sender(self, len(self.senders), target)
+ sender = Sender(self, len(self.senders), target, options)
self.senders.append(sender)
self._wakeup()
# XXX: because of the lack of waiting here we can end up getting
@@ -327,7 +340,7 @@ class Session:
return sender
@synchronized
- def receiver(self, source, filter=None):
+ def receiver(self, source, **options):
"""
Creates a receiver that may be used to actively fetch or to listen
for the arrival of L{Messages<Message>} from the specified source.
@@ -337,7 +350,7 @@ class Session:
@rtype: Receiver
@return: a new Receiver for the specified source
"""
- receiver = Receiver(self, len(self.receivers), source, filter,
+ receiver = Receiver(self, len(self.receivers), source, options,
self.started)
self.receivers.append(receiver)
self._wakeup()
@@ -368,8 +381,8 @@ class Session:
@synchronized
def _get(self, predicate, timeout=None):
- if self._wait(lambda: ((self._peek(predicate) is not None) or self.closing),
- timeout):
+ if self._ewait(lambda: ((self._peek(predicate) is not None) or self.closing),
+ timeout):
msg = self._pop(predicate)
if msg is not None:
msg._receiver.returned += 1
@@ -505,13 +518,18 @@ class Sender:
Sends outgoing messages.
"""
- def __init__(self, session, index, target):
+ def __init__(self, session, index, target, options):
self.session = session
self.index = index
self.target = target
- self.capacity = UNLIMITED
+ self.options = options
+ self.capacity = options.get("capacity", UNLIMITED)
+ self.durable = options.get("durable")
self.queued = Serial(0)
self.acked = Serial(0)
+ self.error = None
+ self.linked = False
+ self.closing = False
self.closed = False
self._lock = self.session._lock
@@ -520,9 +538,13 @@ class Sender:
def _check_error(self, exc=SendError):
self.session._check_error(exc)
+ if self.error:
+ raise exc(*self.error)
def _ewait(self, predicate, timeout=None, exc=SendError):
- return self.session._ewait(predicate, timeout, exc)
+ result = self.session._ewait(lambda: self.error or predicate(), timeout, exc)
+ self._check_error(exc)
+ return result
@synchronized
def pending(self):
@@ -558,11 +580,16 @@ class Sender:
if not self.session.connection._connected or self.session.closing:
raise Disconnected()
+ self._ewait(lambda: self.linked)
+
if isinstance(object, Message):
message = object
else:
message = Message(object)
+ if message.durable is None:
+ message.durable = self.durable
+
if self.capacity is not UNLIMITED:
if self.capacity <= 0:
raise InsufficientCapacity("capacity = %s" % self.capacity)
@@ -573,15 +600,19 @@ class Sender:
message._sender = self
self.session.outgoing.append(message)
self.queued += 1
- mno = self.queued
self._wakeup()
if sync:
- self._ewait(lambda: self.acked >= mno)
+ self.sync()
assert message not in self.session.outgoing
@synchronized
+ def sync(self):
+ mno = self.queued
+ self._ewait(lambda: self.acked >= mno)
+
+ @synchronized
def close(self):
"""
Close the Sender.
@@ -609,21 +640,23 @@ class Receiver:
L{listen}.
"""
- def __init__(self, session, index, source, filter, started):
+ def __init__(self, session, index, source, options, started):
self.session = session
self.index = index
self.destination = str(self.index)
self.source = source
- self.filter = filter
+ self.options = options
self.started = started
- self.capacity = UNLIMITED
+ self.capacity = options.get("capacity", UNLIMITED)
self.granted = Serial(0)
- self.drain = False
+ self.draining = False
self.impending = Serial(0)
self.received = Serial(0)
self.returned = Serial(0)
+ self.error = None
+ self.linked = False
self.closing = False
self.closed = False
self.listener = None
@@ -634,9 +667,13 @@ class Receiver:
def _check_error(self, exc=ReceiveError):
self.session._check_error(exc)
+ if self.error:
+ raise exc(*self.error)
def _ewait(self, predicate, timeout=None, exc=ReceiveError):
- return self.session._ewait(predicate, timeout, exc)
+ result = self.session._ewait(lambda: self.error or predicate(), timeout, exc)
+ self._check_error(exc)
+ return result
@synchronized
def pending(self):
@@ -680,17 +717,18 @@ class Receiver:
@type timeout: float
@param timeout: the time to wait for a message to be available
"""
+
+ self._ewait(lambda: self.linked)
+
if self._capacity() == 0:
self.granted = self.returned + 1
self._wakeup()
self._ewait(lambda: self.impending >= self.granted)
msg = self.session._get(self._pred, timeout=timeout)
if msg is None:
- self.drain = True
- self.granted = self.received
+ self.draining = True
self._wakeup()
- self._ewait(lambda: self.impending == self.received)
- self.drain = False
+ self._ewait(lambda: not self.draining)
self._grant()
self._wakeup()
msg = self.session._get(self._pred, timeout=0)
@@ -738,7 +776,7 @@ class Receiver:
self.closing = True
self._wakeup()
try:
- self._ewait(lambda: self.closed)
+ self.session._ewait(lambda: self.closed)
finally:
self.session.receivers.remove(self)
@@ -778,6 +816,8 @@ def get_type(content):
def get_codec(content_type):
return TYPE_CODEC[content_type]
+UNSPECIFIED = object()
+
class Message:
"""
@@ -802,7 +842,9 @@ class Message:
@ivar content: the message content
"""
- def __init__(self, content=None):
+ def __init__(self, content=None, content_type=UNSPECIFIED, id=None,
+ subject=None, to=None, user_id=None, reply_to=None,
+ correlation_id=None, durable=None, properties=None):
"""
Construct a new message with the supplied content. The
content-type of the message will be automatically inferred from
@@ -810,20 +852,44 @@ class Message:
@type content: str, unicode, buffer, dict, list
@param content: the message content
- """
- self.id = None
- self.subject = None
- self.user_id = None
- self.to = None
- self.reply_to = None
- self.correlation_id = None
- self.durable = False
- self.properties = {}
- self.content_type = get_type(content)
+
+ @type content_type: str
+ @param content_type: the content-type of the message
+ """
+ self.id = id
+ self.subject = subject
+ self.to = to
+ self.user_id = user_id
+ self.reply_to = reply_to
+ self.correlation_id = correlation_id
+ self.durable = durable
+ if properties is None:
+ self.properties = {}
+ else:
+ self.properties = properties
+ if content_type is UNSPECIFIED:
+ self.content_type = get_type(content)
+ else:
+ self.content_type = content_type
self.content = content
def __repr__(self):
- return "Message(%r)" % self.content
+ args = []
+ for name in ["id", "subject", "to", "user_id", "reply_to",
+ "correlation_id"]:
+ value = self.__dict__[name]
+ if value is not None: args.append("%s=%r" % (name, value))
+ for name in ["durable", "properties"]:
+ value = self.__dict__[name]
+ if value: args.append("%s=%r" % (name, value))
+ if self.content_type != get_type(self.content):
+ args.append("content_type=%r" % self.content_type)
+ if self.content is not None:
+ if args:
+ args.append("content=%r" % self.content)
+ else:
+ args.append(repr(self.content))
+ return "Message(%s)" % ", ".join(args)
__all__ = ["Connection", "Session", "Sender", "Receiver", "Pattern", "Message",
"ConnectionError", "ConnectError", "SessionError", "Disconnected",