diff options
author | Rafael H. Schloming <rhs@apache.org> | 2009-09-24 17:25:10 +0000 |
---|---|---|
committer | Rafael H. Schloming <rhs@apache.org> | 2009-09-24 17:25:10 +0000 |
commit | c20ddb1ec5f95ee7f9d85db22ef469a0fa027621 (patch) | |
tree | ec9766250516e09b1d7e79973e5ba7cf848887de /python/qpid/driver.py | |
parent | 9d23fd30e819f7176b9583fc2bd548f425e93831 (diff) | |
download | qpid-python-c20ddb1ec5f95ee7f9d85db22ef469a0fa027621.tar.gz |
added back exchange query on link establishment; added sender.sync()
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@818556 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/qpid/driver.py')
-rw-r--r-- | python/qpid/driver.py | 105 |
1 files changed, 56 insertions, 49 deletions
diff --git a/python/qpid/driver.py b/python/qpid/driver.py index d5dcb47ec3..a4244b9830 100644 --- a/python/qpid/driver.py +++ b/python/qpid/driver.py @@ -90,6 +90,7 @@ class SessionState: self.completions = {} self.min_completion = self.sent self.max_completion = self.sent + self.results = {} # receiver state self.received = None @@ -97,6 +98,10 @@ class SessionState: # XXX: need to periodically exchange completion/known_completion + def write_query(self, query, handler): + id = self.sent + self.write_cmd(query, lambda: handler(self.results.pop(id))) + def write_cmd(self, cmd, completion=noop): cmd.id = self.sent self.sent += 1 @@ -220,6 +225,8 @@ class Driver: log.warn("sleeping 3 seconds") else: self.connection.error = error + else: + self.dispatch() self.connection._waiter.notifyAll() @@ -272,9 +279,6 @@ class Driver: def do_connection_open_ok(self, open_ok): self._connected = True - # XXX: maybe think about a more generic way to catchup with - # deferred work - self.dispatch() def connection_heartbeat(self, hrt): self.write_op(ConnectionHeartbeat()) @@ -335,6 +339,10 @@ class Driver: if sf.completed: sst.write_op(SessionCompleted(sst.executed)) + def do_execution_result(self, er): + sst = self.get_sst(er) + sst.results[er.command_id] = er.value + def dispatch(self): try: if self._socket is None and self.connection._connected and not self._opening: @@ -408,71 +416,66 @@ class Driver: del self._attachments[ssn] ssn.closed = True - def _exchange_query(self, ssn, address): - # XXX: auto sync hack is to avoid deadlock on future - result = ssn.exchange_query(name=address, sync=True) - ssn.sync() - return result.get() - def link_out(self, snd): sst = self._attachments[snd.session] _snd = self._attachments.get(snd) - if _snd is None: + if not snd.closing and _snd is None: _snd = Attachment(snd) + _snd.linked = False node, _snd._subject = parse_addr(snd.target) - # XXX: result = self._exchange_query(sst, node) -# if result.not_found: - if True: - # XXX: should check 'create' option - sst.write_cmd(QueueDeclare(queue=node, durable=DURABLE_DEFAULT)) - _snd._exchange = "" - _snd._routing_key = node - else: - _snd._exchange = node - _snd._routing_key = _snd._subject + def do_link_out(result): + if result.not_found: + # XXX: should check 'create' option + sst.write_cmd(QueueDeclare(queue=node, durable=DURABLE_DEFAULT)) + _snd._exchange = "" + _snd._routing_key = node + else: + _snd._exchange = node + _snd._routing_key = _snd._subject + _snd.linked = True + sst.write_query(ExchangeQuery(name=snd.target, sync=True), do_link_out) self._attachments[snd] = _snd - if snd.closed: + if snd.closing and not snd.closed: del self._attachments[snd] - return None - else: - return _snd + snd.closed = True def link_in(self, rcv): sst = self._attachments[rcv.session] _rcv = self._attachments.get(rcv) - if _rcv is None: + if _rcv is None and not rcv.closing: _rcv = Attachment(rcv) - # XXX: result = self._exchange_query(sst, rcv.source) -# if result.not_found: + _rcv.linked = False _rcv.canceled = False _rcv.draining = False - if True: - _rcv._queue = rcv.source - # XXX: should check 'create' option - sst.write_cmd(QueueDeclare(queue=_rcv._queue, durable=DURABLE_DEFAULT)) - else: - _rcv._queue = "%s.%s" % (rcv.session.name, rcv.destination) - sst.queue_declare(queue=_rcv._queue, durable=DURABLE_DEFAULT, exclusive=True, auto_delete=True) - if rcv.filter is None: - f = FILTER_DEFAULTS[result.type] + + def do_link_in(result): + if result.not_found: + _rcv._queue = rcv.source + # XXX: should check 'create' option + sst.write_cmd(QueueDeclare(queue=_rcv._queue, durable=DURABLE_DEFAULT)) else: - f = rcv.filter - f._bind(sst, rcv.source, _rcv._queue) - sst.write_cmd(MessageSubscribe(queue=_rcv._queue, destination=rcv.destination)) - sst.write_cmd(MessageSetFlowMode(rcv.destination, flow_mode.credit)) + _rcv._queue = "%s.%s" % (rcv.session.name, rcv.destination) + sst.write_cmd(QueueDeclare(queue=_rcv._queue, durable=DURABLE_DEFAULT, exclusive=True, auto_delete=True)) + # XXX + if rcv.filter is None: + f = FILTER_DEFAULTS[result.type] + else: + f = rcv.filter + f._bind(sst, rcv.source, _rcv._queue) + sst.write_cmd(MessageSubscribe(queue=_rcv._queue, destination=rcv.destination)) + sst.write_cmd(MessageSetFlowMode(rcv.destination, flow_mode.credit)) + _rcv.linked = True + sst.write_query(ExchangeQuery(name=rcv.source, sync=True), do_link_in) self._attachments[rcv] = _rcv - if rcv.closing: + if rcv.closing and not rcv.closed: if not _rcv.canceled: def close_rcv(): del self._attachments[rcv] rcv.closed = True sst.write_cmd(MessageCancel(rcv.destination, sync=True), close_rcv) _rcv.canceled = True - return None - else: - return _rcv def process(self, ssn): if ssn.closing: return @@ -482,8 +485,12 @@ class Driver: while sst.outgoing_idx < len(ssn.outgoing): msg = ssn.outgoing[sst.outgoing_idx] snd = msg._sender - self.send(snd, msg) - sst.outgoing_idx += 1 + _snd = self._attachments.get(snd) + if _snd and _snd.linked: + self.send(snd, msg) + sst.outgoing_idx += 1 + else: + break for rcv in ssn.receivers: self.process_receiver(rcv) @@ -551,8 +558,8 @@ class Driver: def grant(self, rcv): sst = self._attachments[rcv.session] - _rcv = self.link_in(rcv) - if _rcv is None or _rcv.draining: + _rcv = self._attachments.get(rcv) + if _rcv is None or not _rcv.linked or _rcv.draining: return if rcv.granted is UNLIMITED: @@ -590,7 +597,7 @@ class Driver: def send(self, snd, msg): sst = self._attachments[snd.session] - _snd = self.link_out(snd) + _snd = self._attachments[snd] # XXX: what if subject is specified for a normal queue? if _snd._routing_key is None: |