summaryrefslogtreecommitdiff
path: root/python/qpid/driver.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/qpid/driver.py')
-rw-r--r--python/qpid/driver.py71
1 files changed, 51 insertions, 20 deletions
diff --git a/python/qpid/driver.py b/python/qpid/driver.py
index 3ea6655aca..3e45045bec 100644
--- a/python/qpid/driver.py
+++ b/python/qpid/driver.py
@@ -30,6 +30,8 @@ from threading import Condition, Thread
from util import connect
log = getLogger("qpid.messaging")
+rawlog = getLogger("qpid.messaging.io.raw")
+opslog = getLogger("qpid.messaging.io.ops")
def addr2reply_to(addr):
name, subject, options = address.parse(addr)
@@ -101,10 +103,11 @@ class SessionState:
def write_query(self, query, handler):
id = self.sent
- query.sync = True
self.write_cmd(query, lambda: handler(self.results.pop(id)))
def write_cmd(self, cmd, completion=noop):
+ if completion != noop:
+ cmd.sync = True
if self.detached:
raise Exception("detached")
cmd.id = self.sent
@@ -195,9 +198,9 @@ class Driver:
try:
data = self._socket.recv(64*1024)
if data:
- log.debug("READ: %r", data)
+ rawlog.debug("READ: %r", data)
else:
- log.debug("ABORTED: %s", self._socket.getpeername())
+ rawlog.debug("ABORTED: %s", self._socket.getpeername())
error = "connection aborted"
recoverable = True
except socket.error, e:
@@ -219,7 +222,7 @@ class Driver:
self._op_dec.write(*self._seg_dec.read())
for op in self._op_dec.read():
self.assign_id(op)
- log.debug("RCVD: %r", op)
+ opslog.debug("RCVD: %r", op)
op.dispatch(self)
except VersionError, e:
error = e
@@ -244,7 +247,7 @@ class Driver:
def writeable(self):
try:
n = self._socket.send(self._buf)
- log.debug("SENT: %r", self._buf[:n])
+ rawlog.debug("SENT: %r", self._buf[:n])
self._buf = self._buf[n:]
except socket.error, e:
self._error(e, True)
@@ -268,7 +271,7 @@ class Driver:
self.connection.error = (err,)
def write_op(self, op):
- log.debug("SENT: %r", op)
+ opslog.debug("SENT: %r", op)
self._op_enc.write(op)
self._seg_enc.write(*self._op_enc.read())
self._frame_enc.write(*self._seg_enc.read())
@@ -446,6 +449,7 @@ class Driver:
_snd = self._attachments.get(snd)
if _snd is None and not snd.closing and not snd.closed:
_snd = Attachment(snd)
+ _snd.closing = False
if snd.target is None:
snd.error = ("target is None",)
@@ -488,7 +492,7 @@ class Driver:
return
if result.not_found:
- if _snd.options.get("create") in ("always", "receiver"):
+ if _snd.options.get("create") in ("always", "sender"):
sst.write_cmd(QueueDeclare(queue=_snd.name, durable=DURABLE_DEFAULT))
_snd._exchange = ""
_snd._routing_key = _snd.name
@@ -503,9 +507,15 @@ class Driver:
sst.write_query(ExchangeQuery(name=_snd.name), do_exchange_q)
self._attachments[snd] = _snd
- if snd.closing and not snd.closed:
- del self._attachments[snd]
- snd.closed = True
+ if snd.closing and not (snd.closed or _snd.closing):
+ _snd.closing = True
+ def do_unlink():
+ del self._attachments[snd]
+ snd.closed = True
+ if _snd.options.get("delete") in ("always", "sender"):
+ self.delete(sst, _snd.name, do_unlink)
+ else:
+ do_unlink()
def link_in(self, rcv):
sst = self._attachments.get(rcv.session)
@@ -584,16 +594,37 @@ class Driver:
if rcv.closing and not rcv.closed:
if rcv.linked:
if not _rcv.canceled:
- def close_rcv():
+ def do_unlink():
del self._attachments[rcv]
rcv.closed = True
- sst.write_cmd(MessageCancel(rcv.destination, sync=True), close_rcv)
+ if _rcv.options.get("delete") in ("always", "receiver"):
+ sst.write_cmd(MessageCancel(rcv.destination))
+ self.delete(sst, _rcv.name, do_unlink)
+ else:
+ sst.write_cmd(MessageCancel(rcv.destination), do_unlink)
_rcv.canceled = True
else:
rcv.closed = True
+ def delete(self, sst, name, completion):
+ def do_queue_delq(result):
+ if sst.detached:
+ return
+ if result.queue:
+ sst.write_cmd(QueueDelete(name), completion)
+ else:
+ completion()
+ def do_exchange_delq(result):
+ if sst.detached:
+ return
+ if result.not_found:
+ sst.write_query(QueueQuery(name), do_queue_delq)
+ else:
+ sst.write_cmd(ExchangeDelete(name), completion)
+ sst.write_query(ExchangeQuery(name), do_exchange_delq)
+
def process(self, ssn):
- if ssn.closing: return
+ if ssn.closed or ssn.closing: return
sst = self._attachments[ssn]
@@ -625,7 +656,7 @@ class Driver:
ssn.acked.remove(m)
if not ssn.transactional:
sst.acked.remove(m)
- sst.write_cmd(MessageAccept(ids, sync=True), ack_ack)
+ sst.write_cmd(MessageAccept(ids), ack_ack)
sst.acked.extend(messages)
if ssn.committing and not sst.committing:
@@ -635,7 +666,7 @@ class Driver:
ssn.committed = True
ssn.aborting = False
ssn.aborted = False
- sst.write_cmd(TxCommit(sync=True), commit_ok)
+ sst.write_cmd(TxCommit(), commit_ok)
sst.committing = True
if ssn.aborting and not sst.aborting:
@@ -647,7 +678,7 @@ class Driver:
sst.executed.add_range(range)
sst.write_op(SessionCompleted(sst.executed))
sst.write_cmd(MessageRelease(ids))
- sst.write_cmd(TxRollback(sync=True), do_rb_ok)
+ sst.write_cmd(TxRollback(), do_rb_ok)
def do_rb_ok():
del ssn.incoming[:]
@@ -670,7 +701,7 @@ class Driver:
for rcv in ssn.receivers:
sst.write_cmd(MessageStop(rcv.destination))
- sst.write_cmd(ExecutionSync(sync=True), do_rb)
+ sst.write_cmd(ExecutionSync(), do_rb)
def grant(self, rcv):
sst = self._attachments[rcv.session]
@@ -702,7 +733,7 @@ class Driver:
rcv.impending = rcv.received
_rcv.draining = False
self.grant(rcv)
- sst.write_cmd(MessageStop(rcv.destination, sync=True), do_stop)
+ sst.write_cmd(MessageStop(rcv.destination), do_stop)
if rcv.draining:
_rcv.draining = True
@@ -711,7 +742,7 @@ class Driver:
rcv.granted = rcv.impending
_rcv.draining = False
rcv.draining = False
- sst.write_cmd(MessageFlush(rcv.destination, sync=True), do_flush)
+ sst.write_cmd(MessageFlush(rcv.destination), do_flush)
def process_receiver(self, rcv):
@@ -758,7 +789,7 @@ class Driver:
sst.outgoing_idx -= 1
assert msg == m
sst.write_cmd(MessageTransfer(destination=_snd._exchange, headers=(dp, mp),
- payload=body, sync=True), msg_acked)
+ payload=body), msg_acked)
def do_message_transfer(self, xfr):
sst = self.get_sst(xfr)