diff options
Diffstat (limited to 'python')
-rw-r--r-- | python/qpid/driver.py | 105 | ||||
-rw-r--r-- | python/qpid/messaging.py | 28 | ||||
-rw-r--r-- | python/qpid/tests/messaging.py | 1 |
3 files changed, 74 insertions, 60 deletions
diff --git a/python/qpid/driver.py b/python/qpid/driver.py index d5dcb47ec3..a4244b9830 100644 --- a/python/qpid/driver.py +++ b/python/qpid/driver.py @@ -90,6 +90,7 @@ class SessionState: self.completions = {} self.min_completion = self.sent self.max_completion = self.sent + self.results = {} # receiver state self.received = None @@ -97,6 +98,10 @@ class SessionState: # XXX: need to periodically exchange completion/known_completion + def write_query(self, query, handler): + id = self.sent + self.write_cmd(query, lambda: handler(self.results.pop(id))) + def write_cmd(self, cmd, completion=noop): cmd.id = self.sent self.sent += 1 @@ -220,6 +225,8 @@ class Driver: log.warn("sleeping 3 seconds") else: self.connection.error = error + else: + self.dispatch() self.connection._waiter.notifyAll() @@ -272,9 +279,6 @@ class Driver: def do_connection_open_ok(self, open_ok): self._connected = True - # XXX: maybe think about a more generic way to catchup with - # deferred work - self.dispatch() def connection_heartbeat(self, hrt): self.write_op(ConnectionHeartbeat()) @@ -335,6 +339,10 @@ class Driver: if sf.completed: sst.write_op(SessionCompleted(sst.executed)) + def do_execution_result(self, er): + sst = self.get_sst(er) + sst.results[er.command_id] = er.value + def dispatch(self): try: if self._socket is None and self.connection._connected and not self._opening: @@ -408,71 +416,66 @@ class Driver: del self._attachments[ssn] ssn.closed = True - def _exchange_query(self, ssn, address): - # XXX: auto sync hack is to avoid deadlock on future - result = ssn.exchange_query(name=address, sync=True) - ssn.sync() - return result.get() - def link_out(self, snd): sst = self._attachments[snd.session] _snd = self._attachments.get(snd) - if _snd is None: + if not snd.closing and _snd is None: _snd = Attachment(snd) + _snd.linked = False node, _snd._subject = parse_addr(snd.target) - # XXX: result = self._exchange_query(sst, node) -# if result.not_found: - if True: - # XXX: should check 'create' option - sst.write_cmd(QueueDeclare(queue=node, durable=DURABLE_DEFAULT)) - _snd._exchange = "" - _snd._routing_key = node - else: - _snd._exchange = node - _snd._routing_key = _snd._subject + def do_link_out(result): + if result.not_found: + # XXX: should check 'create' option + sst.write_cmd(QueueDeclare(queue=node, durable=DURABLE_DEFAULT)) + _snd._exchange = "" + _snd._routing_key = node + else: + _snd._exchange = node + _snd._routing_key = _snd._subject + _snd.linked = True + sst.write_query(ExchangeQuery(name=snd.target, sync=True), do_link_out) self._attachments[snd] = _snd - if snd.closed: + if snd.closing and not snd.closed: del self._attachments[snd] - return None - else: - return _snd + snd.closed = True def link_in(self, rcv): sst = self._attachments[rcv.session] _rcv = self._attachments.get(rcv) - if _rcv is None: + if _rcv is None and not rcv.closing: _rcv = Attachment(rcv) - # XXX: result = self._exchange_query(sst, rcv.source) -# if result.not_found: + _rcv.linked = False _rcv.canceled = False _rcv.draining = False - if True: - _rcv._queue = rcv.source - # XXX: should check 'create' option - sst.write_cmd(QueueDeclare(queue=_rcv._queue, durable=DURABLE_DEFAULT)) - else: - _rcv._queue = "%s.%s" % (rcv.session.name, rcv.destination) - sst.queue_declare(queue=_rcv._queue, durable=DURABLE_DEFAULT, exclusive=True, auto_delete=True) - if rcv.filter is None: - f = FILTER_DEFAULTS[result.type] + + def do_link_in(result): + if result.not_found: + _rcv._queue = rcv.source + # XXX: should check 'create' option + sst.write_cmd(QueueDeclare(queue=_rcv._queue, durable=DURABLE_DEFAULT)) else: - f = rcv.filter - f._bind(sst, rcv.source, _rcv._queue) - sst.write_cmd(MessageSubscribe(queue=_rcv._queue, destination=rcv.destination)) - sst.write_cmd(MessageSetFlowMode(rcv.destination, flow_mode.credit)) + _rcv._queue = "%s.%s" % (rcv.session.name, rcv.destination) + sst.write_cmd(QueueDeclare(queue=_rcv._queue, durable=DURABLE_DEFAULT, exclusive=True, auto_delete=True)) + # XXX + if rcv.filter is None: + f = FILTER_DEFAULTS[result.type] + else: + f = rcv.filter + f._bind(sst, rcv.source, _rcv._queue) + sst.write_cmd(MessageSubscribe(queue=_rcv._queue, destination=rcv.destination)) + sst.write_cmd(MessageSetFlowMode(rcv.destination, flow_mode.credit)) + _rcv.linked = True + sst.write_query(ExchangeQuery(name=rcv.source, sync=True), do_link_in) self._attachments[rcv] = _rcv - if rcv.closing: + if rcv.closing and not rcv.closed: if not _rcv.canceled: def close_rcv(): del self._attachments[rcv] rcv.closed = True sst.write_cmd(MessageCancel(rcv.destination, sync=True), close_rcv) _rcv.canceled = True - return None - else: - return _rcv def process(self, ssn): if ssn.closing: return @@ -482,8 +485,12 @@ class Driver: while sst.outgoing_idx < len(ssn.outgoing): msg = ssn.outgoing[sst.outgoing_idx] snd = msg._sender - self.send(snd, msg) - sst.outgoing_idx += 1 + _snd = self._attachments.get(snd) + if _snd and _snd.linked: + self.send(snd, msg) + sst.outgoing_idx += 1 + else: + break for rcv in ssn.receivers: self.process_receiver(rcv) @@ -551,8 +558,8 @@ class Driver: def grant(self, rcv): sst = self._attachments[rcv.session] - _rcv = self.link_in(rcv) - if _rcv is None or _rcv.draining: + _rcv = self._attachments.get(rcv) + if _rcv is None or not _rcv.linked or _rcv.draining: return if rcv.granted is UNLIMITED: @@ -590,7 +597,7 @@ class Driver: def send(self, snd, msg): sst = self._attachments[snd.session] - _snd = self.link_out(snd) + _snd = self._attachments[snd] # XXX: what if subject is specified for a normal queue? if _snd._routing_key is None: diff --git a/python/qpid/messaging.py b/python/qpid/messaging.py index 84653a56ff..f9ca54fe9e 100644 --- a/python/qpid/messaging.py +++ b/python/qpid/messaging.py @@ -314,7 +314,7 @@ class Session: return self.connection._ewait(predicate, timeout, exc) @synchronized - def sender(self, target): + def sender(self, target, **options): """ Creates a L{Sender} that may be used to send L{Messages<Message>} to the specified target. @@ -324,7 +324,7 @@ class Session: @rtype: Sender @return: a new Sender for the specified target """ - sender = Sender(self, len(self.senders), target) + sender = Sender(self, len(self.senders), target, options) self.senders.append(sender) self._wakeup() # XXX: because of the lack of waiting here we can end up getting @@ -334,7 +334,7 @@ class Session: return sender @synchronized - def receiver(self, source, filter=None): + def receiver(self, source, **options): """ Creates a receiver that may be used to actively fetch or to listen for the arrival of L{Messages<Message>} from the specified source. @@ -344,7 +344,7 @@ class Session: @rtype: Receiver @return: a new Receiver for the specified source """ - receiver = Receiver(self, len(self.receivers), source, filter, + receiver = Receiver(self, len(self.receivers), source, options, self.started) self.receivers.append(receiver) self._wakeup() @@ -512,13 +512,15 @@ class Sender: Sends outgoing messages. """ - def __init__(self, session, index, target): + def __init__(self, session, index, target, options): self.session = session self.index = index self.target = target - self.capacity = UNLIMITED + self.options = options + self.capacity = options.get("capacity", UNLIMITED) self.queued = Serial(0) self.acked = Serial(0) + self.closing = False self.closed = False self._lock = self.session._lock @@ -580,15 +582,19 @@ class Sender: message._sender = self self.session.outgoing.append(message) self.queued += 1 - mno = self.queued self._wakeup() if sync: - self._ewait(lambda: self.acked >= mno) + self.sync() assert message not in self.session.outgoing @synchronized + def sync(self): + mno = self.queued + self._ewait(lambda: self.acked >= mno) + + @synchronized def close(self): """ Close the Sender. @@ -616,15 +622,15 @@ class Receiver: L{listen}. """ - def __init__(self, session, index, source, filter, started): + def __init__(self, session, index, source, options, started): self.session = session self.index = index self.destination = str(self.index) self.source = source - self.filter = filter + self.options = options self.started = started - self.capacity = UNLIMITED + self.capacity = options.get("capacity", UNLIMITED) self.granted = Serial(0) self.drain = False self.impending = Serial(0) diff --git a/python/qpid/tests/messaging.py b/python/qpid/tests/messaging.py index 7623c1f93b..fd7aca6ec7 100644 --- a/python/qpid/tests/messaging.py +++ b/python/qpid/tests/messaging.py @@ -611,6 +611,7 @@ class SenderTests(Base): except InsufficientCapacity: caught = True break + self.snd.sync() self.drain(self.rcv, expected=msgs) self.ssn.acknowledge() assert caught, "did not exceed capacity" |