diff options
author | Rafael H. Schloming <rhs@apache.org> | 2010-02-17 17:50:29 +0000 |
---|---|---|
committer | Rafael H. Schloming <rhs@apache.org> | 2010-02-17 17:50:29 +0000 |
commit | 0f6d12d3117c50b9ff4f3e7d1ee83d6069b67683 (patch) | |
tree | bfbaf78dab177414be1c64b5fdfb3e3b01223fcb | |
parent | 6c2a7ea80b9cabbd9c9051e2d5c9f4274451ed7f (diff) | |
download | qpid-python-0f6d12d3117c50b9ff4f3e7d1ee83d6069b67683.tar.gz |
added caching for resolved addresses
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@911116 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/python/qpid/driver.py | 87 |
1 files changed, 69 insertions, 18 deletions
diff --git a/qpid/python/qpid/driver.py b/qpid/python/qpid/driver.py index aa2ca3ccc5..859ad99040 100644 --- a/qpid/python/qpid/driver.py +++ b/qpid/python/qpid/driver.py @@ -234,6 +234,26 @@ class LinkOut: def del_link(self, sst, snd, _snd): pass +class Cache: + + def __init__(self, ttl): + self.ttl = ttl + self.entries = {} + + def __setitem__(self, key, value): + self.entries[key] = time.time(), value + + def __getitem__(self, key): + tstamp, value = self.entries[key] + if time.time() - tstamp >= self.ttl: + del self.entries[key] + raise KeyError(key) + else: + return value + + def __delitem__(self, key): + del self.entries[key] + # XXX HEADER="!4s4B" @@ -259,6 +279,7 @@ class Driver: self.connection.backups self._host = 0 self._retrying = False + self.reset() def reset(self): @@ -271,6 +292,10 @@ class Driver: self._channels = 0 self._sessions = {} + options = self.connection.options + + self.address_cache = Cache(options.get("address_ttl", 60)) + self._socket = None self._buf = "" self._hdr = "" @@ -289,7 +314,6 @@ class Driver: self._sasl.setAttr("password", self.connection.password) if self.connection.host: self._sasl.setAttr("host", self.connection.host) - options = self.connection.options self._sasl.setAttr("service", options.get("service", "qpidd")) if "min_ssf" in options: self._sasl.setAttr("minssf", options["min_ssf"]) @@ -672,21 +696,22 @@ class Driver: if err: return "error in options: %s" % err def resolve_declare(self, sst, lnk, dir, action): - def do_resolved(er, qr): + declare = lnk.options.get("create") in ("always", dir) + def do_resolved(type, subtype): err = None - if er.not_found and not qr.queue: - if lnk.options.get("create") in ("always", dir): + if type is None: + if declare: err = self.declare(sst, lnk, action) else: err = ("no such queue: %s" % lnk.name,) - elif qr.queue: + elif type == "queue": try: cmds = self.bindings(lnk) - sst.write_cmds(cmds, lambda: action("queue", None)) + sst.write_cmds(cmds, lambda: action(type, subtype)) except address.ParseError, e: err = (e,) else: - action("topic", er.type) + action(type, subtype) if err: tgt = lnk.target @@ -694,15 +719,31 @@ class Driver: del self._attachments[tgt] tgt.closed = True return - self.resolve(sst, lnk.name, do_resolved) + self.resolve(sst, lnk.name, do_resolved, force=declare) + + def resolve(self, sst, name, action, force=False): + if not force: + try: + type, subtype = self.address_cache[name] + action(type, subtype) + return + except KeyError: + pass - def resolve(self, sst, name, action): args = [] def do_result(r): args.append(r) def do_action(r): do_result(r) - action(*args) + er, qr = args + if er.not_found and not qr.queue: + type, subtype = None, None + elif qr.queue: + type, subtype = "queue", None + else: + type, subtype = "topic", er.type + self.address_cache[name] = (type, subtype) + action(type, subtype) sst.write_query(ExchangeQuery(name), do_result) sst.write_query(QueueQuery(name), do_action) @@ -740,7 +781,11 @@ class Driver: except address.ParseError, e: return (e,) - sst.write_cmds(cmds, lambda: action(type, subtype)) + def declared(): + self.address_cache[name] = (type, subtype) + action(type, subtype) + + sst.write_cmds(cmds, declared) def bindings(self, lnk): props = lnk.options.get("node-properties", {}) @@ -753,14 +798,20 @@ class Driver: return cmds def delete(self, sst, name, action): - def do_delete(er, qr): - if not er.not_found: - sst.write_cmd(ExchangeDelete(name), action) - elif qr.queue: - sst.write_cmd(QueueDelete(name), action) - else: + def deleted(): + del self.address_cache[name] + action() + + def do_delete(type, subtype): + if type == "topic": + sst.write_cmd(ExchangeDelete(name), deleted) + elif type == "queue": + sst.write_cmd(QueueDelete(name), deleted) + elif type is None: action() - self.resolve(sst, name, do_delete) + else: + raise ValueError(type) + self.resolve(sst, name, do_delete, force=True) def process(self, ssn): if ssn.closed or ssn.closing: return |