summaryrefslogtreecommitdiff
path: root/python/qpid/driver.py
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2009-10-10 17:15:31 +0000
committerRafael H. Schloming <rhs@apache.org>2009-10-10 17:15:31 +0000
commit13e6bd9704643993d95c81f22106dae5b59b3084 (patch)
tree2dbfa0faacecf665170120c61ec6239e5bff5a9c /python/qpid/driver.py
parentc68d17bf36649f3ba68334c3147e2d0da7246e67 (diff)
downloadqpid-python-13e6bd9704643993d95c81f22106dae5b59b3084.tar.gz
made addresses not auto-create by default; added error handling and tests for nonexist/invalid addresses; added logging for aborted connections; fixed spurious reattach
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@823890 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/qpid/driver.py')
-rw-r--r--python/qpid/driver.py176
1 files changed, 129 insertions, 47 deletions
diff --git a/python/qpid/driver.py b/python/qpid/driver.py
index a4244b9830..dadf43fc7f 100644
--- a/python/qpid/driver.py
+++ b/python/qpid/driver.py
@@ -17,7 +17,7 @@
# under the License.
#
-import compat, connection, socket, struct, sys, time
+import address, compat, connection, socket, struct, sys, time
from concurrency import synchronized
from datatypes import RangedSet, Serial
from exceptions import Timeout, VersionError
@@ -26,18 +26,14 @@ from logging import getLogger
from messaging import get_codec, ConnectError, Message, Pattern, UNLIMITED
from ops import *
from selector import Selector
-from session import Client, INCOMPLETE, SessionDetached
from threading import Condition, Thread
from util import connect
log = getLogger("qpid.messaging")
-def parse_addr(address):
- parts = address.split("/", 1)
- if len(parts) == 1:
- return parts[0], None
- else:
- return parts[0], parts[i1]
+def addr2reply_to(addr):
+ name, subject, options = address.parse(addr)
+ return ReplyTo(name, subject)
def reply_to2addr(reply_to):
if reply_to.routing_key is None:
@@ -100,9 +96,12 @@ class SessionState:
def write_query(self, query, handler):
id = self.sent
+ query.sync = True
self.write_cmd(query, lambda: handler(self.results.pop(id)))
def write_cmd(self, cmd, completion=noop):
+ if self.detached:
+ raise Exception("detached")
cmd.id = self.sent
self.sent += 1
self.completions[cmd.id] = completion
@@ -187,6 +186,7 @@ class Driver:
if data:
log.debug("READ: %r", data)
else:
+ log.debug("ABORTED: %s", self._socket.getpeername())
error = ("connection aborted",)
recoverable = True
except socket.error, e:
@@ -285,7 +285,7 @@ class Driver:
def do_connection_close(self, close):
self.write_op(ConnectionCloseOk())
- if close.reply_ok != close_code.normal:
+ if close.reply_code != close_code.normal:
self.connection.error = (close.reply_code, close.reply_text)
# XXX: should we do a half shutdown on the socket here?
# XXX: we really need to test this, we may end up reporting a
@@ -343,6 +343,10 @@ class Driver:
sst = self.get_sst(er)
sst.results[er.command_id] = er.value
+ def do_execution_exception(self, ex):
+ sst = self.get_sst(ex)
+ sst.session.error = (ex,)
+
def dispatch(self):
try:
if self._socket is None and self.connection._connected and not self._opening:
@@ -381,7 +385,7 @@ class Driver:
def attach(self, ssn):
sst = self._attachments.get(ssn)
- if sst is None:
+ if sst is None and not ssn.closed:
for i in xrange(0, self.channel_max):
if not self._sessions.has_key(i):
ch = i
@@ -403,7 +407,7 @@ class Driver:
for rcv in ssn.receivers:
self.link_in(rcv)
- if ssn.closing and not sst.detached:
+ if sst is not None and ssn.closing and not sst.detached:
sst.detached = True
sst.write_op(SessionDetach(name=ssn.name))
@@ -416,24 +420,66 @@ class Driver:
del self._attachments[ssn]
ssn.closed = True
+ def do_session_detach(self, dtc):
+ sst = self.get_sst(dtc)
+ sst.write_op(SessionDetached(name=dtc.name))
+ self.do_session_detached(dtc)
+
def link_out(self, snd):
- sst = self._attachments[snd.session]
+ sst = self._attachments.get(snd.session)
_snd = self._attachments.get(snd)
- if not snd.closing and _snd is None:
+ if _snd is None and not snd.closing and not snd.closed:
_snd = Attachment(snd)
- _snd.linked = False
- node, _snd._subject = parse_addr(snd.target)
- def do_link_out(result):
- if result.not_found:
- # XXX: should check 'create' option
- sst.write_cmd(QueueDeclare(queue=node, durable=DURABLE_DEFAULT))
+
+ try:
+ _snd.name, _snd.subject, _snd.options = address.parse(snd.target)
+ except address.LexError, e:
+ snd.error = e
+ snd.closed = True
+ return
+ except address.ParseError, e:
+ snd.error = e
+ snd.closed = True
+ return
+
+ # XXX: subject
+ if _snd.options is None:
+ _snd.options = {}
+
+ def do_link():
+ snd.linked = True
+
+ def do_queue_q(result):
+ if sst.detached:
+ return
+
+ if result.queue:
_snd._exchange = ""
- _snd._routing_key = node
+ _snd._routing_key = _snd.name
+ do_link()
else:
- _snd._exchange = node
- _snd._routing_key = _snd._subject
- _snd.linked = True
- sst.write_query(ExchangeQuery(name=snd.target, sync=True), do_link_out)
+ snd.error = ("no such queue: %s" % _snd.name,)
+ del self._attachments[snd]
+ snd.closed = True
+
+ def do_exchange_q(result):
+ if sst.detached:
+ return
+
+ if result.not_found:
+ if _snd.options.get("create") in ("always", "receiver"):
+ sst.write_cmd(QueueDeclare(queue=_snd.name, durable=DURABLE_DEFAULT))
+ _snd._exchange = ""
+ _snd._routing_key = _snd.name
+ else:
+ sst.write_query(QueueQuery(queue=_snd.name), do_queue_q)
+ return
+ else:
+ _snd._exchange = _snd.name
+ _snd._routing_key = _snd.subject
+ do_link()
+
+ sst.write_query(ExchangeQuery(name=_snd.name), do_exchange_q)
self._attachments[snd] = _snd
if snd.closing and not snd.closed:
@@ -441,41 +487,77 @@ class Driver:
snd.closed = True
def link_in(self, rcv):
- sst = self._attachments[rcv.session]
+ sst = self._attachments.get(rcv.session)
_rcv = self._attachments.get(rcv)
- if _rcv is None and not rcv.closing:
+ if _rcv is None and not rcv.closing and not rcv.closed:
_rcv = Attachment(rcv)
- _rcv.linked = False
_rcv.canceled = False
_rcv.draining = False
- def do_link_in(result):
+ try:
+ _rcv.name, _rcv.subject, _rcv.options = address.parse(rcv.source)
+ except address.LexError, e:
+ rcv.error = e
+ rcv.closed = True
+ return
+ except address.ParseError, e:
+ rcv.error = e
+ rcv.closed = True
+ return
+
+ # XXX: subject
+ if _rcv.options is None:
+ _rcv.options = {}
+
+ def do_link():
+ sst.write_cmd(MessageSubscribe(queue=_rcv._queue, destination=rcv.destination))
+ sst.write_cmd(MessageSetFlowMode(rcv.destination, flow_mode.credit))
+ rcv.linked = True
+
+ def do_queue_q(result):
+ if sst.detached:
+ return
+ if result.queue:
+ _rcv._queue = _rcv.name
+ do_link()
+ else:
+ rcv.error = ("no such queue: %s" % _rcv.name,)
+ del self._attachments[rcv]
+ rcv.closed = True
+
+ def do_exchange_q(result):
+ if sst.detached:
+ return
if result.not_found:
- _rcv._queue = rcv.source
- # XXX: should check 'create' option
- sst.write_cmd(QueueDeclare(queue=_rcv._queue, durable=DURABLE_DEFAULT))
+ if _rcv.options.get("create") in ("always", "receiver"):
+ _rcv._queue = _rcv.name
+ sst.write_cmd(QueueDeclare(queue=_rcv._queue, durable=DURABLE_DEFAULT))
+ else:
+ sst.write_query(QueueQuery(queue=_rcv.name), do_queue_q)
+ return
else:
_rcv._queue = "%s.%s" % (rcv.session.name, rcv.destination)
sst.write_cmd(QueueDeclare(queue=_rcv._queue, durable=DURABLE_DEFAULT, exclusive=True, auto_delete=True))
# XXX
- if rcv.filter is None:
+ if _rcv.options.get("filter") is None:
f = FILTER_DEFAULTS[result.type]
else:
f = rcv.filter
- f._bind(sst, rcv.source, _rcv._queue)
- sst.write_cmd(MessageSubscribe(queue=_rcv._queue, destination=rcv.destination))
- sst.write_cmd(MessageSetFlowMode(rcv.destination, flow_mode.credit))
- _rcv.linked = True
- sst.write_query(ExchangeQuery(name=rcv.source, sync=True), do_link_in)
+ f._bind(sst, _rcv.name, _rcv._queue)
+ do_link()
+ sst.write_query(ExchangeQuery(name=_rcv.name), do_exchange_q)
self._attachments[rcv] = _rcv
if rcv.closing and not rcv.closed:
- if not _rcv.canceled:
- def close_rcv():
- del self._attachments[rcv]
- rcv.closed = True
- sst.write_cmd(MessageCancel(rcv.destination, sync=True), close_rcv)
- _rcv.canceled = True
+ if rcv.linked:
+ if not _rcv.canceled:
+ def close_rcv():
+ del self._attachments[rcv]
+ rcv.closed = True
+ sst.write_cmd(MessageCancel(rcv.destination, sync=True), close_rcv)
+ _rcv.canceled = True
+ else:
+ rcv.closed = True
def process(self, ssn):
if ssn.closing: return
@@ -485,8 +567,9 @@ class Driver:
while sst.outgoing_idx < len(ssn.outgoing):
msg = ssn.outgoing[sst.outgoing_idx]
snd = msg._sender
+ # XXX: should check for sender error here
_snd = self._attachments.get(snd)
- if _snd and _snd.linked:
+ if _snd and snd.linked:
self.send(snd, msg)
sst.outgoing_idx += 1
else:
@@ -559,7 +642,7 @@ class Driver:
def grant(self, rcv):
sst = self._attachments[rcv.session]
_rcv = self._attachments.get(rcv)
- if _rcv is None or not _rcv.linked or _rcv.draining:
+ if _rcv is None or not rcv.linked or _rcv.canceled or _rcv.draining:
return
if rcv.granted is UNLIMITED:
@@ -606,7 +689,7 @@ class Driver:
rk = _snd._routing_key
# XXX: do we need to query to figure out how to create the reply-to interoperably?
if msg.reply_to:
- rt = ReplyTo(*parse_addr(msg.reply_to))
+ rt = addr2reply_to(msg.reply_to)
else:
rt = None
dp = DeliveryProperties(routing_key=rk)
@@ -650,7 +733,6 @@ class Driver:
log.debug("RECV [%s] %s", ssn, msg)
ssn.incoming.append(msg)
self.connection._waiter.notifyAll()
- return INCOMPLETE
def _decode(self, xfr):
dp = EMPTY_DP