summaryrefslogtreecommitdiff
path: root/python/qpid/driver.py
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2009-09-24 17:25:10 +0000
committerRafael H. Schloming <rhs@apache.org>2009-09-24 17:25:10 +0000
commitc20ddb1ec5f95ee7f9d85db22ef469a0fa027621 (patch)
treeec9766250516e09b1d7e79973e5ba7cf848887de /python/qpid/driver.py
parent9d23fd30e819f7176b9583fc2bd548f425e93831 (diff)
downloadqpid-python-c20ddb1ec5f95ee7f9d85db22ef469a0fa027621.tar.gz
added back exchange query on link establishment; added sender.sync()
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@818556 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/qpid/driver.py')
-rw-r--r--python/qpid/driver.py105
1 files changed, 56 insertions, 49 deletions
diff --git a/python/qpid/driver.py b/python/qpid/driver.py
index d5dcb47ec3..a4244b9830 100644
--- a/python/qpid/driver.py
+++ b/python/qpid/driver.py
@@ -90,6 +90,7 @@ class SessionState:
self.completions = {}
self.min_completion = self.sent
self.max_completion = self.sent
+ self.results = {}
# receiver state
self.received = None
@@ -97,6 +98,10 @@ class SessionState:
# XXX: need to periodically exchange completion/known_completion
+ def write_query(self, query, handler):
+ id = self.sent
+ self.write_cmd(query, lambda: handler(self.results.pop(id)))
+
def write_cmd(self, cmd, completion=noop):
cmd.id = self.sent
self.sent += 1
@@ -220,6 +225,8 @@ class Driver:
log.warn("sleeping 3 seconds")
else:
self.connection.error = error
+ else:
+ self.dispatch()
self.connection._waiter.notifyAll()
@@ -272,9 +279,6 @@ class Driver:
def do_connection_open_ok(self, open_ok):
self._connected = True
- # XXX: maybe think about a more generic way to catchup with
- # deferred work
- self.dispatch()
def connection_heartbeat(self, hrt):
self.write_op(ConnectionHeartbeat())
@@ -335,6 +339,10 @@ class Driver:
if sf.completed:
sst.write_op(SessionCompleted(sst.executed))
+ def do_execution_result(self, er):
+ sst = self.get_sst(er)
+ sst.results[er.command_id] = er.value
+
def dispatch(self):
try:
if self._socket is None and self.connection._connected and not self._opening:
@@ -408,71 +416,66 @@ class Driver:
del self._attachments[ssn]
ssn.closed = True
- def _exchange_query(self, ssn, address):
- # XXX: auto sync hack is to avoid deadlock on future
- result = ssn.exchange_query(name=address, sync=True)
- ssn.sync()
- return result.get()
-
def link_out(self, snd):
sst = self._attachments[snd.session]
_snd = self._attachments.get(snd)
- if _snd is None:
+ if not snd.closing and _snd is None:
_snd = Attachment(snd)
+ _snd.linked = False
node, _snd._subject = parse_addr(snd.target)
- # XXX: result = self._exchange_query(sst, node)
-# if result.not_found:
- if True:
- # XXX: should check 'create' option
- sst.write_cmd(QueueDeclare(queue=node, durable=DURABLE_DEFAULT))
- _snd._exchange = ""
- _snd._routing_key = node
- else:
- _snd._exchange = node
- _snd._routing_key = _snd._subject
+ def do_link_out(result):
+ if result.not_found:
+ # XXX: should check 'create' option
+ sst.write_cmd(QueueDeclare(queue=node, durable=DURABLE_DEFAULT))
+ _snd._exchange = ""
+ _snd._routing_key = node
+ else:
+ _snd._exchange = node
+ _snd._routing_key = _snd._subject
+ _snd.linked = True
+ sst.write_query(ExchangeQuery(name=snd.target, sync=True), do_link_out)
self._attachments[snd] = _snd
- if snd.closed:
+ if snd.closing and not snd.closed:
del self._attachments[snd]
- return None
- else:
- return _snd
+ snd.closed = True
def link_in(self, rcv):
sst = self._attachments[rcv.session]
_rcv = self._attachments.get(rcv)
- if _rcv is None:
+ if _rcv is None and not rcv.closing:
_rcv = Attachment(rcv)
- # XXX: result = self._exchange_query(sst, rcv.source)
-# if result.not_found:
+ _rcv.linked = False
_rcv.canceled = False
_rcv.draining = False
- if True:
- _rcv._queue = rcv.source
- # XXX: should check 'create' option
- sst.write_cmd(QueueDeclare(queue=_rcv._queue, durable=DURABLE_DEFAULT))
- else:
- _rcv._queue = "%s.%s" % (rcv.session.name, rcv.destination)
- sst.queue_declare(queue=_rcv._queue, durable=DURABLE_DEFAULT, exclusive=True, auto_delete=True)
- if rcv.filter is None:
- f = FILTER_DEFAULTS[result.type]
+
+ def do_link_in(result):
+ if result.not_found:
+ _rcv._queue = rcv.source
+ # XXX: should check 'create' option
+ sst.write_cmd(QueueDeclare(queue=_rcv._queue, durable=DURABLE_DEFAULT))
else:
- f = rcv.filter
- f._bind(sst, rcv.source, _rcv._queue)
- sst.write_cmd(MessageSubscribe(queue=_rcv._queue, destination=rcv.destination))
- sst.write_cmd(MessageSetFlowMode(rcv.destination, flow_mode.credit))
+ _rcv._queue = "%s.%s" % (rcv.session.name, rcv.destination)
+ sst.write_cmd(QueueDeclare(queue=_rcv._queue, durable=DURABLE_DEFAULT, exclusive=True, auto_delete=True))
+ # XXX
+ if rcv.filter is None:
+ f = FILTER_DEFAULTS[result.type]
+ else:
+ f = rcv.filter
+ f._bind(sst, rcv.source, _rcv._queue)
+ sst.write_cmd(MessageSubscribe(queue=_rcv._queue, destination=rcv.destination))
+ sst.write_cmd(MessageSetFlowMode(rcv.destination, flow_mode.credit))
+ _rcv.linked = True
+ sst.write_query(ExchangeQuery(name=rcv.source, sync=True), do_link_in)
self._attachments[rcv] = _rcv
- if rcv.closing:
+ if rcv.closing and not rcv.closed:
if not _rcv.canceled:
def close_rcv():
del self._attachments[rcv]
rcv.closed = True
sst.write_cmd(MessageCancel(rcv.destination, sync=True), close_rcv)
_rcv.canceled = True
- return None
- else:
- return _rcv
def process(self, ssn):
if ssn.closing: return
@@ -482,8 +485,12 @@ class Driver:
while sst.outgoing_idx < len(ssn.outgoing):
msg = ssn.outgoing[sst.outgoing_idx]
snd = msg._sender
- self.send(snd, msg)
- sst.outgoing_idx += 1
+ _snd = self._attachments.get(snd)
+ if _snd and _snd.linked:
+ self.send(snd, msg)
+ sst.outgoing_idx += 1
+ else:
+ break
for rcv in ssn.receivers:
self.process_receiver(rcv)
@@ -551,8 +558,8 @@ class Driver:
def grant(self, rcv):
sst = self._attachments[rcv.session]
- _rcv = self.link_in(rcv)
- if _rcv is None or _rcv.draining:
+ _rcv = self._attachments.get(rcv)
+ if _rcv is None or not _rcv.linked or _rcv.draining:
return
if rcv.granted is UNLIMITED:
@@ -590,7 +597,7 @@ class Driver:
def send(self, snd, msg):
sst = self._attachments[snd.session]
- _snd = self.link_out(snd)
+ _snd = self._attachments[snd]
# XXX: what if subject is specified for a normal queue?
if _snd._routing_key is None: