diff options
Diffstat (limited to 'qpid/python/qpid/messaging.py')
-rw-r--r-- | qpid/python/qpid/messaging.py | 148 |
1 files changed, 107 insertions, 41 deletions
diff --git a/qpid/python/qpid/messaging.py b/qpid/python/qpid/messaging.py index d755aa5054..3e3c8f36cb 100644 --- a/qpid/python/qpid/messaging.py +++ b/qpid/python/qpid/messaging.py @@ -77,7 +77,8 @@ class Connection: """ @static - def open(host, port=None): + def open(host, port=None, username="guest", password="guest", + mechanism="PLAIN", heartbeat=None, **options): """ Creates an AMQP connection and connects it to the given host and port. @@ -88,11 +89,12 @@ class Connection: @rtype: Connection @return: a connected Connection """ - conn = Connection(host, port) + conn = Connection(host, port, username, password, mechanism, heartbeat, **options) conn.connect() return conn - def __init__(self, host, port=None): + def __init__(self, host, port=None, username="guest", password="guest", + mechanism="PLAIN", heartbeat=None, **options): """ Creates a connection. A newly created connection must be connected with the Connection.connect() method before it can be started. @@ -106,11 +108,16 @@ class Connection: """ self.host = host self.port = default(port, AMQP_PORT) + self.username = username + self.password = password + self.mechanism = mechanism + self.heartbeat = heartbeat + self.started = False self.id = str(uuid4()) self.session_counter = 0 self.sessions = {} - self.reconnect = False + self.reconnect = options.get("reconnect", False) self._connected = False self._lock = RLock() self._condition = Condition(self._lock) @@ -230,9 +237,10 @@ class Pattern: self.value = value # XXX: this should become part of the driver - def _bind(self, ssn, exchange, queue): - ssn.exchange_bind(exchange=exchange, queue=queue, - binding_key=self.value.replace("*", "#")) + def _bind(self, sst, exchange, queue): + from qpid.ops import ExchangeBind + sst.write_cmd(ExchangeBind(exchange=exchange, queue=queue, + binding_key=self.value.replace("*", "#"))) class SessionError(Exception): pass @@ -282,6 +290,7 @@ class Session: # XXX: I hate this name. self.ack_capacity = UNLIMITED + self.error = None self.closing = False self.closed = False @@ -302,12 +311,16 @@ class Session: def _check_error(self, exc=SessionError): self.connection._check_error(exc) + if self.error: + raise exc(*self.error) def _ewait(self, predicate, timeout=None, exc=SessionError): - return self.connection._ewait(predicate, timeout, exc) + result = self.connection._ewait(lambda: self.error or predicate(), timeout, exc) + self._check_error(exc) + return result @synchronized - def sender(self, target): + def sender(self, target, **options): """ Creates a L{Sender} that may be used to send L{Messages<Message>} to the specified target. @@ -317,7 +330,7 @@ class Session: @rtype: Sender @return: a new Sender for the specified target """ - sender = Sender(self, len(self.senders), target) + sender = Sender(self, len(self.senders), target, options) self.senders.append(sender) self._wakeup() # XXX: because of the lack of waiting here we can end up getting @@ -327,7 +340,7 @@ class Session: return sender @synchronized - def receiver(self, source, filter=None): + def receiver(self, source, **options): """ Creates a receiver that may be used to actively fetch or to listen for the arrival of L{Messages<Message>} from the specified source. @@ -337,7 +350,7 @@ class Session: @rtype: Receiver @return: a new Receiver for the specified source """ - receiver = Receiver(self, len(self.receivers), source, filter, + receiver = Receiver(self, len(self.receivers), source, options, self.started) self.receivers.append(receiver) self._wakeup() @@ -368,8 +381,8 @@ class Session: @synchronized def _get(self, predicate, timeout=None): - if self._wait(lambda: ((self._peek(predicate) is not None) or self.closing), - timeout): + if self._ewait(lambda: ((self._peek(predicate) is not None) or self.closing), + timeout): msg = self._pop(predicate) if msg is not None: msg._receiver.returned += 1 @@ -505,13 +518,18 @@ class Sender: Sends outgoing messages. """ - def __init__(self, session, index, target): + def __init__(self, session, index, target, options): self.session = session self.index = index self.target = target - self.capacity = UNLIMITED + self.options = options + self.capacity = options.get("capacity", UNLIMITED) + self.durable = options.get("durable") self.queued = Serial(0) self.acked = Serial(0) + self.error = None + self.linked = False + self.closing = False self.closed = False self._lock = self.session._lock @@ -520,9 +538,13 @@ class Sender: def _check_error(self, exc=SendError): self.session._check_error(exc) + if self.error: + raise exc(*self.error) def _ewait(self, predicate, timeout=None, exc=SendError): - return self.session._ewait(predicate, timeout, exc) + result = self.session._ewait(lambda: self.error or predicate(), timeout, exc) + self._check_error(exc) + return result @synchronized def pending(self): @@ -558,11 +580,16 @@ class Sender: if not self.session.connection._connected or self.session.closing: raise Disconnected() + self._ewait(lambda: self.linked) + if isinstance(object, Message): message = object else: message = Message(object) + if message.durable is None: + message.durable = self.durable + if self.capacity is not UNLIMITED: if self.capacity <= 0: raise InsufficientCapacity("capacity = %s" % self.capacity) @@ -573,15 +600,19 @@ class Sender: message._sender = self self.session.outgoing.append(message) self.queued += 1 - mno = self.queued self._wakeup() if sync: - self._ewait(lambda: self.acked >= mno) + self.sync() assert message not in self.session.outgoing @synchronized + def sync(self): + mno = self.queued + self._ewait(lambda: self.acked >= mno) + + @synchronized def close(self): """ Close the Sender. @@ -609,21 +640,23 @@ class Receiver: L{listen}. """ - def __init__(self, session, index, source, filter, started): + def __init__(self, session, index, source, options, started): self.session = session self.index = index self.destination = str(self.index) self.source = source - self.filter = filter + self.options = options self.started = started - self.capacity = UNLIMITED + self.capacity = options.get("capacity", UNLIMITED) self.granted = Serial(0) - self.drain = False + self.draining = False self.impending = Serial(0) self.received = Serial(0) self.returned = Serial(0) + self.error = None + self.linked = False self.closing = False self.closed = False self.listener = None @@ -634,9 +667,13 @@ class Receiver: def _check_error(self, exc=ReceiveError): self.session._check_error(exc) + if self.error: + raise exc(*self.error) def _ewait(self, predicate, timeout=None, exc=ReceiveError): - return self.session._ewait(predicate, timeout, exc) + result = self.session._ewait(lambda: self.error or predicate(), timeout, exc) + self._check_error(exc) + return result @synchronized def pending(self): @@ -680,17 +717,18 @@ class Receiver: @type timeout: float @param timeout: the time to wait for a message to be available """ + + self._ewait(lambda: self.linked) + if self._capacity() == 0: self.granted = self.returned + 1 self._wakeup() self._ewait(lambda: self.impending >= self.granted) msg = self.session._get(self._pred, timeout=timeout) if msg is None: - self.drain = True - self.granted = self.received + self.draining = True self._wakeup() - self._ewait(lambda: self.impending == self.received) - self.drain = False + self._ewait(lambda: not self.draining) self._grant() self._wakeup() msg = self.session._get(self._pred, timeout=0) @@ -738,7 +776,7 @@ class Receiver: self.closing = True self._wakeup() try: - self._ewait(lambda: self.closed) + self.session._ewait(lambda: self.closed) finally: self.session.receivers.remove(self) @@ -778,6 +816,8 @@ def get_type(content): def get_codec(content_type): return TYPE_CODEC[content_type] +UNSPECIFIED = object() + class Message: """ @@ -802,7 +842,9 @@ class Message: @ivar content: the message content """ - def __init__(self, content=None): + def __init__(self, content=None, content_type=UNSPECIFIED, id=None, + subject=None, to=None, user_id=None, reply_to=None, + correlation_id=None, durable=None, properties=None): """ Construct a new message with the supplied content. The content-type of the message will be automatically inferred from @@ -810,20 +852,44 @@ class Message: @type content: str, unicode, buffer, dict, list @param content: the message content - """ - self.id = None - self.subject = None - self.user_id = None - self.to = None - self.reply_to = None - self.correlation_id = None - self.durable = False - self.properties = {} - self.content_type = get_type(content) + + @type content_type: str + @param content_type: the content-type of the message + """ + self.id = id + self.subject = subject + self.to = to + self.user_id = user_id + self.reply_to = reply_to + self.correlation_id = correlation_id + self.durable = durable + if properties is None: + self.properties = {} + else: + self.properties = properties + if content_type is UNSPECIFIED: + self.content_type = get_type(content) + else: + self.content_type = content_type self.content = content def __repr__(self): - return "Message(%r)" % self.content + args = [] + for name in ["id", "subject", "to", "user_id", "reply_to", + "correlation_id"]: + value = self.__dict__[name] + if value is not None: args.append("%s=%r" % (name, value)) + for name in ["durable", "properties"]: + value = self.__dict__[name] + if value: args.append("%s=%r" % (name, value)) + if self.content_type != get_type(self.content): + args.append("content_type=%r" % self.content_type) + if self.content is not None: + if args: + args.append("content=%r" % self.content) + else: + args.append(repr(self.content)) + return "Message(%s)" % ", ".join(args) __all__ = ["Connection", "Session", "Sender", "Receiver", "Pattern", "Message", "ConnectionError", "ConnectError", "SessionError", "Disconnected", |