diff options
author | Rafael H. Schloming <rhs@apache.org> | 2009-08-20 23:03:28 +0000 |
---|---|---|
committer | Rafael H. Schloming <rhs@apache.org> | 2009-08-20 23:03:28 +0000 |
commit | f34769235c625d5afe031076b6738f6ae672c3ff (patch) | |
tree | f680c97f82a5dfa17150640c52c6356625e2a214 /python/qpid/messaging.py | |
parent | a55a45cac6111b666de84823686bf6405483099e (diff) | |
download | qpid-python-f34769235c625d5afe031076b6738f6ae672c3ff.tar.gz |
implemented reconnect and separated out the protocol driver from the messaging client
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@806393 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/qpid/messaging.py')
-rw-r--r-- | python/qpid/messaging.py | 792 |
1 files changed, 548 insertions, 244 deletions
diff --git a/python/qpid/messaging.py b/python/qpid/messaging.py index 9b3fecbf9b..b66f681dcd 100644 --- a/python/qpid/messaging.py +++ b/python/qpid/messaging.py @@ -32,10 +32,11 @@ Areas that still need work: import connection, time, socket, sys, traceback from codec010 import StringCodec -from datatypes import timestamp, uuid4, RangedSet, Message as Message010 +from datatypes import timestamp, uuid4, RangedSet, Message as Message010, Serial +from exceptions import Timeout from logging import getLogger -from ops import PRIMITIVE -from session import Client, INCOMPLETE +from ops import PRIMITIVE, delivery_mode +from session import Client, INCOMPLETE, SessionDetached from threading import Thread, RLock, Condition from util import connect @@ -101,7 +102,10 @@ class Constant: UNLIMITED = Constant("UNLIMITED", 0xFFFFFFFFL) -class ConnectError(Exception): +class ConnectionError(Exception): + pass + +class ConnectError(ConnectionError): pass class Connection(Lockable): @@ -142,12 +146,29 @@ class Connection(Lockable): self.host = host self.port = default(port, AMQP_PORT) self.started = False - self._conn = None self.id = str(uuid4()) self.session_counter = 0 self.sessions = {} + self.reconnect = False self._lock = RLock() self._condition = Condition(self._lock) + self._modcount = Serial(0) + self.error = None + self._driver = Driver(self) + self._driver.start() + + def wakeup(self): + self._modcount += 1 + self._driver.wakeup() + + def catchup(self, exc=ConnectionError): + mc = self._modcount + self.wait(lambda: not self._driver._modcount < mc) + self.check_error(exc) + + def check_error(self, exc=ConnectionError): + if self.error: + raise exc(*self.error) @synchronized def session(self, name=None, transactional=False): @@ -173,8 +194,7 @@ class Connection(Lockable): else: ssn = Session(self, name, self.started, transactional=transactional) self.sessions[name] = ssn - if self._conn is not None: - ssn._attach() + self.wakeup() return ssn @synchronized @@ -186,38 +206,25 @@ class Connection(Lockable): """ Connect to the remote endpoint. """ - if self._conn is not None: - return - try: - self._socket = connect(self.host, self.port) - except socket.error, e: - raise ConnectError(e) - self._conn = connection.Connection(self._socket) - try: - self._conn.start() - except connection.VersionError, e: - raise ConnectError(e) - - for ssn in self.sessions.values(): - ssn._attach() + self._connected = True + self.wakeup() + self.catchup(ConnectError) @synchronized def disconnect(self): """ Disconnect from the remote endpoint. """ - if self._conn is not None: - self._conn.close() - self._conn = None - for ssn in self.sessions.values(): - ssn._disconnected() + self._connected = False + self.wakeup() + self.catchup() @synchronized def connected(self): """ Return true if the connection is connected, false otherwise. """ - return self._conn is not None + return self._connected @synchronized def start(self): @@ -255,6 +262,7 @@ class Pattern: def __init__(self, value): self.value = value + # XXX: this should become part of the driver def _bind(self, ssn, exchange, queue): ssn.exchange_bind(exchange=exchange, queue=queue, binding_key=self.value.replace("*", "#")) @@ -263,13 +271,33 @@ FILTER_DEFAULTS = { "topic": Pattern("*") } -def delegate(session): +def delegate(handler, session): class Delegate(Client): def message_transfer(self, cmd): - session._message_transfer(cmd) + handler._message_transfer(session, cmd) return Delegate +class SessionError(Exception): + pass + +class Disconnected(SessionError): + """ + Exception raised when an operation is attempted that is illegal when + disconnected. + """ + pass + +class NontransactionalSession(SessionError): + """ + Exception raised when commit or rollback is attempted on a non + transactional session. + """ + pass + +class TransactionAborted(SessionError): + pass + class Session(Lockable): """ @@ -281,18 +309,26 @@ class Session(Lockable): self.connection = connection self.name = name self.started = started + self.transactional = transactional - self._ssn = None + + self.committing = False + self.committed = True + self.aborting = False + self.aborted = False + self.senders = [] self.receivers = [] - self.closing = False + self.outgoing = [] self.incoming = [] - self.closed = False self.unacked = [] - if self.transactional: - self.acked = [] - self._lock = RLock() - self._condition = Condition(self._lock) + self.acked = [] + + self.closing = False + self.closed = False + + self._lock = connection._lock + self._condition = connection._condition self.thread = Thread(target = self.run) self.thread.setDaemon(True) self.thread.start() @@ -300,60 +336,14 @@ class Session(Lockable): def __repr__(self): return "<Session %s>" % self.name - def _attach(self): - self._ssn = self.connection._conn.session(self.name, delegate=delegate(self)) - self._ssn.auto_sync = False - self._ssn.invoke_lock = self._lock - self._ssn.lock = self._lock - self._ssn.condition = self._condition - if self.transactional: - self._ssn.tx_select() - for link in self.senders + self.receivers: - link._link() - - def _disconnected(self): - self._ssn = None - for link in self.senders + self.receivers: - link._disconnected() + def wakeup(self): + self.connection.wakeup() - @synchronized - def _message_transfer(self, cmd): - m = Message010(cmd.payload) - m.headers = cmd.headers - m.id = cmd.id - msg = self._decode(m) - rcv = self.receivers[int(cmd.destination)] - msg._receiver = rcv - log.debug("RECV [%s] %s", self, msg) - self.incoming.append(msg) - self.notifyAll() - return INCOMPLETE + def catchup(self, exc=SessionError): + self.connection.catchup(exc) - def _decode(self, message): - dp = message.get("delivery_properties") - mp = message.get("message_properties") - ap = mp.application_headers - enc, dec = get_codec(mp.content_type) - content = dec(message.body) - msg = Message(content) - msg.id = mp.message_id - if ap is not None: - msg.to = ap.get("to") - msg.subject = ap.get("subject") - msg.user_id = mp.user_id - if mp.reply_to is not None: - msg.reply_to = reply_to2addr(mp.reply_to) - msg.correlation_id = mp.correlation_id - msg.properties = mp.application_headers - msg.content_type = mp.content_type - msg._transfer_id = message.id - return msg - - def _exchange_query(self, address): - # XXX: auto sync hack is to avoid deadlock on future - result = self._ssn.exchange_query(name=address, sync=True) - self._ssn.sync() - return result.get() + def check_error(self, exc=SessionError): + self.connection.check_error(exc) @synchronized def sender(self, target): @@ -368,8 +358,7 @@ class Session(Lockable): """ sender = Sender(self, len(self.senders), target) self.senders.append(sender) - if self._ssn is not None: - sender._link() + self.wakeup() return sender @synchronized @@ -386,8 +375,7 @@ class Session(Lockable): receiver = Receiver(self, len(self.receivers), source, filter, self.started) self.receivers.append(receiver) - if self._ssn is not None: - receiver._link() + self.wakeup() return receiver @synchronized @@ -419,6 +407,7 @@ class Session(Lockable): timeout): msg = self._pop(predicate) if msg is not None: + msg._receiver.returned += 1 self.unacked.append(msg) log.debug("RETR [%s] %s", self, msg) return msg @@ -438,20 +427,13 @@ class Session(Lockable): else: messages = [message] - ids = RangedSet(*[m._transfer_id for m in messages]) - for range in ids: - self._ssn.receiver._completed.add_range(range) - self._ssn.channel.session_completed(self._ssn.receiver._completed) - self._ssn.message_accept(ids, sync=True) - self._ssn.sync() - for m in messages: - try: - self.unacked.remove(m) - except ValueError: - pass - if self.transactional: - self.acked.append(m) + self.unacked.remove(m) + self.acked.append(m) + + self.wakeup() + self.wait(lambda: self.connection.error or not [m for m in messages if m in self.acked]) + self.check_error() @synchronized def commit(self): @@ -461,11 +443,13 @@ class Session(Lockable): """ if not self.transactional: raise NontransactionalSession() - if self._ssn is None: - raise Disconnected() - self._ssn.tx_commit(sync=True) - del self.acked[:] - self._ssn.sync() + self.committing = True + self.wakeup() + self.catchup() + assert not self.committing + if self.aborted: + raise TransactionAborted() + assert self.committed @synchronized def rollback(self): @@ -475,21 +459,10 @@ class Session(Lockable): """ if not self.transactional: raise NontransactionalSession() - if self._ssn is None: - raise Disconnected() - - ids = RangedSet(*[m._transfer_id for m in self.acked + self.unacked + self.incoming]) - for range in ids: - self._ssn.receiver._completed.add_range(range) - self._ssn.channel.session_completed(self._ssn.receiver._completed) - self._ssn.message_release(ids) - self._ssn.tx_rollback(sync=True) - - del self.incoming[:] - del self.unacked[:] - del self.acked[:] - - self._ssn.sync() + self.aborting = True + self.wakeup() + self.catchup() + assert not self.aborting and self.aborted @synchronized def start(self): @@ -538,13 +511,12 @@ class Session(Lockable): link.close() self.closing = True - self.notifyAll() + self.wakeup() + self.catchup() self.wait(lambda: self.closed) while self.thread.isAlive(): self.thread.join(3) self.thread = None - self._ssn.close() - self._ssn = None self.connection._remove_session(self) def parse_addr(address): @@ -562,18 +534,7 @@ def reply_to2addr(reply_to): else: return "%s/%s" % (reply_to.exchange, reply_to.routing_key) -class Disconnected(Exception): - """ - Exception raised when an operation is attempted that is illegal when - disconnected. - """ - pass - -class NontransactionalSession(Exception): - """ - Exception raised when commit or rollback is attempted on a non - transactional session. - """ +class SendError(SessionError): pass class Sender(Lockable): @@ -587,29 +548,17 @@ class Sender(Lockable): self.index = index self.target = target self.closed = False - self._ssn = None - self._exchange = None - self._routing_key = None - self._subject = None self._lock = self.session._lock self._condition = self.session._condition - def _link(self): - self._ssn = self.session._ssn - node, self._subject = parse_addr(self.target) - result = self.session._exchange_query(node) - if result.not_found: - # XXX: should check 'create' option - self._ssn.queue_declare(queue=node, durable=False, sync=True) - self._ssn.sync() - self._exchange = "" - self._routing_key = node - else: - self._exchange = node - self._routing_key = self._subject + def wakeup(self): + self.session.wakeup() - def _disconnected(self): - self._ssn = None + def catchup(self, exc=SendError): + self.session.catchup(exc) + + def check_error(self, exc=SendError): + self.session.check_error(exc) @synchronized def send(self, object): @@ -623,56 +572,36 @@ class Sender(Lockable): @param object: the message or content to send """ - if self._ssn is None: + if not self.session.connection._connected or self.session.closing: raise Disconnected() if isinstance(object, Message): message = object else: message = Message(object) - # XXX: what if subject is specified for a normal queue? - if self._routing_key is None: - rk = message.subject - else: - rk = self._routing_key - # XXX: do we need to query to figure out how to create the reply-to interoperably? - if message.reply_to: - rt = self._ssn.reply_to(*parse_addr(message.reply_to)) - else: - rt = None - dp = self._ssn.delivery_properties(routing_key=rk) - mp = self._ssn.message_properties(message_id=message.id, - user_id=message.user_id, - reply_to=rt, - correlation_id=message.correlation_id, - content_type=message.content_type, - application_headers=message.properties) - if message.subject is not None: - if mp.application_headers is None: - mp.application_headers = {} - mp.application_headers["subject"] = message.subject - if message.to is not None: - if mp.application_headers is None: - mp.application_headers = {} - mp.application_headers["to"] = message.to - enc, dec = get_codec(message.content_type) - body = enc(message.content) - self._ssn.message_transfer(destination=self._exchange, - message=Message010(dp, mp, body), - sync=True) - log.debug("SENT [%s] %s", self.session, message) - self._ssn.sync() + + # XXX: what if we send the same message to multiple senders? + message._sender = self + self.session.outgoing.append(message) + + self.wakeup() + self.catchup() + assert message not in self.session.outgoing @synchronized def close(self): """ Close the Sender. """ + # XXX: should make driver do something here if not self.closed: self.session.senders.remove(self) self.closed = True -class Empty(Exception): +class ReceiveError(SessionError): + pass + +class Empty(ReceiveError): """ Exception raised by L{Receiver.fetch} when there is no message available within the alloted time. @@ -693,43 +622,32 @@ class Receiver(Lockable): self.destination = str(self.index) self.source = source self.filter = filter + self.started = started self.capacity = UNLIMITED + self.granted = Serial(0) + self.drain = False + self.impending = Serial(0) + self.received = Serial(0) + self.returned = Serial(0) + self.closed = False self.listener = None - self._ssn = None - self._queue = None self._lock = self.session._lock self._condition = self.session._condition - def _link(self): - self._ssn = self.session._ssn - result = self.session._exchange_query(self.source) - if result.not_found: - self._queue = self.source - # XXX: should check 'create' option - self._ssn.queue_declare(queue=self._queue, durable=False) - else: - self._queue = "%s.%s" % (self.session.name, self.destination) - self._ssn.queue_declare(queue=self._queue, durable=False, exclusive=True, auto_delete=True) - if self.filter is None: - f = FILTER_DEFAULTS[result.type] - else: - f = self.filter - f._bind(self._ssn, self.source, self._queue) - self._ssn.message_subscribe(queue=self._queue, destination=self.destination, - sync=True) - self._ssn.message_set_flow_mode(self.destination, self._ssn.flow_mode.credit) - self._ssn.sync() - if self.started: - self._start() - - def _disconnected(self): - self._ssn = None + def wakeup(self): + self.session.wakeup() + + def catchup(self, exc=ReceiveError): + self.session.catchup() + + def check_error(self, exc=ReceiveError): + self.session.check_error(exc) @synchronized def pending(self): - return self.session._count(self._pred) + return self.received - self.returned def _capacity(self): if not self.started: @@ -762,58 +680,64 @@ class Receiver(Lockable): @type timeout: float @param timeout: the time to wait for a message to be available """ - if self.capacity is not UNLIMITED or not self.started: - self._ssn.message_flow(self.destination, self._ssn.credit_unit.byte, - UNLIMITED.value) - self._ssn.message_flow(self.destination, self._ssn.credit_unit.message, 1) + if self._capacity() == 0: + self.granted = self.returned + 1 + self.wakeup() + self.catchup() + assert self.impending == self.granted msg = self.session._get(self._pred, timeout=timeout) if msg is None: - self._ssn.message_flush(self.destination) - self._start() - self._ssn.sync() + self.drain = True + self.wakeup() + self.catchup() + assert self.granted == self.received + self.drain = False + if self.capacity is not UNLIMITED: + self.granted += self._capacity() + self.wakeup() msg = self.session._get(self._pred, timeout=0) if msg is None: raise Empty() + if self._capacity() not in (0, UNLIMITED.value): + self.granted += 1 + self.wakeup() return msg - def _start(self): - self._ssn.message_flow(self.destination, self._ssn.credit_unit.byte, UNLIMITED.value) - self._ssn.message_flow(self.destination, self._ssn.credit_unit.message, self._capacity()) - @synchronized def start(self): """ Start incoming message delivery for this receiver. """ self.started = True - if self._ssn is not None: - self._start() - - def _stop(self): - self._ssn.message_stop(self.destination) + if self.capacity is UNLIMITED: + self.granted = UNLIMITED + else: + self.granted = self.received + self._capacity() + self.wakeup() @synchronized def stop(self): """ Stop incoming message delivery for this receiver. """ - if self._ssn is not None: - self._stop() + self.granted = self.received self.started = False + self.wakeup() + self.catchup() + assert self.granted == self.received @synchronized def close(self): """ Close the receiver. """ - if not self.closed: - self.closed = True - self._ssn.message_cancel(self.destination, sync=True) - self._ssn.sync() + self.closed = True + self.wakeup() + try: + self.catchup() + finally: self.session.receivers.remove(self) - - def codec(name): type = PRIMITIVE[name] @@ -889,6 +813,7 @@ class Message: self.to = None self.reply_to = None self.correlation_id = None + self.durable = False self.properties = {} self.content_type = get_type(content) self.content = content @@ -896,5 +821,384 @@ class Message: def __repr__(self): return "Message(%r)" % self.content +class Attachment: + + def __init__(self, target): + self.target = target + +DURABLE_DEFAULT=True + +class Driver(Lockable): + + def __init__(self, connection): + self.connection = connection + self._lock = self.connection._lock + self._condition = self.connection._condition + self._wakeup_cond = Condition() + self._socket = None + self._conn = None + self._attachments = {} + self._modcount = self.connection._modcount + self.thread = Thread(target=self.run) + self.thread.setDaemon(True) + # XXX: need to figure out how to join on this thread + + def start(self): + self.thread.start() + + def wakeup(self): + self._wakeup_cond.acquire() + try: + self._wakeup_cond.notifyAll() + finally: + self._wakeup_cond.release() + + def start(self): + self.thread.start() + + def run(self): + while True: + self._wakeup_cond.acquire() + try: + if self.connection._modcount <= self._modcount: + self._wakeup_cond.wait(10) + finally: + self._wakeup_cond.release() + self.dispatch(self.connection._modcount) + + @synchronized + def dispatch(self, modcount): + try: + if self._conn is None and self.connection._connected: + self.connect() + elif self._conn is not None and not self.connection._connected: + self.disconnect() + + if self._conn is not None: + for ssn in self.connection.sessions.values(): + self.attach(ssn) + self.process(ssn) + exi = None + except: + exi = sys.exc_info() + + if exi: + msg = traceback.format_exc() + recoverable = ["aborted", "Connection refused", "SessionDetached", "Connection reset by peer", + "Bad file descriptor", "start timed out", "Broken pipe"] + for r in recoverable: + if self.connection.reconnect and r in msg: + print "waiting to retry" + self.reset() + time.sleep(3) + print "retrying..." + return + else: + self.connection.error = (msg,) + + self._modcount = modcount + self.notifyAll() + + def connect(self): + if self._conn is not None: + return + try: + self._socket = connect(self.connection.host, self.connection.port) + except socket.error, e: + raise ConnectError(e) + self._conn = connection.Connection(self._socket) + try: + self._conn.start(timeout=10) + except connection.VersionError, e: + raise ConnectError(e) + except Timeout: + print "start timed out" + raise ConnectError("start timed out") + + def disconnect(self): + self._conn.close() + self.reset() + + def reset(self): + self._conn = None + self._attachments.clear() + for ssn in self.connection.sessions.values(): + for m in ssn.acked + ssn.unacked + ssn.incoming: + m._transfer_id = None + for rcv in ssn.receivers: + rcv.impending = rcv.received + rcv.returned = rcv.received + + def connected(self): + return self._conn is not None + + def attach(self, ssn): + _ssn = self._attachments.get(ssn) + if _ssn is None: + _ssn = self._conn.session(ssn.name, delegate=delegate(self, ssn)) + _ssn.auto_sync = False + _ssn.invoke_lock = self._lock + _ssn.lock = self._lock + _ssn.condition = self._condition + if ssn.transactional: + # XXX: adding an attribute to qpid.session.Session + _ssn.acked = [] + _ssn.tx_select() + self._attachments[ssn] = _ssn + + for snd in ssn.senders: + self.link_out(snd) + for rcv in ssn.receivers: + self.link_in(rcv) + + if ssn.closing: + _ssn.close() + del self._attachments[ssn] + + def _exchange_query(self, ssn, address): + # XXX: auto sync hack is to avoid deadlock on future + result = ssn.exchange_query(name=address, sync=True) + ssn.sync() + return result.get() + + def link_out(self, snd): + _ssn = self._attachments[snd.session] + _snd = self._attachments.get(snd) + if _snd is None: + _snd = Attachment(snd) + node, _snd._subject = parse_addr(snd.target) + result = self._exchange_query(_ssn, node) + if result.not_found: + # XXX: should check 'create' option + _ssn.queue_declare(queue=node, durable=DURABLE_DEFAULT, sync=True) + _ssn.sync() + _snd._exchange = "" + _snd._routing_key = node + else: + _snd._exchange = node + _snd._routing_key = _snd._subject + self._attachments[snd] = _snd + + if snd.closed: + del self._attachments[snd] + + def link_in(self, rcv): + _ssn = self._attachments[rcv.session] + _rcv = self._attachments.get(rcv) + if _rcv is None: + _rcv = Attachment(rcv) + result = self._exchange_query(_ssn, rcv.source) + if result.not_found: + _rcv._queue = rcv.source + # XXX: should check 'create' option + _ssn.queue_declare(queue=_rcv._queue, durable=DURABLE_DEFAULT) + else: + _rcv._queue = "%s.%s" % (rcv.session.name, rcv.destination) + _ssn.queue_declare(queue=_rcv._queue, durable=DURABLE_DEFAULT, exclusive=True, auto_delete=True) + if rcv.filter is None: + f = FILTER_DEFAULTS[result.type] + else: + f = rcv.filter + f._bind(_ssn, rcv.source, _rcv._queue) + _ssn.message_subscribe(queue=_rcv._queue, destination=rcv.destination) + _ssn.message_set_flow_mode(rcv.destination, _ssn.flow_mode.credit, sync=True) + self._attachments[rcv] = _rcv + # XXX: need to kill syncs + _ssn.sync() + + if rcv.closed: + _ssn.message_cancel(rcv.destination, sync=True) + # XXX: need to kill syncs + _ssn.sync() + del self._attachments[rcv] + + def process(self, ssn): + if ssn.closing: return + + _ssn = self._attachments[ssn] + + while ssn.outgoing: + msg = ssn.outgoing[0] + snd = msg._sender + self.send(snd, msg) + ssn.outgoing.pop(0) + + for rcv in ssn.receivers: + self.process_receiver(rcv) + + if ssn.acked: + messages = ssn.acked[:] + ids = RangedSet(*[m._transfer_id for m in messages if m._transfer_id is not None]) + for range in ids: + _ssn.receiver._completed.add_range(range) + ch = _ssn.channel + if ch is None: + raise SessionDetached() + ch.session_completed(_ssn.receiver._completed) + _ssn.message_accept(ids, sync=True) + # XXX: really need to make this async so that we don't give up the lock + _ssn.sync() + + for m in messages: + ssn.acked.remove(m) + if ssn.transactional: + _ssn.acked.append(m) + + if ssn.committing: + _ssn.tx_commit(sync=True) + # XXX: need to kill syncs + _ssn.sync() + del _ssn.acked[:] + ssn.committing = False + ssn.committed = True + ssn.aborting = False + ssn.aborted = False + + if ssn.aborting: + for rcv in ssn.receivers: + _ssn.message_stop(rcv.destination) + _ssn.sync() + + messages = _ssn.acked + ssn.unacked + ssn.incoming + ids = RangedSet(*[m._transfer_id for m in messages]) + for range in ids: + _ssn.receiver._completed.add_range(range) + _ssn.channel.session_completed(_ssn.receiver._completed) + _ssn.message_release(ids) + _ssn.tx_rollback(sync=True) + _ssn.sync() + + del ssn.incoming[:] + del ssn.unacked[:] + del _ssn.acked[:] + + for rcv in ssn.receivers: + rcv.impending = rcv.received + rcv.returned = rcv.received + # XXX: do we need to update granted here as well? + + for rcv in ssn.receivers: + self.process_receiver(rcv) + + ssn.aborting = False + ssn.aborted = True + ssn.committing = False + ssn.committed = False + + def grant(self, rcv): + _ssn = self._attachments[rcv.session] + _rcv = self._attachments[rcv] + + if rcv.granted is UNLIMITED: + if rcv.impending is UNLIMITED: + delta = 0 + else: + delta = UNLIMITED.value + else: + delta = rcv.granted - rcv.impending + + if delta > 0: + _ssn.message_flow(rcv.destination, _ssn.credit_unit.byte, UNLIMITED.value) + _ssn.message_flow(rcv.destination, _ssn.credit_unit.message, delta) + rcv.impending += delta + elif delta < 0: + _ssn.message_stop(rcv.destination, sync=True) + # XXX: need to kill syncs + _ssn.sync() + rcv.impending = rcv.received + # XXX: this can recurse infinitely if granted drops below received + self.grant(rcv) + + def process_receiver(self, rcv): + if rcv.closed: return + _ssn = self._attachments[rcv.session] + _rcv = self._attachments[rcv] + + self.grant(rcv) + + if rcv.drain: + _ssn.message_flush(rcv.destination, sync=True) + # XXX: really need to make this async so that we don't give up the lock + _ssn.sync() + rcv.granted = rcv.received + rcv.impending = rcv.received + + def send(self, snd, msg): + _ssn = self._attachments[snd.session] + _snd = self._attachments[snd] + + # XXX: what if subject is specified for a normal queue? + if _snd._routing_key is None: + rk = msg.subject + else: + rk = _snd._routing_key + # XXX: do we need to query to figure out how to create the reply-to interoperably? + if msg.reply_to: + rt = _ssn.reply_to(*parse_addr(msg.reply_to)) + else: + rt = None + dp = _ssn.delivery_properties(routing_key=rk) + mp = _ssn.message_properties(message_id=msg.id, + user_id=msg.user_id, + reply_to=rt, + correlation_id=msg.correlation_id, + content_type=msg.content_type, + application_headers=msg.properties) + if msg.subject is not None: + if mp.application_headers is None: + mp.application_headers = {} + mp.application_headers["subject"] = msg.subject + if msg.to is not None: + if mp.application_headers is None: + mp.application_headers = {} + mp.application_headers["to"] = msg.to + if msg.durable: + dp.delivery_mode = delivery_mode.persistent + enc, dec = get_codec(msg.content_type) + body = enc(msg.content) + _ssn.message_transfer(destination=_snd._exchange, + message=Message010(dp, mp, body), + sync=True) + log.debug("SENT [%s] %s", snd.session, msg) + # XXX: really need to make this async so that we don't give up the lock + _ssn.sync() + # XXX: should we log the ack somehow too? + + @synchronized + def _message_transfer(self, ssn, cmd): + m = Message010(cmd.payload) + m.headers = cmd.headers + m.id = cmd.id + msg = self._decode(m) + rcv = ssn.receivers[int(cmd.destination)] + msg._receiver = rcv + rcv.received += 1 + log.debug("RECV [%s] %s", ssn, msg) + ssn.incoming.append(msg) + self.notifyAll() + return INCOMPLETE + + def _decode(self, message): + dp = message.get("delivery_properties") + mp = message.get("message_properties") + ap = mp.application_headers + enc, dec = get_codec(mp.content_type) + content = dec(message.body) + msg = Message(content) + msg.id = mp.message_id + if ap is not None: + msg.to = ap.get("to") + msg.subject = ap.get("subject") + msg.user_id = mp.user_id + if mp.reply_to is not None: + msg.reply_to = reply_to2addr(mp.reply_to) + msg.correlation_id = mp.correlation_id + msg.durable = dp.delivery_mode == delivery_mode.persistent + msg.properties = mp.application_headers + msg.content_type = mp.content_type + msg._transfer_id = message.id + return msg + __all__ = ["Connection", "Pattern", "Session", "Sender", "Receiver", "Message", "Empty", "timestamp", "uuid4"] +\ |