summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKenneth Anthony Giusti <kgiusti@apache.org>2010-10-29 16:07:38 +0000
committerKenneth Anthony Giusti <kgiusti@apache.org>2010-10-29 16:07:38 +0000
commitcb763d5178e6c125f89f70942c4709099f5410e9 (patch)
tree53e13582768903a9c97a184dcead8d9da3fca3b6
parent3e594b89e847ae927d116151714f8cd619a60ae2 (diff)
downloadqpid-python-cb763d5178e6c125f89f70942c4709099f5410e9.tar.gz
QPID-2782: enhance the console's ability to selectively filter unsolicited events.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1028819 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/extras/qmf/src/py/qmf/console.py311
1 files changed, 235 insertions, 76 deletions
diff --git a/qpid/extras/qmf/src/py/qmf/console.py b/qpid/extras/qmf/src/py/qmf/console.py
index f96800db9c..478b017649 100644
--- a/qpid/extras/qmf/src/py/qmf/console.py
+++ b/qpid/extras/qmf/src/py/qmf/console.py
@@ -210,10 +210,6 @@ class Object(object):
""" Return the broker from which this object was sent """
return self._broker
- def getAgent(self):
- """ Return the agent from which this object was sent """
- return self._agent
-
def getV2RoutingKey(self):
""" Get the QMFv2 routing key to address this object """
return self._agent.getV2RoutingKey()
@@ -579,12 +575,15 @@ class Session:
self.rcvHeartbeats = False
self.v1BindingKeyList, self.v2BindingKeyList = self._bindingKeys()
self.manageConnections = manageConnections
- self.agent_filter = [] # (vendor, product, instance)
+ # callback filters:
+ self.agent_filter = [] # (vendor, product, instance) || v1-agent-label-str
+ self.class_filter = [] # (pkg, class)
+ self.event_filter = [] # (pkg, event)
self.agent_heartbeat_min = 10 # minimum agent heartbeat timeout interval
self.agent_heartbeat_miss = 3 # # of heartbeats to miss before deleting agent
- if self.userBindings and not self.rcvObjects:
- raise Exception("userBindings can't be set unless rcvObjects is set and a console is provided")
+ if self.userBindings and not self.console:
+ raise Exception("userBindings can't be set unless a console is provided.")
def close(self):
""" Releases all resources held by the session. Must be called by the
@@ -647,9 +646,6 @@ class Session:
'broker' argument is the object returned from the addBroker call. Errors
are ignored.
"""
- if self.console:
- for agent in broker.getAgents():
- self.console.delAgent(agent)
broker._shutdown()
self.brokers.remove(broker)
del broker
@@ -677,13 +673,21 @@ class Session:
def bindPackage(self, packageName):
- """ Request object updates and events for all elements of a package. Only
- valid if userBindings is True."""
- if not self.userBindings or not self.rcvObjects:
- raise Exception("userBindings option not set for Session")
+ """ Filter object and event callbacks to only those elements of the
+ specified package. Also filters newPackage and newClass callbacks to the
+ given package. Only valid if userBindings is True.
+ """
+ if not self.userBindings:
+ raise Exception("userBindings option must be set for this Session.")
+ if not self.rcvObjects and not self.rcvEvents:
+ raise Exception("Session needs to be configured to receive events or objects.")
v1keys = ["console.obj.*.*.%s.#" % packageName, "console.event.*.*.%s.#" % packageName]
v2keys = ["agent.ind.data.%s.#" % packageName.replace(".", "_"),
"agent.ind.event.%s.#" % packageName.replace(".", "_"),]
+ if (packageName, None) not in self.class_filter:
+ self.class_filter.append((packageName, None))
+ if (packageName, None) not in self.event_filter:
+ self.event_filter.append((packageName, None))
self.v1BindingKeyList.extend(v1keys)
self.v2BindingKeyList.extend(v2keys)
for broker in self.brokers:
@@ -697,12 +701,15 @@ class Session:
def bindClass(self, pname, cname=None):
- """ Request object updates for a particular class given package and class
- name, or all classes of a particular package if cname=None. Only valid if
- userBindings is True.
+ """ Filter object callbacks to only those objects of the specified package
+ and optional class. Will also filter newPackage/newClass callbacks to the
+ specified package and class. Only valid if userBindings is True and
+ rcvObjects is True.
"""
- if not self.userBindings or not self.rcvObjects:
- raise Exception("userBindings option not set for Session")
+ if not self.userBindings:
+ raise Exception("userBindings option must be set for this Session.")
+ if not self.rcvObjects:
+ raise Exception("Session needs to be configured with rcvObjects=True.")
if cname is not None:
v1key = "console.obj.*.*.%s.%s.#" % (pname, cname)
v2key = "agent.ind.data.%s.%s.#" % (pname.replace(".", "_"), cname.replace(".", "_"))
@@ -711,6 +718,8 @@ class Session:
v2key = "agent.ind.data.%s.#" % pname.replace(".", "_")
self.v1BindingKeyList.append(v1key)
self.v2BindingKeyList.append(v2key)
+ if (pname, cname) not in self.class_filter:
+ self.class_filter.append((pname, cname))
for broker in self.brokers:
if broker.isConnected():
broker.amqpSession.exchange_bind(exchange="qpid.management", queue=broker.topicName, binding_key=v1key)
@@ -720,19 +729,25 @@ class Session:
def bindClassKey(self, classKey):
- """ Request object updates for a particular table class by class key. Only
- valid if userBindings is True.
+ """ Filter object callbacks to only those objects of the specified
+ class. Will also filter newPackage/newClass callbacks to the specified
+ package and class. Only valid if userBindings is True and rcvObjects is
+ True.
"""
pname = classKey.getPackageName()
cname = classKey.getClassName()
self.bindClass(pname, cname)
def bindEvent(self, pname, ename=None):
- """ Request events from a particular class by package and event name, or
- all events is ename=None. Only valid if userBindings is True.
+ """ Filter event callbacks only from a particular class by package and
+ event name, or all events in a package if ename=None. Will also filter
+ newPackage/newClass callbacks to the specified package and class. Only
+ valid if userBindings is True and rcvEvents is True.
"""
- if not self.userBindings or not self.rcvEvents:
- raise Exception("userBindings option not set for Session")
+ if not self.userBindings:
+ raise Exception("userBindings option must be set for this Session.")
+ if not self.rcvEvents:
+ raise Exception("Session needs to be configured with rcvEvents=True.")
if ename is not None:
v1key = "console.event.*.*.%s.%s.#" % (pname, ename)
v2key = "agent.ind.event.%s.%s.#" % (pname.replace(".", "_"), ename.replace(".", "_"))
@@ -741,6 +756,8 @@ class Session:
v2key = "agent.ind.event.%s.#" % pname.replace(".", "_")
self.v1BindingKeyList.append(v1key)
self.v2BindingKeyList.append(v2key)
+ if (pname, ename) not in self.event_filter:
+ self.event_filter.append((pname, ename))
for broker in self.brokers:
if broker.isConnected():
broker.amqpSession.exchange_bind(exchange="qpid.management", queue=broker.topicName, binding_key=v1key)
@@ -749,40 +766,58 @@ class Session:
broker.amqpSession.exchange_bind(exchange="qmf.default.topic", queue=broker.v2_topic_queue_ui, binding_key=v2key)
def bindEventKey(self, eventKey):
- """ Request events for a particular class by class key. Only valid if
- userBindings is True. """
+ """ Filter event callbacks only from a particular class key. Will also
+ filter newPackage/newClass callbacks to the specified package and
+ class. Only valid if userBindings is True and rcvEvents is True.
+ """
pname = eventKey.getPackageName()
ename = eventKey.getClassName()
self.bindEvent(pname, ename)
- def bindAgent(self, vendor, product=None, instance=None):
- """ Listen for heartbeats/events/data indications only for those agent(s)
- that match the vendor and, optionally, the product strings. Only valid if
- userBindings is True.
+ def bindAgent(self, vendor=None, product=None, instance=None, label=None):
+ """ Receive heartbeats, newAgent and delAgent callbacks only for those
+ agent(s) that match the passed identification criteria:
+ V2 agents: vendor, optionally product and instance strings
+ V1 agents: the label string.
+ Only valid if userBindings is True.
"""
if not self.userBindings:
raise Exception("Session not configured for binding specific agents.")
- if product is not None:
- v2key = "agent.ind.heartbeat.%s.%s.#" % (vendor.replace(".", "_"), product.replace(".", "_"))
- else:
- v2key = "agent.ind.heartbeat.%s.#" % vendor.replace(".", "_")
- self.v2BindingKeyList.append(v2key)
+ if vendor is None and label is None:
+ raise Exception("Must specify at least a vendor (V2 agents)"
+ " or label (V1 agents).")
- # allow wildcards - only add filter if a non-wildcarded component is given
- if vendor == "*":
- vendor = None
- if product == "*":
- product = None
- if instance == "*":
- instance = None
- if vendor or product or instance:
- self.agent_filter.append((vendor, product, instance))
+ if vendor: # V2 agent identification
+ if product is not None:
+ v2key = "agent.ind.heartbeat.%s.%s.#" % (vendor.replace(".", "_"), product.replace(".", "_"))
+ else:
+ v2key = "agent.ind.heartbeat.%s.#" % vendor.replace(".", "_")
+ self.v2BindingKeyList.append(v2key)
+
+ # allow wildcards - only add filter if a non-wildcarded component is given
+ if vendor == "*":
+ vendor = None
+ if product == "*":
+ product = None
+ if instance == "*":
+ instance = None
+ if vendor or product or instance:
+ if (vendor, product, instance) not in self.agent_filter:
+ self.agent_filter.append((vendor, product, instance))
+
+ for broker in self.brokers:
+ if broker.isConnected():
+ if broker.brokerSupportsV2:
+ # heartbeats should arrive on the heartbeat queue
+ broker.amqpSession.exchange_bind(exchange="qmf.default.topic",
+ queue=broker.v2_topic_queue_hb,
+ binding_key=v2key)
+ elif label != "*": # non-wildcard V1 agent label
+ # V1 format heartbeats do not have any agent identifier in the routing
+ # key, so we cannot filter them by bindings.
+ if label not in self.agent_filter:
+ self.agent_filter.append(label)
- for broker in self.brokers:
- if broker.isConnected():
- if broker.brokerSupportsV2:
- # heartbeats should arrive on the heartbeat queue
- broker.amqpSession.exchange_bind(exchange="qmf.default.topic", queue=broker.v2_topic_queue_hb, binding_key=v2key)
def getAgents(self, broker=None):
""" Get a list of currently known agents """
@@ -915,14 +950,14 @@ class Session:
product = kwargs.get("product", "*")
severity = kwargs.get("severity", "*")
- if package is "*" and event is not "*":
+ if package == "*" and event != "*":
raise Exception("'package' parameter required if 'event' parameter"
" supplied")
# V1 key - can only filter on package (and event)
- if package is not "*":
+ if package == "*":
key = "console.event.*.*." + str(package)
- if event is not "*":
+ if event != "*":
key += "." + str(event)
key += ".#"
@@ -951,6 +986,15 @@ class Session:
except:
pass
+ if package != "*":
+ if event != "*":
+ f = (package, event)
+ else:
+ f = (package, None)
+ if f not in self.event_filter:
+ self.event_filter.append(f)
+
+
def addAgentFilter(self, vendor, product=None):
""" Deprecate - use bindAgent() instead
"""
@@ -1030,14 +1074,14 @@ class Session:
def _handleBrokerConnect(self, broker):
if self.console:
for agent in broker.getAgents():
- self.console.newAgent(agent)
+ self._newAgentCallback(agent)
self.console.brokerConnected(broker)
def _handleBrokerDisconnect(self, broker):
if self.console:
for agent in broker.getAgents():
- self.console.delAgent(agent)
+ self._delAgentCallback(agent)
self.console.brokerDisconnected(broker)
@@ -1059,7 +1103,7 @@ class Session:
pname = str(codec.read_str8())
notify = self.schemaCache.declarePackage(pname)
if notify and self.console != None:
- self.console.newPackage(pname)
+ self._newPackageCallback(pname)
# Send a class request
broker._incOutstanding()
@@ -1132,9 +1176,9 @@ class Session:
agentBank = 0
agent = broker.getAgent(brokerBank, agentBank)
- timestamp = codec.read_uint64()
if self.rcvHeartbeats and self.console != None and agent != None:
- self.console.heartbeat(agent, timestamp)
+ timestamp = codec.read_uint64()
+ self._heartbeatCallback(agent, timestamp)
def _handleSchemaResp(self, broker, codec, seq, agent_addr):
@@ -1148,9 +1192,9 @@ class Session:
broker._decOutstanding()
if self.console != None:
if new_pkg:
- self.console.newPackage(classKey.getPackageName())
+ self._newPackageCallback(classKey.getPackageName())
if new_cls:
- self.console.newClass(kind, classKey)
+ self._newClassCallback(kind, classKey)
if agent_addr and (agent_addr.__class__ == str or agent_addr.__class__ == unicode):
agent = self._getAgentForAgentAddr(agent_addr)
@@ -1179,7 +1223,7 @@ class Session:
return
if self.agent_filter:
- # only allow agents that satisfy the filter
+ # only allow V2 agents that satisfy the filter
v = agentName.split(":", 2)
if len(v) != 3 or ((v[0], None, None) not in self.agent_filter
and (v[0], v[1], None) not in self.agent_filter
@@ -1194,7 +1238,7 @@ class Session:
else:
agent.touch()
if self.rcvHeartbeats and self.console and agent:
- self.console.heartbeat(agent, timestamp)
+ self._heartbeatCallback(agent, timestamp)
agent.update_schema_timestamp(values.get("schema_timestamp", 0))
@@ -1479,6 +1523,88 @@ class Session:
return seq
return None
+ def _newPackageCallback(self, pname):
+ """
+ Invokes the console.newPackage() callback if the callback is present and
+ the package is not filtered.
+ """
+ if self.console:
+ if len(self.class_filter) == 0 and len(self.event_filter) == 0:
+ self.console.newPackage(pname)
+ else:
+ for x in self.class_filter:
+ if x[0] == pname:
+ self.console.newPackage(pname)
+ return
+
+ for x in self.event_filter:
+ if x[0] == pname:
+ self.console.newPackage(pname)
+ return
+
+
+ def _newClassCallback(self, ctype, ckey):
+ """
+ Invokes the console.newClass() callback if the callback is present and the
+ class is not filtered.
+ """
+ if self.console:
+ if ctype == ClassKey.TYPE_DATA:
+ if (len(self.class_filter) == 0
+ or (ckey.getPackageName(), ckey.getClassName()) in self.class_filter):
+ self.console.newClass(ctype, ckey)
+ elif ctype == ClassKey.TYPE_EVENT:
+ if (len(self.event_filter) == 0
+ or (ckey.getPackageName(), ckey.getClassName()) in self.event_filter):
+ self.console.newClass(ctype, ckey)
+ else: # old class keys did not contain type info, check both filters
+ if ((len(self.class_filter) == 0 and len(self.event_filter) == 0)
+ or (ckey.getPackageName(), ckey.getClassName()) in self.class_filter
+ or (ckey.getPackageName(), ckey.getClassName()) in self.event_filter):
+ self.console.newClass(ctype, ckey)
+
+ def _agentAllowed(self, agentName, isV2):
+ """ True if the agent is NOT filtered.
+ """
+ if self.agent_filter:
+ if isV2:
+ v = agentName.split(":", 2)
+ return ((len(v) > 2 and (v[0], v[1], v[2]) in self.agent_filter)
+ or (len(v) > 1 and (v[0], v[1], None) in self.agent_filter)
+ or (v and (v[0], None, None) in self.agent_filter));
+ else:
+ return agentName in self.agent_filter
+ return True
+
+ def _heartbeatCallback(self, agent, timestamp):
+ """
+ Invokes the console.heartbeat() callback if the callback is present and the
+ agent is not filtered.
+ """
+ if self.console and self.rcvHeartbeats:
+ if ((agent.isV2 and self._agentAllowed(agent.agentBank, True))
+ or ((not agent.isV2) and self._agentAllowed(agent.label, False))):
+ self.console.heartbeat(agent, timestamp)
+
+ def _newAgentCallback(self, agent):
+ """
+ Invokes the console.newAgent() callback if the callback is present and the
+ agent is not filtered.
+ """
+ if self.console:
+ if ((agent.isV2 and self._agentAllowed(agent.agentBank, True))
+ or ((not agent.isV2) and self._agentAllowed(agent.label, False))):
+ self.console.newAgent(agent)
+
+ def _delAgentCallback(self, agent):
+ """
+ Invokes the console.delAgent() callback if the callback is present and the
+ agent is not filtered.
+ """
+ if self.console:
+ if ((agent.isV2 and self._agentAllowed(agent.agentBank, True))
+ or ((not agent.isV2) and self._agentAllowed(agent.label, False))):
+ self.console.delAgent(agent)
#===================================================================================================
# SessionGetRequest
@@ -2385,18 +2511,24 @@ class Broker(Thread):
return False # connection failed
def _updateAgent(self, obj):
+ """
+ Just received an object of class "org.apache.qpid.broker:agent", which
+ represents a V1 agent. Add or update the list of agent proxies.
+ """
bankKey = str(obj.agentBank)
agent = None
if obj._deleteTime == 0:
try:
self.cv.acquire()
if bankKey not in self.agents:
- agent = Agent(self, obj.agentBank, obj.label)
- self.agents[bankKey] = agent
+ # add new agent only if label is not filtered
+ if len(self.session.agent_filter) == 0 or obj.label in self.session.agent_filter:
+ agent = Agent(self, obj.agentBank, obj.label)
+ self.agents[bankKey] = agent
finally:
self.cv.release()
if agent and self.session.console:
- self.session.console.newAgent(agent)
+ self.session._newAgentCallback(agent)
else:
try:
self.cv.acquire()
@@ -2406,7 +2538,7 @@ class Broker(Thread):
finally:
self.cv.release()
if agent and self.session.console:
- self.session.console.delAgent(agent)
+ self.session._delAgentCallback(agent)
def _addAgent(self, name, agent):
try:
@@ -2415,7 +2547,7 @@ class Broker(Thread):
finally:
self.cv.release()
if self.session.console:
- self.session.console.newAgent(agent)
+ self.session._newAgentCallback(agent)
def _ageAgents(self):
if (time() - self.last_age_check) < self.session.agent_heartbeat_min:
@@ -2437,7 +2569,7 @@ class Broker(Thread):
self.cv.release()
if self.session.console:
for agent in to_notify:
- self.session.console.delAgent(agent)
+ self.session._delAgentCallback(agent)
def _v2SendAgentLocate(self, predicate={}):
"""
@@ -2517,8 +2649,8 @@ class Broker(Thread):
self.cv.release()
if self.session.console:
- for agent in _agents:
- self.session.console.delAgent(agent)
+ for agent in _agents.itervalues():
+ self.session._delAgentCallback(agent)
def _shutdown(self, _timeout=10):
""" Disconnect from a broker, and release its resources. Errors are
@@ -2530,7 +2662,7 @@ class Broker(Thread):
self.rcv_queue.put(Broker._q_item(Broker._q_item.type_wakeup, None))
self.join(_timeout)
- # abort any pending transactions
+ # abort any pending transactions and delete agents
self._disconnect("broker shutdown")
try:
@@ -2904,19 +3036,46 @@ class Agent:
"""
if 'qmf_object' in kwargs:
if self.session.console:
- self.session.console.objectProps(self.broker, kwargs['qmf_object'])
+ obj = kwargs['qmf_object']
+ if len(self.session.class_filter) == 0:
+ self.session.console.objectProps(self.broker, obj)
+ elif obj.getClassKey():
+ # slow path: check classKey against event_filter
+ pname = obj.getClassKey().getPackageName()
+ cname = obj.getClassKey().getClassName()
+ if ((pname, cname) in self.session.class_filter
+ or (pname, None) in self.session.class_filter):
+ self.session.console.objectProps(self.broker, obj)
elif 'qmf_object_stats' in kwargs:
if self.session.console:
- self.session.console.objectStats(self.broker, kwargs['qmf_object_stats'])
+ obj = kwargs['qmf_object_stats']
+ if len(self.session.class_filter) == 0:
+ self.session.console.objectStats(self.broker, obj)
+ elif obj.getClassKey():
+ # slow path: check classKey against event_filter
+ pname = obj.getClassKey().getPackageName()
+ cname = obj.getClassKey().getClassName()
+ if ((pname, cname) in self.session.class_filter
+ or (pname, None) in self.session.class_filter):
+ self.session.console.objectStats(self.broker, obj)
elif 'qmf_event' in kwargs:
if self.session.console:
- self.session.console.event(self.broker, kwargs['qmf_event'])
+ event = kwargs['qmf_event']
+ if len(self.session.event_filter) == 0:
+ self.session.console.event(self.broker, event)
+ elif event.classKey:
+ # slow path: check classKey against event_filter
+ pname = event.classKey.getPackageName()
+ ename = event.classKey.getClassName()
+ if ((pname, ename) in self.session.event_filter
+ or (pname, None) in self.session.event_filter):
+ self.session.console.event(self.broker, event)
elif 'qmf_schema_id' in kwargs:
ckey = kwargs['qmf_schema_id']
new_pkg, new_cls = self.session.schemaCache.declareClass(ckey)
if self.session.console:
if new_pkg:
- self.session.console.newPackage(ckey.getPackageName())
+ self.session._newPackageCallback(ckey.getPackageName())
if new_cls:
# translate V2's string based type value to legacy
# integer value for backward compatibility
@@ -2925,7 +3084,7 @@ class Agent:
cls_type = 1
elif str(cls_type) == ckey.TYPE_EVENT:
cls_type = 2
- self.session.console.newClass(cls_type, ckey)
+ self.session._newClassCallback(cls_type, ckey)
def touch(self):
if self.heartbeatInterval: