summaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2010-04-13 11:21:38 +0000
committerRafael H. Schloming <rhs@apache.org>2010-04-13 11:21:38 +0000
commit118c4bb7fa781bbb4512a66ba1ca618e70abe64b (patch)
tree7b61c7420cd0736ecdb5b5c05d04b3488a3476cf /python
parent96367205ad9e9a1d24069845683a59e5f36c9683 (diff)
downloadqpid-python-118c4bb7fa781bbb4512a66ba1ca618e70abe64b.tar.gz
added more complete exception handling/hierarchy
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@933560 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python')
-rw-r--r--python/qpid/messaging/driver.py51
-rw-r--r--python/qpid/messaging/endpoints.py51
-rw-r--r--python/qpid/messaging/exceptions.py86
-rw-r--r--python/qpid/tests/messaging/endpoints.py35
4 files changed, 150 insertions, 73 deletions
diff --git a/python/qpid/messaging/driver.py b/python/qpid/messaging/driver.py
index 241bed4587..934b5c02cc 100644
--- a/python/qpid/messaging/driver.py
+++ b/python/qpid/messaging/driver.py
@@ -23,12 +23,11 @@ from qpid import compat
from qpid import sasl
from qpid.concurrency import synchronized
from qpid.datatypes import RangedSet, Serial
-from qpid.exceptions import Timeout, VersionError
from qpid.framing import OpEncoder, SegmentEncoder, FrameEncoder, \
FrameDecoder, SegmentDecoder, OpDecoder
from qpid.messaging import address, transports
from qpid.messaging.constants import UNLIMITED, REJECTED, RELEASED
-from qpid.messaging.exceptions import ConnectError
+from qpid.messaging.exceptions import *
from qpid.messaging.message import get_codec, Disposition, Message
from qpid.ops import *
from qpid.selector import Selector
@@ -375,7 +374,7 @@ class Driver:
else:
self.close_engine()
except socket.error, e:
- self.close_engine(e)
+ self.close_engine(ConnectionError(text=str(e)))
self.update_status()
@@ -388,7 +387,7 @@ class Driver:
def close_engine(self, e=None):
if e is None:
- e = "connection aborted"
+ e = ConnectionError(text="connection aborted")
if (self.connection.reconnect and
(self.connection.reconnect_limit is None or
@@ -455,7 +454,7 @@ class Driver:
except:
# XXX: Does socket get leaked if this occurs?
msg = compat.format_exc()
- self.connection.error = (msg,)
+ self.connection.error = InternalError(text=msg)
def connect(self):
try:
@@ -481,7 +480,7 @@ class Driver:
self._retrying = False
except socket.error, e:
self._host = (self._host + 1) % len(self._hosts)
- self.close_engine(e)
+ self.close_engine(ConnectError(text=str(e)))
DEFAULT_DISPOSITION = Disposition(None)
@@ -496,6 +495,20 @@ def get_bindings(opts, queue=None, exchange=None, key=None):
cmds.append(ExchangeBind(queue, exchange, key, args))
return cmds
+CONNECTION_ERRS = {
+ # anythong not here (i.e. everything right now) will default to
+ # connection error
+ }
+
+SESSION_ERRS = {
+ # anything not here will default to session error
+ error_code.unauthorized_access: UnauthorizedAccess,
+ error_code.not_found: NotFound,
+ error_code.resource_locked: ReceiverError,
+ error_code.resource_limit_exceeded: TargetCapacityExceeded,
+ error_code.internal_error: ServerError
+ }
+
class Engine:
def __init__(self, connection):
@@ -576,15 +589,15 @@ class Engine:
opslog.debug("RCVD[%s]: %r", self.log_id, op)
op.dispatch(self)
self.dispatch()
- except VersionError, e:
+ except MessagingError, e:
self.close(e)
except:
- self.close(compat.format_exc())
+ self.close(InternalError(text=compat.format_exc()))
def close(self, e=None):
self._reset()
if e:
- self.connection.error = (e,)
+ self.connection.error = e
self._status = CLOSED
def assign_id(self, op):
@@ -618,7 +631,7 @@ class Engine:
cli_major = 0; cli_minor = 10
magic, _, _, major, minor = struct.unpack(HEADER, hdr)
if major != cli_major or minor != cli_minor:
- raise VersionError("client: %s-%s, server: %s-%s" %
+ raise VersionError(text="client: %s-%s, server: %s-%s" %
(cli_major, cli_minor, major, minor))
def do_connection_start(self, start):
@@ -655,7 +668,8 @@ class Engine:
def do_connection_close(self, close):
self.write_op(ConnectionCloseOk())
if close.reply_code != close_code.normal:
- self.connection.error = (close.reply_code, close.reply_text)
+ exc = CONNECTION_ERRS.get(close.reply_code, ConnectionError)
+ self.connection.error = exc(close.reply_code, close.reply_text)
# XXX: should we do a half shutdown on the socket here?
# XXX: we really need to test this, we may end up reporting a
# connection abort after this, if we were to do a shutdown on read
@@ -713,7 +727,8 @@ class Engine:
def do_execution_exception(self, ex):
sst = self.get_sst(ex)
- sst.session.error = (ex,)
+ exc = SESSION_ERRS.get(ex.error_code, SessionError)
+ sst.session.error = exc(ex.error_code, ex.description)
def dispatch(self):
if not self.connection._connected and not self._closing and self._status != CLOSED:
@@ -786,7 +801,7 @@ class Engine:
err = self.parse_address(_lnk, dir, addr) or self.validate_options(_lnk, dir)
if err:
- lnk.error = (err,)
+ lnk.error = err
lnk.closed = True
return
@@ -816,7 +831,7 @@ class Engine:
def parse_address(self, lnk, dir, addr):
if addr is None:
- return "%s is None" % dir.ADDR_NAME
+ return MalformedAddress(text="%s is None" % dir.ADDR_NAME)
else:
try:
lnk.name, lnk.subject, lnk.options = address.parse(addr)
@@ -824,14 +839,14 @@ class Engine:
if lnk.options is None:
lnk.options = {}
except address.LexError, e:
- return e
+ return MalformedAddress(text=str(e))
except address.ParseError, e:
- return e
+ return MalformedAddress(text=str(e))
def validate_options(self, lnk, dir):
ctx = Context()
err = dir.VALIDATOR.validate(lnk.options, ctx)
- if err: return "error in options: %s" % err
+ if err: return InvalidOption(text="error in options: %s" % err)
def resolve_declare(self, sst, lnk, dir, action):
declare = lnk.options.get("create") in ("always", dir)
@@ -841,7 +856,7 @@ class Engine:
if declare:
err = self.declare(sst, lnk, action)
else:
- err = ("no such queue: %s" % lnk.name,)
+ err = NotFound(text="no such queue: %s" % lnk.name)
else:
action(type, subtype)
diff --git a/python/qpid/messaging/endpoints.py b/python/qpid/messaging/endpoints.py
index 3f5cf3b9bd..e2602bfc15 100644
--- a/python/qpid/messaging/endpoints.py
+++ b/python/qpid/messaging/endpoints.py
@@ -172,14 +172,14 @@ class Connection:
self._modcount += 1
self._driver.wakeup()
- def _check_error(self, exc=ConnectionError):
+ def check_error(self):
if self.error:
self._condition.gc()
- raise exc(*self.error)
+ raise self.error
- def _ewait(self, predicate, timeout=None, exc=ConnectionError):
+ def _ewait(self, predicate, timeout=None):
result = self._wait(lambda: self.error or predicate(), timeout)
- self._check_error(exc)
+ self.check_error()
return result
@synchronized
@@ -238,8 +238,7 @@ class Connection:
self._connected = True
self._driver.start()
self._wakeup()
- self._ewait(lambda: self._transport_connected and not self._unlinked(),
- exc=ConnectError)
+ self._ewait(lambda: self._transport_connected and not self._unlinked())
def _unlinked(self):
return [l
@@ -509,14 +508,14 @@ class Session:
def _wakeup(self):
self.connection._wakeup()
- def _check_error(self, exc=SessionError):
- self.connection._check_error(exc)
+ def check_error(self):
+ self.connection.check_error()
if self.error:
- raise exc(*self.error)
+ raise self.error
- def _ewait(self, predicate, timeout=None, exc=SessionError):
- result = self.connection._ewait(lambda: self.error or predicate(), timeout, exc)
- self._check_error(exc)
+ def _ewait(self, predicate, timeout=None):
+ result = self.connection._ewait(lambda: self.error or predicate(), timeout)
+ self.check_error()
return result
@synchronized
@@ -537,7 +536,7 @@ class Session:
self._wakeup()
try:
sender._ewait(lambda: sender.linked)
- except SendError, e:
+ except LinkError, e:
sender.close()
raise e
return sender
@@ -560,7 +559,7 @@ class Session:
self._wakeup()
try:
receiver._ewait(lambda: receiver.linked)
- except ReceiveError, e:
+ except LinkError, e:
receiver.close()
raise e
return receiver
@@ -706,14 +705,14 @@ class Sender:
def _wakeup(self):
self.session._wakeup()
- def _check_error(self, exc=SendError):
- self.session._check_error(exc)
+ def check_error(self):
+ self.session.check_error()
if self.error:
- raise exc(*self.error)
+ raise self.error
- def _ewait(self, predicate, timeout=None, exc=SendError):
- result = self.session._ewait(lambda: self.error or predicate(), timeout, exc)
- self._check_error(exc)
+ def _ewait(self, predicate, timeout=None):
+ result = self.session._ewait(lambda: self.error or predicate(), timeout)
+ self.check_error()
return result
@synchronized
@@ -849,14 +848,14 @@ class Receiver(object):
def _wakeup(self):
self.session._wakeup()
- def _check_error(self, exc=ReceiveError):
- self.session._check_error(exc)
+ def check_error(self):
+ self.session.check_error()
if self.error:
- raise exc(*self.error)
+ raise self.error
- def _ewait(self, predicate, timeout=None, exc=ReceiveError):
- result = self.session._ewait(lambda: self.error or predicate(), timeout, exc)
- self._check_error(exc)
+ def _ewait(self, predicate, timeout=None):
+ result = self.session._ewait(lambda: self.error or predicate(), timeout)
+ self.check_error()
return result
@synchronized
diff --git a/python/qpid/messaging/exceptions.py b/python/qpid/messaging/exceptions.py
index 10ad529806..0a4941a40f 100644
--- a/python/qpid/messaging/exceptions.py
+++ b/python/qpid/messaging/exceptions.py
@@ -17,7 +17,26 @@
# under the License.
#
-class ConnectionError(Exception):
+class MessagingError(Exception):
+
+ def __init__(self, code=None, text=None, **info):
+ self.code = code
+ self.text = text
+ self.info = info
+ if self.code is None:
+ msg = self.text
+ else:
+ msg = "%s(%s)" % (self.text, self.code)
+ if info:
+ msg += " " + ", ".join(["%s=%r" % (k, v) for k, v in self.info.items()])
+ Exception.__init__(self, msg)
+
+class InternalError(MessagingError):
+ pass
+
+## Connection Errors
+
+class ConnectionError(MessagingError):
"""
The base class for all connection related exceptions.
"""
@@ -30,7 +49,15 @@ class ConnectError(ConnectionError):
"""
pass
-class SessionError(Exception):
+class VersionError(ConnectError):
+ pass
+
+class AuthenticationFailure(ConnectError):
+ pass
+
+## Session Errors
+
+class SessionError(MessagingError):
pass
class Detached(SessionError):
@@ -47,19 +74,64 @@ class NontransactionalSession(SessionError):
"""
pass
-class TransactionAborted(SessionError):
+class TransactionError(SessionError):
pass
-class SendError(SessionError):
+class TransactionAborted(TransactionError):
pass
-class InsufficientCapacity(SendError):
+class UnauthorizedAccess(SessionError):
+ pass
+
+class ServerError(SessionError):
+ pass
+
+## Link Errors
+
+class LinkError(MessagingError):
+ pass
+
+class InsufficientCapacity(LinkError):
+ pass
+
+class AddressError(LinkError):
+ pass
+
+class MalformedAddress(AddressError):
+ pass
+
+class InvalidOption(AddressError):
+ pass
+
+class ResolutionError(AddressError):
+ pass
+
+class AssertionFailed(ResolutionError):
+ pass
+
+class NotFound(ResolutionError):
+ pass
+
+## Sender Errors
+
+class SenderError(LinkError):
+ pass
+
+class SendError(SenderError):
+ pass
+
+class TargetCapacityExceeded(SendError):
+ pass
+
+## Receiver Errors
+
+class ReceiverError(LinkError):
pass
-class ReceiveError(SessionError):
+class FetchError(ReceiverError):
pass
-class Empty(ReceiveError):
+class Empty(FetchError):
"""
Exception raised by L{Receiver.fetch} when there is no message
available within the alloted time.
diff --git a/python/qpid/tests/messaging/endpoints.py b/python/qpid/tests/messaging/endpoints.py
index c46a3f6fa9..6a5cecf3b1 100644
--- a/python/qpid/tests/messaging/endpoints.py
+++ b/python/qpid/tests/messaging/endpoints.py
@@ -676,13 +676,13 @@ class AddressTests(Base):
try:
self.ssn.sender("test-bad-options-snd; %s" % options)
assert False
- except SendError, e:
+ except InvalidOption, e:
assert "error in options: %s" % error == str(e), e
try:
self.ssn.receiver("test-bad-options-rcv; %s" % options)
assert False
- except ReceiveError, e:
+ except InvalidOption, e:
assert "error in options: %s" % error == str(e), e
def testIllegalKey(self):
@@ -759,7 +759,7 @@ class AddressTests(Base):
snd.close()
try:
self.ssn.sender("test-delete")
- except SendError, e:
+ except NotFound, e:
assert "no such queue" in str(e)
def testDeleteByReceiver(self):
@@ -773,7 +773,7 @@ class AddressTests(Base):
try:
self.ssn.receiver("test-delete")
assert False
- except ReceiveError, e:
+ except NotFound, e:
assert "no such queue" in str(e)
def testDeleteSpecial(self):
@@ -957,45 +957,36 @@ class AddressErrorTests(Base):
assert check(e), "unexpected error: %s" % compat.format_exc(e)
def testNoneTarget(self):
- # XXX: should have specific exception for this
- self.senderErrorTest(None, SendError)
+ self.senderErrorTest(None, MalformedAddress)
def testNoneSource(self):
- # XXX: should have specific exception for this
- self.receiverErrorTest(None, ReceiveError)
+ self.receiverErrorTest(None, MalformedAddress)
def testNoTarget(self):
- # XXX: should have specific exception for this
- self.senderErrorTest(NOSUCH_Q, SendError, lambda e: NOSUCH_Q in str(e))
+ self.senderErrorTest(NOSUCH_Q, NotFound, lambda e: NOSUCH_Q in str(e))
def testNoSource(self):
- # XXX: should have specific exception for this
- self.receiverErrorTest(NOSUCH_Q, ReceiveError, lambda e: NOSUCH_Q in str(e))
+ self.receiverErrorTest(NOSUCH_Q, NotFound, lambda e: NOSUCH_Q in str(e))
def testUnparseableTarget(self):
- # XXX: should have specific exception for this
- self.senderErrorTest(UNPARSEABLE_ADDR, SendError,
+ self.senderErrorTest(UNPARSEABLE_ADDR, MalformedAddress,
lambda e: "expecting COLON" in str(e))
def testUnparseableSource(self):
- # XXX: should have specific exception for this
- self.receiverErrorTest(UNPARSEABLE_ADDR, ReceiveError,
+ self.receiverErrorTest(UNPARSEABLE_ADDR, MalformedAddress,
lambda e: "expecting COLON" in str(e))
def testUnlexableTarget(self):
- # XXX: should have specific exception for this
- self.senderErrorTest(UNLEXABLE_ADDR, SendError,
+ self.senderErrorTest(UNLEXABLE_ADDR, MalformedAddress,
lambda e: "unrecognized characters" in str(e))
def testUnlexableSource(self):
- # XXX: should have specific exception for this
- self.receiverErrorTest(UNLEXABLE_ADDR, ReceiveError,
+ self.receiverErrorTest(UNLEXABLE_ADDR, MalformedAddress,
lambda e: "unrecognized characters" in str(e))
def testInvalidMode(self):
- # XXX: should have specific exception for this
self.receiverErrorTest('name; {mode: "this-is-a-bad-receiver-mode"}',
- ReceiveError,
+ InvalidOption,
lambda e: "not in ('browse', 'consume')" in str(e))
SENDER_Q = 'test-sender-q; {create: always, delete: always}'