summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2010-02-17 05:00:22 +0000
committerRafael H. Schloming <rhs@apache.org>2010-02-17 05:00:22 +0000
commita7cb15a4d29c9df1394db91fb1c80c75a708139a (patch)
tree460779f6e79f6b8e8bd47a61baa61e407bb24f55
parent6056ab3b3de9dedf6765b79a643eb40f8d62ed90 (diff)
downloadqpid-python-a7cb15a4d29c9df1394db91fb1c80c75a708139a.tar.gz
combined duplicate logic between link_in/link_out
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@910823 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/python/qpid/driver.py246
1 files changed, 115 insertions, 131 deletions
diff --git a/qpid/python/qpid/driver.py b/qpid/python/qpid/driver.py
index 2eef9db06c..d4f5d3bb93 100644
--- a/qpid/python/qpid/driver.py
+++ b/qpid/python/qpid/driver.py
@@ -132,6 +132,73 @@ class SessionState:
op.channel = self.channel
self.driver.write_op(op)
+class LinkIn:
+
+ ADDR_NAME = "source"
+ DIR_NAME = "receiver"
+
+ def init_link(self, sst, rcv, _rcv):
+ _rcv.destination = str(rcv.id)
+ sst.destinations[_rcv.destination] = _rcv
+ _rcv.closing = False
+ _rcv.draining = False
+
+ def do_link(self, sst, rcv, _rcv, type, subtype):
+ if type == "topic":
+ _rcv._queue = "%s.%s" % (rcv.session.name, _rcv.destination)
+ sst.write_cmd(QueueDeclare(queue=_rcv._queue, durable=DURABLE_DEFAULT, exclusive=True, auto_delete=True))
+ filter = _rcv.options.get("filter")
+ if _rcv.subject is None and filter is None:
+ f = FILTER_DEFAULTS[subtype]
+ elif _rcv.subject and filter:
+ # XXX
+ raise Exception("can't supply both subject and filter")
+ elif _rcv.subject:
+ # XXX
+ from messaging import Pattern
+ f = Pattern(_rcv.subject)
+ else:
+ f = filter
+ f._bind(sst, _rcv.name, _rcv._queue)
+ elif type == "queue":
+ _rcv._queue = _rcv.name
+
+ def done():
+ rcv.linked = True
+
+ sst.write_cmd(MessageSubscribe(queue=_rcv._queue, destination=_rcv.destination))
+ sst.write_cmd(MessageSetFlowMode(_rcv.destination, flow_mode.credit), done)
+
+ def do_unlink(self, sst, rcv, _rcv, action=noop):
+ sst.write_cmd(MessageCancel(_rcv.destination), action)
+
+ def del_link(self, sst, rcv, _rcv):
+ del sst.destinations[_rcv.destination]
+
+class LinkOut:
+
+ ADDR_NAME = "target"
+ DIR_NAME = "sender"
+
+ def init_link(self, sst, snd, _snd):
+ _snd.closing = False
+
+ def do_link(self, sst, snd, _snd, type, subtype):
+ if type == "topic":
+ _snd._exchange = _snd.name
+ _snd._routing_key = _snd.subject
+ elif type == "queue":
+ _snd._exchange = ""
+ _snd._routing_key = _snd.name
+
+ snd.linked = True
+
+ def do_unlink(self, sst, snd, _snd, action=noop):
+ action()
+
+ def del_link(self, sst, snd, _snd):
+ pass
+
# XXX
HEADER="!4s4B"
@@ -148,6 +215,9 @@ class Driver:
self.log_id = "%x" % id(self.connection)
self._lock = self.connection._lock
+ self._in = LinkIn()
+ self._out = LinkOut()
+
self._selector = Selector.default()
self._attempts = 0
self._hosts = [(self.connection.host, self.connection.port)] + \
@@ -486,9 +556,9 @@ class Driver:
self._sessions[sst.channel] = sst
for snd in ssn.senders:
- self.link_out(snd)
+ self.link(snd, self._out, snd.target)
for rcv in ssn.receivers:
- self.link_in(rcv)
+ self.link(rcv, self._in, rcv.source)
if sst is not None and ssn.closing and not sst.detached:
sst.detached = True
@@ -508,135 +578,54 @@ class Driver:
sst.write_op(SessionDetached(name=dtc.name))
self.do_session_detached(dtc)
- def link_out(self, snd):
- sst = self._attachments.get(snd.session)
- _snd = self._attachments.get(snd)
- if _snd is None and not snd.closing and not snd.closed:
- _snd = Attachment(snd)
- _snd.closing = False
-
- if snd.target is None:
- snd.error = ("target is None",)
- snd.closed = True
- return
-
- try:
- _snd.name, _snd.subject, _snd.options = address.parse(snd.target)
- except address.LexError, e:
- snd.error = (e,)
- snd.closed = True
- return
- except address.ParseError, e:
- snd.error = (e,)
- snd.closed = True
- return
-
- # XXX: subject
- if _snd.options is None:
- _snd.options = {}
-
- if not self.validate_options(_snd):
- return
+ def link(self, lnk, dir, addr):
+ sst = self._attachments.get(lnk.session)
+ _lnk = self._attachments.get(lnk)
- def do_link(type, subtype):
- if type == "topic":
- _snd._exchange = _snd.name
- _snd._routing_key = _snd.subject
- elif type == "queue":
- _snd._exchange = ""
- _snd._routing_key = _snd.name
-
- snd.linked = True
-
- self.resolve_declare(sst, _snd, "sender", do_link)
- self._attachments[snd] = _snd
-
- if snd.linked and snd.closing and not (snd.closed or _snd.closing):
- _snd.closing = True
- def do_unlink():
- del self._attachments[snd]
- snd.closed = True
- if _snd.options.get("delete") in ("always", "sender"):
- self.delete(sst, _snd.name, do_unlink)
- else:
- do_unlink()
- elif not snd.linked and snd.closing and not snd.closed:
- snd.closed = True
+ if _lnk is None and not lnk.closing and not lnk.closed:
+ _lnk = Attachment(lnk)
+ dir.init_link(sst, lnk, _lnk)
- def link_in(self, rcv):
- sst = self._attachments.get(rcv.session)
- _rcv = self._attachments.get(rcv)
- if _rcv is None and not rcv.closing and not rcv.closed:
- _rcv = Attachment(rcv)
- _rcv.destination = str(rcv.id)
- sst.destinations[_rcv.destination] = _rcv
- _rcv.canceled = False
- _rcv.draining = False
-
- if rcv.source is None:
- rcv.error = ("source is None",)
- rcv.closed = True
+ err = self.parse_address(_lnk, dir, addr) or self.validate_options(_lnk)
+ if err:
+ lnk.error = (err,)
+ lnk.closed = True
return
+ def resolved(type, subtype):
+ dir.do_link(sst, lnk, _lnk, type, subtype)
+
+ self.resolve_declare(sst, _lnk, dir.DIR_NAME, resolved)
+ self._attachments[lnk] = _lnk
+
+ if lnk.linked and lnk.closing and not lnk.closed:
+ if not _lnk.closing:
+ def done():
+ dir.del_link(sst, lnk, _lnk)
+ del self._attachments[lnk]
+ lnk.closed = True
+ if _lnk.options.get("delete") in ("always", dir.DIR_NAME):
+ dir.do_unlink(sst, lnk, _lnk)
+ self.delete(sst, _lnk.name, done)
+ else:
+ dir.do_unlink(sst, lnk, _lnk, done)
+ _lnk.closing = True
+ elif not lnk.linked and lnk.closing and not lnk.closed:
+ lnk.closed = True
+
+ def parse_address(self, lnk, dir, addr):
+ if addr is None:
+ return "%s is None" % dir.ADDR_NAME
+ else:
try:
- _rcv.name, _rcv.subject, _rcv.options = address.parse(rcv.source)
+ lnk.name, lnk.subject, lnk.options = address.parse(addr)
+ # XXX: subject
+ if lnk.options is None:
+ lnk.options = {}
except address.LexError, e:
- rcv.error = (e,)
- rcv.closed = True
- return
+ return e
except address.ParseError, e:
- rcv.error = (e,)
- rcv.closed = True
- return
-
- # XXX: subject
- if _rcv.options is None:
- _rcv.options = {}
-
- if not self.validate_options(_rcv):
- return
-
- def do_link(type, subtype):
- if type == "topic":
- _rcv._queue = "%s.%s" % (rcv.session.name, _rcv.destination)
- sst.write_cmd(QueueDeclare(queue=_rcv._queue, durable=DURABLE_DEFAULT, exclusive=True, auto_delete=True))
- filter = _rcv.options.get("filter")
- if _rcv.subject is None and filter is None:
- f = FILTER_DEFAULTS[subtype]
- elif _rcv.subject and filter:
- # XXX
- raise Exception("can't supply both subject and filter")
- elif _rcv.subject:
- # XXX
- from messaging import Pattern
- f = Pattern(_rcv.subject)
- else:
- f = filter
- f._bind(sst, _rcv.name, _rcv._queue)
- elif type == "queue":
- _rcv._queue = _rcv.name
-
- sst.write_cmd(MessageSubscribe(queue=_rcv._queue, destination=_rcv.destination))
- sst.write_cmd(MessageSetFlowMode(_rcv.destination, flow_mode.credit))
- rcv.linked = True
-
- self.resolve_declare(sst, _rcv, "receiver", do_link)
- self._attachments[rcv] = _rcv
-
- if rcv.linked and rcv.closing and not rcv.closed:
- if not _rcv.canceled:
- def do_unlink():
- del self._attachments[rcv]
- del sst.destinations[_rcv.destination]
- rcv.closed = True
- if _rcv.options.get("delete") in ("always", "receiver"):
- sst.write_cmd(MessageCancel(_rcv.destination))
- self.delete(sst, _rcv.name, do_unlink)
- else:
- sst.write_cmd(MessageCancel(_rcv.destination), do_unlink)
- _rcv.canceled = True
- elif not rcv.linked and rcv.closing and not rcv.closed:
- rcv.closed = True
+ return e
POLICIES = Values("always", "sender", "receiver", "never")
@@ -665,12 +654,7 @@ class Driver:
def validate_options(self, lnk):
ctx = Context()
err = Driver.OPTS.validate(lnk.options, ctx)
- if err:
- lnk.target.error = ("error in options: %s" % err,)
- lnk.target.closed = True
- return False
- else:
- return True
+ if err: return "error in options: %s" % err
def resolve_declare(self, sst, lnk, dir, action):
def do_resolved(er, qr):
@@ -848,7 +832,7 @@ class Driver:
def grant(self, rcv):
sst = self._attachments[rcv.session]
_rcv = self._attachments.get(rcv)
- if _rcv is None or not rcv.linked or _rcv.canceled or _rcv.draining:
+ if _rcv is None or not rcv.linked or _rcv.closing or _rcv.draining:
return
if rcv.granted is UNLIMITED: