diff options
Diffstat (limited to 'python/qpid/messaging/driver.py')
-rw-r--r-- | python/qpid/messaging/driver.py | 51 |
1 files changed, 33 insertions, 18 deletions
diff --git a/python/qpid/messaging/driver.py b/python/qpid/messaging/driver.py index 241bed4587..934b5c02cc 100644 --- a/python/qpid/messaging/driver.py +++ b/python/qpid/messaging/driver.py @@ -23,12 +23,11 @@ from qpid import compat from qpid import sasl from qpid.concurrency import synchronized from qpid.datatypes import RangedSet, Serial -from qpid.exceptions import Timeout, VersionError from qpid.framing import OpEncoder, SegmentEncoder, FrameEncoder, \ FrameDecoder, SegmentDecoder, OpDecoder from qpid.messaging import address, transports from qpid.messaging.constants import UNLIMITED, REJECTED, RELEASED -from qpid.messaging.exceptions import ConnectError +from qpid.messaging.exceptions import * from qpid.messaging.message import get_codec, Disposition, Message from qpid.ops import * from qpid.selector import Selector @@ -375,7 +374,7 @@ class Driver: else: self.close_engine() except socket.error, e: - self.close_engine(e) + self.close_engine(ConnectionError(text=str(e))) self.update_status() @@ -388,7 +387,7 @@ class Driver: def close_engine(self, e=None): if e is None: - e = "connection aborted" + e = ConnectionError(text="connection aborted") if (self.connection.reconnect and (self.connection.reconnect_limit is None or @@ -455,7 +454,7 @@ class Driver: except: # XXX: Does socket get leaked if this occurs? msg = compat.format_exc() - self.connection.error = (msg,) + self.connection.error = InternalError(text=msg) def connect(self): try: @@ -481,7 +480,7 @@ class Driver: self._retrying = False except socket.error, e: self._host = (self._host + 1) % len(self._hosts) - self.close_engine(e) + self.close_engine(ConnectError(text=str(e))) DEFAULT_DISPOSITION = Disposition(None) @@ -496,6 +495,20 @@ def get_bindings(opts, queue=None, exchange=None, key=None): cmds.append(ExchangeBind(queue, exchange, key, args)) return cmds +CONNECTION_ERRS = { + # anythong not here (i.e. everything right now) will default to + # connection error + } + +SESSION_ERRS = { + # anything not here will default to session error + error_code.unauthorized_access: UnauthorizedAccess, + error_code.not_found: NotFound, + error_code.resource_locked: ReceiverError, + error_code.resource_limit_exceeded: TargetCapacityExceeded, + error_code.internal_error: ServerError + } + class Engine: def __init__(self, connection): @@ -576,15 +589,15 @@ class Engine: opslog.debug("RCVD[%s]: %r", self.log_id, op) op.dispatch(self) self.dispatch() - except VersionError, e: + except MessagingError, e: self.close(e) except: - self.close(compat.format_exc()) + self.close(InternalError(text=compat.format_exc())) def close(self, e=None): self._reset() if e: - self.connection.error = (e,) + self.connection.error = e self._status = CLOSED def assign_id(self, op): @@ -618,7 +631,7 @@ class Engine: cli_major = 0; cli_minor = 10 magic, _, _, major, minor = struct.unpack(HEADER, hdr) if major != cli_major or minor != cli_minor: - raise VersionError("client: %s-%s, server: %s-%s" % + raise VersionError(text="client: %s-%s, server: %s-%s" % (cli_major, cli_minor, major, minor)) def do_connection_start(self, start): @@ -655,7 +668,8 @@ class Engine: def do_connection_close(self, close): self.write_op(ConnectionCloseOk()) if close.reply_code != close_code.normal: - self.connection.error = (close.reply_code, close.reply_text) + exc = CONNECTION_ERRS.get(close.reply_code, ConnectionError) + self.connection.error = exc(close.reply_code, close.reply_text) # XXX: should we do a half shutdown on the socket here? # XXX: we really need to test this, we may end up reporting a # connection abort after this, if we were to do a shutdown on read @@ -713,7 +727,8 @@ class Engine: def do_execution_exception(self, ex): sst = self.get_sst(ex) - sst.session.error = (ex,) + exc = SESSION_ERRS.get(ex.error_code, SessionError) + sst.session.error = exc(ex.error_code, ex.description) def dispatch(self): if not self.connection._connected and not self._closing and self._status != CLOSED: @@ -786,7 +801,7 @@ class Engine: err = self.parse_address(_lnk, dir, addr) or self.validate_options(_lnk, dir) if err: - lnk.error = (err,) + lnk.error = err lnk.closed = True return @@ -816,7 +831,7 @@ class Engine: def parse_address(self, lnk, dir, addr): if addr is None: - return "%s is None" % dir.ADDR_NAME + return MalformedAddress(text="%s is None" % dir.ADDR_NAME) else: try: lnk.name, lnk.subject, lnk.options = address.parse(addr) @@ -824,14 +839,14 @@ class Engine: if lnk.options is None: lnk.options = {} except address.LexError, e: - return e + return MalformedAddress(text=str(e)) except address.ParseError, e: - return e + return MalformedAddress(text=str(e)) def validate_options(self, lnk, dir): ctx = Context() err = dir.VALIDATOR.validate(lnk.options, ctx) - if err: return "error in options: %s" % err + if err: return InvalidOption(text="error in options: %s" % err) def resolve_declare(self, sst, lnk, dir, action): declare = lnk.options.get("create") in ("always", dir) @@ -841,7 +856,7 @@ class Engine: if declare: err = self.declare(sst, lnk, action) else: - err = ("no such queue: %s" % lnk.name,) + err = NotFound(text="no such queue: %s" % lnk.name) else: action(type, subtype) |