summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2010-02-17 17:50:29 +0000
committerRafael H. Schloming <rhs@apache.org>2010-02-17 17:50:29 +0000
commit0f6d12d3117c50b9ff4f3e7d1ee83d6069b67683 (patch)
treebfbaf78dab177414be1c64b5fdfb3e3b01223fcb
parent6c2a7ea80b9cabbd9c9051e2d5c9f4274451ed7f (diff)
downloadqpid-python-0f6d12d3117c50b9ff4f3e7d1ee83d6069b67683.tar.gz
added caching for resolved addresses
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@911116 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/python/qpid/driver.py87
1 files changed, 69 insertions, 18 deletions
diff --git a/qpid/python/qpid/driver.py b/qpid/python/qpid/driver.py
index aa2ca3ccc5..859ad99040 100644
--- a/qpid/python/qpid/driver.py
+++ b/qpid/python/qpid/driver.py
@@ -234,6 +234,26 @@ class LinkOut:
def del_link(self, sst, snd, _snd):
pass
+class Cache:
+
+ def __init__(self, ttl):
+ self.ttl = ttl
+ self.entries = {}
+
+ def __setitem__(self, key, value):
+ self.entries[key] = time.time(), value
+
+ def __getitem__(self, key):
+ tstamp, value = self.entries[key]
+ if time.time() - tstamp >= self.ttl:
+ del self.entries[key]
+ raise KeyError(key)
+ else:
+ return value
+
+ def __delitem__(self, key):
+ del self.entries[key]
+
# XXX
HEADER="!4s4B"
@@ -259,6 +279,7 @@ class Driver:
self.connection.backups
self._host = 0
self._retrying = False
+
self.reset()
def reset(self):
@@ -271,6 +292,10 @@ class Driver:
self._channels = 0
self._sessions = {}
+ options = self.connection.options
+
+ self.address_cache = Cache(options.get("address_ttl", 60))
+
self._socket = None
self._buf = ""
self._hdr = ""
@@ -289,7 +314,6 @@ class Driver:
self._sasl.setAttr("password", self.connection.password)
if self.connection.host:
self._sasl.setAttr("host", self.connection.host)
- options = self.connection.options
self._sasl.setAttr("service", options.get("service", "qpidd"))
if "min_ssf" in options:
self._sasl.setAttr("minssf", options["min_ssf"])
@@ -672,21 +696,22 @@ class Driver:
if err: return "error in options: %s" % err
def resolve_declare(self, sst, lnk, dir, action):
- def do_resolved(er, qr):
+ declare = lnk.options.get("create") in ("always", dir)
+ def do_resolved(type, subtype):
err = None
- if er.not_found and not qr.queue:
- if lnk.options.get("create") in ("always", dir):
+ if type is None:
+ if declare:
err = self.declare(sst, lnk, action)
else:
err = ("no such queue: %s" % lnk.name,)
- elif qr.queue:
+ elif type == "queue":
try:
cmds = self.bindings(lnk)
- sst.write_cmds(cmds, lambda: action("queue", None))
+ sst.write_cmds(cmds, lambda: action(type, subtype))
except address.ParseError, e:
err = (e,)
else:
- action("topic", er.type)
+ action(type, subtype)
if err:
tgt = lnk.target
@@ -694,15 +719,31 @@ class Driver:
del self._attachments[tgt]
tgt.closed = True
return
- self.resolve(sst, lnk.name, do_resolved)
+ self.resolve(sst, lnk.name, do_resolved, force=declare)
+
+ def resolve(self, sst, name, action, force=False):
+ if not force:
+ try:
+ type, subtype = self.address_cache[name]
+ action(type, subtype)
+ return
+ except KeyError:
+ pass
- def resolve(self, sst, name, action):
args = []
def do_result(r):
args.append(r)
def do_action(r):
do_result(r)
- action(*args)
+ er, qr = args
+ if er.not_found and not qr.queue:
+ type, subtype = None, None
+ elif qr.queue:
+ type, subtype = "queue", None
+ else:
+ type, subtype = "topic", er.type
+ self.address_cache[name] = (type, subtype)
+ action(type, subtype)
sst.write_query(ExchangeQuery(name), do_result)
sst.write_query(QueueQuery(name), do_action)
@@ -740,7 +781,11 @@ class Driver:
except address.ParseError, e:
return (e,)
- sst.write_cmds(cmds, lambda: action(type, subtype))
+ def declared():
+ self.address_cache[name] = (type, subtype)
+ action(type, subtype)
+
+ sst.write_cmds(cmds, declared)
def bindings(self, lnk):
props = lnk.options.get("node-properties", {})
@@ -753,14 +798,20 @@ class Driver:
return cmds
def delete(self, sst, name, action):
- def do_delete(er, qr):
- if not er.not_found:
- sst.write_cmd(ExchangeDelete(name), action)
- elif qr.queue:
- sst.write_cmd(QueueDelete(name), action)
- else:
+ def deleted():
+ del self.address_cache[name]
+ action()
+
+ def do_delete(type, subtype):
+ if type == "topic":
+ sst.write_cmd(ExchangeDelete(name), deleted)
+ elif type == "queue":
+ sst.write_cmd(QueueDelete(name), deleted)
+ elif type is None:
action()
- self.resolve(sst, name, do_delete)
+ else:
+ raise ValueError(type)
+ self.resolve(sst, name, do_delete, force=True)
def process(self, ssn):
if ssn.closed or ssn.closing: return