diff options
Diffstat (limited to 'qpid/python/qmf2/agent.py')
-rw-r--r-- | qpid/python/qmf2/agent.py | 758 |
1 files changed, 758 insertions, 0 deletions
diff --git a/qpid/python/qmf2/agent.py b/qpid/python/qmf2/agent.py new file mode 100644 index 0000000000..c6a518ca31 --- /dev/null +++ b/qpid/python/qmf2/agent.py @@ -0,0 +1,758 @@ + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import sys +import logging +import datetime +import time +import Queue +from threading import Thread, Lock, currentThread +from qpid.messaging import Connection, Message, Empty, SendError +from uuid import uuid4 +from common import (AMQP_QMF_AGENT_LOCATE, AMQP_QMF_AGENT_INDICATION, + makeSubject, parseSubject, OpCode, QmfQuery, + SchemaObjectClass, MsgKey, QmfData, QmfAddress, + SchemaClass, SchemaClassId, WorkItem, SchemaMethod) + +# global flag that indicates which thread (if any) is +# running the agent notifier callback +_callback_thread=None + + ##============================================================================== + ## METHOD CALL + ##============================================================================== + +class _MethodCallHandle(object): + """ + Private class used to hold context when handing off a method call to the + application. Given to the app in a WorkItem, provided to the agent when + method_response() is invoked. + """ + def __init__(self, correlation_id, reply_to, meth_name, _oid=None): + self.correlation_id = correlation_id + self.reply_to = reply_to + self.meth_name = meth_name + self.oid = _oid + +class MethodCallParams(object): + """ + """ + def __init__(self, name, _oid=None, _in_args=None, _user_id=None): + self._meth_name = name + self._oid = _oid + self._in_args = _in_args + self._user_id = _user_id + + def get_name(self): + return self._meth_name + + def get_object_id(self): + return self._oid + + def get_args(self): + return self._in_args + + def get_user_id(self): + return self._user_id + + + + ##============================================================================== + ## AGENT + ##============================================================================== + +class Agent(Thread): + def __init__(self, name, _domain=None, _notifier=None, _heartbeat_interval=30, + _max_msg_size=0, _capacity=10): + Thread.__init__(self) + self._running = False + + self.name = str(name) + self._domain = _domain + self._notifier = _notifier + self._heartbeat_interval = _heartbeat_interval + self._max_msg_size = _max_msg_size + self._capacity = _capacity + + self._conn = None + self._session = None + self._direct_receiver = None + self._locate_receiver = None + self._ind_sender = None + self._event_sender = None + + self._lock = Lock() + self._packages = {} + self._schema_timestamp = long(0) + self._schema = {} + self._agent_data = {} + self._work_q = Queue.Queue() + self._work_q_put = False + + + def destroy(self, timeout=None): + """ + Must be called before the Agent is deleted. + Frees up all resources and shuts down all background threads. + + @type timeout: float + @param timeout: maximum time in seconds to wait for all background threads to terminate. Default: forever. + """ + logging.debug("Destroying Agent %s" % self.name) + if self._conn: + self.remove_connection(timeout) + logging.debug("Agent Destroyed") + + + def get_name(self): + return self.name + + def set_connection(self, conn): + self._conn = conn + self._session = self._conn.session() + + my_addr = QmfAddress.direct(self.name, self._domain) + self._direct_receiver = self._session.receiver(str(my_addr) + + ";{create:always," + " node-properties:" + " {type:topic," + " x-properties:" + " {type:direct}}}", + capacity=self._capacity) + logging.debug("my direct addr=%s" % self._direct_receiver.source) + + locate_addr = QmfAddress.topic(AMQP_QMF_AGENT_LOCATE, self._domain) + self._locate_receiver = self._session.receiver(str(locate_addr) + + ";{create:always," + " node-properties:" + " {type:topic}}", + capacity=self._capacity) + logging.debug("agent.locate addr=%s" % self._locate_receiver.source) + + + ind_addr = QmfAddress.topic(AMQP_QMF_AGENT_INDICATION, self._domain) + self._ind_sender = self._session.sender(str(ind_addr) + + ";{create:always," + " node-properties:" + " {type:topic}}") + logging.debug("agent.ind addr=%s" % self._ind_sender.target) + + my_events = QmfAddress.topic(self.name, self._domain) + self._event_sender = self._session.sender(str(my_events) + + ";{create:always," + " node-properties:" + " {type:topic}}") + logging.debug("my event addr=%s" % self._event_sender.target) + + self._running = True + self.start() + + def remove_connection(self, timeout=None): + # tell connection thread to shutdown + self._running = False + if self.isAlive(): + # kick my thread to wake it up + my_addr = QmfAddress.direct(self.name, self._domain) + logging.debug("Making temp sender for [%s]" % str(my_addr)) + tmp_sender = self._session.sender(str(my_addr)) + try: + msg = Message(subject=makeSubject(OpCode.noop)) + tmp_sender.send( msg, sync=True ) + except SendError, e: + logging.error(str(e)) + logging.debug("waiting for agent receiver thread to exit") + self.join(timeout) + if self.isAlive(): + logging.error( "Agent thread '%s' is hung..." % self.name) + self._direct_receiver.close() + self._direct_receiver = None + self._locate_receiver.close() + self._locate_receiver = None + self._ind_sender.close() + self._ind_sender = None + self._event_sender.close() + self._event_sender = None + self._session.close() + self._session = None + self._conn = None + logging.debug("agent connection removal complete") + + def register_object_class(self, schema): + """ + Register an instance of a SchemaClass with this agent + """ + # @todo: need to update subscriptions + # @todo: need to mark schema as "non-const" + if not isinstance(schema, SchemaClass): + raise TypeError("SchemaClass instance expected") + + self._lock.acquire() + try: + classId = schema.get_class_id() + pname = classId.get_package_name() + cname = classId.get_class_name() + if pname not in self._packages: + self._packages[pname] = [cname] + else: + if cname not in self._packages[pname]: + self._packages[pname].append(cname) + self._schema[classId] = schema + self._schema_timestamp = long(time.time() * 1000) + finally: + self._lock.release() + + def register_event_class(self, schema): + return self.register_object_class(schema) + + def raise_event(self, qmfEvent): + """ + TBD + """ + if not self._event_sender: + raise Exception("No connection available") + + # @todo: should we validate against the schema? + _map = {"_name": self.get_name(), + "_event": qmfEvent.map_encode()} + msg = Message(subject=makeSubject(OpCode.event_ind), + properties={"method":"response"}, + content={MsgKey.event:_map}) + self._event_sender.send(msg) + + def add_object(self, data ): + """ + Register an instance of a QmfAgentData object. + """ + # @todo: need to update subscriptions + # @todo: need to mark schema as "non-const" + if not isinstance(data, QmfAgentData): + raise TypeError("QmfAgentData instance expected") + + id_ = data.get_object_id() + if not id_: + raise TypeError("No identifier assigned to QmfAgentData!") + + self._lock.acquire() + try: + self._agent_data[id_] = data + finally: + self._lock.release() + + def get_object(self, id): + self._lock.acquire() + try: + data = self._agent_data.get(id) + finally: + self._lock.release() + return data + + + def method_response(self, handle, _out_args=None, _error=None): + """ + """ + if not isinstance(handle, _MethodCallHandle): + raise TypeError("Invalid handle passed to method_response!") + + _map = {SchemaMethod.KEY_NAME:handle.meth_name} + if handle.oid is not None: + _map[QmfData.KEY_OBJECT_ID] = handle.oid + if _out_args is not None: + _map[SchemaMethod.KEY_ARGUMENTS] = _out_args.copy() + if _error is not None: + if not isinstance(_error, QmfData): + raise TypeError("Invalid type for error - must be QmfData") + _map[SchemaMethod.KEY_ERROR] = _error.map_encode() + + msg = Message(subject=makeSubject(OpCode.response), + properties={"method":"response"}, + content={MsgKey.method:_map}) + msg.correlation_id = handle.correlation_id + + try: + tmp_snd = self._session.sender( handle.reply_to ) + tmp_snd.send(msg) + logging.debug("method-response sent to [%s]" % handle.reply_to) + except SendError, e: + logging.error("Failed to send method response msg '%s' (%s)" % (msg, str(e))) + + def get_workitem_count(self): + """ + Returns the count of pending WorkItems that can be retrieved. + """ + return self._work_q.qsize() + + def get_next_workitem(self, timeout=None): + """ + Obtains the next pending work item, or None if none available. + """ + try: + wi = self._work_q.get(True, timeout) + except Queue.Empty: + return None + return wi + + def release_workitem(self, wi): + """ + Releases a WorkItem instance obtained by getNextWorkItem(). Called when + the application has finished processing the WorkItem. + """ + pass + + + def run(self): + global _callback_thread + next_heartbeat = datetime.datetime.utcnow() + batch_limit = 10 # a guess + while self._running: + + now = datetime.datetime.utcnow() + # print("now=%s next_heartbeat=%s" % (now, next_heartbeat)) + if now >= next_heartbeat: + self._ind_sender.send(self._makeAgentIndMsg()) + logging.debug("Agent Indication Sent") + next_heartbeat = now + datetime.timedelta(seconds = self._heartbeat_interval) + + timeout = ((next_heartbeat - now) + datetime.timedelta(microseconds=999999)).seconds + # print("now=%s next_heartbeat=%s timeout=%s" % (now, next_heartbeat, timeout)) + try: + self._session.next_receiver(timeout=timeout) + except Empty: + continue + + for i in range(batch_limit): + try: + msg = self._locate_receiver.fetch(timeout=0) + except Empty: + break + if msg and msg.content_type == "amqp/map": + self._dispatch(msg, _direct=False) + + for i in range(batch_limit): + try: + msg = self._direct_receiver.fetch(timeout=0) + except Empty: + break + if msg and msg.content_type == "amqp/map": + self._dispatch(msg, _direct=True) + + if self._work_q_put and self._notifier: + # new stuff on work queue, kick the the application... + self._work_q_put = False + _callback_thread = currentThread() + logging.info("Calling agent notifier.indication") + self._notifier.indication() + _callback_thread = None + + # + # Private: + # + + def _makeAgentIndMsg(self): + """ + Create an agent indication message identifying this agent + """ + _map = {"_name": self.get_name(), + "_schema_timestamp": self._schema_timestamp} + return Message( subject=makeSubject(OpCode.agent_ind), + properties={"method":"response"}, + content={MsgKey.agent_info: _map}) + + + def _dispatch(self, msg, _direct=False): + """ + Process a message from a console. + + @param _direct: True if msg directly addressed to this agent. + """ + logging.debug( "Message received from Console! [%s]" % msg ) + try: + version,opcode = parseSubject(msg.subject) + except: + logging.debug("Ignoring unrecognized message '%s'" % msg.subject) + return + + cmap = {}; props={} + if msg.content_type == "amqp/map": + cmap = msg.content + if msg.properties: + props = msg.properties + + if opcode == OpCode.agent_locate: + self._handleAgentLocateMsg( msg, cmap, props, version, _direct ) + elif opcode == OpCode.get_query: + self._handleQueryMsg( msg, cmap, props, version, _direct ) + elif opcode == OpCode.method_req: + self._handleMethodReqMsg(msg, cmap, props, version, _direct) + elif opcode == OpCode.cancel_subscription: + logging.warning("!!! CANCEL_SUB TBD !!!") + elif opcode == OpCode.create_subscription: + logging.warning("!!! CREATE_SUB TBD !!!") + elif opcode == OpCode.renew_subscription: + logging.warning("!!! RENEW_SUB TBD !!!") + elif opcode == OpCode.schema_query: + logging.warning("!!! SCHEMA_QUERY TBD !!!") + elif opcode == OpCode.noop: + logging.debug("No-op msg received.") + else: + logging.warning("Ignoring message with unrecognized 'opcode' value: '%s'" + % opcode) + + def _handleAgentLocateMsg( self, msg, cmap, props, version, direct ): + """ + Process a received agent-locate message + """ + logging.debug("_handleAgentLocateMsg") + + reply = True + if "method" in props and props["method"] == "request": + query = cmap.get(MsgKey.query) + if query is not None: + # fake a QmfData containing my identifier for the query compare + tmpData = QmfData(_values={QmfQuery.KEY_AGENT_NAME: self.get_name()}) + reply = QmfQuery(query).evaluate(tmpData) + + if reply: + try: + tmp_snd = self._session.sender( msg.reply_to ) + m = self._makeAgentIndMsg() + m.correlation_id = msg.correlation_id + tmp_snd.send(m) + logging.debug("agent-ind sent to [%s]" % msg.reply_to) + except SendError, e: + logging.error("Failed to send reply to agent-ind msg '%s' (%s)" % (msg, str(e))) + else: + logging.debug("agent-locate msg not mine - no reply sent") + + + def _handleQueryMsg(self, msg, cmap, props, version, _direct ): + """ + Handle received query message + """ + logging.debug("_handleQueryMsg") + + if "method" in props and props["method"] == "request": + qmap = cmap.get(MsgKey.query) + if qmap: + query = QmfQuery.from_map(qmap) + target = query.get_target() + if target == QmfQuery.TARGET_PACKAGES: + self._queryPackages( msg, query ) + elif target == QmfQuery.TARGET_SCHEMA_ID: + self._querySchema( msg, query, _idOnly=True ) + elif target == QmfQuery.TARGET_SCHEMA: + self._querySchema( msg, query) + elif target == QmfQuery.TARGET_AGENT: + logging.warning("!!! @todo: Query TARGET=AGENT TBD !!!") + elif target == QmfQuery.TARGET_OBJECT_ID: + self._queryData(msg, query, _idOnly=True) + elif target == QmfQuery.TARGET_OBJECT: + self._queryData(msg, query) + else: + logging.warning("Unrecognized query target: '%s'" % str(target)) + + + + def _handleMethodReqMsg(self, msg, cmap, props, version, _direct): + """ + Process received Method Request + """ + if "method" in props and props["method"] == "request": + mname = cmap.get(SchemaMethod.KEY_NAME) + if not mname: + logging.warning("Invalid method call from '%s': no name" + % msg.reply_to) + return + + in_args = cmap.get(SchemaMethod.KEY_ARGUMENTS) + oid = cmap.get(QmfData.KEY_OBJECT_ID) + + print("!!! ci=%s rt=%s mn=%s oid=%s" % + (msg.correlation_id, + msg.reply_to, + mname, + oid)) + + handle = _MethodCallHandle(msg.correlation_id, + msg.reply_to, + mname, + oid) + param = MethodCallParams( mname, oid, in_args, msg.user_id) + self._work_q.put(WorkItem(WorkItem.METHOD_CALL, handle, param)) + self._work_q_put = True + + def _queryPackages(self, msg, query): + """ + Run a query against the list of known packages + """ + pnames = [] + self._lock.acquire() + try: + for name in self._packages.iterkeys(): + if query.evaluate(QmfData.create({SchemaClassId.KEY_PACKAGE:name})): + pnames.append(name) + finally: + self._lock.release() + + try: + tmp_snd = self._session.sender( msg.reply_to ) + m = Message( subject=makeSubject(OpCode.data_ind), + properties={"method":"response"}, + content={MsgKey.package_info: pnames} ) + if msg.correlation_id != None: + m.correlation_id = msg.correlation_id + tmp_snd.send(m) + logging.debug("package_info sent to [%s]" % msg.reply_to) + except SendError, e: + logging.error("Failed to send reply to query msg '%s' (%s)" % (msg, str(e))) + + + def _querySchema( self, msg, query, _idOnly=False ): + """ + """ + schemas = [] + # if querying for a specific schema, do a direct lookup + if query.get_selector() == QmfQuery.ID: + found = None + self._lock.acquire() + try: + found = self._schema.get(query.get_id()) + finally: + self._lock.release() + if found: + if _idOnly: + schemas.append(query.get_id().map_encode()) + else: + schemas.append(found.map_encode()) + else: # otherwise, evaluate all schema + self._lock.acquire() + try: + for sid,val in self._schema.iteritems(): + if query.evaluate(val): + if _idOnly: + schemas.append(sid.map_encode()) + else: + schemas.append(val.map_encode()) + finally: + self._lock.release() + + + tmp_snd = self._session.sender( msg.reply_to ) + + if _idOnly: + content = {MsgKey.schema_id: schemas} + else: + content = {MsgKey.schema:schemas} + + m = Message( subject=makeSubject(OpCode.data_ind), + properties={"method":"response"}, + content=content ) + if msg.correlation_id != None: + m.correlation_id = msg.correlation_id + try: + tmp_snd.send(m) + logging.debug("schema_id sent to [%s]" % msg.reply_to) + except SendError, e: + logging.error("Failed to send reply to query msg '%s' (%s)" % (msg, str(e))) + + + def _queryData( self, msg, query, _idOnly=False ): + """ + """ + data_objs = [] + # if querying for a specific object, do a direct lookup + if query.get_selector() == QmfQuery.ID: + found = None + self._lock.acquire() + try: + found = self._agent_data.get(query.get_id()) + finally: + self._lock.release() + if found: + if _idOnly: + data_objs.append(query.get_id()) + else: + data_objs.append(found.map_encode()) + else: # otherwise, evaluate all data + self._lock.acquire() + try: + for oid,val in self._agent_data.iteritems(): + if query.evaluate(val): + if _idOnly: + data_objs.append(oid) + else: + data_objs.append(val.map_encode()) + finally: + self._lock.release() + + tmp_snd = self._session.sender( msg.reply_to ) + + if _idOnly: + content = {MsgKey.object_id:data_objs} + else: + content = {MsgKey.data_obj:data_objs} + + m = Message( subject=makeSubject(OpCode.data_ind), + properties={"method":"response"}, + content=content ) + if msg.correlation_id != None: + m.correlation_id = msg.correlation_id + try: + tmp_snd.send(m) + logging.debug("data reply sent to [%s]" % msg.reply_to) + except SendError, e: + logging.error("Failed to send reply to query msg '%s' (%s)" % (msg, str(e))) + + + ##============================================================================== + ## DATA MODEL + ##============================================================================== + + +class QmfAgentData(QmfData): + """ + A managed data object that is owned by an agent. + """ + + def __init__(self, agent, _values={}, _subtypes={}, _tag=None, _object_id=None, + _schema=None): + # timestamp in millisec since epoch UTC + ctime = long(time.time() * 1000) + super(QmfAgentData, self).__init__(_values=_values, _subtypes=_subtypes, + _tag=_tag, _ctime=ctime, + _utime=ctime, _object_id=_object_id, + _schema=_schema, _const=False) + self._agent = agent + + def destroy(self): + self._dtime = long(time.time() * 1000) + # @todo: publish change + + def is_deleted(self): + return self._dtime == 0 + + def set_value(self, _name, _value, _subType=None): + super(QmfAgentData, self).set_value(_name, _value, _subType) + # @todo: publish change + + def inc_value(self, name, delta=1): + """ add the delta to the property """ + # @todo: need to take write-lock + val = self.get_value(name) + try: + val += delta + except: + raise + self.set_value(name, val) + + def dec_value(self, name, delta=1): + """ subtract the delta from the property """ + # @todo: need to take write-lock + logging.error(" TBD!!!") + + +################################################################################ +################################################################################ +################################################################################ +################################################################################ + +if __name__ == '__main__': + # static test cases - no message passing, just exercise API + from common import (AgentName, SchemaProperty, qmfTypes, + SchemaMethod, SchemaEventClass) + + logging.getLogger().setLevel(logging.INFO) + + logging.info( "Create an Agent" ) + _agent_name = AgentName("redhat.com", "agent", "tross") + _agent = Agent(str(_agent_name)) + + logging.info( "Get agent name: '%s'" % _agent.get_name()) + + logging.info( "Create SchemaObjectClass" ) + + _schema = SchemaObjectClass(SchemaClassId("MyPackage", "MyClass"), + _desc="A test data schema", + _object_id_names=["index1", "index2"]) + # add properties + _schema.add_property("index1", SchemaProperty(qmfTypes.TYPE_UINT8)) + _schema.add_property("index2", SchemaProperty(qmfTypes.TYPE_LSTR)) + + # these two properties are statistics + _schema.add_property("query_count", SchemaProperty(qmfTypes.TYPE_UINT32)) + _schema.add_property("method_call_count", SchemaProperty(qmfTypes.TYPE_UINT32)) + # These two properties can be set via the method call + _schema.add_property("set_string", SchemaProperty(qmfTypes.TYPE_LSTR)) + _schema.add_property("set_int", SchemaProperty(qmfTypes.TYPE_UINT32)) + + # add method + _meth = SchemaMethod(_desc="Method to set string and int in object." ) + _meth.add_argument( "arg_int", SchemaProperty(qmfTypes.TYPE_UINT32) ) + _meth.add_argument( "arg_str", SchemaProperty(qmfTypes.TYPE_LSTR) ) + _schema.add_method( "set_meth", _meth ) + + # Add schema to Agent + + print("Schema Map='%s'" % str(_schema.map_encode())) + + _agent.register_object_class(_schema) + + # instantiate managed data objects matching the schema + + logging.info( "Create QmfAgentData" ) + + _obj = QmfAgentData( _agent, _schema=_schema ) + _obj.set_value("index1", 100) + _obj.set_value("index2", "a name" ) + _obj.set_value("set_string", "UNSET") + _obj.set_value("set_int", 0) + _obj.set_value("query_count", 0) + _obj.set_value("method_call_count", 0) + + print("Obj1 Map='%s'" % str(_obj.map_encode())) + + _agent.add_object( _obj ) + + _obj = QmfAgentData( _agent, + _values={"index1":99, + "index2": "another name", + "set_string": "UNSET", + "set_int": 0, + "query_count": 0, + "method_call_count": 0}, + _schema=_schema) + + print("Obj2 Map='%s'" % str(_obj.map_encode())) + + _agent.add_object(_obj) + + ############## + + + + logging.info( "Create SchemaEventClass" ) + + _event = SchemaEventClass(SchemaClassId("MyPackage", "MyEvent", + stype=SchemaClassId.TYPE_EVENT), + _desc="A test data schema", + _props={"edata_1": SchemaProperty(qmfTypes.TYPE_UINT32)}) + _event.add_property("edata_2", SchemaProperty(qmfTypes.TYPE_LSTR)) + + print("Event Map='%s'" % str(_event.map_encode())) + + _agent.register_event_class(_event) |