diff options
| author | Rafael H. Schloming <rhs@apache.org> | 2009-10-10 17:15:31 +0000 |
|---|---|---|
| committer | Rafael H. Schloming <rhs@apache.org> | 2009-10-10 17:15:31 +0000 |
| commit | 13e6bd9704643993d95c81f22106dae5b59b3084 (patch) | |
| tree | 2dbfa0faacecf665170120c61ec6239e5bff5a9c /python/qpid/driver.py | |
| parent | c68d17bf36649f3ba68334c3147e2d0da7246e67 (diff) | |
| download | qpid-python-13e6bd9704643993d95c81f22106dae5b59b3084.tar.gz | |
made addresses not auto-create by default; added error handling and tests for nonexist/invalid addresses; added logging for aborted connections; fixed spurious reattach
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@823890 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/qpid/driver.py')
| -rw-r--r-- | python/qpid/driver.py | 176 |
1 files changed, 129 insertions, 47 deletions
diff --git a/python/qpid/driver.py b/python/qpid/driver.py index a4244b9830..dadf43fc7f 100644 --- a/python/qpid/driver.py +++ b/python/qpid/driver.py @@ -17,7 +17,7 @@ # under the License. # -import compat, connection, socket, struct, sys, time +import address, compat, connection, socket, struct, sys, time from concurrency import synchronized from datatypes import RangedSet, Serial from exceptions import Timeout, VersionError @@ -26,18 +26,14 @@ from logging import getLogger from messaging import get_codec, ConnectError, Message, Pattern, UNLIMITED from ops import * from selector import Selector -from session import Client, INCOMPLETE, SessionDetached from threading import Condition, Thread from util import connect log = getLogger("qpid.messaging") -def parse_addr(address): - parts = address.split("/", 1) - if len(parts) == 1: - return parts[0], None - else: - return parts[0], parts[i1] +def addr2reply_to(addr): + name, subject, options = address.parse(addr) + return ReplyTo(name, subject) def reply_to2addr(reply_to): if reply_to.routing_key is None: @@ -100,9 +96,12 @@ class SessionState: def write_query(self, query, handler): id = self.sent + query.sync = True self.write_cmd(query, lambda: handler(self.results.pop(id))) def write_cmd(self, cmd, completion=noop): + if self.detached: + raise Exception("detached") cmd.id = self.sent self.sent += 1 self.completions[cmd.id] = completion @@ -187,6 +186,7 @@ class Driver: if data: log.debug("READ: %r", data) else: + log.debug("ABORTED: %s", self._socket.getpeername()) error = ("connection aborted",) recoverable = True except socket.error, e: @@ -285,7 +285,7 @@ class Driver: def do_connection_close(self, close): self.write_op(ConnectionCloseOk()) - if close.reply_ok != close_code.normal: + if close.reply_code != close_code.normal: self.connection.error = (close.reply_code, close.reply_text) # XXX: should we do a half shutdown on the socket here? # XXX: we really need to test this, we may end up reporting a @@ -343,6 +343,10 @@ class Driver: sst = self.get_sst(er) sst.results[er.command_id] = er.value + def do_execution_exception(self, ex): + sst = self.get_sst(ex) + sst.session.error = (ex,) + def dispatch(self): try: if self._socket is None and self.connection._connected and not self._opening: @@ -381,7 +385,7 @@ class Driver: def attach(self, ssn): sst = self._attachments.get(ssn) - if sst is None: + if sst is None and not ssn.closed: for i in xrange(0, self.channel_max): if not self._sessions.has_key(i): ch = i @@ -403,7 +407,7 @@ class Driver: for rcv in ssn.receivers: self.link_in(rcv) - if ssn.closing and not sst.detached: + if sst is not None and ssn.closing and not sst.detached: sst.detached = True sst.write_op(SessionDetach(name=ssn.name)) @@ -416,24 +420,66 @@ class Driver: del self._attachments[ssn] ssn.closed = True + def do_session_detach(self, dtc): + sst = self.get_sst(dtc) + sst.write_op(SessionDetached(name=dtc.name)) + self.do_session_detached(dtc) + def link_out(self, snd): - sst = self._attachments[snd.session] + sst = self._attachments.get(snd.session) _snd = self._attachments.get(snd) - if not snd.closing and _snd is None: + if _snd is None and not snd.closing and not snd.closed: _snd = Attachment(snd) - _snd.linked = False - node, _snd._subject = parse_addr(snd.target) - def do_link_out(result): - if result.not_found: - # XXX: should check 'create' option - sst.write_cmd(QueueDeclare(queue=node, durable=DURABLE_DEFAULT)) + + try: + _snd.name, _snd.subject, _snd.options = address.parse(snd.target) + except address.LexError, e: + snd.error = e + snd.closed = True + return + except address.ParseError, e: + snd.error = e + snd.closed = True + return + + # XXX: subject + if _snd.options is None: + _snd.options = {} + + def do_link(): + snd.linked = True + + def do_queue_q(result): + if sst.detached: + return + + if result.queue: _snd._exchange = "" - _snd._routing_key = node + _snd._routing_key = _snd.name + do_link() else: - _snd._exchange = node - _snd._routing_key = _snd._subject - _snd.linked = True - sst.write_query(ExchangeQuery(name=snd.target, sync=True), do_link_out) + snd.error = ("no such queue: %s" % _snd.name,) + del self._attachments[snd] + snd.closed = True + + def do_exchange_q(result): + if sst.detached: + return + + if result.not_found: + if _snd.options.get("create") in ("always", "receiver"): + sst.write_cmd(QueueDeclare(queue=_snd.name, durable=DURABLE_DEFAULT)) + _snd._exchange = "" + _snd._routing_key = _snd.name + else: + sst.write_query(QueueQuery(queue=_snd.name), do_queue_q) + return + else: + _snd._exchange = _snd.name + _snd._routing_key = _snd.subject + do_link() + + sst.write_query(ExchangeQuery(name=_snd.name), do_exchange_q) self._attachments[snd] = _snd if snd.closing and not snd.closed: @@ -441,41 +487,77 @@ class Driver: snd.closed = True def link_in(self, rcv): - sst = self._attachments[rcv.session] + sst = self._attachments.get(rcv.session) _rcv = self._attachments.get(rcv) - if _rcv is None and not rcv.closing: + if _rcv is None and not rcv.closing and not rcv.closed: _rcv = Attachment(rcv) - _rcv.linked = False _rcv.canceled = False _rcv.draining = False - def do_link_in(result): + try: + _rcv.name, _rcv.subject, _rcv.options = address.parse(rcv.source) + except address.LexError, e: + rcv.error = e + rcv.closed = True + return + except address.ParseError, e: + rcv.error = e + rcv.closed = True + return + + # XXX: subject + if _rcv.options is None: + _rcv.options = {} + + def do_link(): + sst.write_cmd(MessageSubscribe(queue=_rcv._queue, destination=rcv.destination)) + sst.write_cmd(MessageSetFlowMode(rcv.destination, flow_mode.credit)) + rcv.linked = True + + def do_queue_q(result): + if sst.detached: + return + if result.queue: + _rcv._queue = _rcv.name + do_link() + else: + rcv.error = ("no such queue: %s" % _rcv.name,) + del self._attachments[rcv] + rcv.closed = True + + def do_exchange_q(result): + if sst.detached: + return if result.not_found: - _rcv._queue = rcv.source - # XXX: should check 'create' option - sst.write_cmd(QueueDeclare(queue=_rcv._queue, durable=DURABLE_DEFAULT)) + if _rcv.options.get("create") in ("always", "receiver"): + _rcv._queue = _rcv.name + sst.write_cmd(QueueDeclare(queue=_rcv._queue, durable=DURABLE_DEFAULT)) + else: + sst.write_query(QueueQuery(queue=_rcv.name), do_queue_q) + return else: _rcv._queue = "%s.%s" % (rcv.session.name, rcv.destination) sst.write_cmd(QueueDeclare(queue=_rcv._queue, durable=DURABLE_DEFAULT, exclusive=True, auto_delete=True)) # XXX - if rcv.filter is None: + if _rcv.options.get("filter") is None: f = FILTER_DEFAULTS[result.type] else: f = rcv.filter - f._bind(sst, rcv.source, _rcv._queue) - sst.write_cmd(MessageSubscribe(queue=_rcv._queue, destination=rcv.destination)) - sst.write_cmd(MessageSetFlowMode(rcv.destination, flow_mode.credit)) - _rcv.linked = True - sst.write_query(ExchangeQuery(name=rcv.source, sync=True), do_link_in) + f._bind(sst, _rcv.name, _rcv._queue) + do_link() + sst.write_query(ExchangeQuery(name=_rcv.name), do_exchange_q) self._attachments[rcv] = _rcv if rcv.closing and not rcv.closed: - if not _rcv.canceled: - def close_rcv(): - del self._attachments[rcv] - rcv.closed = True - sst.write_cmd(MessageCancel(rcv.destination, sync=True), close_rcv) - _rcv.canceled = True + if rcv.linked: + if not _rcv.canceled: + def close_rcv(): + del self._attachments[rcv] + rcv.closed = True + sst.write_cmd(MessageCancel(rcv.destination, sync=True), close_rcv) + _rcv.canceled = True + else: + rcv.closed = True def process(self, ssn): if ssn.closing: return @@ -485,8 +567,9 @@ class Driver: while sst.outgoing_idx < len(ssn.outgoing): msg = ssn.outgoing[sst.outgoing_idx] snd = msg._sender + # XXX: should check for sender error here _snd = self._attachments.get(snd) - if _snd and _snd.linked: + if _snd and snd.linked: self.send(snd, msg) sst.outgoing_idx += 1 else: @@ -559,7 +642,7 @@ class Driver: def grant(self, rcv): sst = self._attachments[rcv.session] _rcv = self._attachments.get(rcv) - if _rcv is None or not _rcv.linked or _rcv.draining: + if _rcv is None or not rcv.linked or _rcv.canceled or _rcv.draining: return if rcv.granted is UNLIMITED: @@ -606,7 +689,7 @@ class Driver: rk = _snd._routing_key # XXX: do we need to query to figure out how to create the reply-to interoperably? if msg.reply_to: - rt = ReplyTo(*parse_addr(msg.reply_to)) + rt = addr2reply_to(msg.reply_to) else: rt = None dp = DeliveryProperties(routing_key=rk) @@ -650,7 +733,6 @@ class Driver: log.debug("RECV [%s] %s", ssn, msg) ssn.incoming.append(msg) self.connection._waiter.notifyAll() - return INCOMPLETE def _decode(self, xfr): dp = EMPTY_DP |
