summaryrefslogtreecommitdiff
path: root/python/qpid/messaging/driver.py
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2010-04-13 11:21:38 +0000
committerRafael H. Schloming <rhs@apache.org>2010-04-13 11:21:38 +0000
commit118c4bb7fa781bbb4512a66ba1ca618e70abe64b (patch)
tree7b61c7420cd0736ecdb5b5c05d04b3488a3476cf /python/qpid/messaging/driver.py
parent96367205ad9e9a1d24069845683a59e5f36c9683 (diff)
downloadqpid-python-118c4bb7fa781bbb4512a66ba1ca618e70abe64b.tar.gz
added more complete exception handling/hierarchy
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@933560 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/qpid/messaging/driver.py')
-rw-r--r--python/qpid/messaging/driver.py51
1 files changed, 33 insertions, 18 deletions
diff --git a/python/qpid/messaging/driver.py b/python/qpid/messaging/driver.py
index 241bed4587..934b5c02cc 100644
--- a/python/qpid/messaging/driver.py
+++ b/python/qpid/messaging/driver.py
@@ -23,12 +23,11 @@ from qpid import compat
from qpid import sasl
from qpid.concurrency import synchronized
from qpid.datatypes import RangedSet, Serial
-from qpid.exceptions import Timeout, VersionError
from qpid.framing import OpEncoder, SegmentEncoder, FrameEncoder, \
FrameDecoder, SegmentDecoder, OpDecoder
from qpid.messaging import address, transports
from qpid.messaging.constants import UNLIMITED, REJECTED, RELEASED
-from qpid.messaging.exceptions import ConnectError
+from qpid.messaging.exceptions import *
from qpid.messaging.message import get_codec, Disposition, Message
from qpid.ops import *
from qpid.selector import Selector
@@ -375,7 +374,7 @@ class Driver:
else:
self.close_engine()
except socket.error, e:
- self.close_engine(e)
+ self.close_engine(ConnectionError(text=str(e)))
self.update_status()
@@ -388,7 +387,7 @@ class Driver:
def close_engine(self, e=None):
if e is None:
- e = "connection aborted"
+ e = ConnectionError(text="connection aborted")
if (self.connection.reconnect and
(self.connection.reconnect_limit is None or
@@ -455,7 +454,7 @@ class Driver:
except:
# XXX: Does socket get leaked if this occurs?
msg = compat.format_exc()
- self.connection.error = (msg,)
+ self.connection.error = InternalError(text=msg)
def connect(self):
try:
@@ -481,7 +480,7 @@ class Driver:
self._retrying = False
except socket.error, e:
self._host = (self._host + 1) % len(self._hosts)
- self.close_engine(e)
+ self.close_engine(ConnectError(text=str(e)))
DEFAULT_DISPOSITION = Disposition(None)
@@ -496,6 +495,20 @@ def get_bindings(opts, queue=None, exchange=None, key=None):
cmds.append(ExchangeBind(queue, exchange, key, args))
return cmds
+CONNECTION_ERRS = {
+ # anythong not here (i.e. everything right now) will default to
+ # connection error
+ }
+
+SESSION_ERRS = {
+ # anything not here will default to session error
+ error_code.unauthorized_access: UnauthorizedAccess,
+ error_code.not_found: NotFound,
+ error_code.resource_locked: ReceiverError,
+ error_code.resource_limit_exceeded: TargetCapacityExceeded,
+ error_code.internal_error: ServerError
+ }
+
class Engine:
def __init__(self, connection):
@@ -576,15 +589,15 @@ class Engine:
opslog.debug("RCVD[%s]: %r", self.log_id, op)
op.dispatch(self)
self.dispatch()
- except VersionError, e:
+ except MessagingError, e:
self.close(e)
except:
- self.close(compat.format_exc())
+ self.close(InternalError(text=compat.format_exc()))
def close(self, e=None):
self._reset()
if e:
- self.connection.error = (e,)
+ self.connection.error = e
self._status = CLOSED
def assign_id(self, op):
@@ -618,7 +631,7 @@ class Engine:
cli_major = 0; cli_minor = 10
magic, _, _, major, minor = struct.unpack(HEADER, hdr)
if major != cli_major or minor != cli_minor:
- raise VersionError("client: %s-%s, server: %s-%s" %
+ raise VersionError(text="client: %s-%s, server: %s-%s" %
(cli_major, cli_minor, major, minor))
def do_connection_start(self, start):
@@ -655,7 +668,8 @@ class Engine:
def do_connection_close(self, close):
self.write_op(ConnectionCloseOk())
if close.reply_code != close_code.normal:
- self.connection.error = (close.reply_code, close.reply_text)
+ exc = CONNECTION_ERRS.get(close.reply_code, ConnectionError)
+ self.connection.error = exc(close.reply_code, close.reply_text)
# XXX: should we do a half shutdown on the socket here?
# XXX: we really need to test this, we may end up reporting a
# connection abort after this, if we were to do a shutdown on read
@@ -713,7 +727,8 @@ class Engine:
def do_execution_exception(self, ex):
sst = self.get_sst(ex)
- sst.session.error = (ex,)
+ exc = SESSION_ERRS.get(ex.error_code, SessionError)
+ sst.session.error = exc(ex.error_code, ex.description)
def dispatch(self):
if not self.connection._connected and not self._closing and self._status != CLOSED:
@@ -786,7 +801,7 @@ class Engine:
err = self.parse_address(_lnk, dir, addr) or self.validate_options(_lnk, dir)
if err:
- lnk.error = (err,)
+ lnk.error = err
lnk.closed = True
return
@@ -816,7 +831,7 @@ class Engine:
def parse_address(self, lnk, dir, addr):
if addr is None:
- return "%s is None" % dir.ADDR_NAME
+ return MalformedAddress(text="%s is None" % dir.ADDR_NAME)
else:
try:
lnk.name, lnk.subject, lnk.options = address.parse(addr)
@@ -824,14 +839,14 @@ class Engine:
if lnk.options is None:
lnk.options = {}
except address.LexError, e:
- return e
+ return MalformedAddress(text=str(e))
except address.ParseError, e:
- return e
+ return MalformedAddress(text=str(e))
def validate_options(self, lnk, dir):
ctx = Context()
err = dir.VALIDATOR.validate(lnk.options, ctx)
- if err: return "error in options: %s" % err
+ if err: return InvalidOption(text="error in options: %s" % err)
def resolve_declare(self, sst, lnk, dir, action):
declare = lnk.options.get("create") in ("always", dir)
@@ -841,7 +856,7 @@ class Engine:
if declare:
err = self.declare(sst, lnk, action)
else:
- err = ("no such queue: %s" % lnk.name,)
+ err = NotFound(text="no such queue: %s" % lnk.name)
else:
action(type, subtype)