diff options
author | Rafael H. Schloming <rhs@apache.org> | 2010-04-13 11:21:38 +0000 |
---|---|---|
committer | Rafael H. Schloming <rhs@apache.org> | 2010-04-13 11:21:38 +0000 |
commit | 118c4bb7fa781bbb4512a66ba1ca618e70abe64b (patch) | |
tree | 7b61c7420cd0736ecdb5b5c05d04b3488a3476cf /python | |
parent | 96367205ad9e9a1d24069845683a59e5f36c9683 (diff) | |
download | qpid-python-118c4bb7fa781bbb4512a66ba1ca618e70abe64b.tar.gz |
added more complete exception handling/hierarchy
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@933560 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python')
-rw-r--r-- | python/qpid/messaging/driver.py | 51 | ||||
-rw-r--r-- | python/qpid/messaging/endpoints.py | 51 | ||||
-rw-r--r-- | python/qpid/messaging/exceptions.py | 86 | ||||
-rw-r--r-- | python/qpid/tests/messaging/endpoints.py | 35 |
4 files changed, 150 insertions, 73 deletions
diff --git a/python/qpid/messaging/driver.py b/python/qpid/messaging/driver.py index 241bed4587..934b5c02cc 100644 --- a/python/qpid/messaging/driver.py +++ b/python/qpid/messaging/driver.py @@ -23,12 +23,11 @@ from qpid import compat from qpid import sasl from qpid.concurrency import synchronized from qpid.datatypes import RangedSet, Serial -from qpid.exceptions import Timeout, VersionError from qpid.framing import OpEncoder, SegmentEncoder, FrameEncoder, \ FrameDecoder, SegmentDecoder, OpDecoder from qpid.messaging import address, transports from qpid.messaging.constants import UNLIMITED, REJECTED, RELEASED -from qpid.messaging.exceptions import ConnectError +from qpid.messaging.exceptions import * from qpid.messaging.message import get_codec, Disposition, Message from qpid.ops import * from qpid.selector import Selector @@ -375,7 +374,7 @@ class Driver: else: self.close_engine() except socket.error, e: - self.close_engine(e) + self.close_engine(ConnectionError(text=str(e))) self.update_status() @@ -388,7 +387,7 @@ class Driver: def close_engine(self, e=None): if e is None: - e = "connection aborted" + e = ConnectionError(text="connection aborted") if (self.connection.reconnect and (self.connection.reconnect_limit is None or @@ -455,7 +454,7 @@ class Driver: except: # XXX: Does socket get leaked if this occurs? msg = compat.format_exc() - self.connection.error = (msg,) + self.connection.error = InternalError(text=msg) def connect(self): try: @@ -481,7 +480,7 @@ class Driver: self._retrying = False except socket.error, e: self._host = (self._host + 1) % len(self._hosts) - self.close_engine(e) + self.close_engine(ConnectError(text=str(e))) DEFAULT_DISPOSITION = Disposition(None) @@ -496,6 +495,20 @@ def get_bindings(opts, queue=None, exchange=None, key=None): cmds.append(ExchangeBind(queue, exchange, key, args)) return cmds +CONNECTION_ERRS = { + # anythong not here (i.e. everything right now) will default to + # connection error + } + +SESSION_ERRS = { + # anything not here will default to session error + error_code.unauthorized_access: UnauthorizedAccess, + error_code.not_found: NotFound, + error_code.resource_locked: ReceiverError, + error_code.resource_limit_exceeded: TargetCapacityExceeded, + error_code.internal_error: ServerError + } + class Engine: def __init__(self, connection): @@ -576,15 +589,15 @@ class Engine: opslog.debug("RCVD[%s]: %r", self.log_id, op) op.dispatch(self) self.dispatch() - except VersionError, e: + except MessagingError, e: self.close(e) except: - self.close(compat.format_exc()) + self.close(InternalError(text=compat.format_exc())) def close(self, e=None): self._reset() if e: - self.connection.error = (e,) + self.connection.error = e self._status = CLOSED def assign_id(self, op): @@ -618,7 +631,7 @@ class Engine: cli_major = 0; cli_minor = 10 magic, _, _, major, minor = struct.unpack(HEADER, hdr) if major != cli_major or minor != cli_minor: - raise VersionError("client: %s-%s, server: %s-%s" % + raise VersionError(text="client: %s-%s, server: %s-%s" % (cli_major, cli_minor, major, minor)) def do_connection_start(self, start): @@ -655,7 +668,8 @@ class Engine: def do_connection_close(self, close): self.write_op(ConnectionCloseOk()) if close.reply_code != close_code.normal: - self.connection.error = (close.reply_code, close.reply_text) + exc = CONNECTION_ERRS.get(close.reply_code, ConnectionError) + self.connection.error = exc(close.reply_code, close.reply_text) # XXX: should we do a half shutdown on the socket here? # XXX: we really need to test this, we may end up reporting a # connection abort after this, if we were to do a shutdown on read @@ -713,7 +727,8 @@ class Engine: def do_execution_exception(self, ex): sst = self.get_sst(ex) - sst.session.error = (ex,) + exc = SESSION_ERRS.get(ex.error_code, SessionError) + sst.session.error = exc(ex.error_code, ex.description) def dispatch(self): if not self.connection._connected and not self._closing and self._status != CLOSED: @@ -786,7 +801,7 @@ class Engine: err = self.parse_address(_lnk, dir, addr) or self.validate_options(_lnk, dir) if err: - lnk.error = (err,) + lnk.error = err lnk.closed = True return @@ -816,7 +831,7 @@ class Engine: def parse_address(self, lnk, dir, addr): if addr is None: - return "%s is None" % dir.ADDR_NAME + return MalformedAddress(text="%s is None" % dir.ADDR_NAME) else: try: lnk.name, lnk.subject, lnk.options = address.parse(addr) @@ -824,14 +839,14 @@ class Engine: if lnk.options is None: lnk.options = {} except address.LexError, e: - return e + return MalformedAddress(text=str(e)) except address.ParseError, e: - return e + return MalformedAddress(text=str(e)) def validate_options(self, lnk, dir): ctx = Context() err = dir.VALIDATOR.validate(lnk.options, ctx) - if err: return "error in options: %s" % err + if err: return InvalidOption(text="error in options: %s" % err) def resolve_declare(self, sst, lnk, dir, action): declare = lnk.options.get("create") in ("always", dir) @@ -841,7 +856,7 @@ class Engine: if declare: err = self.declare(sst, lnk, action) else: - err = ("no such queue: %s" % lnk.name,) + err = NotFound(text="no such queue: %s" % lnk.name) else: action(type, subtype) diff --git a/python/qpid/messaging/endpoints.py b/python/qpid/messaging/endpoints.py index 3f5cf3b9bd..e2602bfc15 100644 --- a/python/qpid/messaging/endpoints.py +++ b/python/qpid/messaging/endpoints.py @@ -172,14 +172,14 @@ class Connection: self._modcount += 1 self._driver.wakeup() - def _check_error(self, exc=ConnectionError): + def check_error(self): if self.error: self._condition.gc() - raise exc(*self.error) + raise self.error - def _ewait(self, predicate, timeout=None, exc=ConnectionError): + def _ewait(self, predicate, timeout=None): result = self._wait(lambda: self.error or predicate(), timeout) - self._check_error(exc) + self.check_error() return result @synchronized @@ -238,8 +238,7 @@ class Connection: self._connected = True self._driver.start() self._wakeup() - self._ewait(lambda: self._transport_connected and not self._unlinked(), - exc=ConnectError) + self._ewait(lambda: self._transport_connected and not self._unlinked()) def _unlinked(self): return [l @@ -509,14 +508,14 @@ class Session: def _wakeup(self): self.connection._wakeup() - def _check_error(self, exc=SessionError): - self.connection._check_error(exc) + def check_error(self): + self.connection.check_error() if self.error: - raise exc(*self.error) + raise self.error - def _ewait(self, predicate, timeout=None, exc=SessionError): - result = self.connection._ewait(lambda: self.error or predicate(), timeout, exc) - self._check_error(exc) + def _ewait(self, predicate, timeout=None): + result = self.connection._ewait(lambda: self.error or predicate(), timeout) + self.check_error() return result @synchronized @@ -537,7 +536,7 @@ class Session: self._wakeup() try: sender._ewait(lambda: sender.linked) - except SendError, e: + except LinkError, e: sender.close() raise e return sender @@ -560,7 +559,7 @@ class Session: self._wakeup() try: receiver._ewait(lambda: receiver.linked) - except ReceiveError, e: + except LinkError, e: receiver.close() raise e return receiver @@ -706,14 +705,14 @@ class Sender: def _wakeup(self): self.session._wakeup() - def _check_error(self, exc=SendError): - self.session._check_error(exc) + def check_error(self): + self.session.check_error() if self.error: - raise exc(*self.error) + raise self.error - def _ewait(self, predicate, timeout=None, exc=SendError): - result = self.session._ewait(lambda: self.error or predicate(), timeout, exc) - self._check_error(exc) + def _ewait(self, predicate, timeout=None): + result = self.session._ewait(lambda: self.error or predicate(), timeout) + self.check_error() return result @synchronized @@ -849,14 +848,14 @@ class Receiver(object): def _wakeup(self): self.session._wakeup() - def _check_error(self, exc=ReceiveError): - self.session._check_error(exc) + def check_error(self): + self.session.check_error() if self.error: - raise exc(*self.error) + raise self.error - def _ewait(self, predicate, timeout=None, exc=ReceiveError): - result = self.session._ewait(lambda: self.error or predicate(), timeout, exc) - self._check_error(exc) + def _ewait(self, predicate, timeout=None): + result = self.session._ewait(lambda: self.error or predicate(), timeout) + self.check_error() return result @synchronized diff --git a/python/qpid/messaging/exceptions.py b/python/qpid/messaging/exceptions.py index 10ad529806..0a4941a40f 100644 --- a/python/qpid/messaging/exceptions.py +++ b/python/qpid/messaging/exceptions.py @@ -17,7 +17,26 @@ # under the License. # -class ConnectionError(Exception): +class MessagingError(Exception): + + def __init__(self, code=None, text=None, **info): + self.code = code + self.text = text + self.info = info + if self.code is None: + msg = self.text + else: + msg = "%s(%s)" % (self.text, self.code) + if info: + msg += " " + ", ".join(["%s=%r" % (k, v) for k, v in self.info.items()]) + Exception.__init__(self, msg) + +class InternalError(MessagingError): + pass + +## Connection Errors + +class ConnectionError(MessagingError): """ The base class for all connection related exceptions. """ @@ -30,7 +49,15 @@ class ConnectError(ConnectionError): """ pass -class SessionError(Exception): +class VersionError(ConnectError): + pass + +class AuthenticationFailure(ConnectError): + pass + +## Session Errors + +class SessionError(MessagingError): pass class Detached(SessionError): @@ -47,19 +74,64 @@ class NontransactionalSession(SessionError): """ pass -class TransactionAborted(SessionError): +class TransactionError(SessionError): pass -class SendError(SessionError): +class TransactionAborted(TransactionError): pass -class InsufficientCapacity(SendError): +class UnauthorizedAccess(SessionError): + pass + +class ServerError(SessionError): + pass + +## Link Errors + +class LinkError(MessagingError): + pass + +class InsufficientCapacity(LinkError): + pass + +class AddressError(LinkError): + pass + +class MalformedAddress(AddressError): + pass + +class InvalidOption(AddressError): + pass + +class ResolutionError(AddressError): + pass + +class AssertionFailed(ResolutionError): + pass + +class NotFound(ResolutionError): + pass + +## Sender Errors + +class SenderError(LinkError): + pass + +class SendError(SenderError): + pass + +class TargetCapacityExceeded(SendError): + pass + +## Receiver Errors + +class ReceiverError(LinkError): pass -class ReceiveError(SessionError): +class FetchError(ReceiverError): pass -class Empty(ReceiveError): +class Empty(FetchError): """ Exception raised by L{Receiver.fetch} when there is no message available within the alloted time. diff --git a/python/qpid/tests/messaging/endpoints.py b/python/qpid/tests/messaging/endpoints.py index c46a3f6fa9..6a5cecf3b1 100644 --- a/python/qpid/tests/messaging/endpoints.py +++ b/python/qpid/tests/messaging/endpoints.py @@ -676,13 +676,13 @@ class AddressTests(Base): try: self.ssn.sender("test-bad-options-snd; %s" % options) assert False - except SendError, e: + except InvalidOption, e: assert "error in options: %s" % error == str(e), e try: self.ssn.receiver("test-bad-options-rcv; %s" % options) assert False - except ReceiveError, e: + except InvalidOption, e: assert "error in options: %s" % error == str(e), e def testIllegalKey(self): @@ -759,7 +759,7 @@ class AddressTests(Base): snd.close() try: self.ssn.sender("test-delete") - except SendError, e: + except NotFound, e: assert "no such queue" in str(e) def testDeleteByReceiver(self): @@ -773,7 +773,7 @@ class AddressTests(Base): try: self.ssn.receiver("test-delete") assert False - except ReceiveError, e: + except NotFound, e: assert "no such queue" in str(e) def testDeleteSpecial(self): @@ -957,45 +957,36 @@ class AddressErrorTests(Base): assert check(e), "unexpected error: %s" % compat.format_exc(e) def testNoneTarget(self): - # XXX: should have specific exception for this - self.senderErrorTest(None, SendError) + self.senderErrorTest(None, MalformedAddress) def testNoneSource(self): - # XXX: should have specific exception for this - self.receiverErrorTest(None, ReceiveError) + self.receiverErrorTest(None, MalformedAddress) def testNoTarget(self): - # XXX: should have specific exception for this - self.senderErrorTest(NOSUCH_Q, SendError, lambda e: NOSUCH_Q in str(e)) + self.senderErrorTest(NOSUCH_Q, NotFound, lambda e: NOSUCH_Q in str(e)) def testNoSource(self): - # XXX: should have specific exception for this - self.receiverErrorTest(NOSUCH_Q, ReceiveError, lambda e: NOSUCH_Q in str(e)) + self.receiverErrorTest(NOSUCH_Q, NotFound, lambda e: NOSUCH_Q in str(e)) def testUnparseableTarget(self): - # XXX: should have specific exception for this - self.senderErrorTest(UNPARSEABLE_ADDR, SendError, + self.senderErrorTest(UNPARSEABLE_ADDR, MalformedAddress, lambda e: "expecting COLON" in str(e)) def testUnparseableSource(self): - # XXX: should have specific exception for this - self.receiverErrorTest(UNPARSEABLE_ADDR, ReceiveError, + self.receiverErrorTest(UNPARSEABLE_ADDR, MalformedAddress, lambda e: "expecting COLON" in str(e)) def testUnlexableTarget(self): - # XXX: should have specific exception for this - self.senderErrorTest(UNLEXABLE_ADDR, SendError, + self.senderErrorTest(UNLEXABLE_ADDR, MalformedAddress, lambda e: "unrecognized characters" in str(e)) def testUnlexableSource(self): - # XXX: should have specific exception for this - self.receiverErrorTest(UNLEXABLE_ADDR, ReceiveError, + self.receiverErrorTest(UNLEXABLE_ADDR, MalformedAddress, lambda e: "unrecognized characters" in str(e)) def testInvalidMode(self): - # XXX: should have specific exception for this self.receiverErrorTest('name; {mode: "this-is-a-bad-receiver-mode"}', - ReceiveError, + InvalidOption, lambda e: "not in ('browse', 'consume')" in str(e)) SENDER_Q = 'test-sender-q; {create: always, delete: always}' |