diff options
author | Aidan Skinner <aidan@apache.org> | 2009-09-09 13:05:43 +0000 |
---|---|---|
committer | Aidan Skinner <aidan@apache.org> | 2009-09-09 13:05:43 +0000 |
commit | c1ebe66bfab328c5192a35c21ea290b5c45f40f5 (patch) | |
tree | 6e61e50d92442f29287a82c22b54de6beac43b2c /qpid/python/qpid/messaging.py | |
parent | 7b28732091473d93ce7546c70fa1d2dbd685161a (diff) | |
download | qpid-python-c1ebe66bfab328c5192a35c21ea290b5c45f40f5.tar.gz |
Merge from trunk
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-network-refactor@812936 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/python/qpid/messaging.py')
-rw-r--r-- | qpid/python/qpid/messaging.py | 587 |
1 files changed, 259 insertions, 328 deletions
diff --git a/qpid/python/qpid/messaging.py b/qpid/python/qpid/messaging.py index 9b3fecbf9b..d755aa5054 100644 --- a/qpid/python/qpid/messaging.py +++ b/qpid/python/qpid/messaging.py @@ -6,9 +6,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -30,63 +30,18 @@ Areas that still need work: - protocol negotiation/multiprotocol impl """ -import connection, time, socket, sys, traceback from codec010 import StringCodec -from datatypes import timestamp, uuid4, RangedSet, Message as Message010 +from concurrency import synchronized, Waiter +from datatypes import timestamp, uuid4, Serial from logging import getLogger from ops import PRIMITIVE -from session import Client, INCOMPLETE from threading import Thread, RLock, Condition -from util import connect +from util import default log = getLogger("qpid.messaging") static = staticmethod -def synchronized(meth): - def sync_wrapper(self, *args, **kwargs): - self.lock() - try: - return meth(self, *args, **kwargs) - finally: - self.unlock() - return sync_wrapper - -class Lockable(object): - - def lock(self): - self._lock.acquire() - - def unlock(self): - self._lock.release() - - def wait(self, predicate, timeout=None): - passed = 0 - start = time.time() - while not predicate(): - if timeout is None: - # using the timed wait prevents keyboard interrupts from being - # blocked while waiting - self._condition.wait(3) - elif passed < timeout: - self._condition.wait(timeout - passed) - else: - return False - passed = time.time() - start - return True - - def notify(self): - self._condition.notify() - - def notifyAll(self): - self._condition.notifyAll() - -def default(value, default): - if value is None: - return default - else: - return value - AMQP_PORT = 5672 AMQPS_PORT = 5671 @@ -101,10 +56,20 @@ class Constant: UNLIMITED = Constant("UNLIMITED", 0xFFFFFFFFL) -class ConnectError(Exception): +class ConnectionError(Exception): + """ + The base class for all connection related exceptions. + """ pass -class Connection(Lockable): +class ConnectError(ConnectionError): + """ + Exception raised when there is an error connecting to the remote + peer. + """ + pass + +class Connection: """ A Connection manages a group of L{Sessions<Session>} and connects @@ -142,12 +107,35 @@ class Connection(Lockable): self.host = host self.port = default(port, AMQP_PORT) self.started = False - self._conn = None self.id = str(uuid4()) self.session_counter = 0 self.sessions = {} + self.reconnect = False + self._connected = False self._lock = RLock() self._condition = Condition(self._lock) + self._waiter = Waiter(self._condition) + self._modcount = Serial(0) + self.error = None + from driver import Driver + self._driver = Driver(self) + self._driver.start() + + def _wait(self, predicate, timeout=None): + return self._waiter.wait(predicate, timeout=timeout) + + def _wakeup(self): + self._modcount += 1 + self._driver.wakeup() + + def _check_error(self, exc=ConnectionError): + if self.error: + raise exc(*self.error) + + def _ewait(self, predicate, timeout=None, exc=ConnectionError): + result = self._wait(lambda: self.error or predicate(), timeout) + self._check_error(exc) + return result @synchronized def session(self, name=None, transactional=False): @@ -173,8 +161,7 @@ class Connection(Lockable): else: ssn = Session(self, name, self.started, transactional=transactional) self.sessions[name] = ssn - if self._conn is not None: - ssn._attach() + self._wakeup() return ssn @synchronized @@ -186,38 +173,25 @@ class Connection(Lockable): """ Connect to the remote endpoint. """ - if self._conn is not None: - return - try: - self._socket = connect(self.host, self.port) - except socket.error, e: - raise ConnectError(e) - self._conn = connection.Connection(self._socket) - try: - self._conn.start() - except connection.VersionError, e: - raise ConnectError(e) - - for ssn in self.sessions.values(): - ssn._attach() + self._connected = True + self._wakeup() + self._ewait(lambda: self._driver._connected, exc=ConnectError) @synchronized def disconnect(self): """ Disconnect from the remote endpoint. """ - if self._conn is not None: - self._conn.close() - self._conn = None - for ssn in self.sessions.values(): - ssn._disconnected() + self._connected = False + self._wakeup() + self._ewait(lambda: not self._driver._connected) @synchronized def connected(self): """ Return true if the connection is connected, false otherwise. """ - return self._conn is not None + return self._connected @synchronized def start(self): @@ -255,22 +229,32 @@ class Pattern: def __init__(self, value): self.value = value + # XXX: this should become part of the driver def _bind(self, ssn, exchange, queue): ssn.exchange_bind(exchange=exchange, queue=queue, binding_key=self.value.replace("*", "#")) -FILTER_DEFAULTS = { - "topic": Pattern("*") - } +class SessionError(Exception): + pass + +class Disconnected(SessionError): + """ + Exception raised when an operation is attempted that is illegal when + disconnected. + """ + pass -def delegate(session): - class Delegate(Client): +class NontransactionalSession(SessionError): + """ + Exception raised when commit or rollback is attempted on a non + transactional session. + """ + pass - def message_transfer(self, cmd): - session._message_transfer(cmd) - return Delegate +class TransactionAborted(SessionError): + pass -class Session(Lockable): +class Session: """ Sessions provide a linear context for sending and receiving @@ -281,18 +265,28 @@ class Session(Lockable): self.connection = connection self.name = name self.started = started + self.transactional = transactional - self._ssn = None + + self.committing = False + self.committed = True + self.aborting = False + self.aborted = False + self.senders = [] self.receivers = [] - self.closing = False + self.outgoing = [] self.incoming = [] - self.closed = False self.unacked = [] - if self.transactional: - self.acked = [] - self._lock = RLock() - self._condition = Condition(self._lock) + self.acked = [] + # XXX: I hate this name. + self.ack_capacity = UNLIMITED + + self.closing = False + self.closed = False + + self._lock = connection._lock + self.running = True self.thread = Thread(target = self.run) self.thread.setDaemon(True) self.thread.start() @@ -300,60 +294,17 @@ class Session(Lockable): def __repr__(self): return "<Session %s>" % self.name - def _attach(self): - self._ssn = self.connection._conn.session(self.name, delegate=delegate(self)) - self._ssn.auto_sync = False - self._ssn.invoke_lock = self._lock - self._ssn.lock = self._lock - self._ssn.condition = self._condition - if self.transactional: - self._ssn.tx_select() - for link in self.senders + self.receivers: - link._link() - - def _disconnected(self): - self._ssn = None - for link in self.senders + self.receivers: - link._disconnected() + def _wait(self, predicate, timeout=None): + return self.connection._wait(predicate, timeout=timeout) - @synchronized - def _message_transfer(self, cmd): - m = Message010(cmd.payload) - m.headers = cmd.headers - m.id = cmd.id - msg = self._decode(m) - rcv = self.receivers[int(cmd.destination)] - msg._receiver = rcv - log.debug("RECV [%s] %s", self, msg) - self.incoming.append(msg) - self.notifyAll() - return INCOMPLETE - - def _decode(self, message): - dp = message.get("delivery_properties") - mp = message.get("message_properties") - ap = mp.application_headers - enc, dec = get_codec(mp.content_type) - content = dec(message.body) - msg = Message(content) - msg.id = mp.message_id - if ap is not None: - msg.to = ap.get("to") - msg.subject = ap.get("subject") - msg.user_id = mp.user_id - if mp.reply_to is not None: - msg.reply_to = reply_to2addr(mp.reply_to) - msg.correlation_id = mp.correlation_id - msg.properties = mp.application_headers - msg.content_type = mp.content_type - msg._transfer_id = message.id - return msg + def _wakeup(self): + self.connection._wakeup() - def _exchange_query(self, address): - # XXX: auto sync hack is to avoid deadlock on future - result = self._ssn.exchange_query(name=address, sync=True) - self._ssn.sync() - return result.get() + def _check_error(self, exc=SessionError): + self.connection._check_error(exc) + + def _ewait(self, predicate, timeout=None, exc=SessionError): + return self.connection._ewait(predicate, timeout, exc) @synchronized def sender(self, target): @@ -368,8 +319,11 @@ class Session(Lockable): """ sender = Sender(self, len(self.senders), target) self.senders.append(sender) - if self._ssn is not None: - sender._link() + self._wakeup() + # XXX: because of the lack of waiting here we can end up getting + # into the driver loop with messages sent for senders that haven't + # been linked yet, something similar can probably happen for + # receivers return sender @synchronized @@ -386,8 +340,7 @@ class Session(Lockable): receiver = Receiver(self, len(self.receivers), source, filter, self.started) self.receivers.append(receiver) - if self._ssn is not None: - receiver._link() + self._wakeup() return receiver @synchronized @@ -415,43 +368,45 @@ class Session(Lockable): @synchronized def _get(self, predicate, timeout=None): - if self.wait(lambda: ((self._peek(predicate) is not None) or self.closing), - timeout): + if self._wait(lambda: ((self._peek(predicate) is not None) or self.closing), + timeout): msg = self._pop(predicate) if msg is not None: + msg._receiver.returned += 1 self.unacked.append(msg) log.debug("RETR [%s] %s", self, msg) return msg return None @synchronized - def acknowledge(self, message=None): + def acknowledge(self, message=None, sync=True): """ Acknowledge the given L{Message}. If message is None, then all unacknowledged messages on the session are acknowledged. @type message: Message @param message: the message to acknowledge or None + @type sync: boolean + @param sync: if true then block until the message(s) are acknowledged """ if message is None: messages = self.unacked[:] else: messages = [message] - ids = RangedSet(*[m._transfer_id for m in messages]) - for range in ids: - self._ssn.receiver._completed.add_range(range) - self._ssn.channel.session_completed(self._ssn.receiver._completed) - self._ssn.message_accept(ids, sync=True) - self._ssn.sync() - for m in messages: - try: - self.unacked.remove(m) - except ValueError: - pass - if self.transactional: - self.acked.append(m) + if self.ack_capacity is not UNLIMITED: + if self.ack_capacity <= 0: + # XXX: this is currently a SendError, maybe it should be a SessionError? + raise InsufficientCapacity("ack_capacity = %s" % self.ack_capacity) + self._wakeup() + self._ewait(lambda: len(self.acked) < self.ack_capacity) + self.unacked.remove(m) + self.acked.append(m) + + self._wakeup() + if sync: + self._ewait(lambda: not [m for m in messages if m in self.acked]) @synchronized def commit(self): @@ -461,11 +416,12 @@ class Session(Lockable): """ if not self.transactional: raise NontransactionalSession() - if self._ssn is None: - raise Disconnected() - self._ssn.tx_commit(sync=True) - del self.acked[:] - self._ssn.sync() + self.committing = True + self._wakeup() + self._ewait(lambda: not self.committing) + if self.aborted: + raise TransactionAborted() + assert self.committed @synchronized def rollback(self): @@ -475,21 +431,10 @@ class Session(Lockable): """ if not self.transactional: raise NontransactionalSession() - if self._ssn is None: - raise Disconnected() - - ids = RangedSet(*[m._transfer_id for m in self.acked + self.unacked + self.incoming]) - for range in ids: - self._ssn.receiver._completed.add_range(range) - self._ssn.channel.session_completed(self._ssn.receiver._completed) - self._ssn.message_release(ids) - self._ssn.tx_rollback(sync=True) - - del self.incoming[:] - del self.unacked[:] - del self.acked[:] - - self._ssn.sync() + self.aborting = True + self._wakeup() + self._ewait(lambda: not self.aborting) + assert self.aborted @synchronized def start(self): @@ -508,7 +453,7 @@ class Session(Lockable): for rcv in self.receivers: rcv.stop() # TODO: think about stopping individual receivers in listen mode - self.wait(lambda: self._peek(self._pred) is None) + self._wait(lambda: self._peek(self._pred) is None) self.started = False def _pred(self, m): @@ -516,6 +461,7 @@ class Session(Lockable): @synchronized def run(self): + self.running = True try: while True: msg = self._get(self._pred) @@ -524,10 +470,10 @@ class Session(Lockable): else: msg._receiver.listener(msg) if self._peek(self._pred) is None: - self.notifyAll() + self.connection._waiter.notifyAll() finally: - self.closed = True - self.notifyAll() + self.running = False + self.connection._waiter.notifyAll() @synchronized def close(self): @@ -538,45 +484,22 @@ class Session(Lockable): link.close() self.closing = True - self.notifyAll() - self.wait(lambda: self.closed) + self._wakeup() + self._ewait(lambda: self.closed and not self.running) while self.thread.isAlive(): self.thread.join(3) self.thread = None - self._ssn.close() - self._ssn = None + # XXX: should be able to express this condition through API calls + self._ewait(lambda: not self.outgoing and not self.acked) self.connection._remove_session(self) -def parse_addr(address): - parts = address.split("/", 1) - if len(parts) == 1: - return parts[0], None - else: - return parts[0], parts[i1] - -def reply_to2addr(reply_to): - if reply_to.routing_key is None: - return reply_to.exchange - elif reply_to.exchange in (None, ""): - return reply_to.routing_key - else: - return "%s/%s" % (reply_to.exchange, reply_to.routing_key) - -class Disconnected(Exception): - """ - Exception raised when an operation is attempted that is illegal when - disconnected. - """ +class SendError(SessionError): pass -class NontransactionalSession(Exception): - """ - Exception raised when commit or rollback is attempted on a non - transactional session. - """ +class InsufficientCapacity(SendError): pass -class Sender(Lockable): +class Sender: """ Sends outgoing messages. @@ -586,100 +509,99 @@ class Sender(Lockable): self.session = session self.index = index self.target = target + self.capacity = UNLIMITED + self.queued = Serial(0) + self.acked = Serial(0) self.closed = False - self._ssn = None - self._exchange = None - self._routing_key = None - self._subject = None self._lock = self.session._lock - self._condition = self.session._condition - - def _link(self): - self._ssn = self.session._ssn - node, self._subject = parse_addr(self.target) - result = self.session._exchange_query(node) - if result.not_found: - # XXX: should check 'create' option - self._ssn.queue_declare(queue=node, durable=False, sync=True) - self._ssn.sync() - self._exchange = "" - self._routing_key = node - else: - self._exchange = node - self._routing_key = self._subject - def _disconnected(self): - self._ssn = None + def _wakeup(self): + self.session._wakeup() + + def _check_error(self, exc=SendError): + self.session._check_error(exc) + + def _ewait(self, predicate, timeout=None, exc=SendError): + return self.session._ewait(predicate, timeout, exc) @synchronized - def send(self, object): + def pending(self): + """ + Returns the number of messages awaiting acknowledgment. + @rtype: int + @return: the number of unacknowledged messages + """ + return self.queued - self.acked + + @synchronized + def send(self, object, sync=True, timeout=None): """ Send a message. If the object passed in is of type L{unicode}, L{str}, L{list}, or L{dict}, it will automatically be wrapped in a L{Message} and sent. If it is of type L{Message}, it will be sent - directly. + directly. If the sender capacity is not L{UNLIMITED} then send + will block until there is available capacity to send the message. + If the timeout parameter is specified, then send will throw an + L{InsufficientCapacity} exception if capacity does not become + available within the specified time. @type object: unicode, str, list, dict, Message @param object: the message or content to send + + @type sync: boolean + @param sync: if true then block until the message is sent + + @type timeout: float + @param timeout: the time to wait for available capacity """ - if self._ssn is None: + if not self.session.connection._connected or self.session.closing: raise Disconnected() if isinstance(object, Message): message = object else: message = Message(object) - # XXX: what if subject is specified for a normal queue? - if self._routing_key is None: - rk = message.subject - else: - rk = self._routing_key - # XXX: do we need to query to figure out how to create the reply-to interoperably? - if message.reply_to: - rt = self._ssn.reply_to(*parse_addr(message.reply_to)) - else: - rt = None - dp = self._ssn.delivery_properties(routing_key=rk) - mp = self._ssn.message_properties(message_id=message.id, - user_id=message.user_id, - reply_to=rt, - correlation_id=message.correlation_id, - content_type=message.content_type, - application_headers=message.properties) - if message.subject is not None: - if mp.application_headers is None: - mp.application_headers = {} - mp.application_headers["subject"] = message.subject - if message.to is not None: - if mp.application_headers is None: - mp.application_headers = {} - mp.application_headers["to"] = message.to - enc, dec = get_codec(message.content_type) - body = enc(message.content) - self._ssn.message_transfer(destination=self._exchange, - message=Message010(dp, mp, body), - sync=True) - log.debug("SENT [%s] %s", self.session, message) - self._ssn.sync() + + if self.capacity is not UNLIMITED: + if self.capacity <= 0: + raise InsufficientCapacity("capacity = %s" % self.capacity) + if not self._ewait(lambda: self.pending() < self.capacity, timeout=timeout): + raise InsufficientCapacity("capacity = %s" % self.capacity) + + # XXX: what if we send the same message to multiple senders? + message._sender = self + self.session.outgoing.append(message) + self.queued += 1 + mno = self.queued + + self._wakeup() + + if sync: + self._ewait(lambda: self.acked >= mno) + assert message not in self.session.outgoing @synchronized def close(self): """ Close the Sender. """ + # XXX: should make driver do something here if not self.closed: self.session.senders.remove(self) self.closed = True -class Empty(Exception): +class ReceiveError(SessionError): + pass + +class Empty(ReceiveError): """ Exception raised by L{Receiver.fetch} when there is no message available within the alloted time. """ pass -class Receiver(Lockable): +class Receiver: """ Receives incoming messages from a remote source. Messages may be @@ -693,43 +615,39 @@ class Receiver(Lockable): self.destination = str(self.index) self.source = source self.filter = filter + self.started = started self.capacity = UNLIMITED + self.granted = Serial(0) + self.drain = False + self.impending = Serial(0) + self.received = Serial(0) + self.returned = Serial(0) + + self.closing = False self.closed = False self.listener = None - self._ssn = None - self._queue = None self._lock = self.session._lock - self._condition = self.session._condition - - def _link(self): - self._ssn = self.session._ssn - result = self.session._exchange_query(self.source) - if result.not_found: - self._queue = self.source - # XXX: should check 'create' option - self._ssn.queue_declare(queue=self._queue, durable=False) - else: - self._queue = "%s.%s" % (self.session.name, self.destination) - self._ssn.queue_declare(queue=self._queue, durable=False, exclusive=True, auto_delete=True) - if self.filter is None: - f = FILTER_DEFAULTS[result.type] - else: - f = self.filter - f._bind(self._ssn, self.source, self._queue) - self._ssn.message_subscribe(queue=self._queue, destination=self.destination, - sync=True) - self._ssn.message_set_flow_mode(self.destination, self._ssn.flow_mode.credit) - self._ssn.sync() - if self.started: - self._start() - def _disconnected(self): - self._ssn = None + def _wakeup(self): + self.session._wakeup() + + def _check_error(self, exc=ReceiveError): + self.session._check_error(exc) + + def _ewait(self, predicate, timeout=None, exc=ReceiveError): + return self.session._ewait(predicate, timeout, exc) @synchronized def pending(self): - return self.session._count(self._pred) + """ + Returns the number of messages available to be fetched by the + application. + + @rtype: int + @return: the number of available messages + """ + return self.received - self.returned def _capacity(self): if not self.started: @@ -762,23 +680,36 @@ class Receiver(Lockable): @type timeout: float @param timeout: the time to wait for a message to be available """ - if self.capacity is not UNLIMITED or not self.started: - self._ssn.message_flow(self.destination, self._ssn.credit_unit.byte, - UNLIMITED.value) - self._ssn.message_flow(self.destination, self._ssn.credit_unit.message, 1) + if self._capacity() == 0: + self.granted = self.returned + 1 + self._wakeup() + self._ewait(lambda: self.impending >= self.granted) msg = self.session._get(self._pred, timeout=timeout) if msg is None: - self._ssn.message_flush(self.destination) - self._start() - self._ssn.sync() + self.drain = True + self.granted = self.received + self._wakeup() + self._ewait(lambda: self.impending == self.received) + self.drain = False + self._grant() + self._wakeup() msg = self.session._get(self._pred, timeout=0) if msg is None: raise Empty() + elif self._capacity() not in (0, UNLIMITED.value): + self.granted += 1 + self._wakeup() return msg - def _start(self): - self._ssn.message_flow(self.destination, self._ssn.credit_unit.byte, UNLIMITED.value) - self._ssn.message_flow(self.destination, self._ssn.credit_unit.message, self._capacity()) + def _grant(self): + if self.started: + if self.capacity is UNLIMITED: + self.granted = UNLIMITED + else: + self.granted = self.received + self._capacity() + else: + self.granted = self.received + @synchronized def start(self): @@ -786,34 +717,31 @@ class Receiver(Lockable): Start incoming message delivery for this receiver. """ self.started = True - if self._ssn is not None: - self._start() - - def _stop(self): - self._ssn.message_stop(self.destination) + self._grant() + self._wakeup() @synchronized def stop(self): """ Stop incoming message delivery for this receiver. """ - if self._ssn is not None: - self._stop() self.started = False + self._grant() + self._wakeup() + self._ewait(lambda: self.impending == self.received) @synchronized def close(self): """ Close the receiver. """ - if not self.closed: - self.closed = True - self._ssn.message_cancel(self.destination, sync=True) - self._ssn.sync() + self.closing = True + self._wakeup() + try: + self._ewait(lambda: self.closed) + finally: self.session.receivers.remove(self) - - def codec(name): type = PRIMITIVE[name] @@ -889,6 +817,7 @@ class Message: self.to = None self.reply_to = None self.correlation_id = None + self.durable = False self.properties = {} self.content_type = get_type(content) self.content = content @@ -896,5 +825,7 @@ class Message: def __repr__(self): return "Message(%r)" % self.content -__all__ = ["Connection", "Pattern", "Session", "Sender", "Receiver", "Message", - "Empty", "timestamp", "uuid4"] +__all__ = ["Connection", "Session", "Sender", "Receiver", "Pattern", "Message", + "ConnectionError", "ConnectError", "SessionError", "Disconnected", + "SendError", "InsufficientCapacity", "ReceiveError", "Empty", + "timestamp", "uuid4", "UNLIMITED", "AMQP_PORT", "AMQPS_PORT"] |