diff options
author | Kenneth Anthony Giusti <kgiusti@apache.org> | 2010-10-29 16:07:38 +0000 |
---|---|---|
committer | Kenneth Anthony Giusti <kgiusti@apache.org> | 2010-10-29 16:07:38 +0000 |
commit | cb763d5178e6c125f89f70942c4709099f5410e9 (patch) | |
tree | 53e13582768903a9c97a184dcead8d9da3fca3b6 | |
parent | 3e594b89e847ae927d116151714f8cd619a60ae2 (diff) | |
download | qpid-python-cb763d5178e6c125f89f70942c4709099f5410e9.tar.gz |
QPID-2782: enhance the console's ability to selectively filter unsolicited events.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1028819 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/extras/qmf/src/py/qmf/console.py | 311 |
1 files changed, 235 insertions, 76 deletions
diff --git a/qpid/extras/qmf/src/py/qmf/console.py b/qpid/extras/qmf/src/py/qmf/console.py index f96800db9c..478b017649 100644 --- a/qpid/extras/qmf/src/py/qmf/console.py +++ b/qpid/extras/qmf/src/py/qmf/console.py @@ -210,10 +210,6 @@ class Object(object): """ Return the broker from which this object was sent """ return self._broker - def getAgent(self): - """ Return the agent from which this object was sent """ - return self._agent - def getV2RoutingKey(self): """ Get the QMFv2 routing key to address this object """ return self._agent.getV2RoutingKey() @@ -579,12 +575,15 @@ class Session: self.rcvHeartbeats = False self.v1BindingKeyList, self.v2BindingKeyList = self._bindingKeys() self.manageConnections = manageConnections - self.agent_filter = [] # (vendor, product, instance) + # callback filters: + self.agent_filter = [] # (vendor, product, instance) || v1-agent-label-str + self.class_filter = [] # (pkg, class) + self.event_filter = [] # (pkg, event) self.agent_heartbeat_min = 10 # minimum agent heartbeat timeout interval self.agent_heartbeat_miss = 3 # # of heartbeats to miss before deleting agent - if self.userBindings and not self.rcvObjects: - raise Exception("userBindings can't be set unless rcvObjects is set and a console is provided") + if self.userBindings and not self.console: + raise Exception("userBindings can't be set unless a console is provided.") def close(self): """ Releases all resources held by the session. Must be called by the @@ -647,9 +646,6 @@ class Session: 'broker' argument is the object returned from the addBroker call. Errors are ignored. """ - if self.console: - for agent in broker.getAgents(): - self.console.delAgent(agent) broker._shutdown() self.brokers.remove(broker) del broker @@ -677,13 +673,21 @@ class Session: def bindPackage(self, packageName): - """ Request object updates and events for all elements of a package. Only - valid if userBindings is True.""" - if not self.userBindings or not self.rcvObjects: - raise Exception("userBindings option not set for Session") + """ Filter object and event callbacks to only those elements of the + specified package. Also filters newPackage and newClass callbacks to the + given package. Only valid if userBindings is True. + """ + if not self.userBindings: + raise Exception("userBindings option must be set for this Session.") + if not self.rcvObjects and not self.rcvEvents: + raise Exception("Session needs to be configured to receive events or objects.") v1keys = ["console.obj.*.*.%s.#" % packageName, "console.event.*.*.%s.#" % packageName] v2keys = ["agent.ind.data.%s.#" % packageName.replace(".", "_"), "agent.ind.event.%s.#" % packageName.replace(".", "_"),] + if (packageName, None) not in self.class_filter: + self.class_filter.append((packageName, None)) + if (packageName, None) not in self.event_filter: + self.event_filter.append((packageName, None)) self.v1BindingKeyList.extend(v1keys) self.v2BindingKeyList.extend(v2keys) for broker in self.brokers: @@ -697,12 +701,15 @@ class Session: def bindClass(self, pname, cname=None): - """ Request object updates for a particular class given package and class - name, or all classes of a particular package if cname=None. Only valid if - userBindings is True. + """ Filter object callbacks to only those objects of the specified package + and optional class. Will also filter newPackage/newClass callbacks to the + specified package and class. Only valid if userBindings is True and + rcvObjects is True. """ - if not self.userBindings or not self.rcvObjects: - raise Exception("userBindings option not set for Session") + if not self.userBindings: + raise Exception("userBindings option must be set for this Session.") + if not self.rcvObjects: + raise Exception("Session needs to be configured with rcvObjects=True.") if cname is not None: v1key = "console.obj.*.*.%s.%s.#" % (pname, cname) v2key = "agent.ind.data.%s.%s.#" % (pname.replace(".", "_"), cname.replace(".", "_")) @@ -711,6 +718,8 @@ class Session: v2key = "agent.ind.data.%s.#" % pname.replace(".", "_") self.v1BindingKeyList.append(v1key) self.v2BindingKeyList.append(v2key) + if (pname, cname) not in self.class_filter: + self.class_filter.append((pname, cname)) for broker in self.brokers: if broker.isConnected(): broker.amqpSession.exchange_bind(exchange="qpid.management", queue=broker.topicName, binding_key=v1key) @@ -720,19 +729,25 @@ class Session: def bindClassKey(self, classKey): - """ Request object updates for a particular table class by class key. Only - valid if userBindings is True. + """ Filter object callbacks to only those objects of the specified + class. Will also filter newPackage/newClass callbacks to the specified + package and class. Only valid if userBindings is True and rcvObjects is + True. """ pname = classKey.getPackageName() cname = classKey.getClassName() self.bindClass(pname, cname) def bindEvent(self, pname, ename=None): - """ Request events from a particular class by package and event name, or - all events is ename=None. Only valid if userBindings is True. + """ Filter event callbacks only from a particular class by package and + event name, or all events in a package if ename=None. Will also filter + newPackage/newClass callbacks to the specified package and class. Only + valid if userBindings is True and rcvEvents is True. """ - if not self.userBindings or not self.rcvEvents: - raise Exception("userBindings option not set for Session") + if not self.userBindings: + raise Exception("userBindings option must be set for this Session.") + if not self.rcvEvents: + raise Exception("Session needs to be configured with rcvEvents=True.") if ename is not None: v1key = "console.event.*.*.%s.%s.#" % (pname, ename) v2key = "agent.ind.event.%s.%s.#" % (pname.replace(".", "_"), ename.replace(".", "_")) @@ -741,6 +756,8 @@ class Session: v2key = "agent.ind.event.%s.#" % pname.replace(".", "_") self.v1BindingKeyList.append(v1key) self.v2BindingKeyList.append(v2key) + if (pname, ename) not in self.event_filter: + self.event_filter.append((pname, ename)) for broker in self.brokers: if broker.isConnected(): broker.amqpSession.exchange_bind(exchange="qpid.management", queue=broker.topicName, binding_key=v1key) @@ -749,40 +766,58 @@ class Session: broker.amqpSession.exchange_bind(exchange="qmf.default.topic", queue=broker.v2_topic_queue_ui, binding_key=v2key) def bindEventKey(self, eventKey): - """ Request events for a particular class by class key. Only valid if - userBindings is True. """ + """ Filter event callbacks only from a particular class key. Will also + filter newPackage/newClass callbacks to the specified package and + class. Only valid if userBindings is True and rcvEvents is True. + """ pname = eventKey.getPackageName() ename = eventKey.getClassName() self.bindEvent(pname, ename) - def bindAgent(self, vendor, product=None, instance=None): - """ Listen for heartbeats/events/data indications only for those agent(s) - that match the vendor and, optionally, the product strings. Only valid if - userBindings is True. + def bindAgent(self, vendor=None, product=None, instance=None, label=None): + """ Receive heartbeats, newAgent and delAgent callbacks only for those + agent(s) that match the passed identification criteria: + V2 agents: vendor, optionally product and instance strings + V1 agents: the label string. + Only valid if userBindings is True. """ if not self.userBindings: raise Exception("Session not configured for binding specific agents.") - if product is not None: - v2key = "agent.ind.heartbeat.%s.%s.#" % (vendor.replace(".", "_"), product.replace(".", "_")) - else: - v2key = "agent.ind.heartbeat.%s.#" % vendor.replace(".", "_") - self.v2BindingKeyList.append(v2key) + if vendor is None and label is None: + raise Exception("Must specify at least a vendor (V2 agents)" + " or label (V1 agents).") - # allow wildcards - only add filter if a non-wildcarded component is given - if vendor == "*": - vendor = None - if product == "*": - product = None - if instance == "*": - instance = None - if vendor or product or instance: - self.agent_filter.append((vendor, product, instance)) + if vendor: # V2 agent identification + if product is not None: + v2key = "agent.ind.heartbeat.%s.%s.#" % (vendor.replace(".", "_"), product.replace(".", "_")) + else: + v2key = "agent.ind.heartbeat.%s.#" % vendor.replace(".", "_") + self.v2BindingKeyList.append(v2key) + + # allow wildcards - only add filter if a non-wildcarded component is given + if vendor == "*": + vendor = None + if product == "*": + product = None + if instance == "*": + instance = None + if vendor or product or instance: + if (vendor, product, instance) not in self.agent_filter: + self.agent_filter.append((vendor, product, instance)) + + for broker in self.brokers: + if broker.isConnected(): + if broker.brokerSupportsV2: + # heartbeats should arrive on the heartbeat queue + broker.amqpSession.exchange_bind(exchange="qmf.default.topic", + queue=broker.v2_topic_queue_hb, + binding_key=v2key) + elif label != "*": # non-wildcard V1 agent label + # V1 format heartbeats do not have any agent identifier in the routing + # key, so we cannot filter them by bindings. + if label not in self.agent_filter: + self.agent_filter.append(label) - for broker in self.brokers: - if broker.isConnected(): - if broker.brokerSupportsV2: - # heartbeats should arrive on the heartbeat queue - broker.amqpSession.exchange_bind(exchange="qmf.default.topic", queue=broker.v2_topic_queue_hb, binding_key=v2key) def getAgents(self, broker=None): """ Get a list of currently known agents """ @@ -915,14 +950,14 @@ class Session: product = kwargs.get("product", "*") severity = kwargs.get("severity", "*") - if package is "*" and event is not "*": + if package == "*" and event != "*": raise Exception("'package' parameter required if 'event' parameter" " supplied") # V1 key - can only filter on package (and event) - if package is not "*": + if package == "*": key = "console.event.*.*." + str(package) - if event is not "*": + if event != "*": key += "." + str(event) key += ".#" @@ -951,6 +986,15 @@ class Session: except: pass + if package != "*": + if event != "*": + f = (package, event) + else: + f = (package, None) + if f not in self.event_filter: + self.event_filter.append(f) + + def addAgentFilter(self, vendor, product=None): """ Deprecate - use bindAgent() instead """ @@ -1030,14 +1074,14 @@ class Session: def _handleBrokerConnect(self, broker): if self.console: for agent in broker.getAgents(): - self.console.newAgent(agent) + self._newAgentCallback(agent) self.console.brokerConnected(broker) def _handleBrokerDisconnect(self, broker): if self.console: for agent in broker.getAgents(): - self.console.delAgent(agent) + self._delAgentCallback(agent) self.console.brokerDisconnected(broker) @@ -1059,7 +1103,7 @@ class Session: pname = str(codec.read_str8()) notify = self.schemaCache.declarePackage(pname) if notify and self.console != None: - self.console.newPackage(pname) + self._newPackageCallback(pname) # Send a class request broker._incOutstanding() @@ -1132,9 +1176,9 @@ class Session: agentBank = 0 agent = broker.getAgent(brokerBank, agentBank) - timestamp = codec.read_uint64() if self.rcvHeartbeats and self.console != None and agent != None: - self.console.heartbeat(agent, timestamp) + timestamp = codec.read_uint64() + self._heartbeatCallback(agent, timestamp) def _handleSchemaResp(self, broker, codec, seq, agent_addr): @@ -1148,9 +1192,9 @@ class Session: broker._decOutstanding() if self.console != None: if new_pkg: - self.console.newPackage(classKey.getPackageName()) + self._newPackageCallback(classKey.getPackageName()) if new_cls: - self.console.newClass(kind, classKey) + self._newClassCallback(kind, classKey) if agent_addr and (agent_addr.__class__ == str or agent_addr.__class__ == unicode): agent = self._getAgentForAgentAddr(agent_addr) @@ -1179,7 +1223,7 @@ class Session: return if self.agent_filter: - # only allow agents that satisfy the filter + # only allow V2 agents that satisfy the filter v = agentName.split(":", 2) if len(v) != 3 or ((v[0], None, None) not in self.agent_filter and (v[0], v[1], None) not in self.agent_filter @@ -1194,7 +1238,7 @@ class Session: else: agent.touch() if self.rcvHeartbeats and self.console and agent: - self.console.heartbeat(agent, timestamp) + self._heartbeatCallback(agent, timestamp) agent.update_schema_timestamp(values.get("schema_timestamp", 0)) @@ -1479,6 +1523,88 @@ class Session: return seq return None + def _newPackageCallback(self, pname): + """ + Invokes the console.newPackage() callback if the callback is present and + the package is not filtered. + """ + if self.console: + if len(self.class_filter) == 0 and len(self.event_filter) == 0: + self.console.newPackage(pname) + else: + for x in self.class_filter: + if x[0] == pname: + self.console.newPackage(pname) + return + + for x in self.event_filter: + if x[0] == pname: + self.console.newPackage(pname) + return + + + def _newClassCallback(self, ctype, ckey): + """ + Invokes the console.newClass() callback if the callback is present and the + class is not filtered. + """ + if self.console: + if ctype == ClassKey.TYPE_DATA: + if (len(self.class_filter) == 0 + or (ckey.getPackageName(), ckey.getClassName()) in self.class_filter): + self.console.newClass(ctype, ckey) + elif ctype == ClassKey.TYPE_EVENT: + if (len(self.event_filter) == 0 + or (ckey.getPackageName(), ckey.getClassName()) in self.event_filter): + self.console.newClass(ctype, ckey) + else: # old class keys did not contain type info, check both filters + if ((len(self.class_filter) == 0 and len(self.event_filter) == 0) + or (ckey.getPackageName(), ckey.getClassName()) in self.class_filter + or (ckey.getPackageName(), ckey.getClassName()) in self.event_filter): + self.console.newClass(ctype, ckey) + + def _agentAllowed(self, agentName, isV2): + """ True if the agent is NOT filtered. + """ + if self.agent_filter: + if isV2: + v = agentName.split(":", 2) + return ((len(v) > 2 and (v[0], v[1], v[2]) in self.agent_filter) + or (len(v) > 1 and (v[0], v[1], None) in self.agent_filter) + or (v and (v[0], None, None) in self.agent_filter)); + else: + return agentName in self.agent_filter + return True + + def _heartbeatCallback(self, agent, timestamp): + """ + Invokes the console.heartbeat() callback if the callback is present and the + agent is not filtered. + """ + if self.console and self.rcvHeartbeats: + if ((agent.isV2 and self._agentAllowed(agent.agentBank, True)) + or ((not agent.isV2) and self._agentAllowed(agent.label, False))): + self.console.heartbeat(agent, timestamp) + + def _newAgentCallback(self, agent): + """ + Invokes the console.newAgent() callback if the callback is present and the + agent is not filtered. + """ + if self.console: + if ((agent.isV2 and self._agentAllowed(agent.agentBank, True)) + or ((not agent.isV2) and self._agentAllowed(agent.label, False))): + self.console.newAgent(agent) + + def _delAgentCallback(self, agent): + """ + Invokes the console.delAgent() callback if the callback is present and the + agent is not filtered. + """ + if self.console: + if ((agent.isV2 and self._agentAllowed(agent.agentBank, True)) + or ((not agent.isV2) and self._agentAllowed(agent.label, False))): + self.console.delAgent(agent) #=================================================================================================== # SessionGetRequest @@ -2385,18 +2511,24 @@ class Broker(Thread): return False # connection failed def _updateAgent(self, obj): + """ + Just received an object of class "org.apache.qpid.broker:agent", which + represents a V1 agent. Add or update the list of agent proxies. + """ bankKey = str(obj.agentBank) agent = None if obj._deleteTime == 0: try: self.cv.acquire() if bankKey not in self.agents: - agent = Agent(self, obj.agentBank, obj.label) - self.agents[bankKey] = agent + # add new agent only if label is not filtered + if len(self.session.agent_filter) == 0 or obj.label in self.session.agent_filter: + agent = Agent(self, obj.agentBank, obj.label) + self.agents[bankKey] = agent finally: self.cv.release() if agent and self.session.console: - self.session.console.newAgent(agent) + self.session._newAgentCallback(agent) else: try: self.cv.acquire() @@ -2406,7 +2538,7 @@ class Broker(Thread): finally: self.cv.release() if agent and self.session.console: - self.session.console.delAgent(agent) + self.session._delAgentCallback(agent) def _addAgent(self, name, agent): try: @@ -2415,7 +2547,7 @@ class Broker(Thread): finally: self.cv.release() if self.session.console: - self.session.console.newAgent(agent) + self.session._newAgentCallback(agent) def _ageAgents(self): if (time() - self.last_age_check) < self.session.agent_heartbeat_min: @@ -2437,7 +2569,7 @@ class Broker(Thread): self.cv.release() if self.session.console: for agent in to_notify: - self.session.console.delAgent(agent) + self.session._delAgentCallback(agent) def _v2SendAgentLocate(self, predicate={}): """ @@ -2517,8 +2649,8 @@ class Broker(Thread): self.cv.release() if self.session.console: - for agent in _agents: - self.session.console.delAgent(agent) + for agent in _agents.itervalues(): + self.session._delAgentCallback(agent) def _shutdown(self, _timeout=10): """ Disconnect from a broker, and release its resources. Errors are @@ -2530,7 +2662,7 @@ class Broker(Thread): self.rcv_queue.put(Broker._q_item(Broker._q_item.type_wakeup, None)) self.join(_timeout) - # abort any pending transactions + # abort any pending transactions and delete agents self._disconnect("broker shutdown") try: @@ -2904,19 +3036,46 @@ class Agent: """ if 'qmf_object' in kwargs: if self.session.console: - self.session.console.objectProps(self.broker, kwargs['qmf_object']) + obj = kwargs['qmf_object'] + if len(self.session.class_filter) == 0: + self.session.console.objectProps(self.broker, obj) + elif obj.getClassKey(): + # slow path: check classKey against event_filter + pname = obj.getClassKey().getPackageName() + cname = obj.getClassKey().getClassName() + if ((pname, cname) in self.session.class_filter + or (pname, None) in self.session.class_filter): + self.session.console.objectProps(self.broker, obj) elif 'qmf_object_stats' in kwargs: if self.session.console: - self.session.console.objectStats(self.broker, kwargs['qmf_object_stats']) + obj = kwargs['qmf_object_stats'] + if len(self.session.class_filter) == 0: + self.session.console.objectStats(self.broker, obj) + elif obj.getClassKey(): + # slow path: check classKey against event_filter + pname = obj.getClassKey().getPackageName() + cname = obj.getClassKey().getClassName() + if ((pname, cname) in self.session.class_filter + or (pname, None) in self.session.class_filter): + self.session.console.objectStats(self.broker, obj) elif 'qmf_event' in kwargs: if self.session.console: - self.session.console.event(self.broker, kwargs['qmf_event']) + event = kwargs['qmf_event'] + if len(self.session.event_filter) == 0: + self.session.console.event(self.broker, event) + elif event.classKey: + # slow path: check classKey against event_filter + pname = event.classKey.getPackageName() + ename = event.classKey.getClassName() + if ((pname, ename) in self.session.event_filter + or (pname, None) in self.session.event_filter): + self.session.console.event(self.broker, event) elif 'qmf_schema_id' in kwargs: ckey = kwargs['qmf_schema_id'] new_pkg, new_cls = self.session.schemaCache.declareClass(ckey) if self.session.console: if new_pkg: - self.session.console.newPackage(ckey.getPackageName()) + self.session._newPackageCallback(ckey.getPackageName()) if new_cls: # translate V2's string based type value to legacy # integer value for backward compatibility @@ -2925,7 +3084,7 @@ class Agent: cls_type = 1 elif str(cls_type) == ckey.TYPE_EVENT: cls_type = 2 - self.session.console.newClass(cls_type, ckey) + self.session._newClassCallback(cls_type, ckey) def touch(self): if self.heartbeatInterval: |