From fadd169527df40c715e549f538d256fc23bab3da Mon Sep 17 00:00:00 2001 From: Kenneth Anthony Giusti Date: Mon, 25 Jan 2010 16:25:05 +0000 Subject: Move the QMFv2 implementation to its own directory. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qmfv2@902858 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/python/qmf2/agent.py | 758 +++++++++++ qpid/python/qmf2/common.py | 1881 +++++++++++++++++++++++++++ qpid/python/qmf2/console.py | 1959 +++++++++++++++++++++++++++++ qpid/python/qmf2/tests/__init__.py | 22 + qpid/python/qmf2/tests/agent_discovery.py | 320 +++++ qpid/python/qmf2/tests/agent_test.py | 167 +++ qpid/python/qmf2/tests/basic_method.py | 348 +++++ qpid/python/qmf2/tests/basic_query.py | 336 +++++ qpid/python/qmf2/tests/console_test.py | 175 +++ qpid/python/qmf2/tests/events.py | 193 +++ qpid/python/qmf2/tests/obj_gets.py | 399 ++++++ 11 files changed, 6558 insertions(+) create mode 100644 qpid/python/qmf2/agent.py create mode 100644 qpid/python/qmf2/common.py create mode 100644 qpid/python/qmf2/console.py create mode 100644 qpid/python/qmf2/tests/__init__.py create mode 100644 qpid/python/qmf2/tests/agent_discovery.py create mode 100644 qpid/python/qmf2/tests/agent_test.py create mode 100644 qpid/python/qmf2/tests/basic_method.py create mode 100644 qpid/python/qmf2/tests/basic_query.py create mode 100644 qpid/python/qmf2/tests/console_test.py create mode 100644 qpid/python/qmf2/tests/events.py create mode 100644 qpid/python/qmf2/tests/obj_gets.py diff --git a/qpid/python/qmf2/agent.py b/qpid/python/qmf2/agent.py new file mode 100644 index 0000000000..c6a518ca31 --- /dev/null +++ b/qpid/python/qmf2/agent.py @@ -0,0 +1,758 @@ + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import sys +import logging +import datetime +import time +import Queue +from threading import Thread, Lock, currentThread +from qpid.messaging import Connection, Message, Empty, SendError +from uuid import uuid4 +from common import (AMQP_QMF_AGENT_LOCATE, AMQP_QMF_AGENT_INDICATION, + makeSubject, parseSubject, OpCode, QmfQuery, + SchemaObjectClass, MsgKey, QmfData, QmfAddress, + SchemaClass, SchemaClassId, WorkItem, SchemaMethod) + +# global flag that indicates which thread (if any) is +# running the agent notifier callback +_callback_thread=None + + ##============================================================================== + ## METHOD CALL + ##============================================================================== + +class _MethodCallHandle(object): + """ + Private class used to hold context when handing off a method call to the + application. Given to the app in a WorkItem, provided to the agent when + method_response() is invoked. + """ + def __init__(self, correlation_id, reply_to, meth_name, _oid=None): + self.correlation_id = correlation_id + self.reply_to = reply_to + self.meth_name = meth_name + self.oid = _oid + +class MethodCallParams(object): + """ + """ + def __init__(self, name, _oid=None, _in_args=None, _user_id=None): + self._meth_name = name + self._oid = _oid + self._in_args = _in_args + self._user_id = _user_id + + def get_name(self): + return self._meth_name + + def get_object_id(self): + return self._oid + + def get_args(self): + return self._in_args + + def get_user_id(self): + return self._user_id + + + + ##============================================================================== + ## AGENT + ##============================================================================== + +class Agent(Thread): + def __init__(self, name, _domain=None, _notifier=None, _heartbeat_interval=30, + _max_msg_size=0, _capacity=10): + Thread.__init__(self) + self._running = False + + self.name = str(name) + self._domain = _domain + self._notifier = _notifier + self._heartbeat_interval = _heartbeat_interval + self._max_msg_size = _max_msg_size + self._capacity = _capacity + + self._conn = None + self._session = None + self._direct_receiver = None + self._locate_receiver = None + self._ind_sender = None + self._event_sender = None + + self._lock = Lock() + self._packages = {} + self._schema_timestamp = long(0) + self._schema = {} + self._agent_data = {} + self._work_q = Queue.Queue() + self._work_q_put = False + + + def destroy(self, timeout=None): + """ + Must be called before the Agent is deleted. + Frees up all resources and shuts down all background threads. + + @type timeout: float + @param timeout: maximum time in seconds to wait for all background threads to terminate. Default: forever. + """ + logging.debug("Destroying Agent %s" % self.name) + if self._conn: + self.remove_connection(timeout) + logging.debug("Agent Destroyed") + + + def get_name(self): + return self.name + + def set_connection(self, conn): + self._conn = conn + self._session = self._conn.session() + + my_addr = QmfAddress.direct(self.name, self._domain) + self._direct_receiver = self._session.receiver(str(my_addr) + + ";{create:always," + " node-properties:" + " {type:topic," + " x-properties:" + " {type:direct}}}", + capacity=self._capacity) + logging.debug("my direct addr=%s" % self._direct_receiver.source) + + locate_addr = QmfAddress.topic(AMQP_QMF_AGENT_LOCATE, self._domain) + self._locate_receiver = self._session.receiver(str(locate_addr) + + ";{create:always," + " node-properties:" + " {type:topic}}", + capacity=self._capacity) + logging.debug("agent.locate addr=%s" % self._locate_receiver.source) + + + ind_addr = QmfAddress.topic(AMQP_QMF_AGENT_INDICATION, self._domain) + self._ind_sender = self._session.sender(str(ind_addr) + + ";{create:always," + " node-properties:" + " {type:topic}}") + logging.debug("agent.ind addr=%s" % self._ind_sender.target) + + my_events = QmfAddress.topic(self.name, self._domain) + self._event_sender = self._session.sender(str(my_events) + + ";{create:always," + " node-properties:" + " {type:topic}}") + logging.debug("my event addr=%s" % self._event_sender.target) + + self._running = True + self.start() + + def remove_connection(self, timeout=None): + # tell connection thread to shutdown + self._running = False + if self.isAlive(): + # kick my thread to wake it up + my_addr = QmfAddress.direct(self.name, self._domain) + logging.debug("Making temp sender for [%s]" % str(my_addr)) + tmp_sender = self._session.sender(str(my_addr)) + try: + msg = Message(subject=makeSubject(OpCode.noop)) + tmp_sender.send( msg, sync=True ) + except SendError, e: + logging.error(str(e)) + logging.debug("waiting for agent receiver thread to exit") + self.join(timeout) + if self.isAlive(): + logging.error( "Agent thread '%s' is hung..." % self.name) + self._direct_receiver.close() + self._direct_receiver = None + self._locate_receiver.close() + self._locate_receiver = None + self._ind_sender.close() + self._ind_sender = None + self._event_sender.close() + self._event_sender = None + self._session.close() + self._session = None + self._conn = None + logging.debug("agent connection removal complete") + + def register_object_class(self, schema): + """ + Register an instance of a SchemaClass with this agent + """ + # @todo: need to update subscriptions + # @todo: need to mark schema as "non-const" + if not isinstance(schema, SchemaClass): + raise TypeError("SchemaClass instance expected") + + self._lock.acquire() + try: + classId = schema.get_class_id() + pname = classId.get_package_name() + cname = classId.get_class_name() + if pname not in self._packages: + self._packages[pname] = [cname] + else: + if cname not in self._packages[pname]: + self._packages[pname].append(cname) + self._schema[classId] = schema + self._schema_timestamp = long(time.time() * 1000) + finally: + self._lock.release() + + def register_event_class(self, schema): + return self.register_object_class(schema) + + def raise_event(self, qmfEvent): + """ + TBD + """ + if not self._event_sender: + raise Exception("No connection available") + + # @todo: should we validate against the schema? + _map = {"_name": self.get_name(), + "_event": qmfEvent.map_encode()} + msg = Message(subject=makeSubject(OpCode.event_ind), + properties={"method":"response"}, + content={MsgKey.event:_map}) + self._event_sender.send(msg) + + def add_object(self, data ): + """ + Register an instance of a QmfAgentData object. + """ + # @todo: need to update subscriptions + # @todo: need to mark schema as "non-const" + if not isinstance(data, QmfAgentData): + raise TypeError("QmfAgentData instance expected") + + id_ = data.get_object_id() + if not id_: + raise TypeError("No identifier assigned to QmfAgentData!") + + self._lock.acquire() + try: + self._agent_data[id_] = data + finally: + self._lock.release() + + def get_object(self, id): + self._lock.acquire() + try: + data = self._agent_data.get(id) + finally: + self._lock.release() + return data + + + def method_response(self, handle, _out_args=None, _error=None): + """ + """ + if not isinstance(handle, _MethodCallHandle): + raise TypeError("Invalid handle passed to method_response!") + + _map = {SchemaMethod.KEY_NAME:handle.meth_name} + if handle.oid is not None: + _map[QmfData.KEY_OBJECT_ID] = handle.oid + if _out_args is not None: + _map[SchemaMethod.KEY_ARGUMENTS] = _out_args.copy() + if _error is not None: + if not isinstance(_error, QmfData): + raise TypeError("Invalid type for error - must be QmfData") + _map[SchemaMethod.KEY_ERROR] = _error.map_encode() + + msg = Message(subject=makeSubject(OpCode.response), + properties={"method":"response"}, + content={MsgKey.method:_map}) + msg.correlation_id = handle.correlation_id + + try: + tmp_snd = self._session.sender( handle.reply_to ) + tmp_snd.send(msg) + logging.debug("method-response sent to [%s]" % handle.reply_to) + except SendError, e: + logging.error("Failed to send method response msg '%s' (%s)" % (msg, str(e))) + + def get_workitem_count(self): + """ + Returns the count of pending WorkItems that can be retrieved. + """ + return self._work_q.qsize() + + def get_next_workitem(self, timeout=None): + """ + Obtains the next pending work item, or None if none available. + """ + try: + wi = self._work_q.get(True, timeout) + except Queue.Empty: + return None + return wi + + def release_workitem(self, wi): + """ + Releases a WorkItem instance obtained by getNextWorkItem(). Called when + the application has finished processing the WorkItem. + """ + pass + + + def run(self): + global _callback_thread + next_heartbeat = datetime.datetime.utcnow() + batch_limit = 10 # a guess + while self._running: + + now = datetime.datetime.utcnow() + # print("now=%s next_heartbeat=%s" % (now, next_heartbeat)) + if now >= next_heartbeat: + self._ind_sender.send(self._makeAgentIndMsg()) + logging.debug("Agent Indication Sent") + next_heartbeat = now + datetime.timedelta(seconds = self._heartbeat_interval) + + timeout = ((next_heartbeat - now) + datetime.timedelta(microseconds=999999)).seconds + # print("now=%s next_heartbeat=%s timeout=%s" % (now, next_heartbeat, timeout)) + try: + self._session.next_receiver(timeout=timeout) + except Empty: + continue + + for i in range(batch_limit): + try: + msg = self._locate_receiver.fetch(timeout=0) + except Empty: + break + if msg and msg.content_type == "amqp/map": + self._dispatch(msg, _direct=False) + + for i in range(batch_limit): + try: + msg = self._direct_receiver.fetch(timeout=0) + except Empty: + break + if msg and msg.content_type == "amqp/map": + self._dispatch(msg, _direct=True) + + if self._work_q_put and self._notifier: + # new stuff on work queue, kick the the application... + self._work_q_put = False + _callback_thread = currentThread() + logging.info("Calling agent notifier.indication") + self._notifier.indication() + _callback_thread = None + + # + # Private: + # + + def _makeAgentIndMsg(self): + """ + Create an agent indication message identifying this agent + """ + _map = {"_name": self.get_name(), + "_schema_timestamp": self._schema_timestamp} + return Message( subject=makeSubject(OpCode.agent_ind), + properties={"method":"response"}, + content={MsgKey.agent_info: _map}) + + + def _dispatch(self, msg, _direct=False): + """ + Process a message from a console. + + @param _direct: True if msg directly addressed to this agent. + """ + logging.debug( "Message received from Console! [%s]" % msg ) + try: + version,opcode = parseSubject(msg.subject) + except: + logging.debug("Ignoring unrecognized message '%s'" % msg.subject) + return + + cmap = {}; props={} + if msg.content_type == "amqp/map": + cmap = msg.content + if msg.properties: + props = msg.properties + + if opcode == OpCode.agent_locate: + self._handleAgentLocateMsg( msg, cmap, props, version, _direct ) + elif opcode == OpCode.get_query: + self._handleQueryMsg( msg, cmap, props, version, _direct ) + elif opcode == OpCode.method_req: + self._handleMethodReqMsg(msg, cmap, props, version, _direct) + elif opcode == OpCode.cancel_subscription: + logging.warning("!!! CANCEL_SUB TBD !!!") + elif opcode == OpCode.create_subscription: + logging.warning("!!! CREATE_SUB TBD !!!") + elif opcode == OpCode.renew_subscription: + logging.warning("!!! RENEW_SUB TBD !!!") + elif opcode == OpCode.schema_query: + logging.warning("!!! SCHEMA_QUERY TBD !!!") + elif opcode == OpCode.noop: + logging.debug("No-op msg received.") + else: + logging.warning("Ignoring message with unrecognized 'opcode' value: '%s'" + % opcode) + + def _handleAgentLocateMsg( self, msg, cmap, props, version, direct ): + """ + Process a received agent-locate message + """ + logging.debug("_handleAgentLocateMsg") + + reply = True + if "method" in props and props["method"] == "request": + query = cmap.get(MsgKey.query) + if query is not None: + # fake a QmfData containing my identifier for the query compare + tmpData = QmfData(_values={QmfQuery.KEY_AGENT_NAME: self.get_name()}) + reply = QmfQuery(query).evaluate(tmpData) + + if reply: + try: + tmp_snd = self._session.sender( msg.reply_to ) + m = self._makeAgentIndMsg() + m.correlation_id = msg.correlation_id + tmp_snd.send(m) + logging.debug("agent-ind sent to [%s]" % msg.reply_to) + except SendError, e: + logging.error("Failed to send reply to agent-ind msg '%s' (%s)" % (msg, str(e))) + else: + logging.debug("agent-locate msg not mine - no reply sent") + + + def _handleQueryMsg(self, msg, cmap, props, version, _direct ): + """ + Handle received query message + """ + logging.debug("_handleQueryMsg") + + if "method" in props and props["method"] == "request": + qmap = cmap.get(MsgKey.query) + if qmap: + query = QmfQuery.from_map(qmap) + target = query.get_target() + if target == QmfQuery.TARGET_PACKAGES: + self._queryPackages( msg, query ) + elif target == QmfQuery.TARGET_SCHEMA_ID: + self._querySchema( msg, query, _idOnly=True ) + elif target == QmfQuery.TARGET_SCHEMA: + self._querySchema( msg, query) + elif target == QmfQuery.TARGET_AGENT: + logging.warning("!!! @todo: Query TARGET=AGENT TBD !!!") + elif target == QmfQuery.TARGET_OBJECT_ID: + self._queryData(msg, query, _idOnly=True) + elif target == QmfQuery.TARGET_OBJECT: + self._queryData(msg, query) + else: + logging.warning("Unrecognized query target: '%s'" % str(target)) + + + + def _handleMethodReqMsg(self, msg, cmap, props, version, _direct): + """ + Process received Method Request + """ + if "method" in props and props["method"] == "request": + mname = cmap.get(SchemaMethod.KEY_NAME) + if not mname: + logging.warning("Invalid method call from '%s': no name" + % msg.reply_to) + return + + in_args = cmap.get(SchemaMethod.KEY_ARGUMENTS) + oid = cmap.get(QmfData.KEY_OBJECT_ID) + + print("!!! ci=%s rt=%s mn=%s oid=%s" % + (msg.correlation_id, + msg.reply_to, + mname, + oid)) + + handle = _MethodCallHandle(msg.correlation_id, + msg.reply_to, + mname, + oid) + param = MethodCallParams( mname, oid, in_args, msg.user_id) + self._work_q.put(WorkItem(WorkItem.METHOD_CALL, handle, param)) + self._work_q_put = True + + def _queryPackages(self, msg, query): + """ + Run a query against the list of known packages + """ + pnames = [] + self._lock.acquire() + try: + for name in self._packages.iterkeys(): + if query.evaluate(QmfData.create({SchemaClassId.KEY_PACKAGE:name})): + pnames.append(name) + finally: + self._lock.release() + + try: + tmp_snd = self._session.sender( msg.reply_to ) + m = Message( subject=makeSubject(OpCode.data_ind), + properties={"method":"response"}, + content={MsgKey.package_info: pnames} ) + if msg.correlation_id != None: + m.correlation_id = msg.correlation_id + tmp_snd.send(m) + logging.debug("package_info sent to [%s]" % msg.reply_to) + except SendError, e: + logging.error("Failed to send reply to query msg '%s' (%s)" % (msg, str(e))) + + + def _querySchema( self, msg, query, _idOnly=False ): + """ + """ + schemas = [] + # if querying for a specific schema, do a direct lookup + if query.get_selector() == QmfQuery.ID: + found = None + self._lock.acquire() + try: + found = self._schema.get(query.get_id()) + finally: + self._lock.release() + if found: + if _idOnly: + schemas.append(query.get_id().map_encode()) + else: + schemas.append(found.map_encode()) + else: # otherwise, evaluate all schema + self._lock.acquire() + try: + for sid,val in self._schema.iteritems(): + if query.evaluate(val): + if _idOnly: + schemas.append(sid.map_encode()) + else: + schemas.append(val.map_encode()) + finally: + self._lock.release() + + + tmp_snd = self._session.sender( msg.reply_to ) + + if _idOnly: + content = {MsgKey.schema_id: schemas} + else: + content = {MsgKey.schema:schemas} + + m = Message( subject=makeSubject(OpCode.data_ind), + properties={"method":"response"}, + content=content ) + if msg.correlation_id != None: + m.correlation_id = msg.correlation_id + try: + tmp_snd.send(m) + logging.debug("schema_id sent to [%s]" % msg.reply_to) + except SendError, e: + logging.error("Failed to send reply to query msg '%s' (%s)" % (msg, str(e))) + + + def _queryData( self, msg, query, _idOnly=False ): + """ + """ + data_objs = [] + # if querying for a specific object, do a direct lookup + if query.get_selector() == QmfQuery.ID: + found = None + self._lock.acquire() + try: + found = self._agent_data.get(query.get_id()) + finally: + self._lock.release() + if found: + if _idOnly: + data_objs.append(query.get_id()) + else: + data_objs.append(found.map_encode()) + else: # otherwise, evaluate all data + self._lock.acquire() + try: + for oid,val in self._agent_data.iteritems(): + if query.evaluate(val): + if _idOnly: + data_objs.append(oid) + else: + data_objs.append(val.map_encode()) + finally: + self._lock.release() + + tmp_snd = self._session.sender( msg.reply_to ) + + if _idOnly: + content = {MsgKey.object_id:data_objs} + else: + content = {MsgKey.data_obj:data_objs} + + m = Message( subject=makeSubject(OpCode.data_ind), + properties={"method":"response"}, + content=content ) + if msg.correlation_id != None: + m.correlation_id = msg.correlation_id + try: + tmp_snd.send(m) + logging.debug("data reply sent to [%s]" % msg.reply_to) + except SendError, e: + logging.error("Failed to send reply to query msg '%s' (%s)" % (msg, str(e))) + + + ##============================================================================== + ## DATA MODEL + ##============================================================================== + + +class QmfAgentData(QmfData): + """ + A managed data object that is owned by an agent. + """ + + def __init__(self, agent, _values={}, _subtypes={}, _tag=None, _object_id=None, + _schema=None): + # timestamp in millisec since epoch UTC + ctime = long(time.time() * 1000) + super(QmfAgentData, self).__init__(_values=_values, _subtypes=_subtypes, + _tag=_tag, _ctime=ctime, + _utime=ctime, _object_id=_object_id, + _schema=_schema, _const=False) + self._agent = agent + + def destroy(self): + self._dtime = long(time.time() * 1000) + # @todo: publish change + + def is_deleted(self): + return self._dtime == 0 + + def set_value(self, _name, _value, _subType=None): + super(QmfAgentData, self).set_value(_name, _value, _subType) + # @todo: publish change + + def inc_value(self, name, delta=1): + """ add the delta to the property """ + # @todo: need to take write-lock + val = self.get_value(name) + try: + val += delta + except: + raise + self.set_value(name, val) + + def dec_value(self, name, delta=1): + """ subtract the delta from the property """ + # @todo: need to take write-lock + logging.error(" TBD!!!") + + +################################################################################ +################################################################################ +################################################################################ +################################################################################ + +if __name__ == '__main__': + # static test cases - no message passing, just exercise API + from common import (AgentName, SchemaProperty, qmfTypes, + SchemaMethod, SchemaEventClass) + + logging.getLogger().setLevel(logging.INFO) + + logging.info( "Create an Agent" ) + _agent_name = AgentName("redhat.com", "agent", "tross") + _agent = Agent(str(_agent_name)) + + logging.info( "Get agent name: '%s'" % _agent.get_name()) + + logging.info( "Create SchemaObjectClass" ) + + _schema = SchemaObjectClass(SchemaClassId("MyPackage", "MyClass"), + _desc="A test data schema", + _object_id_names=["index1", "index2"]) + # add properties + _schema.add_property("index1", SchemaProperty(qmfTypes.TYPE_UINT8)) + _schema.add_property("index2", SchemaProperty(qmfTypes.TYPE_LSTR)) + + # these two properties are statistics + _schema.add_property("query_count", SchemaProperty(qmfTypes.TYPE_UINT32)) + _schema.add_property("method_call_count", SchemaProperty(qmfTypes.TYPE_UINT32)) + # These two properties can be set via the method call + _schema.add_property("set_string", SchemaProperty(qmfTypes.TYPE_LSTR)) + _schema.add_property("set_int", SchemaProperty(qmfTypes.TYPE_UINT32)) + + # add method + _meth = SchemaMethod(_desc="Method to set string and int in object." ) + _meth.add_argument( "arg_int", SchemaProperty(qmfTypes.TYPE_UINT32) ) + _meth.add_argument( "arg_str", SchemaProperty(qmfTypes.TYPE_LSTR) ) + _schema.add_method( "set_meth", _meth ) + + # Add schema to Agent + + print("Schema Map='%s'" % str(_schema.map_encode())) + + _agent.register_object_class(_schema) + + # instantiate managed data objects matching the schema + + logging.info( "Create QmfAgentData" ) + + _obj = QmfAgentData( _agent, _schema=_schema ) + _obj.set_value("index1", 100) + _obj.set_value("index2", "a name" ) + _obj.set_value("set_string", "UNSET") + _obj.set_value("set_int", 0) + _obj.set_value("query_count", 0) + _obj.set_value("method_call_count", 0) + + print("Obj1 Map='%s'" % str(_obj.map_encode())) + + _agent.add_object( _obj ) + + _obj = QmfAgentData( _agent, + _values={"index1":99, + "index2": "another name", + "set_string": "UNSET", + "set_int": 0, + "query_count": 0, + "method_call_count": 0}, + _schema=_schema) + + print("Obj2 Map='%s'" % str(_obj.map_encode())) + + _agent.add_object(_obj) + + ############## + + + + logging.info( "Create SchemaEventClass" ) + + _event = SchemaEventClass(SchemaClassId("MyPackage", "MyEvent", + stype=SchemaClassId.TYPE_EVENT), + _desc="A test data schema", + _props={"edata_1": SchemaProperty(qmfTypes.TYPE_UINT32)}) + _event.add_property("edata_2", SchemaProperty(qmfTypes.TYPE_LSTR)) + + print("Event Map='%s'" % str(_event.map_encode())) + + _agent.register_event_class(_event) diff --git a/qpid/python/qmf2/common.py b/qpid/python/qmf2/common.py new file mode 100644 index 0000000000..061c9fbe78 --- /dev/null +++ b/qpid/python/qmf2/common.py @@ -0,0 +1,1881 @@ + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +import time +import logging +from threading import Lock +from threading import Condition +try: + import hashlib + _md5Obj = hashlib.md5 +except ImportError: + import md5 + _md5Obj = md5.new + + + + +## +## Constants +## + + +AMQP_QMF_AGENT_LOCATE = "agent.locate" +AMQP_QMF_AGENT_INDICATION = "agent.ind" +AMQP_QMF_AGENT_EVENT="agent.event" +# agent.ind[.] +# agent.event.. +# sev="strings" +# + +AMQP_QMF_SUBJECT = "qmf" +AMQP_QMF_VERSION = 4 +AMQP_QMF_SUBJECT_FMT = "%s%d.%s" + +class MsgKey(object): + agent_info = "agent_info" + query = "query" + package_info = "package_info" + schema_id = "schema_id" + schema = "schema" + object_id="object_id" + data_obj="object" + method="method" + event="event" + + +class OpCode(object): + noop = "noop" + + # codes sent by a console and processed by the agent + agent_locate = "agent-locate" + cancel_subscription = "cancel-subscription" + create_subscription = "create-subscription" + get_query = "get-query" + method_req = "method" + renew_subscription = "renew-subscription" + schema_query = "schema-query" # @todo: deprecate + + # codes sent by the agent to a console + agent_ind = "agent" + data_ind = "data" + event_ind = "event" + managed_object = "managed-object" + object_ind = "object" + response = "response" + schema_ind="schema" # @todo: deprecate + + + + +def makeSubject(_code): + """ + Create a message subject field value. + """ + return AMQP_QMF_SUBJECT_FMT % (AMQP_QMF_SUBJECT, AMQP_QMF_VERSION, _code) + + +def parseSubject(_sub): + """ + Deconstruct a subject field, return version,opcode values + """ + if _sub[:3] != "qmf": + raise Exception("Non-QMF message received") + + return _sub[3:].split('.', 1) + + +##============================================================================== +## Async Event Model +##============================================================================== + + +class Notifier(object): + """ + Virtual base class that defines a call back which alerts the application that + a QMF Console notification is pending. + """ + def indication(self): + """ + Called when one or more items are ready for the application to process. + This method may be called by an internal QMF library thread. Its purpose is to + indicate that the application should process pending work items. + """ + raise Exception("The indication method must be overridden by the application!") + + + +class WorkItem(object): + """ + Describes an event that has arrived for the application to process. The + Notifier is invoked when one or more of these WorkItems become available + for processing. + """ + # Enumeration of the types of WorkItems produced on the Console + AGENT_ADDED=1 + AGENT_DELETED=2 + NEW_PACKAGE=3 + NEW_CLASS=4 + OBJECT_UPDATE=5 + EVENT_RECEIVED=7 + AGENT_HEARTBEAT=8 + # Enumeration of the types of WorkItems produced on the Agent + METHOD_CALL=1000 + QUERY=1001 + SUBSCRIBE=1002 + UNSUBSCRIBE=1003 + + def __init__(self, kind, handle, _params=None): + """ + Used by the Console to create a work item. + + @type kind: int + @param kind: work item type + """ + self._kind = kind + self._handle = handle + self._params = _params + + def get_type(self): + return self._kind + + def get_handle(self): + return self._handle + + def get_params(self): + return self._params + + + +##============================================================================== +## Addressing +##============================================================================== + +class QmfAddress(object): + """ + TBD + """ + TYPE_DIRECT = "direct" + TYPE_TOPIC = "topic" + + ADDRESS_FMT = "qmf.%s.%s/%s" + DEFAULT_DOMAIN = "default" + + + def __init__(self, name, domain, type_): + self._name = name + self._domain = domain + self._type = type_ + + def _direct(cls, name, _domain=None): + if _domain is None: + _domain = QmfAddress.DEFAULT_DOMAIN + return cls(name, _domain, type_=QmfAddress.TYPE_DIRECT) + direct = classmethod(_direct) + + def _topic(cls, name, _domain=None): + if _domain is None: + _domain = QmfAddress.DEFAULT_DOMAIN + return cls(name, _domain, type_=QmfAddress.TYPE_TOPIC) + topic = classmethod(_topic) + + + def get_address(self): + return str(self) + + def __repr__(self): + return QmfAddress.ADDRESS_FMT % (self._domain, self._type, self._name) + + + + +class AgentName(object): + """ + Uniquely identifies a management agent within the management domain. + """ + _separator = ":" + + def __init__(self, vendor, product, name, _str=None): + """ + Note: this object must be immutable, as it is used to index into a dictionary + """ + if _str is not None: + # construct from string representation + if _str.count(AgentName._separator) < 2: + raise TypeError("AgentName string format must be 'vendor.product.name'") + self._vendor, self._product, self._name = _str.split(AgentName._separator) + else: + self._vendor = vendor + self._product = product + self._name = name + + + def _from_str(cls, str_): + return cls(None, None, None, str_=str_) + from_str = classmethod(_from_str) + + def vendor(self): + return self._vendor + + def product(self): + return self._product + + def name(self): + return self._name + + def __cmp__(self, other): + if not isinstance(other, AgentName) : + raise TypeError("Invalid types for compare") + # return 1 + me = str(self) + them = str(other) + + if me < them: + return -1 + if me > them: + return 1 + return 0 + + def __hash__(self): + return (self._vendor, self._product, self._name).__hash__() + + def __repr__(self): + return self._vendor + AgentName._separator + \ + self._product + AgentName._separator + \ + self._name + + + +##============================================================================== +## DATA MODEL +##============================================================================== + + +class _mapEncoder(object): + """ + virtual base class for all objects that support being converted to a map + """ + + def map_encode(self): + raise Exception("The map_encode method my be overridden.") + + +class QmfData(_mapEncoder): + """ + Base data class representing arbitrarily structure data. No schema or + managing agent is associated with data of this class. + + Map format: + map["_values"] = map of unordered "name"= pairs (optional) + map["_subtype"] = map of unordered "name"="subtype string" pairs (optional) + map["_tag"] = application-specific tag for this instance (optional) + """ + KEY_VALUES = "_values" + KEY_SUBTYPES = "_subtypes" + KEY_TAG="_tag" + KEY_OBJECT_ID = "_object_id" + KEY_SCHEMA_ID = "_schema_id" + KEY_UPDATE_TS = "_update_ts" + KEY_CREATE_TS = "_create_ts" + KEY_DELETE_TS = "_delete_ts" + + def __init__(self, + _values={}, _subtypes={}, _tag=None, _object_id=None, + _ctime = 0, _utime = 0, _dtime = 0, + _map=None, + _schema=None, _const=False): + """ + @type _values: dict + @param _values: dictionary of initial name=value pairs for object's + named data. + @type _subtypes: dict + @param _subtype: dictionary of subtype strings for each of the object's + named data. + @type _desc: string + @param _desc: Human-readable description of this data object. + @type _const: boolean + @param _const: if true, this object cannot be modified + """ + self._schema_id = None + if _map is not None: + # construct from map + _tag = _map.get(self.KEY_TAG, _tag) + _values = _map.get(self.KEY_VALUES, _values) + _subtypes = _map.get(self.KEY_SUBTYPES, _subtypes) + _object_id = _map.get(self.KEY_OBJECT_ID, _object_id) + sid = _map.get(self.KEY_SCHEMA_ID) + if sid: + self._schema_id = SchemaClassId(_map=sid) + _ctime = long(_map.get(self.KEY_CREATE_TS, _ctime)) + _utime = long(_map.get(self.KEY_UPDATE_TS, _utime)) + _dtime = long(_map.get(self.KEY_DELETE_TS, _dtime)) + + self._values = _values.copy() + self._subtypes = _subtypes.copy() + self._tag = _tag + self._ctime = _ctime + self._utime = _utime + self._dtime = _dtime + self._const = _const + + if _object_id is not None: + self._object_id = str(_object_id) + else: + self._object_id = None + + if _schema is not None: + self._set_schema(_schema) + else: + # careful: map constructor may have already set self._schema_id, do + # not override it! + self._schema = None + + def _create(cls, values, _subtypes={}, _tag=None, _object_id=None, + _schema=None, _const=False): + # timestamp in millisec since epoch UTC + ctime = long(time.time() * 1000) + return cls(_values=values, _subtypes=_subtypes, _tag=_tag, + _ctime=ctime, _utime=ctime, + _object_id=_object_id, _schema=_schema, _const=_const) + create = classmethod(_create) + + def __from_map(cls, map_, _schema=None, _const=False): + return cls(_map=map_, _schema=_schema, _const=_const) + from_map = classmethod(__from_map) + + def is_managed(self): + return self._object_id is not None + + def is_described(self): + return self._schema_id is not None + + def get_tag(self): + return self._tag + + def get_value(self, name): + # meta-properties: + if name == SchemaClassId.KEY_PACKAGE: + if self._schema_id: + return self._schema_id.get_package_name() + return None + if name == SchemaClassId.KEY_CLASS: + if self._schema_id: + return self._schema_id.get_class_name() + return None + if name == SchemaClassId.KEY_TYPE: + if self._schema_id: + return self._schema_id.get_type() + return None + if name == SchemaClassId.KEY_HASH: + if self._schema_id: + return self._schema_id.get_hash_string() + return None + if name == self.KEY_SCHEMA_ID: + return self._schema_id + if name == self.KEY_OBJECT_ID: + return self._object_id + if name == self.KEY_TAG: + return self._tag + if name == self.KEY_UPDATE_TS: + return self._utime + if name == self.KEY_CREATE_TS: + return self._ctime + if name == self.KEY_DELETE_TS: + return self._dtime + + return self._values.get(name) + + def has_value(self, name): + + if name in [SchemaClassId.KEY_PACKAGE, SchemaClassId.KEY_CLASS, + SchemaClassId.KEY_TYPE, SchemaClassId.KEY_HASH, + self.KEY_SCHEMA_ID]: + return self._schema_id is not None + if name in [self.KEY_UPDATE_TS, self.KEY_CREATE_TS, + self.KEY_DELETE_TS]: + return True + if name == self.KEY_OBJECT_ID: + return self._object_id is not None + if name == self.KEY_TAG: + return self._tag is not None + + return name in self._values + + def set_value(self, _name, _value, _subType=None): + if self._const: + raise Exception("cannot modify constant data object") + self._values[_name] = _value + if _subType: + self._subtypes[_name] = _subType + return _value + + def get_subtype(self, _name): + return self._subtypes.get(_name) + + def get_schema_class_id(self): + """ + @rtype: class SchemaClassId + @returns: the identifier of the Schema that describes the structure of the data. + """ + return self._schema_id + + def get_object_id(self): + """ + Get the instance's identification string. + @rtype: str + @returns: the identification string, or None if not assigned and id. + """ + if self._object_id: + return self._object_id + + # if object id not assigned, see if schema defines a set of field + # values to use as an id + if not self._schema: + return None + + ids = self._schema.get_id_names() + if not ids: + return None + + if not self._validated: + self._validate() + + result = u"" + for key in ids: + try: + result += unicode(self._values[key]) + except: + logging.error("get_object_id(): cannot convert value '%s'." + % key) + return None + self._object_id = result + return result + + def map_encode(self): + _map = {} + if self._tag: + _map[self.KEY_TAG] = self._tag + + # data in the _values map may require recursive map_encode() + vmap = {} + for name,val in self._values.iteritems(): + if isinstance(val, _mapEncoder): + vmap[name] = val.map_encode() + else: + # otherwise, just toss in the native type... + vmap[name] = val + + _map[self.KEY_VALUES] = vmap + # subtypes are never complex, so safe to just copy + _map[self.KEY_SUBTYPES] = self._subtypes.copy() + if self._object_id: + _map[self.KEY_OBJECT_ID] = self._object_id + if self._schema_id: + _map[self.KEY_SCHEMA_ID] = self._schema_id.map_encode() + return _map + + def _set_schema(self, schema): + self._validated = False + self._schema = schema + if schema: + self._schema_id = schema.get_class_id() + if self._const: + self._validate() + else: + self._schema_id = None + + def _validate(self): + """ + Compares this object's data against the associated schema. Throws an + exception if the data does not conform to the schema. + """ + props = self._schema.get_properties() + for name,val in props.iteritems(): + # @todo validate: type compatible with amqp_type? + # @todo validate: primary keys have values + if name not in self._values: + if val._isOptional: + # ok not to be present, put in dummy value + # to simplify access + self._values[name] = None + else: + raise Exception("Required property '%s' not present." % name) + self._validated = True + + def __repr__(self): + return "QmfData=<<" + str(self.map_encode()) + ">>" + + + def __setattr__(self, _name, _value): + # ignore private data members + if _name[0] == '_': + return super(QmfData, self).__setattr__(_name, _value) + if _name in self._values: + return self.set_value(_name, _value) + return super(QmfData, self).__setattr__(_name, _value) + + def __getattr__(self, _name): + if _name != "_values" and _name in self._values: + return self._values[_name] + raise AttributeError("no value named '%s' in this object" % _name) + + def __getitem__(self, _name): + return self.__getattr__(_name) + + def __setitem__(self, _name, _value): + return self.__setattr__(_name, _value) + + + +class QmfEvent(QmfData): + """ + A QMF Event is a type of described data that is not managed. Events are + notifications that are sent by Agents. An event notifies a Console of a + change in some aspect of the system under managment. + """ + KEY_TIMESTAMP = "_timestamp" + KEY_SEVERITY = "_severity" + + SEV_EMERG = "emerg" + SEV_ALERT = "alert" + SEV_CRIT = "crit" + SEV_ERR = "err" + SEV_WARNING = "warning" + SEV_NOTICE = "notice" + SEV_INFO = "info" + SEV_DEBUG = "debug" + + def __init__(self, _timestamp=None, _sev=SEV_NOTICE, _values={}, + _subtypes={}, _tag=None, + _map=None, + _schema=None, _const=True): + """ + @type _map: dict + @param _map: if not None, construct instance from map representation. + @type _timestamp: int + @param _timestamp: moment in time when event occurred, expressed + as milliseconds since Midnight, Jan 1, 1970 UTC. + @type _agentId: class AgentId + @param _agentId: Identifies agent issuing this event. + @type _schema: class Schema + @param _schema: + @type _schemaId: class SchemaClassId (event) + @param _schemaId: identi + """ + + if _map is not None: + # construct from map + super(QmfEvent, self).__init__(_map=_map, _schema=_schema, + _const=_const) + _timestamp = _map.get(self.KEY_TIMESTAMP, _timestamp) + _sev = _map.get(self.KEY_SEVERITY, _sev) + else: + super(QmfEvent, self).__init__(_values=_values, + _subtypes=_subtypes, _tag=_tag, + _schema=_schema, _const=_const) + if _timestamp is None: + raise TypeError("QmfEvent: a valid timestamp is required.") + + try: + self._timestamp = long(_timestamp) + except: + raise TypeError("QmfEvent: a numeric timestamp is required.") + + self._severity = _sev + + def _create(cls, timestamp, severity, values, + _subtypes={}, _tag=None, _schema=None, _const=False): + return cls(_timestamp=timestamp, _sev=severity, _values=values, + _subtypes=_subtypes, _tag=_tag, _schema=_schema, _const=_const) + create = classmethod(_create) + + def _from_map(cls, map_, _schema=None, _const=False): + return cls(_map=map_, _schema=_schema, _const=_const) + from_map = classmethod(_from_map) + + def get_timestamp(self): + return self._timestamp + + def get_severity(self): + return self._severity + + def map_encode(self): + _map = super(QmfEvent, self).map_encode() + _map[self.KEY_TIMESTAMP] = self._timestamp + _map[self.KEY_SEVERITY] = self._severity + return _map + + + + + +#============================================================================== +#============================================================================== +#============================================================================== + + + + +class Arguments(object): + def __init__(self, map): + pass +# self.map = map +# self._by_hash = {} +# key_count = self.map.keyCount() +# a = 0 +# while a < key_count: +# self._by_hash[self.map.key(a)] = self.by_key(self.map.key(a)) +# a += 1 + + +# def __getitem__(self, key): +# return self._by_hash[key] + + +# def __setitem__(self, key, value): +# self._by_hash[key] = value +# self.set(key, value) + + +# def __iter__(self): +# return self._by_hash.__iter__ + + +# def __getattr__(self, name): +# if name in self._by_hash: +# return self._by_hash[name] +# return super.__getattr__(self, name) + + +# def __setattr__(self, name, value): +# # +# # ignore local data members +# # +# if (name[0] == '_' or +# name == 'map'): +# return super.__setattr__(self, name, value) + +# if name in self._by_hash: +# self._by_hash[name] = value +# return self.set(name, value) + +# return super.__setattr__(self, name, value) + + +# def by_key(self, key): +# val = self.map.byKey(key) +# vType = val.getType() +# if vType == TYPE_UINT8: return val.asUint() +# elif vType == TYPE_UINT16: return val.asUint() +# elif vType == TYPE_UINT32: return val.asUint() +# elif vType == TYPE_UINT64: return val.asUint64() +# elif vType == TYPE_SSTR: return val.asString() +# elif vType == TYPE_LSTR: return val.asString() +# elif vType == TYPE_ABSTIME: return val.asInt64() +# elif vType == TYPE_DELTATIME: return val.asUint64() +# elif vType == TYPE_REF: return ObjectId(val.asObjectId()) +# elif vType == TYPE_BOOL: return val.asBool() +# elif vType == TYPE_FLOAT: return val.asFloat() +# elif vType == TYPE_DOUBLE: return val.asDouble() +# elif vType == TYPE_UUID: return val.asUuid() +# elif vType == TYPE_INT8: return val.asInt() +# elif vType == TYPE_INT16: return val.asInt() +# elif vType == TYPE_INT32: return val.asInt() +# elif vType == TYPE_INT64: return val.asInt64() +# else: +# # when TYPE_MAP +# # when TYPE_OBJECT +# # when TYPE_LIST +# # when TYPE_ARRAY +# logging.error( "Unsupported Type for Get? '%s'" % str(val.getType())) +# return None + + +# def set(self, key, value): +# val = self.map.byKey(key) +# vType = val.getType() +# if vType == TYPE_UINT8: return val.setUint(value) +# elif vType == TYPE_UINT16: return val.setUint(value) +# elif vType == TYPE_UINT32: return val.setUint(value) +# elif vType == TYPE_UINT64: return val.setUint64(value) +# elif vType == TYPE_SSTR: +# if value: +# return val.setString(value) +# else: +# return val.setString('') +# elif vType == TYPE_LSTR: +# if value: +# return val.setString(value) +# else: +# return val.setString('') +# elif vType == TYPE_ABSTIME: return val.setInt64(value) +# elif vType == TYPE_DELTATIME: return val.setUint64(value) +# elif vType == TYPE_REF: return val.setObjectId(value.impl) +# elif vType == TYPE_BOOL: return val.setBool(value) +# elif vType == TYPE_FLOAT: return val.setFloat(value) +# elif vType == TYPE_DOUBLE: return val.setDouble(value) +# elif vType == TYPE_UUID: return val.setUuid(value) +# elif vType == TYPE_INT8: return val.setInt(value) +# elif vType == TYPE_INT16: return val.setInt(value) +# elif vType == TYPE_INT32: return val.setInt(value) +# elif vType == TYPE_INT64: return val.setInt64(value) +# else: +# # when TYPE_MAP +# # when TYPE_OBJECT +# # when TYPE_LIST +# # when TYPE_ARRAY +# logging.error("Unsupported Type for Set? '%s'" % str(val.getType())) +# return None + + + +#class MethodResponse(object): +# def __init__(self, impl): +# pass +# self.impl = qmfengine.MethodResponse(impl) + + +# def status(self): +# return self.impl.getStatus() + + +# def exception(self): +# return self.impl.getException() + + +# def text(self): +# return exception().asString() + + +# def args(self): +# return Arguments(self.impl.getArgs()) + + +# def __getattr__(self, name): +# myArgs = self.args() +# return myArgs.__getattr__(name) + + +# def __setattr__(self, name, value): +# if name == 'impl': +# return super.__setattr__(self, name, value) + +# myArgs = self.args() +# return myArgs.__setattr__(name, value) + + + +# ##============================================================================== +# ## QUERY +# ##============================================================================== + + + +# def _doQuery(predicate, params ): +# """ +# Given the predicate from a query, and a map of named parameters, apply the predicate +# to the parameters, and return True or False. +# """ +# if type(predicate) != list or len(predicate) < 1: +# return False + +# elif opr == Query._LOGIC_AND: +# logging.debug("_doQuery() AND: [%s]" % predicate ) +# rc = False +# for exp in predicate[1:]: +# rc = _doQuery( exp, params ) +# if not rc: +# break +# return rc + +# elif opr == Query._LOGIC_OR: +# logging.debug("_doQuery() OR: [%s]" % predicate ) +# rc = False +# for exp in predicate[1:]: +# rc = _doQuery( exp, params ) +# if rc: +# break +# return rc + +# elif opr == Query._LOGIC_NOT: +# logging.debug("_doQuery() NOT: [%s]" % predicate ) +# if len(predicate) != 2: +# logging.warning("Malformed query not-expression received: '%s'" % predicate) +# return False +# return not _doQuery( predicate[1:], params ) + + + +# else: +# logging.warning("Unknown query operator received: '%s'" % opr) +# return False + + + +class QmfQuery(_mapEncoder): + + KEY_TARGET="what" + KEY_PREDICATE="where" + KEY_ID="id" + + ### Query Types + ID=1 + PREDICATE=2 + + #### Query Targets #### + TARGET_PACKAGES="schema_package" + # (returns just package names) + # allowed predicate key(s): + # + # SchemaClassId.KEY_PACKAGE + + TARGET_SCHEMA_ID="schema_id" + TARGET_SCHEMA="schema" + # allowed predicate key(s): + # + # SchemaClassId.KEY_PACKAGE + # SchemaClassId.KEY_CLASS + # SchemaClassId.KEY_TYPE + # SchemaClassId.KEY_HASH + # SchemaClass.KEY_SCHEMA_ID + # name of property (exist test only) + # name of method (exist test only) + + TARGET_AGENT="agent" + # allowed predicate keys(s): + # + KEY_AGENT_NAME="_name" + + TARGET_OBJECT_ID="object_id" + TARGET_OBJECT="object" + # allowed predicate keys(s): + # + # SchemaClassId.KEY_PACKAGE + # SchemaClassId.KEY_CLASS + # SchemaClassId.KEY_TYPE + # SchemaClassId.KEY_HASH + # QmfData.KEY_SCHEMA_ID + # QmfData.KEY_OBJECT_ID + # QmfData.KEY_UPDATE_TS + # QmfData.KEY_CREATE_TS + # QmfData.KEY_DELETE_TS + # + + CMP_EQ="eq" + CMP_NE="ne" + CMP_LT="lt" + CMP_LE="le" + CMP_GT="gt" + CMP_GE="ge" + CMP_RE_MATCH="re_match" + CMP_EXISTS="exists" + CMP_TRUE="true" + CMP_FALSE="false" + + LOGIC_AND="and" + LOGIC_OR="or" + LOGIC_NOT="not" + + _valid_targets = [TARGET_PACKAGES, TARGET_OBJECT_ID, TARGET_SCHEMA, TARGET_SCHEMA_ID, + TARGET_OBJECT, TARGET_AGENT] + + def __init__(self, _target=None, _target_params=None, _predicate=None, + _id=None, _map=None): + """ + """ + if _map is not None: + target_map = _map.get(self.KEY_TARGET) + if not target_map: + raise TypeError("QmfQuery requires a target map") + + _target = None + for key in target_map.iterkeys(): + if key in self._valid_targets: + _target = key + break + + _target_params = target_map.get(_target) + + _id = _map.get(self.KEY_ID) + if _id is not None: + # Convert identifier to native type if necessary + if _target == self.TARGET_SCHEMA: + _id = SchemaClassId.from_map(_id) + else: + pred = _map.get(self.KEY_PREDICATE) + if pred: + _predicate = QmfQueryPredicate(pred) + + self._target = _target + if not self._target: + raise TypeError("QmfQuery requires a target value") + self._target_params = _target_params + self._predicate = _predicate + self._id = _id + + # constructors + def _create_wildcard(cls, target, _target_params=None): + return cls(_target=target, _target_params=_target_params) + create_wildcard = classmethod(_create_wildcard) + + def _create_predicate(cls, target, predicate, _target_params=None): + return cls(_target=target, _target_params=_target_params, + _predicate=predicate) + create_predicate = classmethod(_create_predicate) + + def _create_id(cls, target, ident, _target_params=None): + return cls(_target=target, _target_params=_target_params, _id=ident) + create_id = classmethod(_create_id) + + def _from_map(cls, map_): + return cls(_map=map_) + from_map = classmethod(_from_map) + + def get_target(self): + return self._target + + def get_target_param(self): + return self._target_params + + def get_selector(self): + if self._id: + return QmfQuery.ID + else: + return QmfQuery.PREDICATE + + def get_id(self): + return self._id + + def get_predicate(self): + """ + """ + return self._predicate + + def evaluate(self, qmfData): + """ + """ + if self._id: + if self._target == self.TARGET_SCHEMA: + return (qmfData.has_value(qmfData.KEY_SCHEMA_ID) and + qmfData.get_value(qmfData.KEY_SCHEMA_ID) == self._id) + elif self._target == self.TARGET_OBJECT: + return (qmfData.has_value(qmfData.KEY_OBJECT_ID) and + qmfData.get_value(qmfData.KEY_OBJECT_ID) == self._id) + elif self._target == self.TARGET_AGENT: + return (qmfData.has_value(self.KEY_AGENT_NAME) and + qmfData.get_value(self.KEY_AGENT_NAME) == self._id) + + raise Exception("Unsupported query target '%s'" % str(self._target)) + + if self._predicate: + return self._predicate.evaluate(qmfData) + # no predicate and no id - always match + return True + + def map_encode(self): + _map = {self.KEY_TARGET: {self._target: self._target_params}} + if self._id is not None: + if isinstance(self._id, _mapEncoder): + _map[self.KEY_ID] = self._id.map_encode() + else: + _map[self.KEY_ID] = self._id + elif self._predicate is not None: + _map[self.KEY_PREDICATE] = self._predicate.map_encode() + return _map + + def __repr__(self): + return "QmfQuery=<<" + str(self.map_encode()) + ">>" + + + +class QmfQueryPredicate(_mapEncoder): + """ + Class for Query predicates. + """ + _valid_cmp_ops = [QmfQuery.CMP_EQ, QmfQuery.CMP_NE, QmfQuery.CMP_LT, + QmfQuery.CMP_GT, QmfQuery.CMP_LE, QmfQuery.CMP_GE, + QmfQuery.CMP_EXISTS, QmfQuery.CMP_RE_MATCH, + QmfQuery.CMP_TRUE, QmfQuery.CMP_FALSE] + _valid_logic_ops = [QmfQuery.LOGIC_AND, QmfQuery.LOGIC_OR, QmfQuery.LOGIC_NOT] + + + def __init__( self, pmap): + """ + {"op": listOf(operands)} + """ + self._oper = None + self._operands = [] + + logic_op = False + if type(pmap) == dict: + for key in pmap.iterkeys(): + if key in self._valid_cmp_ops: + # comparison operation - may have "name" and "value" + self._oper = key + break + if key in self._valid_logic_ops: + logic_op = True + self._oper = key + break + + if not self._oper: + raise TypeError("invalid predicate expression: '%s'" % str(pmap)) + + if type(pmap[self._oper]) == list or type(pmap[self._oper]) == tuple: + if logic_op: + for exp in pmap[self._oper]: + self.append(QmfQueryPredicate(exp)) + else: + self._operands = list(pmap[self._oper]) + + else: + raise TypeError("invalid predicate: '%s'" % str(pmap)) + + + def append(self, operand): + """ + Append another operand to a predicate expression + """ + self._operands.append(operand) + + + + def evaluate( self, qmfData ): + """ + """ + if not isinstance(qmfData, QmfData): + raise TypeError("Query expects to evaluate QmfData types.") + + if self._oper == QmfQuery.CMP_TRUE: + logging.debug("query evaluate TRUE") + return True + if self._oper == QmfQuery.CMP_FALSE: + logging.debug("query evaluate FALSE") + return False + + if self._oper in [QmfQuery.CMP_EQ, QmfQuery.CMP_NE, QmfQuery.CMP_LT, + QmfQuery.CMP_LE, QmfQuery.CMP_GT, QmfQuery.CMP_GE, + QmfQuery.CMP_RE_MATCH]: + if len(self._operands) != 2: + logging.warning("Malformed query compare expression received: '%s, %s'" % + (self._oper, str(self._operands))) + return False + # @todo: support regular expression match + name = self._operands[0] + logging.debug("looking for: '%s'" % str(name)) + if not qmfData.has_value(name): + logging.warning("Malformed query, attribute '%s' not present." + % name) + return False + + arg1 = qmfData.get_value(name) + arg2 = self._operands[1] + logging.debug("query evaluate %s: '%s' '%s' '%s'" % + (name, str(arg1), self._oper, str(arg2))) + try: + if self._oper == QmfQuery.CMP_EQ: return arg1 == arg2 + if self._oper == QmfQuery.CMP_NE: return arg1 != arg2 + if self._oper == QmfQuery.CMP_LT: return arg1 < arg2 + if self._oper == QmfQuery.CMP_LE: return arg1 <= arg2 + if self._oper == QmfQuery.CMP_GT: return arg1 > arg2 + if self._oper == QmfQuery.CMP_GE: return arg1 >= arg2 + if self._oper == QmfQuery.CMP_RE_MATCH: + logging.error("!!! RE QUERY TBD !!!") + return False + except: + pass + logging.warning("Malformed query - %s: '%s' '%s' '%s'" % + (name, str(arg1), self._oper, str(self._operands[1]))) + return False + + + if self._oper == QmfQuery.CMP_EXISTS: + if len(self._operands) != 1: + logging.warning("Malformed query present expression received") + return False + name = self._operands[0] + logging.debug("query evaluate PRESENT: [%s]" % str(name)) + return qmfData.has_value(name) + + if self._oper == QmfQuery.LOGIC_AND: + logging.debug("query evaluate AND: '%s'" % str(self._operands)) + for exp in self._operands: + if not exp.evaluate(qmfData): + return False + return True + + if self._oper == QmfQuery.LOGIC_OR: + logging.debug("query evaluate OR: [%s]" % str(self._operands)) + for exp in self._operands: + if exp.evaluate(qmfData): + return True + return False + + if self._oper == QmfQuery.LOGIC_NOT: + logging.debug("query evaluate NOT: [%s]" % str(self._operands)) + for exp in self._operands: + if exp.evaluate(qmfData): + return False + return True + + logging.warning("Unrecognized query operator: [%s]" % str(self._oper)) + return False + + + def map_encode(self): + _map = {} + _list = [] + for exp in self._operands: + if isinstance(exp, QmfQueryPredicate): + _list.append(exp.map_encode()) + else: + _list.append(exp) + _map[self._oper] = _list + return _map + + + def __repr__(self): + return "QmfQueryPredicate=<<" + str(self.map_encode()) + ">>" + + + +##============================================================================== +## SCHEMA +##============================================================================== + + +# Argument typecodes, access, and direction qualifiers + +class qmfTypes(object): + TYPE_UINT8 = 1 + TYPE_UINT16 = 2 + TYPE_UINT32 = 3 + TYPE_UINT64 = 4 + + TYPE_SSTR = 6 + TYPE_LSTR = 7 + + TYPE_ABSTIME = 8 + TYPE_DELTATIME = 9 + + TYPE_REF = 10 + + TYPE_BOOL = 11 + + TYPE_FLOAT = 12 + TYPE_DOUBLE = 13 + + TYPE_UUID = 14 + + TYPE_MAP = 15 + + TYPE_INT8 = 16 + TYPE_INT16 = 17 + TYPE_INT32 = 18 + TYPE_INT64 = 19 + + TYPE_OBJECT = 20 + + TYPE_LIST = 21 + + TYPE_ARRAY = 22 + +# New subtypes: +# integer (for time, duration, signed/unsigned) +# double (float) +# bool +# string +# map (ref, qmfdata) +# list +# uuid + + +class qmfAccess(object): + READ_CREATE = 1 + READ_WRITE = 2 + READ_ONLY = 3 + + +class qmfDirection(object): + DIR_IN = 1 + DIR_OUT = 2 + DIR_IN_OUT = 3 + + + +def _toBool( param ): + """ + Helper routine to convert human-readable representations of + boolean values to python bool types. + """ + _false_strings = ["off", "no", "false", "0", "none"] + _true_strings = ["on", "yes", "true", "1"] + if type(param) == str: + lparam = param.lower() + if lparam in _false_strings: + return False + if lparam in _true_strings: + return True + raise TypeError("unrecognized boolean string: '%s'" % param ) + else: + return bool(param) + + + +class SchemaClassId(_mapEncoder): + """ + Unique identifier for an instance of a SchemaClass. + + Map format: + map["package_name"] = str, name of associated package + map["class_name"] = str, name of associated class + map["type"] = str, "data"|"event", default: "data" + optional: + map["hash_str"] = str, hash value in standard format or None + if hash is unknown. + """ + KEY_PACKAGE="_package_name" + KEY_CLASS="_class_name" + KEY_TYPE="_type" + KEY_HASH="_hash_str" + + TYPE_DATA = "_data" + TYPE_EVENT = "_event" + + _valid_types=[TYPE_DATA, TYPE_EVENT] + _schemaHashStrFormat = "%08x-%08x-%08x-%08x" + _schemaHashStrDefault = "00000000-00000000-00000000-00000000" + + def __init__(self, pname=None, cname=None, stype=TYPE_DATA, hstr=None, + _map=None): + """ + @type pname: str + @param pname: the name of the class's package + @type cname: str + @param cname: name of the class + @type stype: str + @param stype: schema type [data | event] + @type hstr: str + @param hstr: the hash value in '%08x-%08x-%08x-%08x' format + """ + if _map is not None: + # construct from map + pname = _map.get(self.KEY_PACKAGE, pname) + cname = _map.get(self.KEY_CLASS, cname) + stype = _map.get(self.KEY_TYPE, stype) + hstr = _map.get(self.KEY_HASH, hstr) + + self._pname = pname + self._cname = cname + if stype not in SchemaClassId._valid_types: + raise TypeError("Invalid SchemaClassId type: '%s'" % stype) + self._type = stype + self._hstr = hstr + if self._hstr: + try: + # sanity check the format of the hash string + hexValues = hstr.split("-") + h0 = int(hexValues[0], 16) + h1 = int(hexValues[1], 16) + h2 = int(hexValues[2], 16) + h3 = int(hexValues[3], 16) + except: + raise Exception("Invalid SchemaClassId format: bad hash string: '%s':" + % hstr) + # constructor + def _create(cls, pname, cname, stype=TYPE_DATA, hstr=None): + return cls(pname=pname, cname=cname, stype=stype, hstr=hstr) + create = classmethod(_create) + + # map constructor + def _from_map(cls, map_): + return cls(_map=map_) + from_map = classmethod(_from_map) + + def get_package_name(self): + """ + Access the package name in the SchemaClassId. + + @rtype: str + """ + return self._pname + + + def get_class_name(self): + """ + Access the class name in the SchemaClassId + + @rtype: str + """ + return self._cname + + + def get_hash_string(self): + """ + Access the schema's hash as a string value + + @rtype: str + """ + return self._hstr + + + def get_type(self): + """ + Returns the type code associated with this Schema + + @rtype: str + """ + return self._type + + def map_encode(self): + _map = {} + _map[self.KEY_PACKAGE] = self._pname + _map[self.KEY_CLASS] = self._cname + _map[self.KEY_TYPE] = self._type + if self._hstr: _map[self.KEY_HASH] = self._hstr + return _map + + def __repr__(self): + hstr = self.get_hash_string() + if not hstr: + hstr = SchemaClassId._schemaHashStrDefault + return self._pname + ":" + self._cname + ":" + self._type + "(" + hstr + ")" + + + def __cmp__(self, other): + if isinstance(other, dict): + other = SchemaClassId.from_map(other) + if not isinstance(other, SchemaClassId): + raise TypeError("Invalid types for compare") + # return 1 + me = str(self) + them = str(other) + if me < them: + return -1 + if me > them: + return 1 + return 0 + + + def __hash__(self): + return (self._pname, self._cname, self._hstr).__hash__() + + + +class SchemaProperty(_mapEncoder): + """ + Describes the structure of a Property data object. + Map format: + map["amqp_type"] = int, AMQP type code indicating property's data type + + optional: + map["access"] = str, access allowed to this property, default "RO" + map["index"] = bool, True if this property is an index value, default False + map["optional"] = bool, True if this property is optional, default False + map["unit"] = str, describes units used + map["min"] = int, minimum allowed value + map["max"] = int, maximun allowed value + map["maxlen"] = int, if string type, this is the maximum length in bytes + required to represent the longest instance of this string. + map["desc"] = str, human-readable description of this argument + map["reference"] = str, ??? + map["parent_ref"] = bool, true if this property references an object in + which this object is in a child-parent relationship. Default False + """ + __hash__ = None + _access_strings = ["RO","RW","RC"] + _dir_strings = ["I", "O", "IO"] + def __init__(self, _type_code=None, _map=None, kwargs={}): + if _map is not None: + # construct from map + _type_code = _map.get("amqp_type", _type_code) + kwargs = _map + if not _type_code: + raise TypeError("SchemaProperty: amqp_type is a mandatory" + " parameter") + + self._type = _type_code + self._access = "RO" + self._isIndex = False + self._isOptional = False + self._unit = None + self._min = None + self._max = None + self._maxlen = None + self._desc = None + self._reference = None + self._isParentRef = False + self._dir = None + self._default = None + + for key, value in kwargs.items(): + if key == "access": + value = str(value).upper() + if value not in self._access_strings: + raise TypeError("invalid value for access parameter: '%s':" % value ) + self._access = value + elif key == "index" : self._isIndex = _toBool(value) + elif key == "optional": self._isOptional = _toBool(value) + elif key == "unit" : self._unit = value + elif key == "min" : self._min = value + elif key == "max" : self._max = value + elif key == "maxlen" : self._maxlen = value + elif key == "desc" : self._desc = value + elif key == "reference" : self._reference = value + elif key == "parent_ref" : self._isParentRef = _toBool(value) + elif key == "dir": + value = str(value).upper() + if value not in self._dir_strings: + raise TypeError("invalid value for direction parameter: '%s'" % value) + self._dir = value + elif key == "default" : self._default = value + + # constructor + def _create(cls, type_code, kwargs={}): + return cls(_type_code=type_code, kwargs=kwargs) + create = classmethod(_create) + + # map constructor + def _from_map(cls, map_): + return cls(_map=map_) + from_map = classmethod(_from_map) + + def getType(self): return self._type + + def getAccess(self): return self._access + + def is_optional(self): return self._isOptional + + def isIndex(self): return self._isIndex + + def getUnit(self): return self._unit + + def getMin(self): return self._min + + def getMax(self): return self._max + + def getMaxLen(self): return self._maxlen + + def getDesc(self): return self._desc + + def getReference(self): return self._reference + + def isParentRef(self): return self._isParentRef + + def get_direction(self): return self._dir + + def get_default(self): return self._default + + def map_encode(self): + """ + Return the map encoding of this schema. + """ + _map = {} + _map["amqp_type"] = self._type + _map["access"] = self._access + _map["index"] = self._isIndex + _map["optional"] = self._isOptional + if self._unit: _map["unit"] = self._unit + if self._min: _map["min"] = self._min + if self._max: _map["max"] = self._max + if self._maxlen: _map["maxlen"] = self._maxlen + if self._desc: _map["desc"] = self._desc + if self._reference: _map["reference"] = self._reference + _map["parent_ref"] = self._isParentRef + if self._dir: _map["dir"] = self._dir + if self._default: _map["default"] = self._default + return _map + + def __repr__(self): + return "SchemaProperty=<<" + str(self.map_encode()) + ">>" + + def _updateHash(self, hasher): + """ + Update the given hash object with a hash computed over this schema. + """ + hasher.update(str(self._type)) + hasher.update(str(self._isIndex)) + hasher.update(str(self._isOptional)) + if self._access: hasher.update(self._access) + if self._unit: hasher.update(self._unit) + if self._desc: hasher.update(self._desc) + if self._dir: hasher.update(self._dir) + if self._default: hasher.update(self._default) + + + +class SchemaMethod(_mapEncoder): + """ + The SchemaMethod class describes the method's structure, and contains a + SchemaProperty class for each argument declared by the method. + + Map format: + map["arguments"] = map of "name"= pairs. + map["desc"] = str, description of the method + """ + KEY_NAME="_name" + KEY_ARGUMENTS="_arguments" + KEY_DESC="_desc" + KEY_ERROR="_error" + def __init__(self, _args={}, _desc=None, _map=None): + """ + Construct a SchemaMethod. + + @type args: map of "name"= objects + @param args: describes the arguments accepted by the method + @type _desc: str + @param _desc: Human-readable description of the schema + """ + if _map is not None: + _desc = _map.get(self.KEY_DESC) + margs = _map.get(self.KEY_ARGUMENTS) + if margs: + # margs are in map format - covert to SchemaProperty + tmp_args = {} + for name,val in margs.iteritems(): + tmp_args[name] = SchemaProperty.from_map(val) + _args=tmp_args + + self._arguments = _args.copy() + self._desc = _desc + + # map constructor + def _from_map(cls, map_): + return cls(_map=map_) + from_map = classmethod(_from_map) + + def get_desc(self): return self._desc + + def get_arg_count(self): return len(self._arguments) + + def get_arguments(self): return self._arguments.copy() + + def get_argument(self, name): return self._arguments.get(name) + + def add_argument(self, name, schema): + """ + Add an argument to the list of arguments passed to this method. + Used by an agent for dynamically creating method schema. + + @type name: string + @param name: name of new argument + @type schema: SchemaProperty + @param schema: SchemaProperty to add to this method + """ + if not isinstance(schema, SchemaProperty): + raise TypeError("argument must be a SchemaProperty class") + # "Input" argument, by default + if schema._dir is None: + schema._dir = "I" + self._arguments[name] = schema + + def map_encode(self): + """ + Return the map encoding of this schema. + """ + _map = {} + _args = {} + for name,val in self._arguments.iteritems(): + _args[name] = val.map_encode() + _map[self.KEY_ARGUMENTS] = _args + if self._desc: _map[self.KEY_DESC] = self._desc + return _map + + def __repr__(self): + result = "SchemaMethod=< objects + @param _props: all properties provided by this schema + @type _pkey: list of strings + @param _pkey: names of each property to be used for constructing the primary key + @type _methods: map of 'name': objects + @param _methods: all methods provided by this schema + """ + if _map is not None: + super(SchemaObjectClass,self).__init__(_map=_map) + else: + super(SchemaObjectClass, self).__init__(_classId=_classId, _desc=_desc) + self._object_id_names = _object_id_names + for name,value in _props.iteritems(): + self.set_value(name, value, self.SUBTYPE_PROPERTY) + for name,value in _methods.iteritems(): + self.set_value(name, value, self.SUBTYPE_METHOD) + + if self._classId.get_type() != SchemaClassId.TYPE_DATA: + raise TypeError("Invalid ClassId type for data schema: %s" % self._classId) + + # map constructor + def __from_map(cls, map_): + return cls(_map=map_) + from_map = classmethod(__from_map) + + def get_id_names(self): + return self._object_id_names[:] + + def get_method_count(self): + count = 0 + for value in self._subtypes.itervalues(): + if value == self.SUBTYPE_METHOD: + count += 1 + return count + + def get_methods(self): + meths = {} + for name,value in self._subtypes.iteritems(): + if value == self.SUBTYPE_METHOD: + meths[name] = self._values.get(name) + return meths + + def get_method(self, name): + if self._subtypes.get(name) == self.SUBTYPE_METHOD: + return self._values.get(name) + return None + + def add_method(self, name, method): + self.set_value(name, method, self.SUBTYPE_METHOD) + # need to re-generate schema hash + self._classId._hstr = None + + + + +class SchemaEventClass(SchemaClass): + """ + A schema class that describes an event. The event is composed + of zero or more properties. + + Map format: + map["schema_id"] = map, SchemaClassId map for this object. + map["desc"] = string description of this schema + map["properties"] = map of "name":SchemaProperty values. + """ + def __init__(self, _classId=None, _desc=None, _props={}, + _map=None): + if _map is not None: + super(SchemaEventClass,self).__init__(_map=_map) + else: + super(SchemaEventClass, self).__init__(_classId=_classId, + _desc=_desc) + for name,value in _props.iteritems(): + self.set_value(name, value, self.SUBTYPE_PROPERTY) + + if self._classId.get_type() != SchemaClassId.TYPE_EVENT: + raise TypeError("Invalid ClassId type for event schema: %s" % + self._classId) + + # map constructor + def __from_map(cls, map_): + return cls(_map=map_) + from_map = classmethod(__from_map) + + + + + + + + + + + diff --git a/qpid/python/qmf2/console.py b/qpid/python/qmf2/console.py new file mode 100644 index 0000000000..8e8f4799f7 --- /dev/null +++ b/qpid/python/qmf2/console.py @@ -0,0 +1,1959 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +import sys +import os +import logging +import platform +import time +import datetime +import Queue +from threading import Thread +from threading import Lock +from threading import currentThread +from threading import Condition + +from qpid.messaging import Connection, Message, Empty, SendError + +from common import (makeSubject, parseSubject, OpCode, QmfQuery, Notifier, + QmfQueryPredicate, MsgKey, QmfData, QmfAddress, + AMQP_QMF_AGENT_LOCATE, AMQP_QMF_AGENT_INDICATION, + SchemaClass, SchemaClassId, SchemaEventClass, + SchemaObjectClass, WorkItem, SchemaMethod, QmfEvent) + + + +# global flag that indicates which thread (if any) is +# running the console notifier callback +_callback_thread=None + + + + +##============================================================================== +## Sequence Manager +##============================================================================== + +class _Mailbox(object): + """ + Virtual base class for all Mailbox-like objects + """ + def __init__(self): + self._msgs = [] + self._cv = Condition() + self._waiting = False + + def deliver(self, obj): + self._cv.acquire() + try: + self._msgs.append(obj) + # if was empty, notify waiters + if len(self._msgs) == 1: + self._cv.notify() + finally: + self._cv.release() + + def fetch(self, timeout=None): + self._cv.acquire() + try: + if len(self._msgs) == 0: + self._cv.wait(timeout) + if len(self._msgs): + return self._msgs.pop() + return None + finally: + self._cv.release() + + + +class SequencedWaiter(object): + """ + Manage sequence numbers for asynchronous method calls. + Allows the caller to associate a generic piece of data with a unique sequence + number.""" + + def __init__(self): + self.lock = Lock() + self.sequence = long(time.time()) # pseudo-randomize seq start + self.pending = {} + + + def allocate(self): + """ + Reserve a sequence number. + + @rtype: long + @return: a unique nonzero sequence number. + """ + self.lock.acquire() + try: + seq = self.sequence + self.sequence = self.sequence + 1 + self.pending[seq] = _Mailbox() + finally: + self.lock.release() + logging.debug( "sequence %d allocated" % seq) + return seq + + + def put_data(self, seq, new_data): + seq = long(seq) + logging.debug( "putting data [%r] to seq %d..." % (new_data, seq) ) + self.lock.acquire() + try: + if seq in self.pending: + # logging.error("Putting seq %d @ %s" % (seq,time.time())) + self.pending[seq].deliver(new_data) + else: + logging.error( "seq %d not found!" % seq ) + finally: + self.lock.release() + + + + def get_data(self, seq, timeout=None): + """ + Release a sequence number reserved using the reserve method. This must + be called when the sequence is no longer needed. + + @type seq: int + @param seq: a sequence previously allocated by calling reserve(). + @rtype: any + @return: the data originally associated with the reserved sequence number. + """ + seq = long(seq) + logging.debug( "getting data for seq=%d" % seq) + mbox = None + self.lock.acquire() + try: + if seq in self.pending: + mbox = self.pending[seq] + finally: + self.lock.release() + + # Note well: pending list is unlocked, so we can wait. + # we reference mbox locally, so it will not be released + # until we are done. + + if mbox: + d = mbox.fetch(timeout) + logging.debug( "seq %d fetched %r!" % (seq, d) ) + return d + + logging.debug( "seq %d not found!" % seq ) + return None + + + def release(self, seq): + """ + Release the sequence, and its mailbox + """ + seq = long(seq) + logging.debug( "releasing seq %d" % seq ) + self.lock.acquire() + try: + if seq in self.pending: + del self.pending[seq] + finally: + self.lock.release() + + + def isValid(self, seq): + """ + True if seq is in use, else False (seq is unknown) + """ + seq = long(seq) + self.lock.acquire() + try: + return seq in self.pending + finally: + self.lock.release() + return False + + +##============================================================================== +## DATA MODEL +##============================================================================== + + +class QmfConsoleData(QmfData): + """ + Console's representation of an managed QmfData instance. + """ + def __init__(self, map_, agent, _schema=None): + super(QmfConsoleData, self).__init__(_map=map_, + _schema=_schema, + _const=True) + self._agent = agent + + def get_timestamps(self): + """ + Returns a list of timestamps describing the lifecycle of + the object. All timestamps are represented by the AMQP + timestamp type. [0] = time of last update from Agent, + [1] = creation timestamp + [2] = deletion timestamp, or zero if not + deleted. + """ + return [self._utime, self._ctime, self._dtime] + + def get_create_time(self): + """ + returns the creation timestamp + """ + return self._ctime + + def get_update_time(self): + """ + returns the update timestamp + """ + return self._utime + + def get_delete_time(self): + """ + returns the deletion timestamp, or zero if not yet deleted. + """ + return self._dtime + + def is_deleted(self): + """ + True if deletion timestamp not zero. + """ + return self._dtime != long(0) + + def refresh(self, _reply_handle=None, _timeout=None): + """ + request that the Agent update the value of this object's + contents. + """ + if _reply_handle is not None: + logging.error(" ASYNC REFRESH TBD!!!") + return None + + assert self._agent + assert self._agent._console + + if _timeout is None: + _timeout = self._agent._console._reply_timeout + + + # create query to agent using this objects ID + oid = self.get_object_id() + query = QmfQuery.create_id(QmfQuery.TARGET_OBJECT, + self.get_object_id()) + obj_list = self._agent._console.doQuery(self._agent, query, + timeout=_timeout) + if obj_list is None or len(obj_list) != 1: + return None + + self._update(obj_list[0]) + return self + + + def invoke_method(self, name, _in_args={}, _reply_handle=None, + _timeout=None): + """ + invoke the named method. + """ + assert self._agent + assert self._agent._console + + oid = self.get_object_id() + if oid is None: + raise ValueError("Cannot invoke methods on unmanaged objects.") + + if _timeout is None: + _timeout = self._agent._console._reply_timeout + + if _in_args: + _in_args = _in_args.copy() + + if self._schema: + # validate + ms = self._schema.get_method(name) + if ms is None: + raise ValueError("Method '%s' is undefined." % name) + + for aname,prop in ms.get_arguments().iteritems(): + if aname not in _in_args: + if prop.get_default(): + _in_args[aname] = prop.get_default() + elif not prop.is_optional(): + raise ValueError("Method '%s' requires argument '%s'" + % (name, aname)) + for aname in _in_args.iterkeys(): + prop = ms.get_argument(aname) + if prop is None: + raise ValueError("Method '%s' does not define argument" + " '%s'" % (name, aname)) + if "I" not in prop.get_direction(): + raise ValueError("Method '%s' argument '%s' is not an" + " input." % (name, aname)) + + # @todo check if value is correct (type, range, etc) + + handle = self._agent._console._req_correlation.allocate() + if handle == 0: + raise Exception("Can not allocate a correlation id!") + + _map = {self.KEY_OBJECT_ID:str(oid), + SchemaMethod.KEY_NAME:name} + if _in_args: + _map[SchemaMethod.KEY_ARGUMENTS] = _in_args + + logging.debug("Sending method req to Agent (%s)" % time.time()) + try: + self._agent._sendMethodReq(_map, handle) + except SendError, e: + logging.error(str(e)) + self._agent._console._req_correlation.release(handle) + return None + + # @todo async method calls!!! + if _reply_handle is not None: + print("ASYNC TBD") + + logging.debug("Waiting for response to method req (%s)" % _timeout) + replyMsg = self._agent._console._req_correlation.get_data(handle, _timeout) + self._agent._console._req_correlation.release(handle) + if not replyMsg: + logging.debug("Agent method req wait timed-out.") + return None + + _map = replyMsg.content.get(MsgKey.method) + if not _map: + logging.error("Invalid method call reply message") + return None + + error=_map.get(SchemaMethod.KEY_ERROR) + if error: + return MethodResult(_error=QmfData.from_map(error)) + else: + return MethodResult(_out_args=_map.get(SchemaMethod.KEY_ARGUMENTS)) + + def _update(self, newer): + super(QmfConsoleData,self).__init__(_values=newer._values, _subtypes=newer._subtypes, + _tag=newer._tag, _object_id=newer._object_id, + _ctime=newer._ctime, _utime=newer._utime, + _dtime=newer._dtime, + _schema=newer._schema, _const=True) + +class QmfLocalData(QmfData): + """ + Console's representation of an unmanaged QmfData instance. There + is no remote agent associated with this instance. The Console has + full control over this instance. + """ + def __init__(self, values, _subtypes={}, _tag=None, _object_id=None, + _schema=None): + # timestamp in millisec since epoch UTC + ctime = long(time.time() * 1000) + super(QmfLocalData, self).__init__(_values=values, + _subtypes=_subtypes, _tag=_tag, + _object_id=_object_id, + _schema=_schema, _ctime=ctime, + _utime=ctime, _const=False) + + +class Agent(object): + """ + A local representation of a remote agent managed by this console. + """ + def __init__(self, name, console): + """ + @type name: AgentId + @param name: uniquely identifies this agent in the AMQP domain. + """ + + if not isinstance(console, Console): + raise TypeError("parameter must be an instance of class Console") + + self._name = name + self._address = QmfAddress.direct(name, console._domain) + self._console = console + self._sender = None + self._packages = {} # map of {package-name:[list of class-names], } for this agent + self._subscriptions = [] # list of active standing subscriptions for this agent + self._announce_timestamp = None # datetime when last announce received + logging.debug( "Created Agent with address: [%s]" % self._address ) + + + def get_name(self): + return self._name + + def isActive(self): + return self._announce_timestamp != None + + def _sendMsg(self, msg, correlation_id=None): + """ + Low-level routine to asynchronously send a message to this agent. + """ + msg.reply_to = str(self._console._address) + # handle = self._console._req_correlation.allocate() + # if handle == 0: + # raise Exception("Can not allocate a correlation id!") + # msg.correlation_id = str(handle) + if correlation_id: + msg.correlation_id = str(correlation_id) + self._sender.send(msg) + # return handle + + def get_packages(self): + """ + Return a list of the names of all packages known to this agent. + """ + return self._packages.keys() + + def get_classes(self): + """ + Return a dictionary [key:class] of classes known to this agent. + """ + return self._packages.copy() + + def get_objects(self, query, kwargs={}): + """ + Return a list of objects that satisfy the given query. + + @type query: dict, or common.Query + @param query: filter for requested objects + @type kwargs: dict + @param kwargs: ??? used to build match selector and query ??? + @rtype: list + @return: list of matching objects, or None. + """ + pass + + def get_object(self, query, kwargs={}): + """ + Get one object - query is expected to match only one object. + ??? Recommended: explicit timeout param, default None ??? + + @type query: dict, or common.Query + @param query: filter for requested objects + @type kwargs: dict + @param kwargs: ??? used to build match selector and query ??? + @rtype: qmfConsole.ObjectProxy + @return: one matching object, or none + """ + pass + + + def create_subscription(self, query): + """ + Factory for creating standing subscriptions based on a given query. + + @type query: common.Query object + @param query: determines the list of objects for which this subscription applies + @rtype: qmfConsole.Subscription + @returns: an object representing the standing subscription. + """ + pass + + + def invoke_method(self, name, _in_args={}, _reply_handle=None, + _timeout=None): + """ + """ + assert self._console + + if _timeout is None: + _timeout = self._console._reply_timeout + + if _in_args: + _in_args = _in_args.copy() + + handle = self._console._req_correlation.allocate() + if handle == 0: + raise Exception("Can not allocate a correlation id!") + + _map = {SchemaMethod.KEY_NAME:name} + if _in_args: + _map[SchemaMethod.KEY_ARGUMENTS] = _in_args + + logging.debug("Sending method req to Agent (%s)" % time.time()) + try: + self._sendMethodReq(_map, handle) + except SendError, e: + logging.error(str(e)) + self._console._req_correlation.release(handle) + return None + + # @todo async method calls!!! + if _reply_handle is not None: + print("ASYNC TBD") + + logging.debug("Waiting for response to method req (%s)" % _timeout) + replyMsg = self._console._req_correlation.get_data(handle, _timeout) + self._console._req_correlation.release(handle) + if not replyMsg: + logging.debug("Agent method req wait timed-out.") + return None + + _map = replyMsg.content.get(MsgKey.method) + if not _map: + logging.error("Invalid method call reply message") + return None + + return MethodResult(_out_args=_map.get(SchemaMethod.KEY_ARGUMENTS), + _error=_map.get(SchemaMethod.KEY_ERROR)) + + def enable_events(self): + raise Exception("enable_events tbd") + + def disable_events(self): + raise Exception("disable_events tbd") + + def destroy(self): + raise Exception("destroy tbd") + + def __repr__(self): + return str(self._address) + + def __str__(self): + return self.__repr__() + + def _sendQuery(self, query, correlation_id=None): + """ + """ + msg = Message(subject=makeSubject(OpCode.get_query), + properties={"method":"request"}, + content={MsgKey.query: query.map_encode()}) + self._sendMsg( msg, correlation_id ) + + + def _sendMethodReq(self, mr_map, correlation_id=None): + """ + """ + msg = Message(subject=makeSubject(OpCode.method_req), + properties={"method":"request"}, + content=mr_map) + self._sendMsg( msg, correlation_id ) + + + ##============================================================================== + ## METHOD CALL + ##============================================================================== + +class MethodResult(object): + def __init__(self, _out_args=None, _error=None): + self._error = _error + self._out_args = _out_args + + def succeeded(self): + return self._error is None + + def get_exception(self): + return self._error + + def get_arguments(self): + return self._out_args + + def get_argument(self, name): + arg = None + if self._out_args: + arg = self._out_args.get(name) + return arg + + + ##============================================================================== + ## CONSOLE + ##============================================================================== + + + + + + +class Console(Thread): + """ + A Console manages communications to a collection of agents on behalf of an application. + """ + def __init__(self, name=None, _domain=None, notifier=None, + reply_timeout = 60, + # agent_timeout = 120, + agent_timeout = 60, + kwargs={}): + """ + @type name: str + @param name: identifier for this console. Must be unique. + @type notifier: qmfConsole.Notifier + @param notifier: invoked when events arrive for processing. + @type kwargs: dict + @param kwargs: ??? Unused + """ + Thread.__init__(self) + if not name: + self._name = "qmfc-%s.%d" % (platform.node(), os.getpid()) + else: + self._name = str(name) + self._domain = _domain + self._address = QmfAddress.direct(self._name, self._domain) + self._notifier = notifier + self._lock = Lock() + self._conn = None + self._session = None + # dict of "agent-direct-address":class Agent entries + self._agent_map = {} + self._direct_recvr = None + self._announce_recvr = None + self._locate_sender = None + self._schema_cache = {} + self._req_correlation = SequencedWaiter() + self._operational = False + self._agent_discovery_filter = None + self._reply_timeout = reply_timeout + self._agent_timeout = agent_timeout + self._next_agent_expire = None + # lock out run() thread + self._cv = Condition() + # for passing WorkItems to the application + self._work_q = Queue.Queue() + self._work_q_put = False + ## Old stuff below??? + #self._broker_list = [] + #self.impl = qmfengine.Console() + #self._event = qmfengine.ConsoleEvent() + ##self._cv = Condition() + ##self._sync_count = 0 + ##self._sync_result = None + ##self._select = {} + ##self._cb_cond = Condition() + + + + def destroy(self, timeout=None): + """ + Must be called before the Console is deleted. + Frees up all resources and shuts down all background threads. + + @type timeout: float + @param timeout: maximum time in seconds to wait for all background threads to terminate. Default: forever. + """ + logging.debug("Destroying Console...") + if self._conn: + self.removeConnection(self._conn, timeout) + logging.debug("Console Destroyed") + + + + def addConnection(self, conn): + """ + Add a AMQP connection to the console. The console will setup a session over the + connection. The console will then broadcast an Agent Locate Indication over + the session in order to discover present agents. + + @type conn: qpid.messaging.Connection + @param conn: the connection to the AMQP messaging infrastructure. + """ + if self._conn: + raise Exception( "Multiple connections per Console not supported." ); + self._conn = conn + self._session = conn.session(name=self._name) + self._direct_recvr = self._session.receiver(str(self._address) + + ";{create:always," + " node-properties:" + " {type:topic," + " x-properties:" + " {type:direct}}}", + capacity=1) + logging.debug("my direct addr=%s" % self._direct_recvr.source) + + ind_addr = QmfAddress.topic(AMQP_QMF_AGENT_INDICATION, self._domain) + self._announce_recvr = self._session.receiver(str(ind_addr) + + ";{create:always," + " node-properties:{type:topic}}", + capacity=1) + logging.debug("agent.ind addr=%s" % self._announce_recvr.source) + + locate_addr = QmfAddress.topic(AMQP_QMF_AGENT_LOCATE, self._domain) + self._locate_sender = self._session.sender(str(locate_addr) + + ";{create:always," + " node-properties:{type:topic}}") + logging.debug("agent.locate addr=%s" % self._locate_sender.target) + + # + # Now that receivers are created, fire off the receive thread... + # + self._operational = True + self.start() + + + + def removeConnection(self, conn, timeout=None): + """ + Remove an AMQP connection from the console. Un-does the add_connection() operation, + and releases any agents and sessions associated with the connection. + + @type conn: qpid.messaging.Connection + @param conn: connection previously added by add_connection() + """ + if self._conn and conn and conn != self._conn: + logging.error( "Attempt to delete unknown connection: %s" % str(conn)) + + # tell connection thread to shutdown + self._operational = False + if self.isAlive(): + # kick my thread to wake it up + logging.debug("Making temp sender for [%s]" % self._address) + tmp_sender = self._session.sender(str(self._address)) + try: + msg = Message(subject=makeSubject(OpCode.noop)) + tmp_sender.send( msg, sync=True ) + except SendError, e: + logging.error(str(e)) + logging.debug("waiting for console receiver thread to exit") + self.join(timeout) + if self.isAlive(): + logging.error( "Console thread '%s' is hung..." % self.getName() ) + self._direct_recvr.close() + self._announce_recvr.close() + self._locate_sender.close() + self._session.close() + self._session = None + self._conn = None + logging.debug("console connection removal complete") + + + def getAddress(self): + """ + The AMQP address this Console is listening to. + """ + return self._address + + + def destroyAgent( self, agent ): + """ + Undoes create. + """ + if not isinstance(agent, Agent): + raise TypeError("agent must be an instance of class Agent") + + self._lock.acquire() + try: + if agent._id in self._agent_map: + del self._agent_map[agent._id] + finally: + self._lock.release() + + def find_agent(self, name, timeout=None ): + """ + Given the name of a particular agent, return an instance of class Agent + representing that agent. Return None if the agent does not exist. + """ + + self._lock.acquire() + try: + agent = self._agent_map.get(name) + if agent: + return agent + finally: + self._lock.release() + + # agent not present yet - ping it with an agent_locate + + handle = self._req_correlation.allocate() + if handle == 0: + raise Exception("Can not allocate a correlation id!") + try: + tmp_sender = self._session.sender(str(QmfAddress.direct(name, + self._domain)) + + ";{create:always," + " node-properties:" + " {type:topic," + " x-properties:" + " {type:direct}}}") + + query = QmfQuery.create_id(QmfQuery.TARGET_AGENT, name) + msg = Message(subject=makeSubject(OpCode.agent_locate), + properties={"method":"request"}, + content={MsgKey.query: query.map_encode()}) + msg.reply_to = str(self._address) + msg.correlation_id = str(handle) + logging.debug("Sending Agent Locate (%s)" % time.time()) + tmp_sender.send( msg ) + except SendError, e: + logging.error(str(e)) + self._req_correlation.release(handle) + return None + + if timeout is None: + timeout = self._reply_timeout + + new_agent = None + logging.debug("Waiting for response to Agent Locate (%s)" % timeout) + self._req_correlation.get_data( handle, timeout ) + self._req_correlation.release(handle) + logging.debug("Agent Locate wait ended (%s)" % time.time()) + self._lock.acquire() + try: + new_agent = self._agent_map.get(name) + finally: + self._lock.release() + return new_agent + + + def doQuery(self, agent, query, timeout=None ): + """ + """ + + target = query.get_target() + handle = self._req_correlation.allocate() + if handle == 0: + raise Exception("Can not allocate a correlation id!") + try: + logging.debug("Sending Query to Agent (%s)" % time.time()) + agent._sendQuery(query, handle) + except SendError, e: + logging.error(str(e)) + self._req_correlation.release(handle) + return None + + if not timeout: + timeout = self._reply_timeout + + logging.debug("Waiting for response to Query (%s)" % timeout) + reply = self._req_correlation.get_data(handle, timeout) + self._req_correlation.release(handle) + if not reply: + logging.debug("Agent Query wait timed-out.") + return None + + if target == QmfQuery.TARGET_PACKAGES: + # simply pass back the list of package names + logging.debug("Response to Packet Query received") + return reply.content.get(MsgKey.package_info) + elif target == QmfQuery.TARGET_OBJECT_ID: + # simply pass back the list of object_id's + logging.debug("Response to Object Id Query received") + return reply.content.get(MsgKey.object_id) + elif target == QmfQuery.TARGET_SCHEMA_ID: + logging.debug("Response to Schema Id Query received") + id_list = [] + for sid_map in reply.content.get(MsgKey.schema_id): + id_list.append(SchemaClassId.from_map(sid_map)) + return id_list + elif target == QmfQuery.TARGET_SCHEMA: + logging.debug("Response to Schema Query received") + schema_list = [] + for schema_map in reply.content.get(MsgKey.schema): + # extract schema id, convert based on schema type + sid_map = schema_map.get(SchemaClass.KEY_SCHEMA_ID) + if sid_map: + sid = SchemaClassId.from_map(sid_map) + if sid: + if sid.get_type() == SchemaClassId.TYPE_DATA: + schema = SchemaObjectClass.from_map(schema_map) + else: + schema = SchemaEventClass.from_map(schema_map) + schema_list.append(schema) + self._add_schema(schema) + return schema_list + elif target == QmfQuery.TARGET_OBJECT: + logging.debug("Response to Object Query received") + obj_list = [] + for obj_map in reply.content.get(MsgKey.data_obj): + # if the object references a schema, fetch it + sid_map = obj_map.get(QmfData.KEY_SCHEMA_ID) + if sid_map: + sid = SchemaClassId.from_map(sid_map) + schema = self._fetch_schema(sid, _agent=agent, + _timeout=timeout) + if not schema: + logging.warning("Unknown schema, id=%s" % sid) + continue + obj = QmfConsoleData(map_=obj_map, agent=agent, + _schema=schema) + else: + # no schema needed + obj = QmfConsoleData(map_=obj_map, agent=agent) + obj_list.append(obj) + return obj_list + else: + logging.warning("Unexpected Target for a Query: '%s'" % target) + return None + + def run(self): + global _callback_thread + # + # @todo KAG Rewrite when api supports waiting on multiple receivers + # + while self._operational: + + # qLen = self._work_q.qsize() + + while True: + try: + msg = self._announce_recvr.fetch(timeout=0) + except Empty: + break + self._dispatch(msg, _direct=False) + + while True: + try: + msg = self._direct_recvr.fetch(timeout = 0) + except Empty: + break + self._dispatch(msg, _direct=True) + + for agent in self._agent_map.itervalues(): + try: + msg = agent._event_recvr.fetch(timeout = 0) + except Empty: + continue + self._dispatch(msg, _direct=False) + + + self._expireAgents() # check for expired agents + + #if qLen == 0 and self._work_q.qsize() and self._notifier: + if self._work_q_put and self._notifier: + # new stuff on work queue, kick the the application... + self._work_q_put = False + _callback_thread = currentThread() + logging.info("Calling console notifier.indication") + self._notifier.indication() + _callback_thread = None + + if self._operational: + # wait for a message to arrive or an agent + # to expire + now = datetime.datetime.utcnow() + if self._next_agent_expire > now: + timeout = ((self._next_agent_expire - now) + datetime.timedelta(microseconds=999999)).seconds + try: + logging.debug("waiting for next rcvr (timeout=%s)..." % timeout) + xxx = self._session.next_receiver(timeout = timeout) + except Empty: + pass + + + logging.debug("Shutting down Console thread") + + def get_objects(self, + _schema_id=None, + _pname=None, _cname=None, + _object_id=None, + _agents=None, + _timeout=None): + """ + @todo + """ + if _object_id is not None: + # query by object id + query = QmfQuery.create_id(QmfQuery.TARGET_OBJECT, _object_id) + elif _schema_id is not None: + pred = QmfQueryPredicate({QmfQuery.CMP_EQ: + [QmfData.KEY_SCHEMA_ID, + _schema_id.map_encode()]}) + query = QmfQuery.create_predicate(QmfQuery.TARGET_OBJECT, pred) + elif _pname is not None: + # query by package name (and maybe class name) + if _cname is not None: + pred = QmfQueryPredicate({QmfQuery.LOGIC_AND: + [{QmfQuery.CMP_EQ: + [SchemaClassId.KEY_PACKAGE, + _pname]}, + {QmfQuery.CMP_EQ: + [SchemaClassId.KEY_CLASS, + _cname]}]}) + else: + pred = QmfQueryPredicate({QmfQuery.CMP_EQ: + [SchemaClassId.KEY_PACKAGE, + _pname]}) + query = QmfQuery.create_predicate(QmfQuery.TARGET_OBJECT, pred) + + else: + raise Exception("invalid arguments") + + if _agents is None: + # use copy of current agent list + self._lock.acquire() + try: + agent_list = self._agent_map.values() + finally: + self._lock.release() + elif isinstance(_agents, Agent): + agent_list = [_agents] + else: + agent_list = _agents + # @todo validate this list! + + # @todo: fix when async doQuery done - query all agents at once, then + # wait for replies, instead of per-agent querying.... + + if _timeout is None: + _timeout = self._reply_timeout + + obj_list = [] + expired = datetime.datetime.utcnow() + datetime.timedelta(seconds=_timeout) + for agent in agent_list: + if not agent.isActive(): + continue + now = datetime.datetime.utcnow() + if now >= expired: + break + timeout = ((expired - now) + datetime.timedelta(microseconds=999999)).seconds + reply = self.doQuery(agent, query, timeout) + if reply: + obj_list = obj_list + reply + + if obj_list: + return obj_list + return None + + + + # called by run() thread ONLY + # + def _dispatch(self, msg, _direct=True): + """ + PRIVATE: Process a message received from an Agent + """ + logging.debug( "Message received from Agent! [%s]" % msg ) + try: + version,opcode = parseSubject(msg.subject) + # @todo: deal with version mismatch!!! + except: + logging.error("Ignoring unrecognized broadcast message '%s'" % msg.subject) + return + + cmap = {}; props = {} + if msg.content_type == "amqp/map": + cmap = msg.content + if msg.properties: + props = msg.properties + + if opcode == OpCode.agent_ind: + self._handleAgentIndMsg( msg, cmap, version, _direct ) + elif opcode == OpCode.data_ind: + self._handleDataIndMsg(msg, cmap, version, _direct) + elif opcode == OpCode.event_ind: + self._handleEventIndMsg(msg, cmap, version, _direct) + elif opcode == OpCode.managed_object: + logging.warning("!!! managed_object TBD !!!") + elif opcode == OpCode.object_ind: + logging.warning("!!! object_ind TBD !!!") + elif opcode == OpCode.response: + self._handleResponseMsg(msg, cmap, version, _direct) + elif opcode == OpCode.schema_ind: + logging.warning("!!! schema_ind TBD !!!") + elif opcode == OpCode.noop: + logging.debug("No-op msg received.") + else: + logging.warning("Ignoring message with unrecognized 'opcode' value: '%s'" % opcode) + + + def _handleAgentIndMsg(self, msg, cmap, version, direct): + """ + Process a received agent-ind message. This message may be a response to a + agent-locate, or it can be an unsolicited agent announce. + """ + logging.debug("_handleAgentIndMsg '%s' (%s)" % (msg, time.time())) + + ai_map = cmap.get(MsgKey.agent_info) + if not ai_map or not isinstance(ai_map, type({})): + logging.warning("Bad agent-ind message received: '%s'" % msg) + return + name = ai_map.get("_name") + if not name: + logging.warning("Bad agent-ind message received: agent name missing" + " '%s'" % msg) + return + + ignore = True + matched = False + correlated = False + agent_query = self._agent_discovery_filter + + if msg.correlation_id: + correlated = self._req_correlation.isValid(msg.correlation_id) + + if direct and correlated: + ignore = False + elif agent_query: + matched = agent_query.evaluate(QmfData.create(values=ai_map)) + ignore = not matched + + if not ignore: + agent = None + self._lock.acquire() + try: + agent = self._agent_map.get(name) + finally: + self._lock.release() + + if not agent: + # need to create and add a new agent + agent = self._createAgent(name) + if not agent: + return # failed to add agent + + # lock out expiration scanning code + self._lock.acquire() + try: + old_timestamp = agent._announce_timestamp + agent._announce_timestamp = datetime.datetime.utcnow() + finally: + self._lock.release() + + if old_timestamp == None and matched: + logging.debug("AGENT_ADDED for %s (%s)" % (agent, time.time())) + wi = WorkItem(WorkItem.AGENT_ADDED, None, {"agent": agent}) + self._work_q.put(wi) + self._work_q_put = True + + if correlated: + # wake up all waiters + logging.debug("waking waiters for correlation id %s" % msg.correlation_id) + self._req_correlation.put_data(msg.correlation_id, msg) + + + + + def _handleDataIndMsg(self, msg, cmap, version, direct): + """ + Process a received data-ind message. + """ + logging.debug("_handleDataIndMsg '%s' (%s)" % (msg, time.time())) + + if not self._req_correlation.isValid(msg.correlation_id): + logging.debug("Data indicate received with unknown correlation_id" + " msg='%s'" % str(msg)) + return + + # wake up all waiters + logging.debug("waking waiters for correlation id %s" % msg.correlation_id) + self._req_correlation.put_data(msg.correlation_id, msg) + + + def _handleResponseMsg(self, msg, cmap, version, direct): + """ + Process a received data-ind message. + """ + # @todo code replication - clean me. + logging.debug("_handleResponseMsg '%s' (%s)" % (msg, time.time())) + + if not self._req_correlation.isValid(msg.correlation_id): + logging.debug("Response msg received with unknown correlation_id" + " msg='%s'" % str(msg)) + return + + # wake up all waiters + logging.debug("waking waiters for correlation id %s" % msg.correlation_id) + self._req_correlation.put_data(msg.correlation_id, msg) + + def _handleEventIndMsg(self, msg, cmap, version, _direct): + ei_map = cmap.get(MsgKey.event) + if not ei_map or not isinstance(ei_map, type({})): + logging.warning("Bad event indication message received: '%s'" % msg) + return + + aname = ei_map.get("_name") + emap = ei_map.get("_event") + if not aname: + logging.debug("No '_name' field in event indication message.") + return + if not emap: + logging.debug("No '_event' field in event indication message.") + return + # @todo: do I need to lock this??? + agent = self._agent_map.get(aname) + if not agent: + logging.debug("Agent '%s' not known." % aname) + return + try: + # @todo: schema??? + event = QmfEvent.from_map(emap) + except TypeError: + logging.debug("Invalid QmfEvent map received: %s" % str(emap)) + return + + # @todo: schema? Need to fetch it, but not from this thread! + # This thread can not pend on a request. + logging.debug("Publishing event received from agent %s" % aname) + wi = WorkItem(WorkItem.EVENT_RECEIVED, None, + {"agent":agent, + "event":event}) + self._work_q.put(wi) + self._work_q_put = True + + + def _expireAgents(self): + """ + Check for expired agents and issue notifications when they expire. + """ + now = datetime.datetime.utcnow() + if self._next_agent_expire and now < self._next_agent_expire: + return + lifetime_delta = datetime.timedelta(seconds = self._agent_timeout) + next_expire_delta = lifetime_delta + self._lock.acquire() + try: + logging.debug("!!! expiring agents '%s'" % now) + for agent in self._agent_map.itervalues(): + if agent._announce_timestamp: + agent_deathtime = agent._announce_timestamp + lifetime_delta + if agent_deathtime <= now: + logging.debug("AGENT_DELETED for %s" % agent) + agent._announce_timestamp = None + wi = WorkItem(WorkItem.AGENT_DELETED, None, + {"agent":agent}) + # @todo: remove agent from self._agent_map + self._work_q.put(wi) + self._work_q_put = True + else: + if (agent_deathtime - now) < next_expire_delta: + next_expire_delta = agent_deathtime - now + + self._next_agent_expire = now + next_expire_delta + logging.debug("!!! next expire cycle = '%s'" % self._next_agent_expire) + finally: + self._lock.release() + + + + def _createAgent( self, name ): + """ + Factory to create/retrieve an agent for this console + """ + logging.debug("creating agent %s" % name) + self._lock.acquire() + try: + agent = self._agent_map.get(name) + if agent: + return agent + + agent = Agent(name, self) + try: + agent._sender = self._session.sender(str(agent._address) + + ";{create:always," + " node-properties:" + " {type:topic," + " x-properties:" + " {type:direct}}}") + except: + logging.warning("Unable to create sender for %s" % name) + return None + logging.debug("created agent sender %s" % agent._sender.target) + + events_addr = QmfAddress.topic(name, self._domain) + try: + agent._event_recvr = self._session.receiver(str(events_addr) + + ";{create:always," + " node-properties:{type:topic}}", + capacity=1) + except: + logging.warning("Unable to create event receiver for %s" % name) + return None + logging.debug("created agent event receiver %s" % agent._event_recvr.source) + + self._agent_map[name] = agent + finally: + self._lock.release() + + # new agent - query for its schema database for + # seeding the schema cache (@todo) + # query = QmfQuery({QmfQuery.TARGET_SCHEMA_ID:None}) + # agent._sendQuery( query ) + + return agent + + + + def enable_agent_discovery(self, _query=None): + """ + Called to enable the asynchronous Agent Discovery process. + Once enabled, AGENT_ADD work items can arrive on the WorkQueue. + """ + # @todo: fix - take predicate only, not entire query! + if _query is not None: + if (not isinstance(_query, QmfQuery) or + _query.get_target() != QmfQuery.TARGET_AGENT): + raise TypeError("Type QmfQuery with target == TARGET_AGENT expected") + self._agent_discovery_filter = _query + else: + # create a match-all agent query (no predicate) + self._agent_discovery_filter = QmfQuery.create_wildcard(QmfQuery.TARGET_AGENT) + + def disable_agent_discovery(self): + """ + Called to disable the async Agent Discovery process enabled by + calling enableAgentDiscovery() + """ + self._agent_discovery_filter = None + + + + def get_workitem_count(self): + """ + Returns the count of pending WorkItems that can be retrieved. + """ + return self._work_q.qsize() + + + + def get_next_workitem(self, timeout=None): + """ + Returns the next pending work item, or None if none available. + @todo: subclass and return an Empty event instead. + """ + try: + wi = self._work_q.get(True, timeout) + except Queue.Empty: + return None + return wi + + + def release_workitem(self, wi): + """ + Return a WorkItem to the Console when it is no longer needed. + @todo: call Queue.task_done() - only 2.5+ + + @type wi: class qmfConsole.WorkItem + @param wi: work item object to return. + """ + pass + + def _add_schema(self, schema): + """ + @todo + """ + if not isinstance(schema, SchemaClass): + raise TypeError("SchemaClass type expected") + + self._lock.acquire() + try: + sid = schema.get_class_id() + if not self._schema_cache.has_key(sid): + self._schema_cache[sid] = schema + finally: + self._lock.release() + + def _fetch_schema(self, schema_id, _agent=None, _timeout=None): + """ + Find the schema identified by schema_id. If not in the cache, ask the + agent for it. + """ + if not isinstance(schema_id, SchemaClassId): + raise TypeError("SchemaClassId type expected") + + self._lock.acquire() + try: + schema = self._schema_cache.get(schema_id) + if schema: + return schema + finally: + self._lock.release() + + if _agent is None: + return None + + # note: doQuery will add the new schema to the cache automatically. + slist = self.doQuery(_agent, + QmfQuery.create_id(QmfQuery.TARGET_SCHEMA, schema_id), + _timeout) + if slist: + return slist[0] + else: + return None + + + + # def get_packages(self): + # plist = [] + # for i in range(self.impl.packageCount()): + # plist.append(self.impl.getPackageName(i)) + # return plist + + + # def get_classes(self, package, kind=CLASS_OBJECT): + # clist = [] + # for i in range(self.impl.classCount(package)): + # key = self.impl.getClass(package, i) + # class_kind = self.impl.getClassKind(key) + # if class_kind == kind: + # if kind == CLASS_OBJECT: + # clist.append(SchemaObjectClass(None, None, {"impl":self.impl.getObjectClass(key)})) + # elif kind == CLASS_EVENT: + # clist.append(SchemaEventClass(None, None, {"impl":self.impl.getEventClass(key)})) + # return clist + + + # def bind_package(self, package): + # return self.impl.bindPackage(package) + + + # def bind_class(self, kwargs = {}): + # if "key" in kwargs: + # self.impl.bindClass(kwargs["key"]) + # elif "package" in kwargs: + # package = kwargs["package"] + # if "class" in kwargs: + # self.impl.bindClass(package, kwargs["class"]) + # else: + # self.impl.bindClass(package) + # else: + # raise Exception("Argument error: invalid arguments, use 'key' or 'package'[,'class']") + + + # def get_agents(self, broker=None): + # blist = [] + # if broker: + # blist.append(broker) + # else: + # self._cv.acquire() + # try: + # # copy while holding lock + # blist = self._broker_list[:] + # finally: + # self._cv.release() + + # agents = [] + # for b in blist: + # for idx in range(b.impl.agentCount()): + # agents.append(AgentProxy(b.impl.getAgent(idx), b)) + + # return agents + + + # def get_objects(self, query, kwargs = {}): + # timeout = 30 + # agent = None + # temp_args = kwargs.copy() + # if type(query) == type({}): + # temp_args.update(query) + + # if "_timeout" in temp_args: + # timeout = temp_args["_timeout"] + # temp_args.pop("_timeout") + + # if "_agent" in temp_args: + # agent = temp_args["_agent"] + # temp_args.pop("_agent") + + # if type(query) == type({}): + # query = Query(temp_args) + + # self._select = {} + # for k in temp_args.iterkeys(): + # if type(k) == str: + # self._select[k] = temp_args[k] + + # self._cv.acquire() + # try: + # self._sync_count = 1 + # self._sync_result = [] + # broker = self._broker_list[0] + # broker.send_query(query.impl, None, agent) + # self._cv.wait(timeout) + # if self._sync_count == 1: + # raise Exception("Timed out: waiting for query response") + # finally: + # self._cv.release() + + # return self._sync_result + + + # def get_object(self, query, kwargs = {}): + # ''' + # Return one and only one object or None. + # ''' + # objs = objects(query, kwargs) + # if len(objs) == 1: + # return objs[0] + # else: + # return None + + + # def first_object(self, query, kwargs = {}): + # ''' + # Return the first of potentially many objects. + # ''' + # objs = objects(query, kwargs) + # if objs: + # return objs[0] + # else: + # return None + + + # # Check the object against select to check for a match + # def _select_match(self, object): + # schema_props = object.properties() + # for key in self._select.iterkeys(): + # for prop in schema_props: + # if key == p[0].name() and self._select[key] != p[1]: + # return False + # return True + + + # def _get_result(self, list, context): + # ''' + # Called by Broker proxy to return the result of a query. + # ''' + # self._cv.acquire() + # try: + # for item in list: + # if self._select_match(item): + # self._sync_result.append(item) + # self._sync_count -= 1 + # self._cv.notify() + # finally: + # self._cv.release() + + + # def start_sync(self, query): pass + + + # def touch_sync(self, sync): pass + + + # def end_sync(self, sync): pass + + + + +# def start_console_events(self): +# self._cb_cond.acquire() +# try: +# self._cb_cond.notify() +# finally: +# self._cb_cond.release() + + +# def _do_console_events(self): +# ''' +# Called by the Console thread to poll for events. Passes the events +# onto the ConsoleHandler associated with this Console. Is called +# periodically, but can also be kicked by Console.start_console_events(). +# ''' +# count = 0 +# valid = self.impl.getEvent(self._event) +# while valid: +# count += 1 +# try: +# if self._event.kind == qmfengine.ConsoleEvent.AGENT_ADDED: +# logging.debug("Console Event AGENT_ADDED received") +# if self._handler: +# self._handler.agent_added(AgentProxy(self._event.agent, None)) +# elif self._event.kind == qmfengine.ConsoleEvent.AGENT_DELETED: +# logging.debug("Console Event AGENT_DELETED received") +# if self._handler: +# self._handler.agent_deleted(AgentProxy(self._event.agent, None)) +# elif self._event.kind == qmfengine.ConsoleEvent.NEW_PACKAGE: +# logging.debug("Console Event NEW_PACKAGE received") +# if self._handler: +# self._handler.new_package(self._event.name) +# elif self._event.kind == qmfengine.ConsoleEvent.NEW_CLASS: +# logging.debug("Console Event NEW_CLASS received") +# if self._handler: +# self._handler.new_class(SchemaClassKey(self._event.classKey)) +# elif self._event.kind == qmfengine.ConsoleEvent.OBJECT_UPDATE: +# logging.debug("Console Event OBJECT_UPDATE received") +# if self._handler: +# self._handler.object_update(ConsoleObject(None, {"impl":self._event.object}), +# self._event.hasProps, self._event.hasStats) +# elif self._event.kind == qmfengine.ConsoleEvent.EVENT_RECEIVED: +# logging.debug("Console Event EVENT_RECEIVED received") +# elif self._event.kind == qmfengine.ConsoleEvent.AGENT_HEARTBEAT: +# logging.debug("Console Event AGENT_HEARTBEAT received") +# if self._handler: +# self._handler.agent_heartbeat(AgentProxy(self._event.agent, None), self._event.timestamp) +# elif self._event.kind == qmfengine.ConsoleEvent.METHOD_RESPONSE: +# logging.debug("Console Event METHOD_RESPONSE received") +# else: +# logging.debug("Console thread received unknown event: '%s'" % str(self._event.kind)) +# except e: +# print "Exception caught in callback thread:", e +# self.impl.popEvent() +# valid = self.impl.getEvent(self._event) +# return count + + + + + +# class Broker(ConnectionHandler): +# # attr_reader :impl :conn, :console, :broker_bank +# def __init__(self, console, conn): +# self.broker_bank = 1 +# self.console = console +# self.conn = conn +# self._session = None +# self._cv = Condition() +# self._stable = None +# self._event = qmfengine.BrokerEvent() +# self._xmtMessage = qmfengine.Message() +# self.impl = qmfengine.BrokerProxy(self.console.impl) +# self.console.impl.addConnection(self.impl, self) +# self.conn.add_conn_handler(self) +# self._operational = True + + +# def shutdown(self): +# logging.debug("broker.shutdown() called.") +# self.console.impl.delConnection(self.impl) +# self.conn.del_conn_handler(self) +# if self._session: +# self.impl.sessionClosed() +# logging.debug("broker.shutdown() sessionClosed done.") +# self._session.destroy() +# logging.debug("broker.shutdown() session destroy done.") +# self._session = None +# self._operational = False +# logging.debug("broker.shutdown() done.") + + +# def wait_for_stable(self, timeout = None): +# self._cv.acquire() +# try: +# if self._stable: +# return +# if timeout: +# self._cv.wait(timeout) +# if not self._stable: +# raise Exception("Timed out: waiting for broker connection to become stable") +# else: +# while not self._stable: +# self._cv.wait() +# finally: +# self._cv.release() + + +# def send_query(self, query, ctx, agent): +# agent_impl = None +# if agent: +# agent_impl = agent.impl +# self.impl.sendQuery(query, ctx, agent_impl) +# self.conn.kick() + + +# def _do_broker_events(self): +# count = 0 +# valid = self.impl.getEvent(self._event) +# while valid: +# count += 1 +# if self._event.kind == qmfengine.BrokerEvent.BROKER_INFO: +# logging.debug("Broker Event BROKER_INFO received"); +# elif self._event.kind == qmfengine.BrokerEvent.DECLARE_QUEUE: +# logging.debug("Broker Event DECLARE_QUEUE received"); +# self.conn.impl.declareQueue(self._session.handle, self._event.name) +# elif self._event.kind == qmfengine.BrokerEvent.DELETE_QUEUE: +# logging.debug("Broker Event DELETE_QUEUE received"); +# self.conn.impl.deleteQueue(self._session.handle, self._event.name) +# elif self._event.kind == qmfengine.BrokerEvent.BIND: +# logging.debug("Broker Event BIND received"); +# self.conn.impl.bind(self._session.handle, self._event.exchange, self._event.name, self._event.bindingKey) +# elif self._event.kind == qmfengine.BrokerEvent.UNBIND: +# logging.debug("Broker Event UNBIND received"); +# self.conn.impl.unbind(self._session.handle, self._event.exchange, self._event.name, self._event.bindingKey) +# elif self._event.kind == qmfengine.BrokerEvent.SETUP_COMPLETE: +# logging.debug("Broker Event SETUP_COMPLETE received"); +# self.impl.startProtocol() +# elif self._event.kind == qmfengine.BrokerEvent.STABLE: +# logging.debug("Broker Event STABLE received"); +# self._cv.acquire() +# try: +# self._stable = True +# self._cv.notify() +# finally: +# self._cv.release() +# elif self._event.kind == qmfengine.BrokerEvent.QUERY_COMPLETE: +# result = [] +# for idx in range(self._event.queryResponse.getObjectCount()): +# result.append(ConsoleObject(None, {"impl":self._event.queryResponse.getObject(idx), "broker":self})) +# self.console._get_result(result, self._event.context) +# elif self._event.kind == qmfengine.BrokerEvent.METHOD_RESPONSE: +# obj = self._event.context +# obj._method_result(MethodResponse(self._event.methodResponse())) + +# self.impl.popEvent() +# valid = self.impl.getEvent(self._event) + +# return count + + +# def _do_broker_messages(self): +# count = 0 +# valid = self.impl.getXmtMessage(self._xmtMessage) +# while valid: +# count += 1 +# logging.debug("Broker: sending msg on connection") +# self.conn.impl.sendMessage(self._session.handle, self._xmtMessage) +# self.impl.popXmt() +# valid = self.impl.getXmtMessage(self._xmtMessage) + +# return count + + +# def _do_events(self): +# while True: +# self.console.start_console_events() +# bcnt = self._do_broker_events() +# mcnt = self._do_broker_messages() +# if bcnt == 0 and mcnt == 0: +# break; + + +# def conn_event_connected(self): +# logging.debug("Broker: Connection event CONNECTED") +# self._session = Session(self.conn, "qmfc-%s.%d" % (socket.gethostname(), os.getpid()), self) +# self.impl.sessionOpened(self._session.handle) +# self._do_events() + + +# def conn_event_disconnected(self, error): +# logging.debug("Broker: Connection event DISCONNECTED") +# pass + + +# def conn_event_visit(self): +# self._do_events() + + +# def sess_event_session_closed(self, context, error): +# logging.debug("Broker: Session event CLOSED") +# self.impl.sessionClosed() + + +# def sess_event_recv(self, context, message): +# logging.debug("Broker: Session event MSG_RECV") +# if not self._operational: +# logging.warning("Unexpected session event message received by Broker proxy: context='%s'" % str(context)) +# self.impl.handleRcvMessage(message) +# self._do_events() + + + +################################################################################ +################################################################################ +################################################################################ +################################################################################ +# TEMPORARY TEST CODE - TO BE DELETED +################################################################################ +################################################################################ +################################################################################ +################################################################################ + +if __name__ == '__main__': + # temp test code + from common import (qmfTypes, QmfEvent, SchemaProperty) + + logging.getLogger().setLevel(logging.INFO) + + logging.info( "************* Creating Async Console **************" ) + + class MyNotifier(Notifier): + def __init__(self, context): + self._myContext = context + self.WorkAvailable = False + + def indication(self): + print("Indication received! context=%d" % self._myContext) + self.WorkAvailable = True + + _noteMe = MyNotifier( 666 ) + + _myConsole = Console(notifier=_noteMe) + + _myConsole.enable_agent_discovery() + logging.info("Waiting...") + + + logging.info( "Destroying console:" ) + _myConsole.destroy( 10 ) + + logging.info( "******** Messing around with Schema ********" ) + + _sec = SchemaEventClass( _classId=SchemaClassId("myPackage", "myClass", + stype=SchemaClassId.TYPE_EVENT), + _desc="A typical event schema", + _props={"Argument-1": SchemaProperty(_type_code=qmfTypes.TYPE_UINT8, + kwargs = {"min":0, + "max":100, + "unit":"seconds", + "desc":"sleep value"}), + "Argument-2": SchemaProperty(_type_code=qmfTypes.TYPE_LSTR, + kwargs={"maxlen":100, + "desc":"a string argument"})}) + print("_sec=%s" % _sec.get_class_id()) + print("_sec.gePropertyCount()=%d" % _sec.get_property_count() ) + print("_sec.getProperty('Argument-1`)=%s" % _sec.get_property('Argument-1') ) + print("_sec.getProperty('Argument-2`)=%s" % _sec.get_property('Argument-2') ) + try: + print("_sec.getProperty('not-found')=%s" % _sec.get_property('not-found') ) + except: + pass + print("_sec.getProperties()='%s'" % _sec.get_properties()) + + print("Adding another argument") + _arg3 = SchemaProperty( _type_code=qmfTypes.TYPE_BOOL, + kwargs={"dir":"IO", + "desc":"a boolean argument"}) + _sec.add_property('Argument-3', _arg3) + print("_sec=%s" % _sec.get_class_id()) + print("_sec.getPropertyCount()=%d" % _sec.get_property_count() ) + print("_sec.getProperty('Argument-1')=%s" % _sec.get_property('Argument-1') ) + print("_sec.getProperty('Argument-2')=%s" % _sec.get_property('Argument-2') ) + print("_sec.getProperty('Argument-3')=%s" % _sec.get_property('Argument-3') ) + + print("_arg3.mapEncode()='%s'" % _arg3.map_encode() ) + + _secmap = _sec.map_encode() + print("_sec.mapEncode()='%s'" % _secmap ) + + _sec2 = SchemaEventClass( _map=_secmap ) + + print("_sec=%s" % _sec.get_class_id()) + print("_sec2=%s" % _sec2.get_class_id()) + + _soc = SchemaObjectClass( _map = {"_schema_id": {"_package_name": "myOtherPackage", + "_class_name": "myOtherClass", + "_type": "_data"}, + "_desc": "A test data object", + "_values": + {"prop1": {"amqp_type": qmfTypes.TYPE_UINT8, + "access": "RO", + "index": True, + "unit": "degrees"}, + "prop2": {"amqp_type": qmfTypes.TYPE_UINT8, + "access": "RW", + "index": True, + "desc": "The Second Property(tm)", + "unit": "radians"}, + "statistics": { "amqp_type": qmfTypes.TYPE_DELTATIME, + "unit": "seconds", + "desc": "time until I retire"}, + "meth1": {"_desc": "A test method", + "_arguments": + {"arg1": {"amqp_type": qmfTypes.TYPE_UINT32, + "desc": "an argument 1", + "dir": "I"}, + "arg2": {"amqp_type": qmfTypes.TYPE_BOOL, + "dir": "IO", + "desc": "some weird boolean"}}}, + "meth2": {"_desc": "A test method", + "_arguments": + {"m2arg1": {"amqp_type": qmfTypes.TYPE_UINT32, + "desc": "an 'nuther argument", + "dir": + "I"}}}}, + "_subtypes": + {"prop1":"qmfProperty", + "prop2":"qmfProperty", + "statistics":"qmfProperty", + "meth1":"qmfMethod", + "meth2":"qmfMethod"}, + "_primary_key_names": ["prop2", "prop1"]}) + + print("_soc='%s'" % _soc) + + print("_soc.getPrimaryKeyList='%s'" % _soc.get_id_names()) + + print("_soc.getPropertyCount='%d'" % _soc.get_property_count()) + print("_soc.getProperties='%s'" % _soc.get_properties()) + print("_soc.getProperty('prop2')='%s'" % _soc.get_property('prop2')) + + print("_soc.getMethodCount='%d'" % _soc.get_method_count()) + print("_soc.getMethods='%s'" % _soc.get_methods()) + print("_soc.getMethod('meth2')='%s'" % _soc.get_method('meth2')) + + _socmap = _soc.map_encode() + print("_socmap='%s'" % _socmap) + _soc2 = SchemaObjectClass( _map=_socmap ) + print("_soc='%s'" % _soc) + print("_soc2='%s'" % _soc2) + + if _soc2.get_class_id() == _soc.get_class_id(): + print("soc and soc2 are the same schema") + + + logging.info( "******** Messing around with ObjectIds ********" ) + + + qd = QmfData( _values={"prop1":1, "prop2":True, "prop3": {"a":"map"}, "prop4": "astring"} ) + print("qd='%s':" % qd) + + print("prop1=%d prop2=%s prop3=%s prop4=%s" % (qd.prop1, qd.prop2, qd.prop3, qd.prop4)) + + print("qd map='%s'" % qd.map_encode()) + print("qd getProperty('prop4')='%s'" % qd.get_value("prop4")) + qd.set_value("prop4", 4, "A test property called 4") + print("qd setProperty('prop4', 4)='%s'" % qd.get_value("prop4")) + qd.prop4 = 9 + print("qd.prop4 = 9 ='%s'" % qd.prop4) + qd["prop4"] = 11 + print("qd[prop4] = 11 ='%s'" % qd["prop4"]) + + print("qd.mapEncode()='%s'" % qd.map_encode()) + _qd2 = QmfData( _map = qd.map_encode() ) + print("_qd2.mapEncode()='%s'" % _qd2.map_encode()) + + _qmfDesc1 = QmfConsoleData( {"_values" : {"prop1": 1, "statistics": 666, + "prop2": 0}}, + agent="some agent name?", + _schema = _soc) + + print("_qmfDesc1 map='%s'" % _qmfDesc1.map_encode()) + + _qmfDesc1._set_schema( _soc ) + + print("_qmfDesc1 prop2 = '%s'" % _qmfDesc1.get_value("prop2")) + print("_qmfDesc1 primarykey = '%s'" % _qmfDesc1.get_object_id()) + print("_qmfDesc1 classid = '%s'" % _qmfDesc1.get_schema_class_id()) + + + _qmfDescMap = _qmfDesc1.map_encode() + print("_qmfDescMap='%s'" % _qmfDescMap) + + _qmfDesc2 = QmfData( _map=_qmfDescMap, _schema=_soc ) + + print("_qmfDesc2 map='%s'" % _qmfDesc2.map_encode()) + print("_qmfDesc2 prop2 = '%s'" % _qmfDesc2.get_value("prop2")) + print("_qmfDesc2 primary key = '%s'" % _qmfDesc2.get_object_id()) + + + logging.info( "******** Messing around with QmfEvents ********" ) + + + _qmfevent1 = QmfEvent( _timestamp = 1111, + _schema = _sec, + _values = {"Argument-1": 77, + "Argument-3": True, + "Argument-2": "a string"}) + print("_qmfevent1.mapEncode()='%s'" % _qmfevent1.map_encode()) + print("_qmfevent1.getTimestamp()='%s'" % _qmfevent1.get_timestamp()) + + _qmfevent1Map = _qmfevent1.map_encode() + + _qmfevent2 = QmfEvent(_map=_qmfevent1Map, _schema=_sec) + print("_qmfevent2.mapEncode()='%s'" % _qmfevent2.map_encode()) + + + logging.info( "******** Messing around with Queries ********" ) + + _q1 = QmfQuery.create_predicate(QmfQuery.TARGET_AGENT, + QmfQueryPredicate({QmfQuery.LOGIC_AND: + [{QmfQuery.CMP_EQ: ["vendor", "AVendor"]}, + {QmfQuery.CMP_EQ: ["product", "SomeProduct"]}, + {QmfQuery.CMP_EQ: ["name", "Thingy"]}, + {QmfQuery.LOGIC_OR: + [{QmfQuery.CMP_LE: ["temperature", -10]}, + {QmfQuery.CMP_FALSE: None}, + {QmfQuery.CMP_EXISTS: ["namey"]}]}]})) + + print("_q1.mapEncode() = [%s]" % _q1.map_encode()) diff --git a/qpid/python/qmf2/tests/__init__.py b/qpid/python/qmf2/tests/__init__.py new file mode 100644 index 0000000000..2e742b79be --- /dev/null +++ b/qpid/python/qmf2/tests/__init__.py @@ -0,0 +1,22 @@ +# Do not delete - marks this directory as a python package. + +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import agent_discovery, basic_query, basic_method, obj_gets, events diff --git a/qpid/python/qmf2/tests/agent_discovery.py b/qpid/python/qmf2/tests/agent_discovery.py new file mode 100644 index 0000000000..19ed79cbc2 --- /dev/null +++ b/qpid/python/qmf2/tests/agent_discovery.py @@ -0,0 +1,320 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +import unittest +import logging +from threading import Thread, Event + +import qpid.messaging +import qmf2.common +import qmf2.console +import qmf2.agent + + +class _testNotifier(qmf.qmfCommon.Notifier): + def __init__(self): + self._event = Event() + + def indication(self): + # note: called by qmf daemon thread + self._event.set() + + def wait_for_work(self, timeout): + # note: called by application thread to wait + # for qmf to generate work + self._event.wait(timeout) + timed_out = self._event.isSet() == False + if not timed_out: + self._event.clear() + return True + return False + + +class _agentApp(Thread): + def __init__(self, name, heartbeat): + Thread.__init__(self) + self.notifier = _testNotifier() + self.agent = qmf.qmfAgent.Agent(name, + _notifier=self.notifier, + _heartbeat_interval=heartbeat) + # No database needed for this test + self.running = True + self.start() + + def connect_agent(self, broker_url): + # broker_url = "user/passwd@hostname:port" + self.conn = qpid.messaging.Connection(broker_url.host, + broker_url.port, + broker_url.user, + broker_url.password) + self.conn.connect() + self.agent.set_connection(self.conn) + + def disconnect_agent(self, timeout): + if self.conn: + self.agent.remove_connection(timeout) + + def shutdown_agent(self, timeout): + self.agent.destroy(timeout) + + def stop(self): + self.running = False + self.notifier.indication() # hmmm... collide with daemon??? + self.join(10) + if self.isAlive(): + logging.error("AGENT DID NOT TERMINATE AS EXPECTED!!!") + + def run(self): + while self.running: + self.notifier.wait_for_work(None) + wi = self.agent.get_next_workitem(timeout=0) + while wi is not None: + logging.error("UNEXPECTED AGENT WORKITEM RECEIVED=%s" % wi.get_type()) + self.agent.release_workitem(wi) + wi = self.agent.get_next_workitem(timeout=0) + + + +class BaseTest(unittest.TestCase): + def configure(self, config): + self.config = config + self.broker = config.broker + self.defines = self.config.defines + + def setUp(self): + # one second agent indication interval + self.agent1 = _agentApp("agent1", 1) + self.agent1.connect_agent(self.broker) + self.agent2 = _agentApp("agent2", 1) + self.agent2.connect_agent(self.broker) + + def tearDown(self): + if self.agent1: + self.agent1.shutdown_agent(10) + self.agent1.stop() + self.agent1 = None + if self.agent2: + self.agent2.shutdown_agent(10) + self.agent2.stop() + self.agent2 = None + + def test_discover_all(self): + # create console + # enable agent discovery + # wait + # expect agent add for agent1 and agent2 + self.notifier = _testNotifier() + self.console = qmf.qmfConsole.Console(notifier=self.notifier, + agent_timeout=3) + self.conn = qpid.messaging.Connection(self.broker.host, + self.broker.port, + self.broker.user, + self.broker.password) + self.conn.connect() + self.console.addConnection(self.conn) + self.console.enable_agent_discovery() + + agent1_found = agent2_found = False + wi = self.console.get_next_workitem(timeout=3) + while wi and not (agent1_found and agent2_found): + if wi.get_type() == wi.AGENT_ADDED: + agent = wi.get_params().get("agent") + if not agent or not isinstance(agent, qmf.qmfConsole.Agent): + self.fail("Unexpected workitem from agent") + else: + if agent.get_name() == "agent1": + agent1_found = True + elif agent.get_name() == "agent2": + agent2_found = True + else: + self.fail("Unexpected agent name received: %s" % + agent.get_name()) + if agent1_found and agent2_found: + break; + + wi = self.console.get_next_workitem(timeout=3) + + self.assertTrue(agent1_found and agent2_found, "All agents not discovered") + + self.console.destroy(10) + + + def test_discover_one(self): + # create console + # enable agent discovery, filter for agent1 only + # wait until timeout + # expect agent add for agent1 only + self.notifier = _testNotifier() + self.console = qmf.qmfConsole.Console(notifier=self.notifier, + agent_timeout=3) + self.conn = qpid.messaging.Connection(self.broker.host, + self.broker.port, + self.broker.user, + self.broker.password) + self.conn.connect() + self.console.addConnection(self.conn) + + query = qmf.qmfCommon.QmfQuery.create_predicate( + qmf.qmfCommon.QmfQuery.TARGET_AGENT, + qmf.qmfCommon.QmfQueryPredicate({qmf.qmfCommon.QmfQuery.CMP_EQ: + [qmf.qmfCommon.QmfQuery.KEY_AGENT_NAME, "agent1"]})) + self.console.enable_agent_discovery(query) + + agent1_found = agent2_found = False + wi = self.console.get_next_workitem(timeout=3) + while wi: + if wi.get_type() == wi.AGENT_ADDED: + agent = wi.get_params().get("agent") + if not agent or not isinstance(agent, qmf.qmfConsole.Agent): + self.fail("Unexpected workitem from agent") + else: + if agent.get_name() == "agent1": + agent1_found = True + elif agent.get_name() == "agent2": + agent2_found = True + else: + self.fail("Unexpected agent name received: %s" % + agent.get_name()) + + wi = self.console.get_next_workitem(timeout=2) + + self.assertTrue(agent1_found and not agent2_found, "Unexpected agent discovered") + + self.console.destroy(10) + + + def test_heartbeat(self): + # create console with 2 sec agent timeout + # enable agent discovery, find all agents + # stop agent1, expect timeout notification + # stop agent2, expect timeout notification + self.notifier = _testNotifier() + self.console = qmf.qmfConsole.Console(notifier=self.notifier, + agent_timeout=2) + self.conn = qpid.messaging.Connection(self.broker.host, + self.broker.port, + self.broker.user, + self.broker.password) + self.conn.connect() + self.console.addConnection(self.conn) + self.console.enable_agent_discovery() + + agent1_found = agent2_found = False + wi = self.console.get_next_workitem(timeout=4) + while wi and not (agent1_found and agent2_found): + if wi.get_type() == wi.AGENT_ADDED: + agent = wi.get_params().get("agent") + if not agent or not isinstance(agent, qmf.qmfConsole.Agent): + self.fail("Unexpected workitem from agent") + else: + if agent.get_name() == "agent1": + agent1_found = True + elif agent.get_name() == "agent2": + agent2_found = True + else: + self.fail("Unexpected agent name received: %s" % + agent.get_name()) + if agent1_found and agent2_found: + break; + + wi = self.console.get_next_workitem(timeout=4) + + self.assertTrue(agent1_found and agent2_found, "All agents not discovered") + + # now kill agent1 and wait for expiration + + agent1 = self.agent1 + self.agent1 = None + agent1.shutdown_agent(10) + agent1.stop() + + wi = self.console.get_next_workitem(timeout=4) + while wi is not None: + if wi.get_type() == wi.AGENT_DELETED: + agent = wi.get_params().get("agent") + if not agent or not isinstance(agent, qmf.qmfConsole.Agent): + self.fail("Unexpected workitem from agent") + else: + if agent.get_name() == "agent1": + agent1_found = False + else: + self.fail("Unexpected agent_deleted received: %s" % + agent.get_name()) + if not agent1_found: + break; + + wi = self.console.get_next_workitem(timeout=4) + + self.assertFalse(agent1_found, "agent1 did not delete!") + + # now kill agent2 and wait for expiration + + agent2 = self.agent2 + self.agent2 = None + agent2.shutdown_agent(10) + agent2.stop() + + wi = self.console.get_next_workitem(timeout=4) + while wi is not None: + if wi.get_type() == wi.AGENT_DELETED: + agent = wi.get_params().get("agent") + if not agent or not isinstance(agent, qmf.qmfConsole.Agent): + self.fail("Unexpected workitem from agent") + else: + if agent.get_name() == "agent2": + agent2_found = False + else: + self.fail("Unexpected agent_deleted received: %s" % + agent.get_name()) + if not agent2_found: + break; + + wi = self.console.get_next_workitem(timeout=4) + + self.assertFalse(agent2_found, "agent2 did not delete!") + + self.console.destroy(10) + + + def test_find_agent(self): + # create console + # do not enable agent discovery + # find agent1, expect success + # find agent-none, expect failure + # find agent2, expect success + self.notifier = _testNotifier() + self.console = qmf.qmfConsole.Console(notifier=self.notifier) + self.conn = qpid.messaging.Connection(self.broker.host, + self.broker.port, + self.broker.user, + self.broker.password) + self.conn.connect() + self.console.addConnection(self.conn) + + agent1 = self.console.find_agent("agent1", timeout=3) + self.assertTrue(agent1 and agent1.get_name() == "agent1") + + no_agent = self.console.find_agent("agent-none", timeout=3) + self.assertTrue(no_agent == None) + + agent2 = self.console.find_agent("agent2", timeout=3) + self.assertTrue(agent2 and agent2.get_name() == "agent2") + + self.console.removeConnection(self.conn, 10) + self.console.destroy(10) + + diff --git a/qpid/python/qmf2/tests/agent_test.py b/qpid/python/qmf2/tests/agent_test.py new file mode 100644 index 0000000000..14d8ada197 --- /dev/null +++ b/qpid/python/qmf2/tests/agent_test.py @@ -0,0 +1,167 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +import logging +import time +import unittest +from threading import Semaphore + + +from qpid.messaging import * +from qmf2.common import (qmfTypes, SchemaProperty, SchemaObjectClass, QmfData, + QmfEvent, SchemaMethod, Notifier, SchemaClassId, + WorkItem) +from qmf2.agent import (Agent, QmfAgentData) + + + +class ExampleNotifier(Notifier): + def __init__(self): + self._sema4 = Semaphore(0) # locked + + def indication(self): + self._sema4.release() + + def waitForWork(self): + print("Waiting for event...") + self._sema4.acquire() + print("...event present") + + + + +class QmfTest(unittest.TestCase): + def test_begin(self): + print("!!! being test") + + def test_end(self): + print("!!! end test") + + +# +# An example agent application +# + + +if __name__ == '__main__': + _notifier = ExampleNotifier() + _agent = Agent( "qmf.testAgent", _notifier=_notifier ) + + # Dynamically construct a class schema + + _schema = SchemaObjectClass( _classId=SchemaClassId("MyPackage", "MyClass"), + _desc="A test data schema", + _object_id_names=["index1", "index2"] ) + # add properties + _schema.add_property( "index1", SchemaProperty(qmfTypes.TYPE_UINT8)) + _schema.add_property( "index2", SchemaProperty(qmfTypes.TYPE_LSTR)) + + # these two properties are statistics + _schema.add_property( "query_count", SchemaProperty(qmfTypes.TYPE_UINT32)) + _schema.add_property( "method_call_count", SchemaProperty(qmfTypes.TYPE_UINT32)) + + # These two properties can be set via the method call + _schema.add_property( "set_string", SchemaProperty(qmfTypes.TYPE_LSTR)) + _schema.add_property( "set_int", SchemaProperty(qmfTypes.TYPE_UINT32)) + + + # add method + _meth = SchemaMethod( _desc="Method to set string and int in object." ) + _meth.add_argument( "arg_int", SchemaProperty(qmfTypes.TYPE_UINT32) ) + _meth.add_argument( "arg_str", SchemaProperty(qmfTypes.TYPE_LSTR) ) + _schema.add_method( "set_meth", _meth ) + + # Add schema to Agent + + _agent.register_object_class(_schema) + + # instantiate managed data objects matching the schema + + _obj1 = QmfAgentData( _agent, _schema=_schema ) + _obj1.set_value("index1", 100) + _obj1.set_value("index2", "a name" ) + _obj1.set_value("set_string", "UNSET") + _obj1.set_value("set_int", 0) + _obj1.set_value("query_count", 0) + _obj1.set_value("method_call_count", 0) + _agent.add_object( _obj1 ) + + _agent.add_object( QmfAgentData( _agent, _schema=_schema, + _values={"index1":99, + "index2": "another name", + "set_string": "UNSET", + "set_int": 0, + "query_count": 0, + "method_call_count": 0} )) + + # add an "unstructured" object to the Agent + _obj2 = QmfAgentData(_agent, _object_id="01545") + _obj2.set_value("field1", "a value") + _obj2.set_value("field2", 2) + _obj2.set_value("field3", {"a":1, "map":2, "value":3}) + _obj2.set_value("field4", ["a", "list", "value"]) + _agent.add_object(_obj2) + + + ## Now connect to the broker + + _c = Connection("localhost") + _c.connect() + _agent.setConnection(_c) + + _error_data = QmfData.create({"code": -1, "description": "You made a boo-boo."}) + + _done = False + while not _done: + # try: + _notifier.waitForWork() + + _wi = _agent.get_next_workitem(timeout=0) + while _wi: + + if _wi.get_type() == WorkItem.METHOD_CALL: + mc = _wi.get_params() + + if mc.get_name() == "set_meth": + print("!!! Calling 'set_meth' on Object_id = %s" % mc.get_object_id()) + print("!!! args='%s'" % str(mc.get_args())) + print("!!! userid=%s" % str(mc.get_user_id())) + print("!!! handle=%s" % _wi.get_handle()) + _agent.method_response(_wi.get_handle(), + {"rc1": 100, "rc2": "Success"}) + else: + print("!!! Unknown Method name = %s" % mc.get_name()) + _agent.method_response(_wi.get_handle(), _error=_error_data) + else: + print("TBD: work item %d:%s" % (_wi.get_type(), str(_wi.get_params()))) + + _agent.release_workitem(_wi) + _wi = _agent.get_next_workitem(timeout=0) + # except: + # print( "shutting down...") + # _done = True + + print( "Removing connection... TBD!!!" ) + #_myConsole.remove_connection( _c, 10 ) + + print( "Destroying agent... TBD!!!" ) + #_myConsole.destroy( 10 ) + + print( "******** agent test done ********" ) + + + diff --git a/qpid/python/qmf2/tests/basic_method.py b/qpid/python/qmf2/tests/basic_method.py new file mode 100644 index 0000000000..c5098b5d72 --- /dev/null +++ b/qpid/python/qmf2/tests/basic_method.py @@ -0,0 +1,348 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +import unittest +import logging +from threading import Thread, Event + +import qpid.messaging +from qmf2.common import (Notifier, SchemaObjectClass, SchemaClassId, + SchemaProperty, qmfTypes, SchemaMethod, QmfQuery, + QmfData, QmfQueryPredicate, WorkItem) +import qmf2.console +from qmf2.agent import(QmfAgentData, Agent, MethodCallParams) + + +class _testNotifier(Notifier): + def __init__(self): + self._event = Event() + + def indication(self): + # note: called by qmf daemon thread + self._event.set() + + def wait_for_work(self, timeout): + # note: called by application thread to wait + # for qmf to generate work + self._event.wait(timeout) + timed_out = self._event.isSet() == False + if not timed_out: + self._event.clear() + return True + return False + + +class _agentApp(Thread): + def __init__(self, name, heartbeat): + Thread.__init__(self) + self.notifier = _testNotifier() + self.agent = Agent(name, + _notifier=self.notifier, + _heartbeat_interval=heartbeat) + + # Dynamically construct a management database + + _schema = SchemaObjectClass( _classId=SchemaClassId("MyPackage", "MyClass"), + _desc="A test data schema", + _object_id_names=["index1", "index2"] ) + # add properties + _schema.add_property( "index1", SchemaProperty(qmfTypes.TYPE_UINT8)) + _schema.add_property( "index2", SchemaProperty(qmfTypes.TYPE_LSTR)) + + # these two properties are statistics + _schema.add_property( "query_count", SchemaProperty(qmfTypes.TYPE_UINT32)) + _schema.add_property( "method_call_count", SchemaProperty(qmfTypes.TYPE_UINT32)) + + # These two properties can be set via the method call + _schema.add_property( "set_string", SchemaProperty(qmfTypes.TYPE_LSTR)) + _schema.add_property( "set_int", SchemaProperty(qmfTypes.TYPE_UINT32)) + + # add method + _meth = SchemaMethod( _desc="Method to set string and int in object." ) + _meth.add_argument( "arg_int", SchemaProperty(qmfTypes.TYPE_UINT32) ) + _meth.add_argument( "arg_str", SchemaProperty(qmfTypes.TYPE_LSTR) ) + _schema.add_method( "set_meth", _meth ) + + # Add schema to Agent + + self.agent.register_object_class(_schema) + + # instantiate managed data objects matching the schema + + _obj1 = QmfAgentData( self.agent, _schema=_schema ) + _obj1.set_value("index1", 100) + _obj1.set_value("index2", "a name" ) + _obj1.set_value("set_string", "UNSET") + _obj1.set_value("set_int", 0) + _obj1.set_value("query_count", 0) + _obj1.set_value("method_call_count", 0) + self.agent.add_object( _obj1 ) + + self.agent.add_object( QmfAgentData( self.agent, _schema=_schema, + _values={"index1":99, + "index2": "another name", + "set_string": "UNSET", + "set_int": 0, + "query_count": 0, + "method_call_count": 0} )) + + # add an "unstructured" object to the Agent + _obj2 = QmfAgentData(self.agent, _object_id="01545") + _obj2.set_value("field1", "a value") + _obj2.set_value("field2", 2) + _obj2.set_value("field3", {"a":1, "map":2, "value":3}) + _obj2.set_value("field4", ["a", "list", "value"]) + self.agent.add_object(_obj2) + + self.running = True + self.start() + + def connect_agent(self, broker_url): + # broker_url = "user/passwd@hostname:port" + self.conn = qpid.messaging.Connection(broker_url.host, + broker_url.port, + broker_url.user, + broker_url.password) + self.conn.connect() + self.agent.set_connection(self.conn) + + def disconnect_agent(self, timeout): + if self.conn: + self.agent.remove_connection(timeout) + + def shutdown_agent(self, timeout): + self.agent.destroy(timeout) + + def stop(self): + self.running = False + self.notifier.indication() # hmmm... collide with daemon??? + self.join(10) + if self.isAlive(): + raise Exception("AGENT DID NOT TERMINATE AS EXPECTED!!!") + + def run(self): + # Agent application main processing loop + while self.running: + self.notifier.wait_for_work(None) + wi = self.agent.get_next_workitem(timeout=0) + while wi is not None: + if wi.get_type() == WorkItem.METHOD_CALL: + mc = wi.get_params() + if not isinstance(mc, MethodCallParams): + raise Exception("Unexpected method call parameters") + + if mc.get_name() == "set_meth": + obj = self.agent.get_object(mc.get_object_id()) + if obj is None: + error_info = QmfData.create({"code": -2, + "description": + "Bad Object Id."}) + self.agent.method_response(wi.get_handle(), + _error=error_info) + else: + obj.inc_value("method_call_count") + if "arg_int" in mc.get_args(): + obj.set_value("set_int", mc.get_args()["arg_int"]) + if "arg_str" in mc.get_args(): + obj.set_value("set_string", mc.get_args()["arg_str"]) + self.agent.method_response(wi.get_handle(), + {"code" : 0}) + elif mc.get_name() == "a_method": + obj = self.agent.get_object(mc.get_object_id()) + if obj is None: + error_info = QmfData.create({"code": -3, + "description": + "Unknown object id."}) + self.agent.method_response(wi.get_handle(), + _error=error_info) + elif obj.get_object_id() != "01545": + error_info = QmfData.create({"code": -4, + "description": + "Unexpected id."}) + self.agent.method_response(wi.get_handle(), + _error=error_info) + else: + args = mc.get_args() + if ("arg1" in args and args["arg1"] == 1 and + "arg2" in args and args["arg2"] == "Now set!" + and "arg3" in args and args["arg3"] == 1966): + self.agent.method_response(wi.get_handle(), + {"code" : 0}) + else: + error_info = QmfData.create({"code": -5, + "description": + "Bad Args."}) + self.agent.method_response(wi.get_handle(), + _error=error_info) + else: + error_info = QmfData.create({"code": -1, + "description": + "Unknown method call."}) + self.agent.method_response(wi.get_handle(), _error=error_info) + + self.agent.release_workitem(wi) + wi = self.agent.get_next_workitem(timeout=0) + + + +class BaseTest(unittest.TestCase): + def configure(self, config): + self.config = config + self.broker = config.broker + self.defines = self.config.defines + + def setUp(self): + # one second agent indication interval + self.agent1 = _agentApp("agent1", 1) + self.agent1.connect_agent(self.broker) + self.agent2 = _agentApp("agent2", 1) + self.agent2.connect_agent(self.broker) + + def tearDown(self): + if self.agent1: + self.agent1.shutdown_agent(10) + self.agent1.stop() + self.agent1 = None + if self.agent2: + self.agent2.shutdown_agent(10) + self.agent2.stop() + self.agent2 = None + + def test_described_obj(self): + # create console + # find agents + # synchronous query for all objects in schema + # method call on each object + self.notifier = _testNotifier() + self.console = qmf.qmfConsole.Console(notifier=self.notifier, + agent_timeout=3) + self.conn = qpid.messaging.Connection(self.broker.host, + self.broker.port, + self.broker.user, + self.broker.password) + self.conn.connect() + self.console.addConnection(self.conn) + + for aname in ["agent1", "agent2"]: + agent = self.console.find_agent(aname, timeout=3) + self.assertTrue(agent and agent.get_name() == aname) + + query = QmfQuery.create_predicate(QmfQuery.TARGET_OBJECT, + QmfQueryPredicate( + {QmfQuery.LOGIC_AND: + [{QmfQuery.CMP_EXISTS: [SchemaClassId.KEY_PACKAGE]}, + {QmfQuery.CMP_EQ: [SchemaClassId.KEY_PACKAGE, + "MyPackage"]}]})) + + obj_list = self.console.doQuery(agent, query) + self.assertTrue(len(obj_list) == 2) + for obj in obj_list: + mr = obj.invoke_method( "set_meth", {"arg_int": -99, + "arg_str": "Now set!"}, + _timeout=3) + self.assertTrue(isinstance(mr, qmf.qmfConsole.MethodResult)) + self.assertTrue(mr.succeeded()) + self.assertTrue(mr.get_argument("code") == 0) + + self.assertTrue(obj.get_value("method_call_count") == 0) + self.assertTrue(obj.get_value("set_string") == "UNSET") + self.assertTrue(obj.get_value("set_int") == 0) + + obj.refresh() + + self.assertTrue(obj.get_value("method_call_count") == 1) + self.assertTrue(obj.get_value("set_string") == "Now set!") + self.assertTrue(obj.get_value("set_int") == -99) + + self.console.destroy(10) + + + def test_bad_method(self): + # create console + # find agents + # synchronous query for all objects in schema + # invalid method call on each object + # - should throw a ValueError + self.notifier = _testNotifier() + self.console = qmf.qmfConsole.Console(notifier=self.notifier, + agent_timeout=3) + self.conn = qpid.messaging.Connection(self.broker.host, + self.broker.port, + self.broker.user, + self.broker.password) + self.conn.connect() + self.console.addConnection(self.conn) + + for aname in ["agent1", "agent2"]: + agent = self.console.find_agent(aname, timeout=3) + self.assertTrue(agent and agent.get_name() == aname) + + query = QmfQuery.create_predicate(QmfQuery.TARGET_OBJECT, + QmfQueryPredicate( + {QmfQuery.LOGIC_AND: + [{QmfQuery.CMP_EXISTS: [SchemaClassId.KEY_PACKAGE]}, + {QmfQuery.CMP_EQ: [SchemaClassId.KEY_PACKAGE, + "MyPackage"]}]})) + + obj_list = self.console.doQuery(agent, query) + self.assertTrue(len(obj_list) == 2) + for obj in obj_list: + self.failUnlessRaises(ValueError, + obj.invoke_method, + "unknown_meth", + {"arg1": -99, "arg2": "Now set!"}, + _timeout=3) + self.console.destroy(10) + + + def test_managed_obj(self): + # create console + # find agents + # synchronous query for a managed object + # method call on each object + self.notifier = _testNotifier() + self.console = qmf.qmfConsole.Console(notifier=self.notifier, + agent_timeout=3) + self.conn = qpid.messaging.Connection(self.broker.host, + self.broker.port, + self.broker.user, + self.broker.password) + self.conn.connect() + self.console.addConnection(self.conn) + + for aname in ["agent1", "agent2"]: + agent = self.console.find_agent(aname, timeout=3) + self.assertTrue(agent and agent.get_name() == aname) + + query = QmfQuery.create_id(QmfQuery.TARGET_OBJECT, "01545") + obj_list = self.console.doQuery(agent, query) + + self.assertTrue(isinstance(obj_list, type([]))) + self.assertTrue(len(obj_list) == 1) + obj = obj_list[0] + + mr = obj.invoke_method("a_method", + {"arg1": 1, + "arg2": "Now set!", + "arg3": 1966}, + _timeout=3) + self.assertTrue(isinstance(mr, qmf.qmfConsole.MethodResult)) + self.assertTrue(mr.succeeded()) + self.assertTrue(mr.get_argument("code") == 0) + # @todo refresh and verify changes + + self.console.destroy(10) diff --git a/qpid/python/qmf2/tests/basic_query.py b/qpid/python/qmf2/tests/basic_query.py new file mode 100644 index 0000000000..46dc87f4a1 --- /dev/null +++ b/qpid/python/qmf2/tests/basic_query.py @@ -0,0 +1,336 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +import unittest +import logging +from threading import Thread, Event + +import qpid.messaging +from qmf2.common import (Notifier, SchemaObjectClass, SchemaClassId, + SchemaProperty, qmfTypes, SchemaMethod, QmfQuery, + QmfData, QmfQueryPredicate) +import qmf2.console +from qmf2.agent import(QmfAgentData, Agent) + + +class _testNotifier(Notifier): + def __init__(self): + self._event = Event() + + def indication(self): + # note: called by qmf daemon thread + self._event.set() + + def wait_for_work(self, timeout): + # note: called by application thread to wait + # for qmf to generate work + self._event.wait(timeout) + timed_out = self._event.isSet() == False + if not timed_out: + self._event.clear() + return True + return False + + +class _agentApp(Thread): + def __init__(self, name, heartbeat): + Thread.__init__(self) + self.notifier = _testNotifier() + self.agent = Agent(name, + _notifier=self.notifier, + _heartbeat_interval=heartbeat) + + # Dynamically construct a management database + + _schema = SchemaObjectClass( _classId=SchemaClassId("MyPackage", "MyClass"), + _desc="A test data schema", + _object_id_names=["index1", "index2"] ) + # add properties + _schema.add_property( "index1", SchemaProperty(qmfTypes.TYPE_UINT8)) + _schema.add_property( "index2", SchemaProperty(qmfTypes.TYPE_LSTR)) + + # these two properties are statistics + _schema.add_property( "query_count", SchemaProperty(qmfTypes.TYPE_UINT32)) + _schema.add_property( "method_call_count", SchemaProperty(qmfTypes.TYPE_UINT32)) + + # These two properties can be set via the method call + _schema.add_property( "set_string", SchemaProperty(qmfTypes.TYPE_LSTR)) + _schema.add_property( "set_int", SchemaProperty(qmfTypes.TYPE_UINT32)) + + # add method + _meth = SchemaMethod( _desc="Method to set string and int in object." ) + _meth.add_argument( "arg_int", SchemaProperty(qmfTypes.TYPE_UINT32) ) + _meth.add_argument( "arg_str", SchemaProperty(qmfTypes.TYPE_LSTR) ) + _schema.add_method( "set_meth", _meth ) + + # Add schema to Agent + + self.agent.register_object_class(_schema) + + # instantiate managed data objects matching the schema + + _obj1 = QmfAgentData( self.agent, _schema=_schema ) + _obj1.set_value("index1", 100) + _obj1.set_value("index2", "a name" ) + _obj1.set_value("set_string", "UNSET") + _obj1.set_value("set_int", 0) + _obj1.set_value("query_count", 0) + _obj1.set_value("method_call_count", 0) + self.agent.add_object( _obj1 ) + + self.agent.add_object( QmfAgentData( self.agent, _schema=_schema, + _values={"index1":99, + "index2": "another name", + "set_string": "UNSET", + "set_int": 0, + "query_count": 0, + "method_call_count": 0} )) + + # add an "unstructured" object to the Agent + _obj2 = QmfAgentData(self.agent, _object_id="01545") + _obj2.set_value("field1", "a value") + _obj2.set_value("field2", 2) + _obj2.set_value("field3", {"a":1, "map":2, "value":3}) + _obj2.set_value("field4", ["a", "list", "value"]) + self.agent.add_object(_obj2) + + self.running = True + self.start() + + def connect_agent(self, broker_url): + # broker_url = "user/passwd@hostname:port" + self.conn = qpid.messaging.Connection(broker_url.host, + broker_url.port, + broker_url.user, + broker_url.password) + self.conn.connect() + self.agent.set_connection(self.conn) + + def disconnect_agent(self, timeout): + if self.conn: + self.agent.remove_connection(timeout) + + def shutdown_agent(self, timeout): + self.agent.destroy(timeout) + + def stop(self): + self.running = False + self.notifier.indication() # hmmm... collide with daemon??? + self.join(10) + if self.isAlive(): + raise Exception("AGENT DID NOT TERMINATE AS EXPECTED!!!") + + def run(self): + while self.running: + self.notifier.wait_for_work(None) + wi = self.agent.get_next_workitem(timeout=0) + while wi is not None: + logging.error("UNEXPECTED AGENT WORKITEM RECEIVED=%s" % wi.get_type()) + self.agent.release_workitem(wi) + wi = self.agent.get_next_workitem(timeout=0) + + + +class BaseTest(unittest.TestCase): + def configure(self, config): + self.config = config + self.broker = config.broker + self.defines = self.config.defines + + def setUp(self): + # one second agent indication interval + self.agent1 = _agentApp("agent1", 1) + self.agent1.connect_agent(self.broker) + self.agent2 = _agentApp("agent2", 1) + self.agent2.connect_agent(self.broker) + + def tearDown(self): + if self.agent1: + self.agent1.shutdown_agent(10) + self.agent1.stop() + self.agent1 = None + if self.agent2: + self.agent2.shutdown_agent(10) + self.agent2.stop() + self.agent2 = None + + def test_all_oids(self): + # create console + # find agents + # synchronous query for all objects by id + # verify known object ids are returned + self.notifier = _testNotifier() + self.console = qmf.qmfConsole.Console(notifier=self.notifier, + agent_timeout=3) + self.conn = qpid.messaging.Connection(self.broker.host, + self.broker.port, + self.broker.user, + self.broker.password) + self.conn.connect() + self.console.addConnection(self.conn) + + for aname in ["agent1", "agent2"]: + agent = self.console.find_agent(aname, timeout=3) + self.assertTrue(agent and agent.get_name() == aname) + + query = QmfQuery.create_wildcard(QmfQuery.TARGET_OBJECT_ID) + oid_list = self.console.doQuery(agent, query) + + self.assertTrue(isinstance(oid_list, type([])), + "Unexpected return type") + self.assertTrue(len(oid_list) == 3, "Wrong count") + self.assertTrue('100a name' in oid_list) + self.assertTrue('99another name' in oid_list) + self.assertTrue('01545' in oid_list) + + self.console.destroy(10) + + + def test_direct_oids(self): + # create console + # find agents + # synchronous query for each objects + # verify objects and schemas are correct + self.notifier = _testNotifier() + self.console = qmf.qmfConsole.Console(notifier=self.notifier, + agent_timeout=3) + self.conn = qpid.messaging.Connection(self.broker.host, + self.broker.port, + self.broker.user, + self.broker.password) + self.conn.connect() + self.console.addConnection(self.conn) + + for aname in ["agent1", "agent2"]: + agent = self.console.find_agent(aname, timeout=3) + self.assertTrue(agent and agent.get_name() == aname) + + for oid in ['100a name', '99another name', '01545']: + query = QmfQuery.create_id(QmfQuery.TARGET_OBJECT, oid) + obj_list = self.console.doQuery(agent, query) + + self.assertTrue(isinstance(obj_list, type([])), + "Unexpected return type") + self.assertTrue(len(obj_list) == 1) + obj = obj_list[0] + self.assertTrue(isinstance(obj, QmfData)) + self.assertTrue(obj.get_object_id() == oid) + + if obj.is_described(): + self.assertTrue(oid in ['100a name', '99another name']) + schema_id = obj.get_schema_class_id() + self.assertTrue(isinstance(schema_id, SchemaClassId)) + else: + self.assertTrue(oid == "01545") + + + + self.console.destroy(10) + + + + def test_packages(self): + # create console + # find agents + # synchronous query all package names + self.notifier = _testNotifier() + self.console = qmf.qmfConsole.Console(notifier=self.notifier, + agent_timeout=3) + self.conn = qpid.messaging.Connection(self.broker.host, + self.broker.port, + self.broker.user, + self.broker.password) + self.conn.connect() + self.console.addConnection(self.conn) + + for aname in ["agent1", "agent2"]: + agent = self.console.find_agent(aname, timeout=3) + self.assertTrue(agent and agent.get_name() == aname) + + query = QmfQuery.create_wildcard(QmfQuery.TARGET_PACKAGES) + package_list = self.console.doQuery(agent, query) + self.assertTrue(len(package_list) == 1) + self.assertTrue('MyPackage' in package_list) + + + self.console.destroy(10) + + + + def test_predicate_schema_id(self): + # create console + # find agents + # synchronous query for all schema by package name + self.notifier = _testNotifier() + self.console = qmf.qmfConsole.Console(notifier=self.notifier, + agent_timeout=3) + self.conn = qpid.messaging.Connection(self.broker.host, + self.broker.port, + self.broker.user, + self.broker.password) + self.conn.connect() + self.console.addConnection(self.conn) + + for aname in ["agent1", "agent2"]: + agent = self.console.find_agent(aname, timeout=3) + self.assertTrue(agent and agent.get_name() == aname) + + query = QmfQuery.create_predicate(QmfQuery.TARGET_SCHEMA, + QmfQueryPredicate( + {QmfQuery.CMP_EQ: [SchemaClassId.KEY_PACKAGE, + "MyPackage"]})) + + schema_list = self.console.doQuery(agent, query) + self.assertTrue(len(schema_list)) + for schema in schema_list: + self.assertTrue(schema.get_class_id().get_package_name() == + "MyPackage") + + + self.console.destroy(10) + + + + def test_predicate_no_match(self): + # create console + # find agents + # synchronous query for all schema by package name + self.notifier = _testNotifier() + self.console = qmf.qmfConsole.Console(notifier=self.notifier, + agent_timeout=3) + self.conn = qpid.messaging.Connection(self.broker.host, + self.broker.port, + self.broker.user, + self.broker.password) + self.conn.connect() + self.console.addConnection(self.conn) + + for aname in ["agent1", "agent2"]: + agent = self.console.find_agent(aname, timeout=3) + self.assertTrue(agent and agent.get_name() == aname) + + query = QmfQuery.create_predicate(QmfQuery.TARGET_SCHEMA, + QmfQueryPredicate( + {QmfQuery.CMP_EQ: [SchemaClassId.KEY_PACKAGE, + "No-Such-Package"]})) + + schema_list = self.console.doQuery(agent, query) + self.assertTrue(len(schema_list) == 0) + + self.console.destroy(10) + + diff --git a/qpid/python/qmf2/tests/console_test.py b/qpid/python/qmf2/tests/console_test.py new file mode 100644 index 0000000000..ac0e064f20 --- /dev/null +++ b/qpid/python/qmf2/tests/console_test.py @@ -0,0 +1,175 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +import logging +import time +from threading import Semaphore + + +from qpid.messaging import * +from qmf2.common import (Notifier, QmfQuery, QmfQueryPredicate, MsgKey, + SchemaClassId, SchemaClass, QmfData) +from qmf2.console import Console + + +class ExampleNotifier(Notifier): + def __init__(self): + self._sema4 = Semaphore(0) # locked + + def indication(self): + self._sema4.release() + + def waitForWork(self): + print("Waiting for event...") + self._sema4.acquire() + print("...event present") + + +logging.getLogger().setLevel(logging.INFO) + +print( "Starting Connection" ) +_c = Connection("localhost") +_c.connect() + +print( "Starting Console" ) + +_notifier = ExampleNotifier() +_myConsole = Console(notifier=_notifier) +_myConsole.addConnection( _c ) + +# Allow discovery only for the agent named "qmf.testAgent" +# @todo: replace "manual" query construction with +# a formal class-based Query API +_query = QmfQuery.create_predicate(QmfQuery.TARGET_AGENT, + QmfQueryPredicate({QmfQuery.CMP_EQ: + [QmfQuery.KEY_AGENT_NAME, + "qmf.testAgent"]})) +_myConsole.enable_agent_discovery(_query) + +_done = False +while not _done: +# try: + _notifier.waitForWork() + + _wi = _myConsole.get_next_workitem(timeout=0) + while _wi: + print("!!! work item received %d:%s" % (_wi.get_type(), + str(_wi.get_params()))) + + + if _wi.get_type() == _wi.AGENT_ADDED: + _agent = _wi.get_params().get("agent") + if not _agent: + print("!!!! AGENT IN REPLY IS NULL !!! ") + + _query = QmfQuery.create_wildcard(QmfQuery.TARGET_OBJECT_ID) + oid_list = _myConsole.doQuery(_agent, _query) + + print("!!!************************** REPLY=%s" % oid_list) + + for oid in oid_list: + _query = QmfQuery.create_id(QmfQuery.TARGET_OBJECT, + oid) + obj_list = _myConsole.doQuery(_agent, _query) + + print("!!!************************** REPLY=%s" % obj_list) + + if obj_list is None: + obj_list={} + + for obj in obj_list: + resp = obj.invoke_method( "set_meth", + {"arg_int": -11, + "arg_str": "are we not goons?"}, + None, + 3) + if resp is None: + print("!!!*** NO RESPONSE FROM METHOD????") + else: + print("!!! method succeeded()=%s" % resp.succeeded()) + print("!!! method exception()=%s" % resp.get_exception()) + print("!!! method get args() = %s" % resp.get_arguments()) + + if not obj.is_described(): + resp = obj.invoke_method( "bad method", + {"arg_int": -11, + "arg_str": "are we not goons?"}, + None, + 3) + if resp is None: + print("!!!*** NO RESPONSE FROM METHOD????") + else: + print("!!! method succeeded()=%s" % resp.succeeded()) + print("!!! method exception()=%s" % resp.get_exception()) + print("!!! method get args() = %s" % resp.get_arguments()) + + + #--------------------------------- + #_query = QmfQuery.create_id(QmfQuery.TARGET_OBJECT, "99another name") + + #obj_list = _myConsole.doQuery(_agent, _query) + + #--------------------------------- + + # _query = QmfQuery.create_wildcard(QmfQuery.TARGET_PACKAGES) + + # package_list = _myConsole.doQuery(_agent, _query) + + # for pname in package_list: + # print("!!! Querying for schema from package: %s" % pname) + # _query = QmfQuery.create_predicate(QmfQuery.TARGET_SCHEMA_ID, + # QmfQueryPredicate( + # {QmfQuery.CMP_EQ: [SchemaClassId.KEY_PACKAGE, pname]})) + + # schema_id_list = _myConsole.doQuery(_agent, _query) + # for sid in schema_id_list: + # _query = QmfQuery.create_predicate(QmfQuery.TARGET_SCHEMA, + # QmfQueryPredicate( + # {QmfQuery.CMP_EQ: [SchemaClass.KEY_SCHEMA_ID, + # sid.map_encode()]})) + + # schema_list = _myConsole.doQuery(_agent, _query) + # for schema in schema_list: + # sid = schema.get_class_id() + # _query = QmfQuery.create_predicate( + # QmfQuery.TARGET_OBJECT_ID, + # QmfQueryPredicate({QmfQuery.CMP_EQ: + # [QmfData.KEY_SCHEMA_ID, + # sid.map_encode()]})) + + # oid_list = _myConsole.doQuery(_agent, _query) + # for oid in oid_list: + # _query = QmfQuery.create_id( + # QmfQuery.TARGET_OBJECT, oid) + # _reply = _myConsole.doQuery(_agent, _query) + + # print("!!!************************** REPLY=%s" % _reply) + + + _myConsole.release_workitem(_wi) + _wi = _myConsole.get_next_workitem(timeout=0) +# except: +# logging.info( "shutting down..." ) +# _done = True + +print( "Removing connection" ) +_myConsole.removeConnection( _c, 10 ) + +print( "Destroying console:" ) +_myConsole.destroy( 10 ) + +print( "******** console test done ********" ) diff --git a/qpid/python/qmf2/tests/events.py b/qpid/python/qmf2/tests/events.py new file mode 100644 index 0000000000..8ce534ce3a --- /dev/null +++ b/qpid/python/qmf2/tests/events.py @@ -0,0 +1,193 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +import unittest +import time +import datetime +import logging +from threading import Thread, Event + +import qpid.messaging +from qmf2.common import (Notifier, SchemaObjectClass, SchemaClassId, + SchemaProperty, qmfTypes, SchemaMethod, QmfQuery, + QmfData, QmfQueryPredicate, SchemaEventClass, + QmfEvent) +import qmf2.console +from qmf2.agent import(QmfAgentData, Agent) + + +class _testNotifier(Notifier): + def __init__(self): + self._event = Event() + + def indication(self): + # note: called by qmf daemon thread + self._event.set() + + def wait_for_work(self, timeout): + # note: called by application thread to wait + # for qmf to generate work + self._event.wait(timeout) + timed_out = self._event.isSet() == False + if not timed_out: + self._event.clear() + return True + return False + + +class _agentApp(Thread): + def __init__(self, name, broker_url, heartbeat): + Thread.__init__(self) + self.timeout = 3 + self.broker_url = broker_url + self.notifier = _testNotifier() + self.agent = Agent(name, + _notifier=self.notifier, + _heartbeat_interval=heartbeat) + + # Dynamically construct a management database + + _schema = SchemaEventClass(_classId=SchemaClassId("MyPackage", + "MyClass", + stype=SchemaClassId.TYPE_EVENT), + _desc="A test event schema") + # add properties + _schema.add_property( "prop-1", SchemaProperty(qmfTypes.TYPE_UINT8)) + _schema.add_property( "prop-2", SchemaProperty(qmfTypes.TYPE_LSTR)) + + # Add schema to Agent + self.schema = _schema + self.agent.register_object_class(_schema) + + self.running = False + + def start_app(self): + self.running = True + self.start() + + def stop_app(self): + self.running = False + # wake main thread + self.notifier.indication() # hmmm... collide with daemon??? + self.join(self.timeout) + if self.isAlive(): + raise Exception("AGENT DID NOT TERMINATE AS EXPECTED!!!") + + def run(self): + # broker_url = "user/passwd@hostname:port" + conn = qpid.messaging.Connection(self.broker_url.host, + self.broker_url.port, + self.broker_url.user, + self.broker_url.password) + conn.connect() + self.agent.set_connection(conn) + + counter = 1 + while self.running: + # post an event every second + event = QmfEvent.create(long(time.time() * 1000), + QmfEvent.SEV_WARNING, + {"prop-1": counter, + "prop-2": str(datetime.datetime.utcnow())}, + _schema=self.schema) + counter += 1 + self.agent.raise_event(event) + wi = self.agent.get_next_workitem(timeout=0) + while wi is not None: + logging.error("UNEXPECTED AGENT WORKITEM RECEIVED=%s" % wi.get_type()) + self.agent.release_workitem(wi) + wi = self.agent.get_next_workitem(timeout=0) + self.notifier.wait_for_work(1) + + self.agent.remove_connection(self.timeout) + self.agent.destroy(self.timeout) + + + +class BaseTest(unittest.TestCase): + def configure(self, config): + self.config = config + self.broker = config.broker + self.defines = self.config.defines + + def setUp(self): + # one second agent indication interval + self.agent1 = _agentApp("agent1", self.broker, 1) + self.agent1.start_app() + self.agent2 = _agentApp("agent2", self.broker, 1) + self.agent2.start_app() + + def tearDown(self): + if self.agent1: + self.agent1.stop_app() + self.agent1 = None + if self.agent2: + self.agent2.stop_app() + self.agent2 = None + + def test_get_events(self): + # create console + # find agents + + self.notifier = _testNotifier() + self.console = qmf.qmfConsole.Console(notifier=self.notifier, + agent_timeout=3) + self.conn = qpid.messaging.Connection(self.broker.host, + self.broker.port, + self.broker.user, + self.broker.password) + self.conn.connect() + self.console.addConnection(self.conn) + + # find the agents + for aname in ["agent1", "agent2"]: + agent = self.console.find_agent(aname, timeout=3) + self.assertTrue(agent and agent.get_name() == aname) + + # now wait for events + agent1_events = agent2_events = 0 + wi = self.console.get_next_workitem(timeout=4) + while wi: + if wi.get_type() == wi.EVENT_RECEIVED: + event = wi.get_params().get("event") + self.assertTrue(isinstance(event, QmfEvent)) + self.assertTrue(event.get_severity() == QmfEvent.SEV_WARNING) + self.assertTrue(event.get_value("prop-1") > 0) + + agent = wi.get_params().get("agent") + if not agent or not isinstance(agent, qmf.qmfConsole.Agent): + self.fail("Unexpected workitem from agent") + else: + if agent.get_name() == "agent1": + agent1_events += 1 + elif agent.get_name() == "agent2": + agent2_events += 1 + else: + self.fail("Unexpected agent name received: %s" % + agent.get_name()) + if agent1_events and agent2_events: + break; + + wi = self.console.get_next_workitem(timeout=4) + + self.assertTrue(agent1_events > 0 and agent2_events > 0) + + self.console.destroy(10) + + + + diff --git a/qpid/python/qmf2/tests/obj_gets.py b/qpid/python/qmf2/tests/obj_gets.py new file mode 100644 index 0000000000..e58575440d --- /dev/null +++ b/qpid/python/qmf2/tests/obj_gets.py @@ -0,0 +1,399 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +import unittest +import logging +from threading import Thread, Event + +import qpid.messaging +from qmf2.common import (Notifier, SchemaObjectClass, SchemaClassId, + SchemaProperty, qmfTypes, SchemaMethod, QmfQuery, + QmfData, QmfQueryPredicate) +import qmf2.console +from qmf2.agent import(QmfAgentData, Agent) + + +class _testNotifier(Notifier): + def __init__(self): + self._event = Event() + + def indication(self): + # note: called by qmf daemon thread + self._event.set() + + def wait_for_work(self, timeout): + # note: called by application thread to wait + # for qmf to generate work + self._event.wait(timeout) + timed_out = self._event.isSet() == False + if not timed_out: + self._event.clear() + return True + return False + + +class _agentApp(Thread): + def __init__(self, name, heartbeat): + Thread.__init__(self) + self.notifier = _testNotifier() + self.agent = Agent(name, + _notifier=self.notifier, + _heartbeat_interval=heartbeat) + + # Management Database + # - two different schema packages, + # - two classes within one schema package + # - multiple objects per schema package+class + # - two "undescribed" objects + + # "package1/class1" + + _schema = SchemaObjectClass( _classId=SchemaClassId("package1", "class1"), + _desc="A test data schema - one", + _object_id_names=["key"] ) + + _schema.add_property( "key", SchemaProperty(qmfTypes.TYPE_LSTR)) + _schema.add_property( "count1", SchemaProperty(qmfTypes.TYPE_UINT32)) + _schema.add_property( "count2", SchemaProperty(qmfTypes.TYPE_UINT32)) + + self.agent.register_object_class(_schema) + + _obj = QmfAgentData( self.agent, _schema=_schema ) + _obj.set_value("key", "p1c1_key1") + _obj.set_value("count1", 0) + _obj.set_value("count2", 0) + self.agent.add_object( _obj ) + + _obj = QmfAgentData( self.agent, _schema=_schema ) + _obj.set_value("key", "p1c1_key2") + _obj.set_value("count1", 9) + _obj.set_value("count2", 10) + self.agent.add_object( _obj ) + + # "package1/class2" + + _schema = SchemaObjectClass( _classId=SchemaClassId("package1", "class2"), + _desc="A test data schema - two", + _object_id_names=["name"] ) + # add properties + _schema.add_property( "name", SchemaProperty(qmfTypes.TYPE_LSTR)) + _schema.add_property( "string1", SchemaProperty(qmfTypes.TYPE_LSTR)) + + self.agent.register_object_class(_schema) + + _obj = QmfAgentData( self.agent, _schema=_schema ) + _obj.set_value("name", "p1c2_name1") + _obj.set_value("string1", "a data string") + self.agent.add_object( _obj ) + + + # "package2/class1" + + _schema = SchemaObjectClass( _classId=SchemaClassId("package2", "class1"), + _desc="A test data schema - second package", + _object_id_names=["key"] ) + + _schema.add_property( "key", SchemaProperty(qmfTypes.TYPE_LSTR)) + _schema.add_property( "counter", SchemaProperty(qmfTypes.TYPE_UINT32)) + + self.agent.register_object_class(_schema) + + _obj = QmfAgentData( self.agent, _schema=_schema ) + _obj.set_value("key", "p2c1_key1") + _obj.set_value("counter", 0) + self.agent.add_object( _obj ) + + _obj = QmfAgentData( self.agent, _schema=_schema ) + _obj.set_value("key", "p2c1_key2") + _obj.set_value("counter", 2112) + self.agent.add_object( _obj ) + + + # add two "unstructured" objects to the Agent + + _obj = QmfAgentData(self.agent, _object_id="undesc-1") + _obj.set_value("field1", "a value") + _obj.set_value("field2", 2) + _obj.set_value("field3", {"a":1, "map":2, "value":3}) + _obj.set_value("field4", ["a", "list", "value"]) + self.agent.add_object(_obj) + + + _obj = QmfAgentData(self.agent, _object_id="undesc-2") + _obj.set_value("key-1", "a value") + _obj.set_value("key-2", 2) + self.agent.add_object(_obj) + + self.running = True + self.start() + + def connect_agent(self, broker_url): + # broker_url = "user/passwd@hostname:port" + self.conn = qpid.messaging.Connection(broker_url.host, + broker_url.port, + broker_url.user, + broker_url.password) + self.conn.connect() + self.agent.set_connection(self.conn) + + def disconnect_agent(self, timeout): + if self.conn: + self.agent.remove_connection(timeout) + + def shutdown_agent(self, timeout): + self.agent.destroy(timeout) + + def stop(self): + self.running = False + self.notifier.indication() # hmmm... collide with daemon??? + self.join(10) + if self.isAlive(): + raise Exception("AGENT DID NOT TERMINATE AS EXPECTED!!!") + + def run(self): + while self.running: + self.notifier.wait_for_work(None) + wi = self.agent.get_next_workitem(timeout=0) + while wi is not None: + logging.error("UNEXPECTED AGENT WORKITEM RECEIVED=%s" % wi.get_type()) + self.agent.release_workitem(wi) + wi = self.agent.get_next_workitem(timeout=0) + + + +class BaseTest(unittest.TestCase): + agent_count = 5 + + def configure(self, config): + self.config = config + self.broker = config.broker + self.defines = self.config.defines + + def setUp(self): + self.agents = [] + for i in range(self.agent_count): + agent = _agentApp("agent-" + str(i), 1) + agent.connect_agent(self.broker) + self.agents.append(agent) + + def tearDown(self): + for agent in self.agents: + if agent is not None: + agent.shutdown_agent(10) + agent.stop() + + + def test_all_agents(self): + # create console + # find all agents + # synchronous query for all objects by id + # verify known object ids are returned + self.notifier = _testNotifier() + self.console = qmf.qmfConsole.Console(notifier=self.notifier, + agent_timeout=3) + self.conn = qpid.messaging.Connection(self.broker.host, + self.broker.port, + self.broker.user, + self.broker.password) + self.conn.connect() + self.console.addConnection(self.conn) + + for agent_app in self.agents: + aname = agent_app.agent.get_name() + agent = self.console.find_agent(aname, timeout=3) + self.assertTrue(agent and agent.get_name() == aname) + + # console has discovered all agents, now query all undesc-2 objects + objs = self.console.get_objects(_object_id="undesc-2", _timeout=5) + self.assertTrue(len(objs) == self.agent_count) + for obj in objs: + self.assertTrue(obj.get_object_id() == "undesc-2") + + # now query all objects from schema "package1" + objs = self.console.get_objects(_pname="package1", _timeout=5) + self.assertTrue(len(objs) == (self.agent_count * 3)) + for obj in objs: + self.assertTrue(obj.get_schema_class_id().get_package_name() == "package1") + + # now query all objects from schema "package2" + objs = self.console.get_objects(_pname="package2", _timeout=5) + self.assertTrue(len(objs) == (self.agent_count * 2)) + for obj in objs: + self.assertTrue(obj.get_schema_class_id().get_package_name() == "package2") + + # now query all objects from schema "package1/class2" + objs = self.console.get_objects(_pname="package1", _cname="class2", _timeout=5) + self.assertTrue(len(objs) == self.agent_count) + for obj in objs: + self.assertTrue(obj.get_schema_class_id().get_package_name() == "package1") + self.assertTrue(obj.get_schema_class_id().get_class_name() == "class2") + + # given the schema identifier from the last query, repeat using the + # specific schema id + schema_id = objs[0].get_schema_class_id() + objs = self.console.get_objects(_schema_id=schema_id, _timeout=5) + self.assertTrue(len(objs) == self.agent_count) + for obj in objs: + self.assertTrue(obj.get_schema_class_id() == schema_id) + + + self.console.destroy(10) + + + + def test_agent_subset(self): + # create console + # find all agents + # synchronous query for all objects by id + # verify known object ids are returned + self.notifier = _testNotifier() + self.console = qmf.qmfConsole.Console(notifier=self.notifier, + agent_timeout=3) + self.conn = qpid.messaging.Connection(self.broker.host, + self.broker.port, + self.broker.user, + self.broker.password) + self.conn.connect() + self.console.addConnection(self.conn) + + agent_list = [] + for agent_app in self.agents: + aname = agent_app.agent.get_name() + agent = self.console.find_agent(aname, timeout=3) + self.assertTrue(agent and agent.get_name() == aname) + agent_list.append(agent) + + # Only use a subset of the agents: + agent_list = agent_list[:len(agent_list)/2] + + # console has discovered all agents, now query all undesc-2 objects + objs = self.console.get_objects(_object_id="undesc-2", + _agents=agent_list, _timeout=5) + self.assertTrue(len(objs) == len(agent_list)) + for obj in objs: + self.assertTrue(obj.get_object_id() == "undesc-2") + + # now query all objects from schema "package1" + objs = self.console.get_objects(_pname="package1", + _agents=agent_list, + _timeout=5) + self.assertTrue(len(objs) == (len(agent_list) * 3)) + for obj in objs: + self.assertTrue(obj.get_schema_class_id().get_package_name() == "package1") + + # now query all objects from schema "package2" + objs = self.console.get_objects(_pname="package2", + _agents=agent_list, + _timeout=5) + self.assertTrue(len(objs) == (len(agent_list) * 2)) + for obj in objs: + self.assertTrue(obj.get_schema_class_id().get_package_name() == "package2") + + # now query all objects from schema "package1/class2" + objs = self.console.get_objects(_pname="package1", _cname="class2", + _agents=agent_list, + _timeout=5) + self.assertTrue(len(objs) == len(agent_list)) + for obj in objs: + self.assertTrue(obj.get_schema_class_id().get_package_name() == "package1") + self.assertTrue(obj.get_schema_class_id().get_class_name() == "class2") + + # given the schema identifier from the last query, repeat using the + # specific schema id + schema_id = objs[0].get_schema_class_id() + objs = self.console.get_objects(_schema_id=schema_id, + _agents=agent_list, + _timeout=5) + self.assertTrue(len(objs) == len(agent_list)) + for obj in objs: + self.assertTrue(obj.get_schema_class_id() == schema_id) + + + self.console.destroy(10) + + + + def test_single_agent(self): + # create console + # find all agents + # synchronous query for all objects by id + # verify known object ids are returned + self.notifier = _testNotifier() + self.console = qmf.qmfConsole.Console(notifier=self.notifier, + agent_timeout=3) + self.conn = qpid.messaging.Connection(self.broker.host, + self.broker.port, + self.broker.user, + self.broker.password) + self.conn.connect() + self.console.addConnection(self.conn) + + agent_list = [] + for agent_app in self.agents: + aname = agent_app.agent.get_name() + agent = self.console.find_agent(aname, timeout=3) + self.assertTrue(agent and agent.get_name() == aname) + agent_list.append(agent) + + # Only use one agetn + agent = agent_list[0] + + # console has discovered all agents, now query all undesc-2 objects + objs = self.console.get_objects(_object_id="undesc-2", + _agents=agent, _timeout=5) + self.assertTrue(len(objs) == 1) + for obj in objs: + self.assertTrue(obj.get_object_id() == "undesc-2") + + # now query all objects from schema "package1" + objs = self.console.get_objects(_pname="package1", + _agents=agent, + _timeout=5) + self.assertTrue(len(objs) == 3) + for obj in objs: + self.assertTrue(obj.get_schema_class_id().get_package_name() == "package1") + + # now query all objects from schema "package2" + objs = self.console.get_objects(_pname="package2", + _agents=agent, + _timeout=5) + self.assertTrue(len(objs) == 2) + for obj in objs: + self.assertTrue(obj.get_schema_class_id().get_package_name() == "package2") + + # now query all objects from schema "package1/class2" + objs = self.console.get_objects(_pname="package1", _cname="class2", + _agents=agent, + _timeout=5) + self.assertTrue(len(objs) == 1) + for obj in objs: + self.assertTrue(obj.get_schema_class_id().get_package_name() == "package1") + self.assertTrue(obj.get_schema_class_id().get_class_name() == "class2") + + # given the schema identifier from the last query, repeat using the + # specific schema id + schema_id = objs[0].get_schema_class_id() + objs = self.console.get_objects(_schema_id=schema_id, + _agents=agent, + _timeout=5) + self.assertTrue(len(objs) == 1) + for obj in objs: + self.assertTrue(obj.get_schema_class_id() == schema_id) + + + self.console.destroy(10) + -- cgit v1.2.1