diff options
Diffstat (limited to 'qpid/python/qmf2/console.py')
-rw-r--r-- | qpid/python/qmf2/console.py | 1959 |
1 files changed, 1959 insertions, 0 deletions
diff --git a/qpid/python/qmf2/console.py b/qpid/python/qmf2/console.py new file mode 100644 index 0000000000..8e8f4799f7 --- /dev/null +++ b/qpid/python/qmf2/console.py @@ -0,0 +1,1959 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +import sys +import os +import logging +import platform +import time +import datetime +import Queue +from threading import Thread +from threading import Lock +from threading import currentThread +from threading import Condition + +from qpid.messaging import Connection, Message, Empty, SendError + +from common import (makeSubject, parseSubject, OpCode, QmfQuery, Notifier, + QmfQueryPredicate, MsgKey, QmfData, QmfAddress, + AMQP_QMF_AGENT_LOCATE, AMQP_QMF_AGENT_INDICATION, + SchemaClass, SchemaClassId, SchemaEventClass, + SchemaObjectClass, WorkItem, SchemaMethod, QmfEvent) + + + +# global flag that indicates which thread (if any) is +# running the console notifier callback +_callback_thread=None + + + + +##============================================================================== +## Sequence Manager +##============================================================================== + +class _Mailbox(object): + """ + Virtual base class for all Mailbox-like objects + """ + def __init__(self): + self._msgs = [] + self._cv = Condition() + self._waiting = False + + def deliver(self, obj): + self._cv.acquire() + try: + self._msgs.append(obj) + # if was empty, notify waiters + if len(self._msgs) == 1: + self._cv.notify() + finally: + self._cv.release() + + def fetch(self, timeout=None): + self._cv.acquire() + try: + if len(self._msgs) == 0: + self._cv.wait(timeout) + if len(self._msgs): + return self._msgs.pop() + return None + finally: + self._cv.release() + + + +class SequencedWaiter(object): + """ + Manage sequence numbers for asynchronous method calls. + Allows the caller to associate a generic piece of data with a unique sequence + number.""" + + def __init__(self): + self.lock = Lock() + self.sequence = long(time.time()) # pseudo-randomize seq start + self.pending = {} + + + def allocate(self): + """ + Reserve a sequence number. + + @rtype: long + @return: a unique nonzero sequence number. + """ + self.lock.acquire() + try: + seq = self.sequence + self.sequence = self.sequence + 1 + self.pending[seq] = _Mailbox() + finally: + self.lock.release() + logging.debug( "sequence %d allocated" % seq) + return seq + + + def put_data(self, seq, new_data): + seq = long(seq) + logging.debug( "putting data [%r] to seq %d..." % (new_data, seq) ) + self.lock.acquire() + try: + if seq in self.pending: + # logging.error("Putting seq %d @ %s" % (seq,time.time())) + self.pending[seq].deliver(new_data) + else: + logging.error( "seq %d not found!" % seq ) + finally: + self.lock.release() + + + + def get_data(self, seq, timeout=None): + """ + Release a sequence number reserved using the reserve method. This must + be called when the sequence is no longer needed. + + @type seq: int + @param seq: a sequence previously allocated by calling reserve(). + @rtype: any + @return: the data originally associated with the reserved sequence number. + """ + seq = long(seq) + logging.debug( "getting data for seq=%d" % seq) + mbox = None + self.lock.acquire() + try: + if seq in self.pending: + mbox = self.pending[seq] + finally: + self.lock.release() + + # Note well: pending list is unlocked, so we can wait. + # we reference mbox locally, so it will not be released + # until we are done. + + if mbox: + d = mbox.fetch(timeout) + logging.debug( "seq %d fetched %r!" % (seq, d) ) + return d + + logging.debug( "seq %d not found!" % seq ) + return None + + + def release(self, seq): + """ + Release the sequence, and its mailbox + """ + seq = long(seq) + logging.debug( "releasing seq %d" % seq ) + self.lock.acquire() + try: + if seq in self.pending: + del self.pending[seq] + finally: + self.lock.release() + + + def isValid(self, seq): + """ + True if seq is in use, else False (seq is unknown) + """ + seq = long(seq) + self.lock.acquire() + try: + return seq in self.pending + finally: + self.lock.release() + return False + + +##============================================================================== +## DATA MODEL +##============================================================================== + + +class QmfConsoleData(QmfData): + """ + Console's representation of an managed QmfData instance. + """ + def __init__(self, map_, agent, _schema=None): + super(QmfConsoleData, self).__init__(_map=map_, + _schema=_schema, + _const=True) + self._agent = agent + + def get_timestamps(self): + """ + Returns a list of timestamps describing the lifecycle of + the object. All timestamps are represented by the AMQP + timestamp type. [0] = time of last update from Agent, + [1] = creation timestamp + [2] = deletion timestamp, or zero if not + deleted. + """ + return [self._utime, self._ctime, self._dtime] + + def get_create_time(self): + """ + returns the creation timestamp + """ + return self._ctime + + def get_update_time(self): + """ + returns the update timestamp + """ + return self._utime + + def get_delete_time(self): + """ + returns the deletion timestamp, or zero if not yet deleted. + """ + return self._dtime + + def is_deleted(self): + """ + True if deletion timestamp not zero. + """ + return self._dtime != long(0) + + def refresh(self, _reply_handle=None, _timeout=None): + """ + request that the Agent update the value of this object's + contents. + """ + if _reply_handle is not None: + logging.error(" ASYNC REFRESH TBD!!!") + return None + + assert self._agent + assert self._agent._console + + if _timeout is None: + _timeout = self._agent._console._reply_timeout + + + # create query to agent using this objects ID + oid = self.get_object_id() + query = QmfQuery.create_id(QmfQuery.TARGET_OBJECT, + self.get_object_id()) + obj_list = self._agent._console.doQuery(self._agent, query, + timeout=_timeout) + if obj_list is None or len(obj_list) != 1: + return None + + self._update(obj_list[0]) + return self + + + def invoke_method(self, name, _in_args={}, _reply_handle=None, + _timeout=None): + """ + invoke the named method. + """ + assert self._agent + assert self._agent._console + + oid = self.get_object_id() + if oid is None: + raise ValueError("Cannot invoke methods on unmanaged objects.") + + if _timeout is None: + _timeout = self._agent._console._reply_timeout + + if _in_args: + _in_args = _in_args.copy() + + if self._schema: + # validate + ms = self._schema.get_method(name) + if ms is None: + raise ValueError("Method '%s' is undefined." % name) + + for aname,prop in ms.get_arguments().iteritems(): + if aname not in _in_args: + if prop.get_default(): + _in_args[aname] = prop.get_default() + elif not prop.is_optional(): + raise ValueError("Method '%s' requires argument '%s'" + % (name, aname)) + for aname in _in_args.iterkeys(): + prop = ms.get_argument(aname) + if prop is None: + raise ValueError("Method '%s' does not define argument" + " '%s'" % (name, aname)) + if "I" not in prop.get_direction(): + raise ValueError("Method '%s' argument '%s' is not an" + " input." % (name, aname)) + + # @todo check if value is correct (type, range, etc) + + handle = self._agent._console._req_correlation.allocate() + if handle == 0: + raise Exception("Can not allocate a correlation id!") + + _map = {self.KEY_OBJECT_ID:str(oid), + SchemaMethod.KEY_NAME:name} + if _in_args: + _map[SchemaMethod.KEY_ARGUMENTS] = _in_args + + logging.debug("Sending method req to Agent (%s)" % time.time()) + try: + self._agent._sendMethodReq(_map, handle) + except SendError, e: + logging.error(str(e)) + self._agent._console._req_correlation.release(handle) + return None + + # @todo async method calls!!! + if _reply_handle is not None: + print("ASYNC TBD") + + logging.debug("Waiting for response to method req (%s)" % _timeout) + replyMsg = self._agent._console._req_correlation.get_data(handle, _timeout) + self._agent._console._req_correlation.release(handle) + if not replyMsg: + logging.debug("Agent method req wait timed-out.") + return None + + _map = replyMsg.content.get(MsgKey.method) + if not _map: + logging.error("Invalid method call reply message") + return None + + error=_map.get(SchemaMethod.KEY_ERROR) + if error: + return MethodResult(_error=QmfData.from_map(error)) + else: + return MethodResult(_out_args=_map.get(SchemaMethod.KEY_ARGUMENTS)) + + def _update(self, newer): + super(QmfConsoleData,self).__init__(_values=newer._values, _subtypes=newer._subtypes, + _tag=newer._tag, _object_id=newer._object_id, + _ctime=newer._ctime, _utime=newer._utime, + _dtime=newer._dtime, + _schema=newer._schema, _const=True) + +class QmfLocalData(QmfData): + """ + Console's representation of an unmanaged QmfData instance. There + is no remote agent associated with this instance. The Console has + full control over this instance. + """ + def __init__(self, values, _subtypes={}, _tag=None, _object_id=None, + _schema=None): + # timestamp in millisec since epoch UTC + ctime = long(time.time() * 1000) + super(QmfLocalData, self).__init__(_values=values, + _subtypes=_subtypes, _tag=_tag, + _object_id=_object_id, + _schema=_schema, _ctime=ctime, + _utime=ctime, _const=False) + + +class Agent(object): + """ + A local representation of a remote agent managed by this console. + """ + def __init__(self, name, console): + """ + @type name: AgentId + @param name: uniquely identifies this agent in the AMQP domain. + """ + + if not isinstance(console, Console): + raise TypeError("parameter must be an instance of class Console") + + self._name = name + self._address = QmfAddress.direct(name, console._domain) + self._console = console + self._sender = None + self._packages = {} # map of {package-name:[list of class-names], } for this agent + self._subscriptions = [] # list of active standing subscriptions for this agent + self._announce_timestamp = None # datetime when last announce received + logging.debug( "Created Agent with address: [%s]" % self._address ) + + + def get_name(self): + return self._name + + def isActive(self): + return self._announce_timestamp != None + + def _sendMsg(self, msg, correlation_id=None): + """ + Low-level routine to asynchronously send a message to this agent. + """ + msg.reply_to = str(self._console._address) + # handle = self._console._req_correlation.allocate() + # if handle == 0: + # raise Exception("Can not allocate a correlation id!") + # msg.correlation_id = str(handle) + if correlation_id: + msg.correlation_id = str(correlation_id) + self._sender.send(msg) + # return handle + + def get_packages(self): + """ + Return a list of the names of all packages known to this agent. + """ + return self._packages.keys() + + def get_classes(self): + """ + Return a dictionary [key:class] of classes known to this agent. + """ + return self._packages.copy() + + def get_objects(self, query, kwargs={}): + """ + Return a list of objects that satisfy the given query. + + @type query: dict, or common.Query + @param query: filter for requested objects + @type kwargs: dict + @param kwargs: ??? used to build match selector and query ??? + @rtype: list + @return: list of matching objects, or None. + """ + pass + + def get_object(self, query, kwargs={}): + """ + Get one object - query is expected to match only one object. + ??? Recommended: explicit timeout param, default None ??? + + @type query: dict, or common.Query + @param query: filter for requested objects + @type kwargs: dict + @param kwargs: ??? used to build match selector and query ??? + @rtype: qmfConsole.ObjectProxy + @return: one matching object, or none + """ + pass + + + def create_subscription(self, query): + """ + Factory for creating standing subscriptions based on a given query. + + @type query: common.Query object + @param query: determines the list of objects for which this subscription applies + @rtype: qmfConsole.Subscription + @returns: an object representing the standing subscription. + """ + pass + + + def invoke_method(self, name, _in_args={}, _reply_handle=None, + _timeout=None): + """ + """ + assert self._console + + if _timeout is None: + _timeout = self._console._reply_timeout + + if _in_args: + _in_args = _in_args.copy() + + handle = self._console._req_correlation.allocate() + if handle == 0: + raise Exception("Can not allocate a correlation id!") + + _map = {SchemaMethod.KEY_NAME:name} + if _in_args: + _map[SchemaMethod.KEY_ARGUMENTS] = _in_args + + logging.debug("Sending method req to Agent (%s)" % time.time()) + try: + self._sendMethodReq(_map, handle) + except SendError, e: + logging.error(str(e)) + self._console._req_correlation.release(handle) + return None + + # @todo async method calls!!! + if _reply_handle is not None: + print("ASYNC TBD") + + logging.debug("Waiting for response to method req (%s)" % _timeout) + replyMsg = self._console._req_correlation.get_data(handle, _timeout) + self._console._req_correlation.release(handle) + if not replyMsg: + logging.debug("Agent method req wait timed-out.") + return None + + _map = replyMsg.content.get(MsgKey.method) + if not _map: + logging.error("Invalid method call reply message") + return None + + return MethodResult(_out_args=_map.get(SchemaMethod.KEY_ARGUMENTS), + _error=_map.get(SchemaMethod.KEY_ERROR)) + + def enable_events(self): + raise Exception("enable_events tbd") + + def disable_events(self): + raise Exception("disable_events tbd") + + def destroy(self): + raise Exception("destroy tbd") + + def __repr__(self): + return str(self._address) + + def __str__(self): + return self.__repr__() + + def _sendQuery(self, query, correlation_id=None): + """ + """ + msg = Message(subject=makeSubject(OpCode.get_query), + properties={"method":"request"}, + content={MsgKey.query: query.map_encode()}) + self._sendMsg( msg, correlation_id ) + + + def _sendMethodReq(self, mr_map, correlation_id=None): + """ + """ + msg = Message(subject=makeSubject(OpCode.method_req), + properties={"method":"request"}, + content=mr_map) + self._sendMsg( msg, correlation_id ) + + + ##============================================================================== + ## METHOD CALL + ##============================================================================== + +class MethodResult(object): + def __init__(self, _out_args=None, _error=None): + self._error = _error + self._out_args = _out_args + + def succeeded(self): + return self._error is None + + def get_exception(self): + return self._error + + def get_arguments(self): + return self._out_args + + def get_argument(self, name): + arg = None + if self._out_args: + arg = self._out_args.get(name) + return arg + + + ##============================================================================== + ## CONSOLE + ##============================================================================== + + + + + + +class Console(Thread): + """ + A Console manages communications to a collection of agents on behalf of an application. + """ + def __init__(self, name=None, _domain=None, notifier=None, + reply_timeout = 60, + # agent_timeout = 120, + agent_timeout = 60, + kwargs={}): + """ + @type name: str + @param name: identifier for this console. Must be unique. + @type notifier: qmfConsole.Notifier + @param notifier: invoked when events arrive for processing. + @type kwargs: dict + @param kwargs: ??? Unused + """ + Thread.__init__(self) + if not name: + self._name = "qmfc-%s.%d" % (platform.node(), os.getpid()) + else: + self._name = str(name) + self._domain = _domain + self._address = QmfAddress.direct(self._name, self._domain) + self._notifier = notifier + self._lock = Lock() + self._conn = None + self._session = None + # dict of "agent-direct-address":class Agent entries + self._agent_map = {} + self._direct_recvr = None + self._announce_recvr = None + self._locate_sender = None + self._schema_cache = {} + self._req_correlation = SequencedWaiter() + self._operational = False + self._agent_discovery_filter = None + self._reply_timeout = reply_timeout + self._agent_timeout = agent_timeout + self._next_agent_expire = None + # lock out run() thread + self._cv = Condition() + # for passing WorkItems to the application + self._work_q = Queue.Queue() + self._work_q_put = False + ## Old stuff below??? + #self._broker_list = [] + #self.impl = qmfengine.Console() + #self._event = qmfengine.ConsoleEvent() + ##self._cv = Condition() + ##self._sync_count = 0 + ##self._sync_result = None + ##self._select = {} + ##self._cb_cond = Condition() + + + + def destroy(self, timeout=None): + """ + Must be called before the Console is deleted. + Frees up all resources and shuts down all background threads. + + @type timeout: float + @param timeout: maximum time in seconds to wait for all background threads to terminate. Default: forever. + """ + logging.debug("Destroying Console...") + if self._conn: + self.removeConnection(self._conn, timeout) + logging.debug("Console Destroyed") + + + + def addConnection(self, conn): + """ + Add a AMQP connection to the console. The console will setup a session over the + connection. The console will then broadcast an Agent Locate Indication over + the session in order to discover present agents. + + @type conn: qpid.messaging.Connection + @param conn: the connection to the AMQP messaging infrastructure. + """ + if self._conn: + raise Exception( "Multiple connections per Console not supported." ); + self._conn = conn + self._session = conn.session(name=self._name) + self._direct_recvr = self._session.receiver(str(self._address) + + ";{create:always," + " node-properties:" + " {type:topic," + " x-properties:" + " {type:direct}}}", + capacity=1) + logging.debug("my direct addr=%s" % self._direct_recvr.source) + + ind_addr = QmfAddress.topic(AMQP_QMF_AGENT_INDICATION, self._domain) + self._announce_recvr = self._session.receiver(str(ind_addr) + + ";{create:always," + " node-properties:{type:topic}}", + capacity=1) + logging.debug("agent.ind addr=%s" % self._announce_recvr.source) + + locate_addr = QmfAddress.topic(AMQP_QMF_AGENT_LOCATE, self._domain) + self._locate_sender = self._session.sender(str(locate_addr) + + ";{create:always," + " node-properties:{type:topic}}") + logging.debug("agent.locate addr=%s" % self._locate_sender.target) + + # + # Now that receivers are created, fire off the receive thread... + # + self._operational = True + self.start() + + + + def removeConnection(self, conn, timeout=None): + """ + Remove an AMQP connection from the console. Un-does the add_connection() operation, + and releases any agents and sessions associated with the connection. + + @type conn: qpid.messaging.Connection + @param conn: connection previously added by add_connection() + """ + if self._conn and conn and conn != self._conn: + logging.error( "Attempt to delete unknown connection: %s" % str(conn)) + + # tell connection thread to shutdown + self._operational = False + if self.isAlive(): + # kick my thread to wake it up + logging.debug("Making temp sender for [%s]" % self._address) + tmp_sender = self._session.sender(str(self._address)) + try: + msg = Message(subject=makeSubject(OpCode.noop)) + tmp_sender.send( msg, sync=True ) + except SendError, e: + logging.error(str(e)) + logging.debug("waiting for console receiver thread to exit") + self.join(timeout) + if self.isAlive(): + logging.error( "Console thread '%s' is hung..." % self.getName() ) + self._direct_recvr.close() + self._announce_recvr.close() + self._locate_sender.close() + self._session.close() + self._session = None + self._conn = None + logging.debug("console connection removal complete") + + + def getAddress(self): + """ + The AMQP address this Console is listening to. + """ + return self._address + + + def destroyAgent( self, agent ): + """ + Undoes create. + """ + if not isinstance(agent, Agent): + raise TypeError("agent must be an instance of class Agent") + + self._lock.acquire() + try: + if agent._id in self._agent_map: + del self._agent_map[agent._id] + finally: + self._lock.release() + + def find_agent(self, name, timeout=None ): + """ + Given the name of a particular agent, return an instance of class Agent + representing that agent. Return None if the agent does not exist. + """ + + self._lock.acquire() + try: + agent = self._agent_map.get(name) + if agent: + return agent + finally: + self._lock.release() + + # agent not present yet - ping it with an agent_locate + + handle = self._req_correlation.allocate() + if handle == 0: + raise Exception("Can not allocate a correlation id!") + try: + tmp_sender = self._session.sender(str(QmfAddress.direct(name, + self._domain)) + + ";{create:always," + " node-properties:" + " {type:topic," + " x-properties:" + " {type:direct}}}") + + query = QmfQuery.create_id(QmfQuery.TARGET_AGENT, name) + msg = Message(subject=makeSubject(OpCode.agent_locate), + properties={"method":"request"}, + content={MsgKey.query: query.map_encode()}) + msg.reply_to = str(self._address) + msg.correlation_id = str(handle) + logging.debug("Sending Agent Locate (%s)" % time.time()) + tmp_sender.send( msg ) + except SendError, e: + logging.error(str(e)) + self._req_correlation.release(handle) + return None + + if timeout is None: + timeout = self._reply_timeout + + new_agent = None + logging.debug("Waiting for response to Agent Locate (%s)" % timeout) + self._req_correlation.get_data( handle, timeout ) + self._req_correlation.release(handle) + logging.debug("Agent Locate wait ended (%s)" % time.time()) + self._lock.acquire() + try: + new_agent = self._agent_map.get(name) + finally: + self._lock.release() + return new_agent + + + def doQuery(self, agent, query, timeout=None ): + """ + """ + + target = query.get_target() + handle = self._req_correlation.allocate() + if handle == 0: + raise Exception("Can not allocate a correlation id!") + try: + logging.debug("Sending Query to Agent (%s)" % time.time()) + agent._sendQuery(query, handle) + except SendError, e: + logging.error(str(e)) + self._req_correlation.release(handle) + return None + + if not timeout: + timeout = self._reply_timeout + + logging.debug("Waiting for response to Query (%s)" % timeout) + reply = self._req_correlation.get_data(handle, timeout) + self._req_correlation.release(handle) + if not reply: + logging.debug("Agent Query wait timed-out.") + return None + + if target == QmfQuery.TARGET_PACKAGES: + # simply pass back the list of package names + logging.debug("Response to Packet Query received") + return reply.content.get(MsgKey.package_info) + elif target == QmfQuery.TARGET_OBJECT_ID: + # simply pass back the list of object_id's + logging.debug("Response to Object Id Query received") + return reply.content.get(MsgKey.object_id) + elif target == QmfQuery.TARGET_SCHEMA_ID: + logging.debug("Response to Schema Id Query received") + id_list = [] + for sid_map in reply.content.get(MsgKey.schema_id): + id_list.append(SchemaClassId.from_map(sid_map)) + return id_list + elif target == QmfQuery.TARGET_SCHEMA: + logging.debug("Response to Schema Query received") + schema_list = [] + for schema_map in reply.content.get(MsgKey.schema): + # extract schema id, convert based on schema type + sid_map = schema_map.get(SchemaClass.KEY_SCHEMA_ID) + if sid_map: + sid = SchemaClassId.from_map(sid_map) + if sid: + if sid.get_type() == SchemaClassId.TYPE_DATA: + schema = SchemaObjectClass.from_map(schema_map) + else: + schema = SchemaEventClass.from_map(schema_map) + schema_list.append(schema) + self._add_schema(schema) + return schema_list + elif target == QmfQuery.TARGET_OBJECT: + logging.debug("Response to Object Query received") + obj_list = [] + for obj_map in reply.content.get(MsgKey.data_obj): + # if the object references a schema, fetch it + sid_map = obj_map.get(QmfData.KEY_SCHEMA_ID) + if sid_map: + sid = SchemaClassId.from_map(sid_map) + schema = self._fetch_schema(sid, _agent=agent, + _timeout=timeout) + if not schema: + logging.warning("Unknown schema, id=%s" % sid) + continue + obj = QmfConsoleData(map_=obj_map, agent=agent, + _schema=schema) + else: + # no schema needed + obj = QmfConsoleData(map_=obj_map, agent=agent) + obj_list.append(obj) + return obj_list + else: + logging.warning("Unexpected Target for a Query: '%s'" % target) + return None + + def run(self): + global _callback_thread + # + # @todo KAG Rewrite when api supports waiting on multiple receivers + # + while self._operational: + + # qLen = self._work_q.qsize() + + while True: + try: + msg = self._announce_recvr.fetch(timeout=0) + except Empty: + break + self._dispatch(msg, _direct=False) + + while True: + try: + msg = self._direct_recvr.fetch(timeout = 0) + except Empty: + break + self._dispatch(msg, _direct=True) + + for agent in self._agent_map.itervalues(): + try: + msg = agent._event_recvr.fetch(timeout = 0) + except Empty: + continue + self._dispatch(msg, _direct=False) + + + self._expireAgents() # check for expired agents + + #if qLen == 0 and self._work_q.qsize() and self._notifier: + if self._work_q_put and self._notifier: + # new stuff on work queue, kick the the application... + self._work_q_put = False + _callback_thread = currentThread() + logging.info("Calling console notifier.indication") + self._notifier.indication() + _callback_thread = None + + if self._operational: + # wait for a message to arrive or an agent + # to expire + now = datetime.datetime.utcnow() + if self._next_agent_expire > now: + timeout = ((self._next_agent_expire - now) + datetime.timedelta(microseconds=999999)).seconds + try: + logging.debug("waiting for next rcvr (timeout=%s)..." % timeout) + xxx = self._session.next_receiver(timeout = timeout) + except Empty: + pass + + + logging.debug("Shutting down Console thread") + + def get_objects(self, + _schema_id=None, + _pname=None, _cname=None, + _object_id=None, + _agents=None, + _timeout=None): + """ + @todo + """ + if _object_id is not None: + # query by object id + query = QmfQuery.create_id(QmfQuery.TARGET_OBJECT, _object_id) + elif _schema_id is not None: + pred = QmfQueryPredicate({QmfQuery.CMP_EQ: + [QmfData.KEY_SCHEMA_ID, + _schema_id.map_encode()]}) + query = QmfQuery.create_predicate(QmfQuery.TARGET_OBJECT, pred) + elif _pname is not None: + # query by package name (and maybe class name) + if _cname is not None: + pred = QmfQueryPredicate({QmfQuery.LOGIC_AND: + [{QmfQuery.CMP_EQ: + [SchemaClassId.KEY_PACKAGE, + _pname]}, + {QmfQuery.CMP_EQ: + [SchemaClassId.KEY_CLASS, + _cname]}]}) + else: + pred = QmfQueryPredicate({QmfQuery.CMP_EQ: + [SchemaClassId.KEY_PACKAGE, + _pname]}) + query = QmfQuery.create_predicate(QmfQuery.TARGET_OBJECT, pred) + + else: + raise Exception("invalid arguments") + + if _agents is None: + # use copy of current agent list + self._lock.acquire() + try: + agent_list = self._agent_map.values() + finally: + self._lock.release() + elif isinstance(_agents, Agent): + agent_list = [_agents] + else: + agent_list = _agents + # @todo validate this list! + + # @todo: fix when async doQuery done - query all agents at once, then + # wait for replies, instead of per-agent querying.... + + if _timeout is None: + _timeout = self._reply_timeout + + obj_list = [] + expired = datetime.datetime.utcnow() + datetime.timedelta(seconds=_timeout) + for agent in agent_list: + if not agent.isActive(): + continue + now = datetime.datetime.utcnow() + if now >= expired: + break + timeout = ((expired - now) + datetime.timedelta(microseconds=999999)).seconds + reply = self.doQuery(agent, query, timeout) + if reply: + obj_list = obj_list + reply + + if obj_list: + return obj_list + return None + + + + # called by run() thread ONLY + # + def _dispatch(self, msg, _direct=True): + """ + PRIVATE: Process a message received from an Agent + """ + logging.debug( "Message received from Agent! [%s]" % msg ) + try: + version,opcode = parseSubject(msg.subject) + # @todo: deal with version mismatch!!! + except: + logging.error("Ignoring unrecognized broadcast message '%s'" % msg.subject) + return + + cmap = {}; props = {} + if msg.content_type == "amqp/map": + cmap = msg.content + if msg.properties: + props = msg.properties + + if opcode == OpCode.agent_ind: + self._handleAgentIndMsg( msg, cmap, version, _direct ) + elif opcode == OpCode.data_ind: + self._handleDataIndMsg(msg, cmap, version, _direct) + elif opcode == OpCode.event_ind: + self._handleEventIndMsg(msg, cmap, version, _direct) + elif opcode == OpCode.managed_object: + logging.warning("!!! managed_object TBD !!!") + elif opcode == OpCode.object_ind: + logging.warning("!!! object_ind TBD !!!") + elif opcode == OpCode.response: + self._handleResponseMsg(msg, cmap, version, _direct) + elif opcode == OpCode.schema_ind: + logging.warning("!!! schema_ind TBD !!!") + elif opcode == OpCode.noop: + logging.debug("No-op msg received.") + else: + logging.warning("Ignoring message with unrecognized 'opcode' value: '%s'" % opcode) + + + def _handleAgentIndMsg(self, msg, cmap, version, direct): + """ + Process a received agent-ind message. This message may be a response to a + agent-locate, or it can be an unsolicited agent announce. + """ + logging.debug("_handleAgentIndMsg '%s' (%s)" % (msg, time.time())) + + ai_map = cmap.get(MsgKey.agent_info) + if not ai_map or not isinstance(ai_map, type({})): + logging.warning("Bad agent-ind message received: '%s'" % msg) + return + name = ai_map.get("_name") + if not name: + logging.warning("Bad agent-ind message received: agent name missing" + " '%s'" % msg) + return + + ignore = True + matched = False + correlated = False + agent_query = self._agent_discovery_filter + + if msg.correlation_id: + correlated = self._req_correlation.isValid(msg.correlation_id) + + if direct and correlated: + ignore = False + elif agent_query: + matched = agent_query.evaluate(QmfData.create(values=ai_map)) + ignore = not matched + + if not ignore: + agent = None + self._lock.acquire() + try: + agent = self._agent_map.get(name) + finally: + self._lock.release() + + if not agent: + # need to create and add a new agent + agent = self._createAgent(name) + if not agent: + return # failed to add agent + + # lock out expiration scanning code + self._lock.acquire() + try: + old_timestamp = agent._announce_timestamp + agent._announce_timestamp = datetime.datetime.utcnow() + finally: + self._lock.release() + + if old_timestamp == None and matched: + logging.debug("AGENT_ADDED for %s (%s)" % (agent, time.time())) + wi = WorkItem(WorkItem.AGENT_ADDED, None, {"agent": agent}) + self._work_q.put(wi) + self._work_q_put = True + + if correlated: + # wake up all waiters + logging.debug("waking waiters for correlation id %s" % msg.correlation_id) + self._req_correlation.put_data(msg.correlation_id, msg) + + + + + def _handleDataIndMsg(self, msg, cmap, version, direct): + """ + Process a received data-ind message. + """ + logging.debug("_handleDataIndMsg '%s' (%s)" % (msg, time.time())) + + if not self._req_correlation.isValid(msg.correlation_id): + logging.debug("Data indicate received with unknown correlation_id" + " msg='%s'" % str(msg)) + return + + # wake up all waiters + logging.debug("waking waiters for correlation id %s" % msg.correlation_id) + self._req_correlation.put_data(msg.correlation_id, msg) + + + def _handleResponseMsg(self, msg, cmap, version, direct): + """ + Process a received data-ind message. + """ + # @todo code replication - clean me. + logging.debug("_handleResponseMsg '%s' (%s)" % (msg, time.time())) + + if not self._req_correlation.isValid(msg.correlation_id): + logging.debug("Response msg received with unknown correlation_id" + " msg='%s'" % str(msg)) + return + + # wake up all waiters + logging.debug("waking waiters for correlation id %s" % msg.correlation_id) + self._req_correlation.put_data(msg.correlation_id, msg) + + def _handleEventIndMsg(self, msg, cmap, version, _direct): + ei_map = cmap.get(MsgKey.event) + if not ei_map or not isinstance(ei_map, type({})): + logging.warning("Bad event indication message received: '%s'" % msg) + return + + aname = ei_map.get("_name") + emap = ei_map.get("_event") + if not aname: + logging.debug("No '_name' field in event indication message.") + return + if not emap: + logging.debug("No '_event' field in event indication message.") + return + # @todo: do I need to lock this??? + agent = self._agent_map.get(aname) + if not agent: + logging.debug("Agent '%s' not known." % aname) + return + try: + # @todo: schema??? + event = QmfEvent.from_map(emap) + except TypeError: + logging.debug("Invalid QmfEvent map received: %s" % str(emap)) + return + + # @todo: schema? Need to fetch it, but not from this thread! + # This thread can not pend on a request. + logging.debug("Publishing event received from agent %s" % aname) + wi = WorkItem(WorkItem.EVENT_RECEIVED, None, + {"agent":agent, + "event":event}) + self._work_q.put(wi) + self._work_q_put = True + + + def _expireAgents(self): + """ + Check for expired agents and issue notifications when they expire. + """ + now = datetime.datetime.utcnow() + if self._next_agent_expire and now < self._next_agent_expire: + return + lifetime_delta = datetime.timedelta(seconds = self._agent_timeout) + next_expire_delta = lifetime_delta + self._lock.acquire() + try: + logging.debug("!!! expiring agents '%s'" % now) + for agent in self._agent_map.itervalues(): + if agent._announce_timestamp: + agent_deathtime = agent._announce_timestamp + lifetime_delta + if agent_deathtime <= now: + logging.debug("AGENT_DELETED for %s" % agent) + agent._announce_timestamp = None + wi = WorkItem(WorkItem.AGENT_DELETED, None, + {"agent":agent}) + # @todo: remove agent from self._agent_map + self._work_q.put(wi) + self._work_q_put = True + else: + if (agent_deathtime - now) < next_expire_delta: + next_expire_delta = agent_deathtime - now + + self._next_agent_expire = now + next_expire_delta + logging.debug("!!! next expire cycle = '%s'" % self._next_agent_expire) + finally: + self._lock.release() + + + + def _createAgent( self, name ): + """ + Factory to create/retrieve an agent for this console + """ + logging.debug("creating agent %s" % name) + self._lock.acquire() + try: + agent = self._agent_map.get(name) + if agent: + return agent + + agent = Agent(name, self) + try: + agent._sender = self._session.sender(str(agent._address) + + ";{create:always," + " node-properties:" + " {type:topic," + " x-properties:" + " {type:direct}}}") + except: + logging.warning("Unable to create sender for %s" % name) + return None + logging.debug("created agent sender %s" % agent._sender.target) + + events_addr = QmfAddress.topic(name, self._domain) + try: + agent._event_recvr = self._session.receiver(str(events_addr) + + ";{create:always," + " node-properties:{type:topic}}", + capacity=1) + except: + logging.warning("Unable to create event receiver for %s" % name) + return None + logging.debug("created agent event receiver %s" % agent._event_recvr.source) + + self._agent_map[name] = agent + finally: + self._lock.release() + + # new agent - query for its schema database for + # seeding the schema cache (@todo) + # query = QmfQuery({QmfQuery.TARGET_SCHEMA_ID:None}) + # agent._sendQuery( query ) + + return agent + + + + def enable_agent_discovery(self, _query=None): + """ + Called to enable the asynchronous Agent Discovery process. + Once enabled, AGENT_ADD work items can arrive on the WorkQueue. + """ + # @todo: fix - take predicate only, not entire query! + if _query is not None: + if (not isinstance(_query, QmfQuery) or + _query.get_target() != QmfQuery.TARGET_AGENT): + raise TypeError("Type QmfQuery with target == TARGET_AGENT expected") + self._agent_discovery_filter = _query + else: + # create a match-all agent query (no predicate) + self._agent_discovery_filter = QmfQuery.create_wildcard(QmfQuery.TARGET_AGENT) + + def disable_agent_discovery(self): + """ + Called to disable the async Agent Discovery process enabled by + calling enableAgentDiscovery() + """ + self._agent_discovery_filter = None + + + + def get_workitem_count(self): + """ + Returns the count of pending WorkItems that can be retrieved. + """ + return self._work_q.qsize() + + + + def get_next_workitem(self, timeout=None): + """ + Returns the next pending work item, or None if none available. + @todo: subclass and return an Empty event instead. + """ + try: + wi = self._work_q.get(True, timeout) + except Queue.Empty: + return None + return wi + + + def release_workitem(self, wi): + """ + Return a WorkItem to the Console when it is no longer needed. + @todo: call Queue.task_done() - only 2.5+ + + @type wi: class qmfConsole.WorkItem + @param wi: work item object to return. + """ + pass + + def _add_schema(self, schema): + """ + @todo + """ + if not isinstance(schema, SchemaClass): + raise TypeError("SchemaClass type expected") + + self._lock.acquire() + try: + sid = schema.get_class_id() + if not self._schema_cache.has_key(sid): + self._schema_cache[sid] = schema + finally: + self._lock.release() + + def _fetch_schema(self, schema_id, _agent=None, _timeout=None): + """ + Find the schema identified by schema_id. If not in the cache, ask the + agent for it. + """ + if not isinstance(schema_id, SchemaClassId): + raise TypeError("SchemaClassId type expected") + + self._lock.acquire() + try: + schema = self._schema_cache.get(schema_id) + if schema: + return schema + finally: + self._lock.release() + + if _agent is None: + return None + + # note: doQuery will add the new schema to the cache automatically. + slist = self.doQuery(_agent, + QmfQuery.create_id(QmfQuery.TARGET_SCHEMA, schema_id), + _timeout) + if slist: + return slist[0] + else: + return None + + + + # def get_packages(self): + # plist = [] + # for i in range(self.impl.packageCount()): + # plist.append(self.impl.getPackageName(i)) + # return plist + + + # def get_classes(self, package, kind=CLASS_OBJECT): + # clist = [] + # for i in range(self.impl.classCount(package)): + # key = self.impl.getClass(package, i) + # class_kind = self.impl.getClassKind(key) + # if class_kind == kind: + # if kind == CLASS_OBJECT: + # clist.append(SchemaObjectClass(None, None, {"impl":self.impl.getObjectClass(key)})) + # elif kind == CLASS_EVENT: + # clist.append(SchemaEventClass(None, None, {"impl":self.impl.getEventClass(key)})) + # return clist + + + # def bind_package(self, package): + # return self.impl.bindPackage(package) + + + # def bind_class(self, kwargs = {}): + # if "key" in kwargs: + # self.impl.bindClass(kwargs["key"]) + # elif "package" in kwargs: + # package = kwargs["package"] + # if "class" in kwargs: + # self.impl.bindClass(package, kwargs["class"]) + # else: + # self.impl.bindClass(package) + # else: + # raise Exception("Argument error: invalid arguments, use 'key' or 'package'[,'class']") + + + # def get_agents(self, broker=None): + # blist = [] + # if broker: + # blist.append(broker) + # else: + # self._cv.acquire() + # try: + # # copy while holding lock + # blist = self._broker_list[:] + # finally: + # self._cv.release() + + # agents = [] + # for b in blist: + # for idx in range(b.impl.agentCount()): + # agents.append(AgentProxy(b.impl.getAgent(idx), b)) + + # return agents + + + # def get_objects(self, query, kwargs = {}): + # timeout = 30 + # agent = None + # temp_args = kwargs.copy() + # if type(query) == type({}): + # temp_args.update(query) + + # if "_timeout" in temp_args: + # timeout = temp_args["_timeout"] + # temp_args.pop("_timeout") + + # if "_agent" in temp_args: + # agent = temp_args["_agent"] + # temp_args.pop("_agent") + + # if type(query) == type({}): + # query = Query(temp_args) + + # self._select = {} + # for k in temp_args.iterkeys(): + # if type(k) == str: + # self._select[k] = temp_args[k] + + # self._cv.acquire() + # try: + # self._sync_count = 1 + # self._sync_result = [] + # broker = self._broker_list[0] + # broker.send_query(query.impl, None, agent) + # self._cv.wait(timeout) + # if self._sync_count == 1: + # raise Exception("Timed out: waiting for query response") + # finally: + # self._cv.release() + + # return self._sync_result + + + # def get_object(self, query, kwargs = {}): + # ''' + # Return one and only one object or None. + # ''' + # objs = objects(query, kwargs) + # if len(objs) == 1: + # return objs[0] + # else: + # return None + + + # def first_object(self, query, kwargs = {}): + # ''' + # Return the first of potentially many objects. + # ''' + # objs = objects(query, kwargs) + # if objs: + # return objs[0] + # else: + # return None + + + # # Check the object against select to check for a match + # def _select_match(self, object): + # schema_props = object.properties() + # for key in self._select.iterkeys(): + # for prop in schema_props: + # if key == p[0].name() and self._select[key] != p[1]: + # return False + # return True + + + # def _get_result(self, list, context): + # ''' + # Called by Broker proxy to return the result of a query. + # ''' + # self._cv.acquire() + # try: + # for item in list: + # if self._select_match(item): + # self._sync_result.append(item) + # self._sync_count -= 1 + # self._cv.notify() + # finally: + # self._cv.release() + + + # def start_sync(self, query): pass + + + # def touch_sync(self, sync): pass + + + # def end_sync(self, sync): pass + + + + +# def start_console_events(self): +# self._cb_cond.acquire() +# try: +# self._cb_cond.notify() +# finally: +# self._cb_cond.release() + + +# def _do_console_events(self): +# ''' +# Called by the Console thread to poll for events. Passes the events +# onto the ConsoleHandler associated with this Console. Is called +# periodically, but can also be kicked by Console.start_console_events(). +# ''' +# count = 0 +# valid = self.impl.getEvent(self._event) +# while valid: +# count += 1 +# try: +# if self._event.kind == qmfengine.ConsoleEvent.AGENT_ADDED: +# logging.debug("Console Event AGENT_ADDED received") +# if self._handler: +# self._handler.agent_added(AgentProxy(self._event.agent, None)) +# elif self._event.kind == qmfengine.ConsoleEvent.AGENT_DELETED: +# logging.debug("Console Event AGENT_DELETED received") +# if self._handler: +# self._handler.agent_deleted(AgentProxy(self._event.agent, None)) +# elif self._event.kind == qmfengine.ConsoleEvent.NEW_PACKAGE: +# logging.debug("Console Event NEW_PACKAGE received") +# if self._handler: +# self._handler.new_package(self._event.name) +# elif self._event.kind == qmfengine.ConsoleEvent.NEW_CLASS: +# logging.debug("Console Event NEW_CLASS received") +# if self._handler: +# self._handler.new_class(SchemaClassKey(self._event.classKey)) +# elif self._event.kind == qmfengine.ConsoleEvent.OBJECT_UPDATE: +# logging.debug("Console Event OBJECT_UPDATE received") +# if self._handler: +# self._handler.object_update(ConsoleObject(None, {"impl":self._event.object}), +# self._event.hasProps, self._event.hasStats) +# elif self._event.kind == qmfengine.ConsoleEvent.EVENT_RECEIVED: +# logging.debug("Console Event EVENT_RECEIVED received") +# elif self._event.kind == qmfengine.ConsoleEvent.AGENT_HEARTBEAT: +# logging.debug("Console Event AGENT_HEARTBEAT received") +# if self._handler: +# self._handler.agent_heartbeat(AgentProxy(self._event.agent, None), self._event.timestamp) +# elif self._event.kind == qmfengine.ConsoleEvent.METHOD_RESPONSE: +# logging.debug("Console Event METHOD_RESPONSE received") +# else: +# logging.debug("Console thread received unknown event: '%s'" % str(self._event.kind)) +# except e: +# print "Exception caught in callback thread:", e +# self.impl.popEvent() +# valid = self.impl.getEvent(self._event) +# return count + + + + + +# class Broker(ConnectionHandler): +# # attr_reader :impl :conn, :console, :broker_bank +# def __init__(self, console, conn): +# self.broker_bank = 1 +# self.console = console +# self.conn = conn +# self._session = None +# self._cv = Condition() +# self._stable = None +# self._event = qmfengine.BrokerEvent() +# self._xmtMessage = qmfengine.Message() +# self.impl = qmfengine.BrokerProxy(self.console.impl) +# self.console.impl.addConnection(self.impl, self) +# self.conn.add_conn_handler(self) +# self._operational = True + + +# def shutdown(self): +# logging.debug("broker.shutdown() called.") +# self.console.impl.delConnection(self.impl) +# self.conn.del_conn_handler(self) +# if self._session: +# self.impl.sessionClosed() +# logging.debug("broker.shutdown() sessionClosed done.") +# self._session.destroy() +# logging.debug("broker.shutdown() session destroy done.") +# self._session = None +# self._operational = False +# logging.debug("broker.shutdown() done.") + + +# def wait_for_stable(self, timeout = None): +# self._cv.acquire() +# try: +# if self._stable: +# return +# if timeout: +# self._cv.wait(timeout) +# if not self._stable: +# raise Exception("Timed out: waiting for broker connection to become stable") +# else: +# while not self._stable: +# self._cv.wait() +# finally: +# self._cv.release() + + +# def send_query(self, query, ctx, agent): +# agent_impl = None +# if agent: +# agent_impl = agent.impl +# self.impl.sendQuery(query, ctx, agent_impl) +# self.conn.kick() + + +# def _do_broker_events(self): +# count = 0 +# valid = self.impl.getEvent(self._event) +# while valid: +# count += 1 +# if self._event.kind == qmfengine.BrokerEvent.BROKER_INFO: +# logging.debug("Broker Event BROKER_INFO received"); +# elif self._event.kind == qmfengine.BrokerEvent.DECLARE_QUEUE: +# logging.debug("Broker Event DECLARE_QUEUE received"); +# self.conn.impl.declareQueue(self._session.handle, self._event.name) +# elif self._event.kind == qmfengine.BrokerEvent.DELETE_QUEUE: +# logging.debug("Broker Event DELETE_QUEUE received"); +# self.conn.impl.deleteQueue(self._session.handle, self._event.name) +# elif self._event.kind == qmfengine.BrokerEvent.BIND: +# logging.debug("Broker Event BIND received"); +# self.conn.impl.bind(self._session.handle, self._event.exchange, self._event.name, self._event.bindingKey) +# elif self._event.kind == qmfengine.BrokerEvent.UNBIND: +# logging.debug("Broker Event UNBIND received"); +# self.conn.impl.unbind(self._session.handle, self._event.exchange, self._event.name, self._event.bindingKey) +# elif self._event.kind == qmfengine.BrokerEvent.SETUP_COMPLETE: +# logging.debug("Broker Event SETUP_COMPLETE received"); +# self.impl.startProtocol() +# elif self._event.kind == qmfengine.BrokerEvent.STABLE: +# logging.debug("Broker Event STABLE received"); +# self._cv.acquire() +# try: +# self._stable = True +# self._cv.notify() +# finally: +# self._cv.release() +# elif self._event.kind == qmfengine.BrokerEvent.QUERY_COMPLETE: +# result = [] +# for idx in range(self._event.queryResponse.getObjectCount()): +# result.append(ConsoleObject(None, {"impl":self._event.queryResponse.getObject(idx), "broker":self})) +# self.console._get_result(result, self._event.context) +# elif self._event.kind == qmfengine.BrokerEvent.METHOD_RESPONSE: +# obj = self._event.context +# obj._method_result(MethodResponse(self._event.methodResponse())) + +# self.impl.popEvent() +# valid = self.impl.getEvent(self._event) + +# return count + + +# def _do_broker_messages(self): +# count = 0 +# valid = self.impl.getXmtMessage(self._xmtMessage) +# while valid: +# count += 1 +# logging.debug("Broker: sending msg on connection") +# self.conn.impl.sendMessage(self._session.handle, self._xmtMessage) +# self.impl.popXmt() +# valid = self.impl.getXmtMessage(self._xmtMessage) + +# return count + + +# def _do_events(self): +# while True: +# self.console.start_console_events() +# bcnt = self._do_broker_events() +# mcnt = self._do_broker_messages() +# if bcnt == 0 and mcnt == 0: +# break; + + +# def conn_event_connected(self): +# logging.debug("Broker: Connection event CONNECTED") +# self._session = Session(self.conn, "qmfc-%s.%d" % (socket.gethostname(), os.getpid()), self) +# self.impl.sessionOpened(self._session.handle) +# self._do_events() + + +# def conn_event_disconnected(self, error): +# logging.debug("Broker: Connection event DISCONNECTED") +# pass + + +# def conn_event_visit(self): +# self._do_events() + + +# def sess_event_session_closed(self, context, error): +# logging.debug("Broker: Session event CLOSED") +# self.impl.sessionClosed() + + +# def sess_event_recv(self, context, message): +# logging.debug("Broker: Session event MSG_RECV") +# if not self._operational: +# logging.warning("Unexpected session event message received by Broker proxy: context='%s'" % str(context)) +# self.impl.handleRcvMessage(message) +# self._do_events() + + + +################################################################################ +################################################################################ +################################################################################ +################################################################################ +# TEMPORARY TEST CODE - TO BE DELETED +################################################################################ +################################################################################ +################################################################################ +################################################################################ + +if __name__ == '__main__': + # temp test code + from common import (qmfTypes, QmfEvent, SchemaProperty) + + logging.getLogger().setLevel(logging.INFO) + + logging.info( "************* Creating Async Console **************" ) + + class MyNotifier(Notifier): + def __init__(self, context): + self._myContext = context + self.WorkAvailable = False + + def indication(self): + print("Indication received! context=%d" % self._myContext) + self.WorkAvailable = True + + _noteMe = MyNotifier( 666 ) + + _myConsole = Console(notifier=_noteMe) + + _myConsole.enable_agent_discovery() + logging.info("Waiting...") + + + logging.info( "Destroying console:" ) + _myConsole.destroy( 10 ) + + logging.info( "******** Messing around with Schema ********" ) + + _sec = SchemaEventClass( _classId=SchemaClassId("myPackage", "myClass", + stype=SchemaClassId.TYPE_EVENT), + _desc="A typical event schema", + _props={"Argument-1": SchemaProperty(_type_code=qmfTypes.TYPE_UINT8, + kwargs = {"min":0, + "max":100, + "unit":"seconds", + "desc":"sleep value"}), + "Argument-2": SchemaProperty(_type_code=qmfTypes.TYPE_LSTR, + kwargs={"maxlen":100, + "desc":"a string argument"})}) + print("_sec=%s" % _sec.get_class_id()) + print("_sec.gePropertyCount()=%d" % _sec.get_property_count() ) + print("_sec.getProperty('Argument-1`)=%s" % _sec.get_property('Argument-1') ) + print("_sec.getProperty('Argument-2`)=%s" % _sec.get_property('Argument-2') ) + try: + print("_sec.getProperty('not-found')=%s" % _sec.get_property('not-found') ) + except: + pass + print("_sec.getProperties()='%s'" % _sec.get_properties()) + + print("Adding another argument") + _arg3 = SchemaProperty( _type_code=qmfTypes.TYPE_BOOL, + kwargs={"dir":"IO", + "desc":"a boolean argument"}) + _sec.add_property('Argument-3', _arg3) + print("_sec=%s" % _sec.get_class_id()) + print("_sec.getPropertyCount()=%d" % _sec.get_property_count() ) + print("_sec.getProperty('Argument-1')=%s" % _sec.get_property('Argument-1') ) + print("_sec.getProperty('Argument-2')=%s" % _sec.get_property('Argument-2') ) + print("_sec.getProperty('Argument-3')=%s" % _sec.get_property('Argument-3') ) + + print("_arg3.mapEncode()='%s'" % _arg3.map_encode() ) + + _secmap = _sec.map_encode() + print("_sec.mapEncode()='%s'" % _secmap ) + + _sec2 = SchemaEventClass( _map=_secmap ) + + print("_sec=%s" % _sec.get_class_id()) + print("_sec2=%s" % _sec2.get_class_id()) + + _soc = SchemaObjectClass( _map = {"_schema_id": {"_package_name": "myOtherPackage", + "_class_name": "myOtherClass", + "_type": "_data"}, + "_desc": "A test data object", + "_values": + {"prop1": {"amqp_type": qmfTypes.TYPE_UINT8, + "access": "RO", + "index": True, + "unit": "degrees"}, + "prop2": {"amqp_type": qmfTypes.TYPE_UINT8, + "access": "RW", + "index": True, + "desc": "The Second Property(tm)", + "unit": "radians"}, + "statistics": { "amqp_type": qmfTypes.TYPE_DELTATIME, + "unit": "seconds", + "desc": "time until I retire"}, + "meth1": {"_desc": "A test method", + "_arguments": + {"arg1": {"amqp_type": qmfTypes.TYPE_UINT32, + "desc": "an argument 1", + "dir": "I"}, + "arg2": {"amqp_type": qmfTypes.TYPE_BOOL, + "dir": "IO", + "desc": "some weird boolean"}}}, + "meth2": {"_desc": "A test method", + "_arguments": + {"m2arg1": {"amqp_type": qmfTypes.TYPE_UINT32, + "desc": "an 'nuther argument", + "dir": + "I"}}}}, + "_subtypes": + {"prop1":"qmfProperty", + "prop2":"qmfProperty", + "statistics":"qmfProperty", + "meth1":"qmfMethod", + "meth2":"qmfMethod"}, + "_primary_key_names": ["prop2", "prop1"]}) + + print("_soc='%s'" % _soc) + + print("_soc.getPrimaryKeyList='%s'" % _soc.get_id_names()) + + print("_soc.getPropertyCount='%d'" % _soc.get_property_count()) + print("_soc.getProperties='%s'" % _soc.get_properties()) + print("_soc.getProperty('prop2')='%s'" % _soc.get_property('prop2')) + + print("_soc.getMethodCount='%d'" % _soc.get_method_count()) + print("_soc.getMethods='%s'" % _soc.get_methods()) + print("_soc.getMethod('meth2')='%s'" % _soc.get_method('meth2')) + + _socmap = _soc.map_encode() + print("_socmap='%s'" % _socmap) + _soc2 = SchemaObjectClass( _map=_socmap ) + print("_soc='%s'" % _soc) + print("_soc2='%s'" % _soc2) + + if _soc2.get_class_id() == _soc.get_class_id(): + print("soc and soc2 are the same schema") + + + logging.info( "******** Messing around with ObjectIds ********" ) + + + qd = QmfData( _values={"prop1":1, "prop2":True, "prop3": {"a":"map"}, "prop4": "astring"} ) + print("qd='%s':" % qd) + + print("prop1=%d prop2=%s prop3=%s prop4=%s" % (qd.prop1, qd.prop2, qd.prop3, qd.prop4)) + + print("qd map='%s'" % qd.map_encode()) + print("qd getProperty('prop4')='%s'" % qd.get_value("prop4")) + qd.set_value("prop4", 4, "A test property called 4") + print("qd setProperty('prop4', 4)='%s'" % qd.get_value("prop4")) + qd.prop4 = 9 + print("qd.prop4 = 9 ='%s'" % qd.prop4) + qd["prop4"] = 11 + print("qd[prop4] = 11 ='%s'" % qd["prop4"]) + + print("qd.mapEncode()='%s'" % qd.map_encode()) + _qd2 = QmfData( _map = qd.map_encode() ) + print("_qd2.mapEncode()='%s'" % _qd2.map_encode()) + + _qmfDesc1 = QmfConsoleData( {"_values" : {"prop1": 1, "statistics": 666, + "prop2": 0}}, + agent="some agent name?", + _schema = _soc) + + print("_qmfDesc1 map='%s'" % _qmfDesc1.map_encode()) + + _qmfDesc1._set_schema( _soc ) + + print("_qmfDesc1 prop2 = '%s'" % _qmfDesc1.get_value("prop2")) + print("_qmfDesc1 primarykey = '%s'" % _qmfDesc1.get_object_id()) + print("_qmfDesc1 classid = '%s'" % _qmfDesc1.get_schema_class_id()) + + + _qmfDescMap = _qmfDesc1.map_encode() + print("_qmfDescMap='%s'" % _qmfDescMap) + + _qmfDesc2 = QmfData( _map=_qmfDescMap, _schema=_soc ) + + print("_qmfDesc2 map='%s'" % _qmfDesc2.map_encode()) + print("_qmfDesc2 prop2 = '%s'" % _qmfDesc2.get_value("prop2")) + print("_qmfDesc2 primary key = '%s'" % _qmfDesc2.get_object_id()) + + + logging.info( "******** Messing around with QmfEvents ********" ) + + + _qmfevent1 = QmfEvent( _timestamp = 1111, + _schema = _sec, + _values = {"Argument-1": 77, + "Argument-3": True, + "Argument-2": "a string"}) + print("_qmfevent1.mapEncode()='%s'" % _qmfevent1.map_encode()) + print("_qmfevent1.getTimestamp()='%s'" % _qmfevent1.get_timestamp()) + + _qmfevent1Map = _qmfevent1.map_encode() + + _qmfevent2 = QmfEvent(_map=_qmfevent1Map, _schema=_sec) + print("_qmfevent2.mapEncode()='%s'" % _qmfevent2.map_encode()) + + + logging.info( "******** Messing around with Queries ********" ) + + _q1 = QmfQuery.create_predicate(QmfQuery.TARGET_AGENT, + QmfQueryPredicate({QmfQuery.LOGIC_AND: + [{QmfQuery.CMP_EQ: ["vendor", "AVendor"]}, + {QmfQuery.CMP_EQ: ["product", "SomeProduct"]}, + {QmfQuery.CMP_EQ: ["name", "Thingy"]}, + {QmfQuery.LOGIC_OR: + [{QmfQuery.CMP_LE: ["temperature", -10]}, + {QmfQuery.CMP_FALSE: None}, + {QmfQuery.CMP_EXISTS: ["namey"]}]}]})) + + print("_q1.mapEncode() = [%s]" % _q1.map_encode()) |