diff options
Diffstat (limited to 'python/qpid/driver.py')
-rw-r--r-- | python/qpid/driver.py | 71 |
1 files changed, 51 insertions, 20 deletions
diff --git a/python/qpid/driver.py b/python/qpid/driver.py index 3ea6655aca..3e45045bec 100644 --- a/python/qpid/driver.py +++ b/python/qpid/driver.py @@ -30,6 +30,8 @@ from threading import Condition, Thread from util import connect log = getLogger("qpid.messaging") +rawlog = getLogger("qpid.messaging.io.raw") +opslog = getLogger("qpid.messaging.io.ops") def addr2reply_to(addr): name, subject, options = address.parse(addr) @@ -101,10 +103,11 @@ class SessionState: def write_query(self, query, handler): id = self.sent - query.sync = True self.write_cmd(query, lambda: handler(self.results.pop(id))) def write_cmd(self, cmd, completion=noop): + if completion != noop: + cmd.sync = True if self.detached: raise Exception("detached") cmd.id = self.sent @@ -195,9 +198,9 @@ class Driver: try: data = self._socket.recv(64*1024) if data: - log.debug("READ: %r", data) + rawlog.debug("READ: %r", data) else: - log.debug("ABORTED: %s", self._socket.getpeername()) + rawlog.debug("ABORTED: %s", self._socket.getpeername()) error = "connection aborted" recoverable = True except socket.error, e: @@ -219,7 +222,7 @@ class Driver: self._op_dec.write(*self._seg_dec.read()) for op in self._op_dec.read(): self.assign_id(op) - log.debug("RCVD: %r", op) + opslog.debug("RCVD: %r", op) op.dispatch(self) except VersionError, e: error = e @@ -244,7 +247,7 @@ class Driver: def writeable(self): try: n = self._socket.send(self._buf) - log.debug("SENT: %r", self._buf[:n]) + rawlog.debug("SENT: %r", self._buf[:n]) self._buf = self._buf[n:] except socket.error, e: self._error(e, True) @@ -268,7 +271,7 @@ class Driver: self.connection.error = (err,) def write_op(self, op): - log.debug("SENT: %r", op) + opslog.debug("SENT: %r", op) self._op_enc.write(op) self._seg_enc.write(*self._op_enc.read()) self._frame_enc.write(*self._seg_enc.read()) @@ -446,6 +449,7 @@ class Driver: _snd = self._attachments.get(snd) if _snd is None and not snd.closing and not snd.closed: _snd = Attachment(snd) + _snd.closing = False if snd.target is None: snd.error = ("target is None",) @@ -488,7 +492,7 @@ class Driver: return if result.not_found: - if _snd.options.get("create") in ("always", "receiver"): + if _snd.options.get("create") in ("always", "sender"): sst.write_cmd(QueueDeclare(queue=_snd.name, durable=DURABLE_DEFAULT)) _snd._exchange = "" _snd._routing_key = _snd.name @@ -503,9 +507,15 @@ class Driver: sst.write_query(ExchangeQuery(name=_snd.name), do_exchange_q) self._attachments[snd] = _snd - if snd.closing and not snd.closed: - del self._attachments[snd] - snd.closed = True + if snd.closing and not (snd.closed or _snd.closing): + _snd.closing = True + def do_unlink(): + del self._attachments[snd] + snd.closed = True + if _snd.options.get("delete") in ("always", "sender"): + self.delete(sst, _snd.name, do_unlink) + else: + do_unlink() def link_in(self, rcv): sst = self._attachments.get(rcv.session) @@ -584,16 +594,37 @@ class Driver: if rcv.closing and not rcv.closed: if rcv.linked: if not _rcv.canceled: - def close_rcv(): + def do_unlink(): del self._attachments[rcv] rcv.closed = True - sst.write_cmd(MessageCancel(rcv.destination, sync=True), close_rcv) + if _rcv.options.get("delete") in ("always", "receiver"): + sst.write_cmd(MessageCancel(rcv.destination)) + self.delete(sst, _rcv.name, do_unlink) + else: + sst.write_cmd(MessageCancel(rcv.destination), do_unlink) _rcv.canceled = True else: rcv.closed = True + def delete(self, sst, name, completion): + def do_queue_delq(result): + if sst.detached: + return + if result.queue: + sst.write_cmd(QueueDelete(name), completion) + else: + completion() + def do_exchange_delq(result): + if sst.detached: + return + if result.not_found: + sst.write_query(QueueQuery(name), do_queue_delq) + else: + sst.write_cmd(ExchangeDelete(name), completion) + sst.write_query(ExchangeQuery(name), do_exchange_delq) + def process(self, ssn): - if ssn.closing: return + if ssn.closed or ssn.closing: return sst = self._attachments[ssn] @@ -625,7 +656,7 @@ class Driver: ssn.acked.remove(m) if not ssn.transactional: sst.acked.remove(m) - sst.write_cmd(MessageAccept(ids, sync=True), ack_ack) + sst.write_cmd(MessageAccept(ids), ack_ack) sst.acked.extend(messages) if ssn.committing and not sst.committing: @@ -635,7 +666,7 @@ class Driver: ssn.committed = True ssn.aborting = False ssn.aborted = False - sst.write_cmd(TxCommit(sync=True), commit_ok) + sst.write_cmd(TxCommit(), commit_ok) sst.committing = True if ssn.aborting and not sst.aborting: @@ -647,7 +678,7 @@ class Driver: sst.executed.add_range(range) sst.write_op(SessionCompleted(sst.executed)) sst.write_cmd(MessageRelease(ids)) - sst.write_cmd(TxRollback(sync=True), do_rb_ok) + sst.write_cmd(TxRollback(), do_rb_ok) def do_rb_ok(): del ssn.incoming[:] @@ -670,7 +701,7 @@ class Driver: for rcv in ssn.receivers: sst.write_cmd(MessageStop(rcv.destination)) - sst.write_cmd(ExecutionSync(sync=True), do_rb) + sst.write_cmd(ExecutionSync(), do_rb) def grant(self, rcv): sst = self._attachments[rcv.session] @@ -702,7 +733,7 @@ class Driver: rcv.impending = rcv.received _rcv.draining = False self.grant(rcv) - sst.write_cmd(MessageStop(rcv.destination, sync=True), do_stop) + sst.write_cmd(MessageStop(rcv.destination), do_stop) if rcv.draining: _rcv.draining = True @@ -711,7 +742,7 @@ class Driver: rcv.granted = rcv.impending _rcv.draining = False rcv.draining = False - sst.write_cmd(MessageFlush(rcv.destination, sync=True), do_flush) + sst.write_cmd(MessageFlush(rcv.destination), do_flush) def process_receiver(self, rcv): @@ -758,7 +789,7 @@ class Driver: sst.outgoing_idx -= 1 assert msg == m sst.write_cmd(MessageTransfer(destination=_snd._exchange, headers=(dp, mp), - payload=body, sync=True), msg_acked) + payload=body), msg_acked) def do_message_transfer(self, xfr): sst = self.get_sst(xfr) |