# # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # import sys import os import logging import platform import time import datetime import Queue from threading import Thread from threading import Lock from threading import currentThread from threading import Condition from qpid.messaging import Connection, Message, Empty, SendError from common import (make_subject, parse_subject, OpCode, QmfQuery, Notifier, MsgKey, QmfData, QmfAddress, SchemaClass, SchemaClassId, SchemaEventClass, SchemaObjectClass, WorkItem, SchemaMethod, QmfEvent, timedelta_to_secs) # global flag that indicates which thread (if any) is # running the console notifier callback _callback_thread=None ##============================================================================== ## Sequence Manager ##============================================================================== class _Mailbox(object): """ Virtual base class for all Mailbox-like objects """ def __init__(self): self._msgs = [] self._cv = Condition() self._waiting = False def deliver(self, obj): self._cv.acquire() try: self._msgs.append(obj) # if was empty, notify waiters if len(self._msgs) == 1: self._cv.notify() finally: self._cv.release() def fetch(self, timeout=None): self._cv.acquire() try: if len(self._msgs) == 0: self._cv.wait(timeout) if len(self._msgs): return self._msgs.pop() return None finally: self._cv.release() class SequencedWaiter(object): """ Manage sequence numbers for asynchronous method calls. Allows the caller to associate a generic piece of data with a unique sequence number.""" def __init__(self): self.lock = Lock() self.sequence = long(time.time()) # pseudo-randomize seq start self.pending = {} def allocate(self): """ Reserve a sequence number. @rtype: long @return: a unique nonzero sequence number. """ self.lock.acquire() try: seq = self.sequence self.sequence = self.sequence + 1 self.pending[seq] = _Mailbox() finally: self.lock.release() logging.debug( "sequence %d allocated" % seq) return seq def put_data(self, seq, new_data): seq = long(seq) logging.debug( "putting data [%r] to seq %d..." % (new_data, seq) ) self.lock.acquire() try: if seq in self.pending: # logging.error("Putting seq %d @ %s" % (seq,time.time())) self.pending[seq].deliver(new_data) else: logging.error( "seq %d not found!" % seq ) finally: self.lock.release() def get_data(self, seq, timeout=None): """ Release a sequence number reserved using the reserve method. This must be called when the sequence is no longer needed. @type seq: int @param seq: a sequence previously allocated by calling reserve(). @rtype: any @return: the data originally associated with the reserved sequence number. """ seq = long(seq) logging.debug( "getting data for seq=%d" % seq) mbox = None self.lock.acquire() try: if seq in self.pending: mbox = self.pending[seq] finally: self.lock.release() # Note well: pending list is unlocked, so we can wait. # we reference mbox locally, so it will not be released # until we are done. if mbox: d = mbox.fetch(timeout) logging.debug( "seq %d fetched %r!" % (seq, d) ) return d logging.debug( "seq %d not found!" % seq ) return None def release(self, seq): """ Release the sequence, and its mailbox """ seq = long(seq) logging.debug( "releasing seq %d" % seq ) self.lock.acquire() try: if seq in self.pending: del self.pending[seq] finally: self.lock.release() def is_valid(self, seq): """ True if seq is in use, else False (seq is unknown) """ seq = long(seq) self.lock.acquire() try: return seq in self.pending finally: self.lock.release() return False ##============================================================================== ## DATA MODEL ##============================================================================== class QmfConsoleData(QmfData): """ Console's representation of an managed QmfData instance. """ def __init__(self, map_, agent, _schema=None): super(QmfConsoleData, self).__init__(_map=map_, _schema=_schema, _const=True) self._agent = agent def get_timestamps(self): """ Returns a list of timestamps describing the lifecycle of the object. All timestamps are represented by the AMQP timestamp type. [0] = time of last update from Agent, [1] = creation timestamp [2] = deletion timestamp, or zero if not deleted. """ return [self._utime, self._ctime, self._dtime] def get_create_time(self): """ returns the creation timestamp """ return self._ctime def get_update_time(self): """ returns the update timestamp """ return self._utime def get_delete_time(self): """ returns the deletion timestamp, or zero if not yet deleted. """ return self._dtime def is_deleted(self): """ True if deletion timestamp not zero. """ return self._dtime != long(0) def refresh(self, _reply_handle=None, _timeout=None): """ request that the Agent update the value of this object's contents. """ if _reply_handle is not None: logging.error(" ASYNC REFRESH TBD!!!") return None assert self._agent assert self._agent._console if _timeout is None: _timeout = self._agent._console._reply_timeout # create query to agent using this objects ID query = QmfQuery.create_id_object(self.get_object_id(), self.get_schema_class_id()) obj_list = self._agent._console.do_query(self._agent, query, timeout=_timeout) if obj_list is None or len(obj_list) != 1: return None self._update(obj_list[0]) return self def invoke_method(self, name, _in_args={}, _reply_handle=None, _timeout=None): """ invoke the named method. """ assert self._agent assert self._agent._console oid = self.get_object_id() if oid is None: raise ValueError("Cannot invoke methods on unmanaged objects.") if _timeout is None: _timeout = self._agent._console._reply_timeout if self._schema: # validate _in_args = _in_args.copy() ms = self._schema.get_method(name) if ms is None: raise ValueError("Method '%s' is undefined." % name) for aname,prop in ms.get_arguments().iteritems(): if aname not in _in_args: if prop.get_default(): _in_args[aname] = prop.get_default() elif not prop.is_optional(): raise ValueError("Method '%s' requires argument '%s'" % (name, aname)) for aname in _in_args.iterkeys(): prop = ms.get_argument(aname) if prop is None: raise ValueError("Method '%s' does not define argument" " '%s'" % (name, aname)) if "I" not in prop.get_direction(): raise ValueError("Method '%s' argument '%s' is not an" " input." % (name, aname)) # @todo check if value is correct (type, range, etc) handle = self._agent._console._req_correlation.allocate() if handle == 0: raise Exception("Can not allocate a correlation id!") _map = {self.KEY_OBJECT_ID:str(oid), SchemaMethod.KEY_NAME:name} sid = self.get_schema_class_id() if sid: _map[self.KEY_SCHEMA_ID] = sid.map_encode() if _in_args: _map[SchemaMethod.KEY_ARGUMENTS] = _in_args logging.debug("Sending method req to Agent (%s)" % time.time()) try: self._agent._send_method_req(_map, handle) except SendError, e: logging.error(str(e)) self._agent._console._req_correlation.release(handle) return None # @todo async method calls!!! if _reply_handle is not None: print("ASYNC TBD") logging.debug("Waiting for response to method req (%s)" % _timeout) replyMsg = self._agent._console._req_correlation.get_data(handle, _timeout) self._agent._console._req_correlation.release(handle) if not replyMsg: logging.debug("Agent method req wait timed-out.") return None _map = replyMsg.content.get(MsgKey.method) if not _map: logging.error("Invalid method call reply message") return None error=_map.get(SchemaMethod.KEY_ERROR) if error: return MethodResult(_error=QmfData.from_map(error)) else: return MethodResult(_out_args=_map.get(SchemaMethod.KEY_ARGUMENTS)) def _update(self, newer): super(QmfConsoleData,self).__init__(_values=newer._values, _subtypes=newer._subtypes, _tag=newer._tag, _object_id=newer._object_id, _ctime=newer._ctime, _utime=newer._utime, _dtime=newer._dtime, _schema=newer._schema, _const=True) class QmfLocalData(QmfData): """ Console's representation of an unmanaged QmfData instance. There is no remote agent associated with this instance. The Console has full control over this instance. """ def __init__(self, values, _subtypes={}, _tag=None, _object_id=None, _schema=None): # timestamp in millisec since epoch UTC ctime = long(time.time() * 1000) super(QmfLocalData, self).__init__(_values=values, _subtypes=_subtypes, _tag=_tag, _object_id=_object_id, _schema=_schema, _ctime=ctime, _utime=ctime, _const=False) class Agent(object): """ A local representation of a remote agent managed by this console. """ def __init__(self, name, console): """ @type name: AgentId @param name: uniquely identifies this agent in the AMQP domain. """ if not isinstance(console, Console): raise TypeError("parameter must be an instance of class Console") self._name = name self._address = QmfAddress.direct(name, console._domain) self._console = console self._sender = None self._packages = {} # map of {package-name:[list of class-names], } for this agent self._subscriptions = [] # list of active standing subscriptions for this agent self._announce_timestamp = None # datetime when last announce received logging.debug( "Created Agent with address: [%s]" % self._address ) def get_name(self): return self._name def is_active(self): return self._announce_timestamp != None def _send_msg(self, msg, correlation_id=None): """ Low-level routine to asynchronously send a message to this agent. """ msg.reply_to = str(self._console._address) # handle = self._console._req_correlation.allocate() # if handle == 0: # raise Exception("Can not allocate a correlation id!") # msg.correlation_id = str(handle) if correlation_id: msg.correlation_id = str(correlation_id) # TRACE #logging.error("!!! Console %s sending to agent %s (%s)" % # (self._console._name, self._name, str(msg))) self._sender.send(msg) # return handle def get_packages(self): """ Return a list of the names of all packages known to this agent. """ return self._packages.keys() def get_classes(self): """ Return a dictionary [key:class] of classes known to this agent. """ return self._packages.copy() def get_objects(self, query, kwargs={}): """ Return a list of objects that satisfy the given query. @type query: dict, or common.Query @param query: filter for requested objects @type kwargs: dict @param kwargs: ??? used to build match selector and query ??? @rtype: list @return: list of matching objects, or None. """ pass def get_object(self, query, kwargs={}): """ Get one object - query is expected to match only one object. ??? Recommended: explicit timeout param, default None ??? @type query: dict, or common.Query @param query: filter for requested objects @type kwargs: dict @param kwargs: ??? used to build match selector and query ??? @rtype: qmfConsole.ObjectProxy @return: one matching object, or none """ pass def create_subscription(self, query): """ Factory for creating standing subscriptions based on a given query. @type query: common.Query object @param query: determines the list of objects for which this subscription applies @rtype: qmfConsole.Subscription @returns: an object representing the standing subscription. """ pass def invoke_method(self, name, _in_args={}, _reply_handle=None, _timeout=None): """ """ assert self._console if _timeout is None: _timeout = self._console._reply_timeout if _in_args: _in_args = _in_args.copy() handle = self._console._req_correlation.allocate() if handle == 0: raise Exception("Can not allocate a correlation id!") _map = {SchemaMethod.KEY_NAME:name} if _in_args: _map[SchemaMethod.KEY_ARGUMENTS] = _in_args logging.debug("Sending method req to Agent (%s)" % time.time()) try: self._send_method_req(_map, handle) except SendError, e: logging.error(str(e)) self._console._req_correlation.release(handle) return None # @todo async method calls!!! if _reply_handle is not None: print("ASYNC TBD") logging.debug("Waiting for response to method req (%s)" % _timeout) replyMsg = self._console._req_correlation.get_data(handle, _timeout) self._console._req_correlation.release(handle) if not replyMsg: logging.debug("Agent method req wait timed-out.") return None _map = replyMsg.content.get(MsgKey.method) if not _map: logging.error("Invalid method call reply message") return None return MethodResult(_out_args=_map.get(SchemaMethod.KEY_ARGUMENTS), _error=_map.get(SchemaMethod.KEY_ERROR)) def enable_events(self): raise Exception("enable_events tbd") def disable_events(self): raise Exception("disable_events tbd") def destroy(self): raise Exception("destroy tbd") def __repr__(self): return str(self._address) def __str__(self): return self.__repr__() def _send_query(self, query, correlation_id=None): """ """ msg = Message(properties={"method":"request", "qmf.subject":make_subject(OpCode.get_query)}, content={MsgKey.query: query.map_encode()}) self._send_msg( msg, correlation_id ) def _send_method_req(self, mr_map, correlation_id=None): """ """ msg = Message(properties={"method":"request", "qmf.subject":make_subject(OpCode.method_req)}, content=mr_map) self._send_msg( msg, correlation_id ) ##============================================================================== ## METHOD CALL ##============================================================================== class MethodResult(object): def __init__(self, _out_args=None, _error=None): self._error = _error self._out_args = _out_args def succeeded(self): return self._error is None def get_exception(self): return self._error def get_arguments(self): return self._out_args def get_argument(self, name): arg = None if self._out_args: arg = self._out_args.get(name) return arg ##============================================================================== ## CONSOLE ##============================================================================== class Console(Thread): """ A Console manages communications to a collection of agents on behalf of an application. """ def __init__(self, name=None, _domain=None, notifier=None, reply_timeout = 60, # agent_timeout = 120, agent_timeout = 60, kwargs={}): """ @type name: str @param name: identifier for this console. Must be unique. @type notifier: qmfConsole.Notifier @param notifier: invoked when events arrive for processing. @type kwargs: dict @param kwargs: ??? Unused """ Thread.__init__(self) if not name: self._name = "qmfc-%s.%d" % (platform.node(), os.getpid()) else: self._name = str(name) self._domain = _domain self._address = QmfAddress.direct(self._name, self._domain) self._notifier = notifier self._lock = Lock() self._conn = None self._session = None # dict of "agent-direct-address":class Agent entries self._agent_map = {} self._direct_recvr = None self._announce_recvr = None self._locate_sender = None self._schema_cache = {} self._req_correlation = SequencedWaiter() self._operational = False self._agent_discovery_filter = None self._reply_timeout = reply_timeout self._agent_timeout = agent_timeout self._next_agent_expire = None # lock out run() thread self._cv = Condition() # for passing WorkItems to the application self._work_q = Queue.Queue() self._work_q_put = False ## Old stuff below??? #self._broker_list = [] #self.impl = qmfengine.Console() #self._event = qmfengine.ConsoleEvent() ##self._cv = Condition() ##self._sync_count = 0 ##self._sync_result = None ##self._select = {} ##self._cb_cond = Condition() def destroy(self, timeout=None): """ Must be called before the Console is deleted. Frees up all resources and shuts down all background threads. @type timeout: float @param timeout: maximum time in seconds to wait for all background threads to terminate. Default: forever. """ logging.debug("Destroying Console...") if self._conn: self.remove_connection(self._conn, timeout) logging.debug("Console Destroyed") def add_connection(self, conn): """ Add a AMQP connection to the console. The console will setup a session over the connection. The console will then broadcast an Agent Locate Indication over the session in order to discover present agents. @type conn: qpid.messaging.Connection @param conn: the connection to the AMQP messaging infrastructure. """ if self._conn: raise Exception( "Multiple connections per Console not supported." ); self._conn = conn self._session = conn.session(name=self._name) # for messages directly addressed to me self._direct_recvr = self._session.receiver(str(self._address) + ";{create:always," " node-properties:" " {type:topic," " x-properties:" " {type:direct}}}", capacity=1) logging.debug("my direct addr=%s" % self._direct_recvr.source) self._direct_sender = self._session.sender(str(self._address.get_node()) + ";{create:always," " node-properties:" " {type:topic," " x-properties:" " {type:direct}}}") logging.debug("my direct sender=%s" % self._direct_sender.target) # for receiving "broadcast" messages from agents default_addr = QmfAddress.topic(QmfAddress.SUBJECT_AGENT_IND + ".#", self._domain) self._topic_recvr = self._session.receiver(str(default_addr) + ";{create:always," " node-properties:{type:topic}}", capacity=1) logging.debug("default topic recv addr=%s" % self._topic_recvr.source) # for sending to topic subscribers topic_addr = QmfAddress.topic(QmfAddress.SUBJECT_CONSOLE_IND, self._domain) self._topic_sender = self._session.sender(str(topic_addr) + ";{create:always," " node-properties:{type:topic}}") logging.debug("default topic send addr=%s" % self._topic_sender.target) # # Now that receivers are created, fire off the receive thread... # self._operational = True self.start() def remove_connection(self, conn, timeout=None): """ Remove an AMQP connection from the console. Un-does the add_connection() operation, and releases any agents and sessions associated with the connection. @type conn: qpid.messaging.Connection @param conn: connection previously added by add_connection() """ if self._conn and conn and conn != self._conn: logging.error( "Attempt to delete unknown connection: %s" % str(conn)) # tell connection thread to shutdown self._operational = False if self.isAlive(): # kick my thread to wake it up logging.debug("Sending noop to wake up [%s]" % self._address) try: msg = Message(properties={"method":"request", "qmf.subject":make_subject(OpCode.noop)}, subject=self._name, content={"noop":"noop"}) self._direct_sender.send( msg, sync=True ) except SendError, e: logging.error(str(e)) logging.debug("waiting for console receiver thread to exit") self.join(timeout) if self.isAlive(): logging.error( "Console thread '%s' is hung..." % self.getName() ) self._direct_recvr.close() self._direct_sender.close() self._topic_recvr.close() self._topic_sender.close() self._session.close() self._session = None self._conn = None logging.debug("console connection removal complete") def get_address(self): """ The AMQP address this Console is listening to. """ return self._address def destroy_agent( self, agent ): """ Undoes create. """ if not isinstance(agent, Agent): raise TypeError("agent must be an instance of class Agent") self._lock.acquire() try: if agent._id in self._agent_map: del self._agent_map[agent._id] finally: self._lock.release() def find_agent(self, name, timeout=None ): """ Given the name of a particular agent, return an instance of class Agent representing that agent. Return None if the agent does not exist. """ self._lock.acquire() try: agent = self._agent_map.get(name) if agent: return agent finally: self._lock.release() # agent not present yet - ping it with an agent_locate handle = self._req_correlation.allocate() if handle == 0: raise Exception("Can not allocate a correlation id!") query = QmfQuery.create_id(QmfQuery.TARGET_AGENT, name) msg = Message(subject="console.ind.locate." + name, properties={"method":"request", "qmf.subject":make_subject(OpCode.agent_locate)}, content={MsgKey.query: query.map_encode()}) msg.reply_to = str(self._address) msg.correlation_id = str(handle) logging.debug("Sending Agent Locate (%s)" % time.time()) # TRACE #logging.error("!!! Console %s sending agent locate (%s)" % # (self._name, str(msg))) try: self._topic_sender.send(msg) except SendError, e: logging.error(str(e)) self._req_correlation.release(handle) return None if timeout is None: timeout = self._reply_timeout new_agent = None logging.debug("Waiting for response to Agent Locate (%s)" % timeout) self._req_correlation.get_data( handle, timeout ) self._req_correlation.release(handle) logging.debug("Agent Locate wait ended (%s)" % time.time()) self._lock.acquire() try: new_agent = self._agent_map.get(name) finally: self._lock.release() return new_agent def get_agents(self): """ Return the list of known agents. """ self._lock.acquire() try: agents = self._agent_map.values() finally: self._lock.release() return agents def get_agent(self, name): """ Return the named agent, else None if not currently available. """ self._lock.acquire() try: agent = self._agent_map.get(name) finally: self._lock.release() return agent def do_query(self, agent, query, timeout=None ): """ """ target = query.get_target() handle = self._req_correlation.allocate() if handle == 0: raise Exception("Can not allocate a correlation id!") try: logging.debug("Sending Query to Agent (%s)" % time.time()) agent._send_query(query, handle) except SendError, e: logging.error(str(e)) self._req_correlation.release(handle) return None if not timeout: timeout = self._reply_timeout logging.debug("Waiting for response to Query (%s)" % timeout) reply = self._req_correlation.get_data(handle, timeout) self._req_correlation.release(handle) if not reply: logging.debug("Agent Query wait timed-out.") return None if target == QmfQuery.TARGET_PACKAGES: # simply pass back the list of package names logging.debug("Response to Packet Query received") return reply.content.get(MsgKey.package_info) elif target == QmfQuery.TARGET_OBJECT_ID: # simply pass back the list of object_id's logging.debug("Response to Object Id Query received") return reply.content.get(MsgKey.object_id) elif target == QmfQuery.TARGET_SCHEMA_ID: logging.debug("Response to Schema Id Query received") id_list = [] for sid_map in reply.content.get(MsgKey.schema_id): id_list.append(SchemaClassId.from_map(sid_map)) return id_list elif target == QmfQuery.TARGET_SCHEMA: logging.debug("Response to Schema Query received") schema_list = [] for schema_map in reply.content.get(MsgKey.schema): # extract schema id, convert based on schema type sid_map = schema_map.get(SchemaClass.KEY_SCHEMA_ID) if sid_map: sid = SchemaClassId.from_map(sid_map) if sid: if sid.get_type() == SchemaClassId.TYPE_DATA: schema = SchemaObjectClass.from_map(schema_map) else: schema = SchemaEventClass.from_map(schema_map) schema_list.append(schema) self._add_schema(schema) return schema_list elif target == QmfQuery.TARGET_OBJECT: logging.debug("Response to Object Query received") obj_list = [] for obj_map in reply.content.get(MsgKey.data_obj): # if the object references a schema, fetch it sid_map = obj_map.get(QmfData.KEY_SCHEMA_ID) if sid_map: sid = SchemaClassId.from_map(sid_map) schema = self._fetch_schema(sid, _agent=agent, _timeout=timeout) if not schema: logging.warning("Unknown schema, id=%s" % sid) continue obj = QmfConsoleData(map_=obj_map, agent=agent, _schema=schema) else: # no schema needed obj = QmfConsoleData(map_=obj_map, agent=agent) obj_list.append(obj) return obj_list else: logging.warning("Unexpected Target for a Query: '%s'" % target) return None def run(self): global _callback_thread # # @todo KAG Rewrite when api supports waiting on multiple receivers # while self._operational: # qLen = self._work_q.qsize() while True: try: msg = self._topic_recvr.fetch(timeout=0) except Empty: break # TRACE: # logging.error("!!! Console %s: msg on %s [%s]" % # (self._name, self._topic_recvr.source, msg)) self._dispatch(msg, _direct=False) while True: try: msg = self._direct_recvr.fetch(timeout = 0) except Empty: break # TRACE #logging.error("!!! Console %s: msg on %s [%s]" % # (self._name, self._direct_recvr.source, msg)) self._dispatch(msg, _direct=True) self._expire_agents() # check for expired agents #if qLen == 0 and self._work_q.qsize() and self._notifier: if self._work_q_put and self._notifier: # new stuff on work queue, kick the the application... self._work_q_put = False _callback_thread = currentThread() logging.info("Calling console notifier.indication") self._notifier.indication() _callback_thread = None if self._operational: # wait for a message to arrive or an agent # to expire now = datetime.datetime.utcnow() if self._next_agent_expire > now: timeout = timedelta_to_secs(self._next_agent_expire - now) try: logging.debug("waiting for next rcvr (timeout=%s)..." % timeout) xxx = self._session.next_receiver(timeout = timeout) except Empty: pass logging.debug("Shutting down Console thread") def get_objects(self, _object_id=None, _schema_id=None, _pname=None, _cname=None, _agents=None, _timeout=None): """ Retrieve objects by id or schema. By object_id: must specify schema_id or pname & cname if object defined by a schema. Undescribed objects: only object_id needed. By schema: must specify schema_id or pname & cname - all instances of objects defined by that schema are returned. """ if _agents is None: # use copy of current agent list self._lock.acquire() try: agent_list = self._agent_map.values() finally: self._lock.release() elif isinstance(_agents, Agent): agent_list = [_agents] else: agent_list = _agents # @todo validate this list! if _timeout is None: _timeout = self._reply_timeout # @todo: fix when async do_query done - query all agents at once, then # wait for replies, instead of per-agent querying.... obj_list = [] expired = datetime.datetime.utcnow() + datetime.timedelta(seconds=_timeout) for agent in agent_list: if not agent.is_active(): continue now = datetime.datetime.utcnow() if now >= expired: break if _pname is None: if _object_id: query = QmfQuery.create_id_object(_object_id, _schema_id) else: if _schema_id is not None: t_params = {QmfData.KEY_SCHEMA_ID: _schema_id} else: t_params = None query = QmfQuery.create_wildcard(QmfQuery.TARGET_OBJECT, t_params) timeout = timedelta_to_secs(expired - now) reply = self.do_query(agent, query, timeout) if reply: obj_list = obj_list + reply else: # looking up by package name (and maybe class name), need to # find all schema_ids in that package, then lookup object by # schema_id if _cname is not None: pred = [QmfQuery.AND, [QmfQuery.EQ, SchemaClassId.KEY_PACKAGE, [QmfQuery.QUOTE, _pname]], [QmfQuery.EQ, SchemaClassId.KEY_CLASS, [QmfQuery.QUOTE, _cname]]] else: pred = [QmfQuery.EQ, SchemaClassId.KEY_PACKAGE, [QmfQuery.QUOTE, _pname]] query = QmfQuery.create_predicate(QmfQuery.TARGET_SCHEMA_ID, pred) timeout = timedelta_to_secs(expired - now) sid_list = self.do_query(agent, query, timeout) if sid_list: for sid in sid_list: now = datetime.datetime.utcnow() if now >= expired: break if _object_id is not None: query = QmfQuery.create_id_object(_object_id, sid) else: t_params = {QmfData.KEY_SCHEMA_ID: sid} query = QmfQuery.create_wildcard(QmfQuery.TARGET_OBJECT, t_params) timeout = timedelta_to_secs(expired - now) reply = self.do_query(agent, query, timeout) if reply: obj_list = obj_list + reply if obj_list: return obj_list return None # called by run() thread ONLY # def _dispatch(self, msg, _direct=True): """ PRIVATE: Process a message received from an Agent """ logging.debug( "Message received from Agent! [%s]" % msg ) try: version,opcode = parse_subject(msg.properties.get("qmf.subject")) # @todo: deal with version mismatch!!! except: logging.error("Ignoring unrecognized message '%s'" % msg) return cmap = {}; props = {} if msg.content_type == "amqp/map": cmap = msg.content if msg.properties: props = msg.properties if opcode == OpCode.agent_ind: self._handle_agent_ind_msg( msg, cmap, version, _direct ) elif opcode == OpCode.data_ind: self._handle_data_ind_msg(msg, cmap, version, _direct) elif opcode == OpCode.event_ind: self._handle_event_ind_msg(msg, cmap, version, _direct) elif opcode == OpCode.managed_object: logging.warning("!!! managed_object TBD !!!") elif opcode == OpCode.object_ind: logging.warning("!!! object_ind TBD !!!") elif opcode == OpCode.response: self._handle_response_msg(msg, cmap, version, _direct) elif opcode == OpCode.schema_ind: logging.warning("!!! schema_ind TBD !!!") elif opcode == OpCode.noop: logging.debug("No-op msg received.") else: logging.warning("Ignoring message with unrecognized 'opcode' value: '%s'" % opcode) def _handle_agent_ind_msg(self, msg, cmap, version, direct): """ Process a received agent-ind message. This message may be a response to a agent-locate, or it can be an unsolicited agent announce. """ logging.debug("_handle_agent_ind_msg '%s' (%s)" % (msg, time.time())) ai_map = cmap.get(MsgKey.agent_info) if not ai_map or not isinstance(ai_map, type({})): logging.warning("Bad agent-ind message received: '%s'" % msg) return name = ai_map.get("_name") if not name: logging.warning("Bad agent-ind message received: agent name missing" " '%s'" % msg) return correlated = False if msg.correlation_id: correlated = self._req_correlation.is_valid(msg.correlation_id) agent = None self._lock.acquire() try: agent = self._agent_map.get(name) if agent: # agent already known, just update timestamp agent._announce_timestamp = datetime.datetime.utcnow() finally: self._lock.release() if not agent: # need to create and add a new agent? matched = False if self._agent_discovery_filter: tmp = QmfData.create(values=ai_map) matched = self._agent_discovery_filter.evaluate(tmp) if (correlated or matched): agent = self._create_agent(name) if not agent: return # failed to add agent agent._announce_timestamp = datetime.datetime.utcnow() if matched: # unsolicited, but newly discovered logging.debug("AGENT_ADDED for %s (%s)" % (agent, time.time())) wi = WorkItem(WorkItem.AGENT_ADDED, None, {"agent": agent}) self._work_q.put(wi) self._work_q_put = True if correlated: # wake up all waiters logging.debug("waking waiters for correlation id %s" % msg.correlation_id) self._req_correlation.put_data(msg.correlation_id, msg) def _handle_data_ind_msg(self, msg, cmap, version, direct): """ Process a received data-ind message. """ logging.debug("_handle_data_ind_msg '%s' (%s)" % (msg, time.time())) if not self._req_correlation.is_valid(msg.correlation_id): logging.debug("Data indicate received with unknown correlation_id" " msg='%s'" % str(msg)) return # wake up all waiters logging.debug("waking waiters for correlation id %s" % msg.correlation_id) self._req_correlation.put_data(msg.correlation_id, msg) def _handle_response_msg(self, msg, cmap, version, direct): """ Process a received data-ind message. """ # @todo code replication - clean me. logging.debug("_handle_response_msg '%s' (%s)" % (msg, time.time())) if not self._req_correlation.is_valid(msg.correlation_id): logging.debug("Response msg received with unknown correlation_id" " msg='%s'" % str(msg)) return # wake up all waiters logging.debug("waking waiters for correlation id %s" % msg.correlation_id) self._req_correlation.put_data(msg.correlation_id, msg) def _handle_event_ind_msg(self, msg, cmap, version, _direct): ei_map = cmap.get(MsgKey.event) if not ei_map or not isinstance(ei_map, type({})): logging.warning("Bad event indication message received: '%s'" % msg) return aname = ei_map.get("_name") emap = ei_map.get("_event") if not aname: logging.debug("No '_name' field in event indication message.") return if not emap: logging.debug("No '_event' field in event indication message.") return agent = None self._lock.acquire() try: agent = self._agent_map.get(aname) finally: self._lock.release() if not agent: logging.debug("Agent '%s' not known." % aname) return try: # @todo: schema??? event = QmfEvent.from_map(emap) except TypeError: logging.debug("Invalid QmfEvent map received: %s" % str(emap)) return # @todo: schema? Need to fetch it, but not from this thread! # This thread can not pend on a request. logging.debug("Publishing event received from agent %s" % aname) wi = WorkItem(WorkItem.EVENT_RECEIVED, None, {"agent":agent, "event":event}) self._work_q.put(wi) self._work_q_put = True def _expire_agents(self): """ Check for expired agents and issue notifications when they expire. """ now = datetime.datetime.utcnow() if self._next_agent_expire and now < self._next_agent_expire: return lifetime_delta = datetime.timedelta(seconds = self._agent_timeout) next_expire_delta = lifetime_delta self._lock.acquire() try: logging.debug("!!! expiring agents '%s'" % now) for agent in self._agent_map.itervalues(): if agent._announce_timestamp: agent_deathtime = agent._announce_timestamp + lifetime_delta if agent_deathtime <= now: logging.debug("AGENT_DELETED for %s" % agent) agent._announce_timestamp = None wi = WorkItem(WorkItem.AGENT_DELETED, None, {"agent":agent}) # @todo: remove agent from self._agent_map self._work_q.put(wi) self._work_q_put = True else: if (agent_deathtime - now) < next_expire_delta: next_expire_delta = agent_deathtime - now self._next_agent_expire = now + next_expire_delta logging.debug("!!! next expire cycle = '%s'" % self._next_agent_expire) finally: self._lock.release() def _create_agent( self, name ): """ Factory to create/retrieve an agent for this console """ logging.debug("creating agent %s" % name) self._lock.acquire() try: agent = self._agent_map.get(name) if agent: return agent agent = Agent(name, self) try: agent._sender = self._session.sender(str(agent._address) + ";{create:always," " node-properties:" " {type:topic," " x-properties:" " {type:direct}}}") except: logging.warning("Unable to create sender for %s" % name) return None logging.debug("created agent sender %s" % agent._sender.target) self._agent_map[name] = agent finally: self._lock.release() # new agent - query for its schema database for # seeding the schema cache (@todo) # query = QmfQuery({QmfQuery.TARGET_SCHEMA_ID:None}) # agent._sendQuery( query ) return agent def enable_agent_discovery(self, _query=None): """ Called to enable the asynchronous Agent Discovery process. Once enabled, AGENT_ADD work items can arrive on the WorkQueue. """ # @todo: fix - take predicate only, not entire query! if _query is not None: if (not isinstance(_query, QmfQuery) or _query.get_target() != QmfQuery.TARGET_AGENT): raise TypeError("Type QmfQuery with target == TARGET_AGENT expected") self._agent_discovery_filter = _query else: # create a match-all agent query (no predicate) self._agent_discovery_filter = QmfQuery.create_wildcard(QmfQuery.TARGET_AGENT) def disable_agent_discovery(self): """ Called to disable the async Agent Discovery process enabled by calling enableAgentDiscovery() """ self._agent_discovery_filter = None def get_workitem_count(self): """ Returns the count of pending WorkItems that can be retrieved. """ return self._work_q.qsize() def get_next_workitem(self, timeout=None): """ Returns the next pending work item, or None if none available. @todo: subclass and return an Empty event instead. """ try: wi = self._work_q.get(True, timeout) except Queue.Empty: return None return wi def release_workitem(self, wi): """ Return a WorkItem to the Console when it is no longer needed. @todo: call Queue.task_done() - only 2.5+ @type wi: class qmfConsole.WorkItem @param wi: work item object to return. """ pass def _add_schema(self, schema): """ @todo """ if not isinstance(schema, SchemaClass): raise TypeError("SchemaClass type expected") self._lock.acquire() try: sid = schema.get_class_id() if not self._schema_cache.has_key(sid): self._schema_cache[sid] = schema finally: self._lock.release() def _fetch_schema(self, schema_id, _agent=None, _timeout=None): """ Find the schema identified by schema_id. If not in the cache, ask the agent for it. """ if not isinstance(schema_id, SchemaClassId): raise TypeError("SchemaClassId type expected") self._lock.acquire() try: schema = self._schema_cache.get(schema_id) if schema: return schema finally: self._lock.release() if _agent is None: return None # note: do_query will add the new schema to the cache automatically. slist = self.do_query(_agent, QmfQuery.create_id(QmfQuery.TARGET_SCHEMA, schema_id), _timeout) if slist: return slist[0] else: return None def __repr__(self): return str(self._address) # def get_packages(self): # plist = [] # for i in range(self.impl.packageCount()): # plist.append(self.impl.getPackageName(i)) # return plist # def get_classes(self, package, kind=CLASS_OBJECT): # clist = [] # for i in range(self.impl.classCount(package)): # key = self.impl.getClass(package, i) # class_kind = self.impl.getClassKind(key) # if class_kind == kind: # if kind == CLASS_OBJECT: # clist.append(SchemaObjectClass(None, None, {"impl":self.impl.getObjectClass(key)})) # elif kind == CLASS_EVENT: # clist.append(SchemaEventClass(None, None, {"impl":self.impl.getEventClass(key)})) # return clist # def bind_package(self, package): # return self.impl.bindPackage(package) # def bind_class(self, kwargs = {}): # if "key" in kwargs: # self.impl.bindClass(kwargs["key"]) # elif "package" in kwargs: # package = kwargs["package"] # if "class" in kwargs: # self.impl.bindClass(package, kwargs["class"]) # else: # self.impl.bindClass(package) # else: # raise Exception("Argument error: invalid arguments, use 'key' or 'package'[,'class']") # def get_agents(self, broker=None): # blist = [] # if broker: # blist.append(broker) # else: # self._cv.acquire() # try: # # copy while holding lock # blist = self._broker_list[:] # finally: # self._cv.release() # agents = [] # for b in blist: # for idx in range(b.impl.agentCount()): # agents.append(AgentProxy(b.impl.getAgent(idx), b)) # return agents # def get_objects(self, query, kwargs = {}): # timeout = 30 # agent = None # temp_args = kwargs.copy() # if type(query) == type({}): # temp_args.update(query) # if "_timeout" in temp_args: # timeout = temp_args["_timeout"] # temp_args.pop("_timeout") # if "_agent" in temp_args: # agent = temp_args["_agent"] # temp_args.pop("_agent") # if type(query) == type({}): # query = Query(temp_args) # self._select = {} # for k in temp_args.iterkeys(): # if type(k) == str: # self._select[k] = temp_args[k] # self._cv.acquire() # try: # self._sync_count = 1 # self._sync_result = [] # broker = self._broker_list[0] # broker.send_query(query.impl, None, agent) # self._cv.wait(timeout) # if self._sync_count == 1: # raise Exception("Timed out: waiting for query response") # finally: # self._cv.release() # return self._sync_result # def get_object(self, query, kwargs = {}): # ''' # Return one and only one object or None. # ''' # objs = objects(query, kwargs) # if len(objs) == 1: # return objs[0] # else: # return None # def first_object(self, query, kwargs = {}): # ''' # Return the first of potentially many objects. # ''' # objs = objects(query, kwargs) # if objs: # return objs[0] # else: # return None # # Check the object against select to check for a match # def _select_match(self, object): # schema_props = object.properties() # for key in self._select.iterkeys(): # for prop in schema_props: # if key == p[0].name() and self._select[key] != p[1]: # return False # return True # def _get_result(self, list, context): # ''' # Called by Broker proxy to return the result of a query. # ''' # self._cv.acquire() # try: # for item in list: # if self._select_match(item): # self._sync_result.append(item) # self._sync_count -= 1 # self._cv.notify() # finally: # self._cv.release() # def start_sync(self, query): pass # def touch_sync(self, sync): pass # def end_sync(self, sync): pass # def start_console_events(self): # self._cb_cond.acquire() # try: # self._cb_cond.notify() # finally: # self._cb_cond.release() # def _do_console_events(self): # ''' # Called by the Console thread to poll for events. Passes the events # onto the ConsoleHandler associated with this Console. Is called # periodically, but can also be kicked by Console.start_console_events(). # ''' # count = 0 # valid = self.impl.getEvent(self._event) # while valid: # count += 1 # try: # if self._event.kind == qmfengine.ConsoleEvent.AGENT_ADDED: # logging.debug("Console Event AGENT_ADDED received") # if self._handler: # self._handler.agent_added(AgentProxy(self._event.agent, None)) # elif self._event.kind == qmfengine.ConsoleEvent.AGENT_DELETED: # logging.debug("Console Event AGENT_DELETED received") # if self._handler: # self._handler.agent_deleted(AgentProxy(self._event.agent, None)) # elif self._event.kind == qmfengine.ConsoleEvent.NEW_PACKAGE: # logging.debug("Console Event NEW_PACKAGE received") # if self._handler: # self._handler.new_package(self._event.name) # elif self._event.kind == qmfengine.ConsoleEvent.NEW_CLASS: # logging.debug("Console Event NEW_CLASS received") # if self._handler: # self._handler.new_class(SchemaClassKey(self._event.classKey)) # elif self._event.kind == qmfengine.ConsoleEvent.OBJECT_UPDATE: # logging.debug("Console Event OBJECT_UPDATE received") # if self._handler: # self._handler.object_update(ConsoleObject(None, {"impl":self._event.object}), # self._event.hasProps, self._event.hasStats) # elif self._event.kind == qmfengine.ConsoleEvent.EVENT_RECEIVED: # logging.debug("Console Event EVENT_RECEIVED received") # elif self._event.kind == qmfengine.ConsoleEvent.AGENT_HEARTBEAT: # logging.debug("Console Event AGENT_HEARTBEAT received") # if self._handler: # self._handler.agent_heartbeat(AgentProxy(self._event.agent, None), self._event.timestamp) # elif self._event.kind == qmfengine.ConsoleEvent.METHOD_RESPONSE: # logging.debug("Console Event METHOD_RESPONSE received") # else: # logging.debug("Console thread received unknown event: '%s'" % str(self._event.kind)) # except e: # print "Exception caught in callback thread:", e # self.impl.popEvent() # valid = self.impl.getEvent(self._event) # return count # class Broker(ConnectionHandler): # # attr_reader :impl :conn, :console, :broker_bank # def __init__(self, console, conn): # self.broker_bank = 1 # self.console = console # self.conn = conn # self._session = None # self._cv = Condition() # self._stable = None # self._event = qmfengine.BrokerEvent() # self._xmtMessage = qmfengine.Message() # self.impl = qmfengine.BrokerProxy(self.console.impl) # self.console.impl.addConnection(self.impl, self) # self.conn.add_conn_handler(self) # self._operational = True # def shutdown(self): # logging.debug("broker.shutdown() called.") # self.console.impl.delConnection(self.impl) # self.conn.del_conn_handler(self) # if self._session: # self.impl.sessionClosed() # logging.debug("broker.shutdown() sessionClosed done.") # self._session.destroy() # logging.debug("broker.shutdown() session destroy done.") # self._session = None # self._operational = False # logging.debug("broker.shutdown() done.") # def wait_for_stable(self, timeout = None): # self._cv.acquire() # try: # if self._stable: # return # if timeout: # self._cv.wait(timeout) # if not self._stable: # raise Exception("Timed out: waiting for broker connection to become stable") # else: # while not self._stable: # self._cv.wait() # finally: # self._cv.release() # def send_query(self, query, ctx, agent): # agent_impl = None # if agent: # agent_impl = agent.impl # self.impl.sendQuery(query, ctx, agent_impl) # self.conn.kick() # def _do_broker_events(self): # count = 0 # valid = self.impl.getEvent(self._event) # while valid: # count += 1 # if self._event.kind == qmfengine.BrokerEvent.BROKER_INFO: # logging.debug("Broker Event BROKER_INFO received"); # elif self._event.kind == qmfengine.BrokerEvent.DECLARE_QUEUE: # logging.debug("Broker Event DECLARE_QUEUE received"); # self.conn.impl.declareQueue(self._session.handle, self._event.name) # elif self._event.kind == qmfengine.BrokerEvent.DELETE_QUEUE: # logging.debug("Broker Event DELETE_QUEUE received"); # self.conn.impl.deleteQueue(self._session.handle, self._event.name) # elif self._event.kind == qmfengine.BrokerEvent.BIND: # logging.debug("Broker Event BIND received"); # self.conn.impl.bind(self._session.handle, self._event.exchange, self._event.name, self._event.bindingKey) # elif self._event.kind == qmfengine.BrokerEvent.UNBIND: # logging.debug("Broker Event UNBIND received"); # self.conn.impl.unbind(self._session.handle, self._event.exchange, self._event.name, self._event.bindingKey) # elif self._event.kind == qmfengine.BrokerEvent.SETUP_COMPLETE: # logging.debug("Broker Event SETUP_COMPLETE received"); # self.impl.startProtocol() # elif self._event.kind == qmfengine.BrokerEvent.STABLE: # logging.debug("Broker Event STABLE received"); # self._cv.acquire() # try: # self._stable = True # self._cv.notify() # finally: # self._cv.release() # elif self._event.kind == qmfengine.BrokerEvent.QUERY_COMPLETE: # result = [] # for idx in range(self._event.queryResponse.getObjectCount()): # result.append(ConsoleObject(None, {"impl":self._event.queryResponse.getObject(idx), "broker":self})) # self.console._get_result(result, self._event.context) # elif self._event.kind == qmfengine.BrokerEvent.METHOD_RESPONSE: # obj = self._event.context # obj._method_result(MethodResponse(self._event.methodResponse())) # self.impl.popEvent() # valid = self.impl.getEvent(self._event) # return count # def _do_broker_messages(self): # count = 0 # valid = self.impl.getXmtMessage(self._xmtMessage) # while valid: # count += 1 # logging.debug("Broker: sending msg on connection") # self.conn.impl.sendMessage(self._session.handle, self._xmtMessage) # self.impl.popXmt() # valid = self.impl.getXmtMessage(self._xmtMessage) # return count # def _do_events(self): # while True: # self.console.start_console_events() # bcnt = self._do_broker_events() # mcnt = self._do_broker_messages() # if bcnt == 0 and mcnt == 0: # break; # def conn_event_connected(self): # logging.debug("Broker: Connection event CONNECTED") # self._session = Session(self.conn, "qmfc-%s.%d" % (socket.gethostname(), os.getpid()), self) # self.impl.sessionOpened(self._session.handle) # self._do_events() # def conn_event_disconnected(self, error): # logging.debug("Broker: Connection event DISCONNECTED") # pass # def conn_event_visit(self): # self._do_events() # def sess_event_session_closed(self, context, error): # logging.debug("Broker: Session event CLOSED") # self.impl.sessionClosed() # def sess_event_recv(self, context, message): # logging.debug("Broker: Session event MSG_RECV") # if not self._operational: # logging.warning("Unexpected session event message received by Broker proxy: context='%s'" % str(context)) # self.impl.handleRcvMessage(message) # self._do_events() ################################################################################ ################################################################################ ################################################################################ ################################################################################ # TEMPORARY TEST CODE - TO BE DELETED ################################################################################ ################################################################################ ################################################################################ ################################################################################ if __name__ == '__main__': # temp test code from common import (qmfTypes, SchemaProperty) logging.getLogger().setLevel(logging.INFO) logging.info( "************* Creating Async Console **************" ) class MyNotifier(Notifier): def __init__(self, context): self._myContext = context self.WorkAvailable = False def indication(self): print("Indication received! context=%d" % self._myContext) self.WorkAvailable = True _noteMe = MyNotifier( 666 ) _myConsole = Console(notifier=_noteMe) _myConsole.enable_agent_discovery() logging.info("Waiting...") logging.info( "Destroying console:" ) _myConsole.destroy( 10 ) logging.info( "******** Messing around with Schema ********" ) _sec = SchemaEventClass( _classId=SchemaClassId("myPackage", "myClass", stype=SchemaClassId.TYPE_EVENT), _desc="A typical event schema", _props={"Argument-1": SchemaProperty(_type_code=qmfTypes.TYPE_UINT8, kwargs = {"min":0, "max":100, "unit":"seconds", "desc":"sleep value"}), "Argument-2": SchemaProperty(_type_code=qmfTypes.TYPE_LSTR, kwargs={"maxlen":100, "desc":"a string argument"})}) print("_sec=%s" % _sec.get_class_id()) print("_sec.gePropertyCount()=%d" % _sec.get_property_count() ) print("_sec.getProperty('Argument-1`)=%s" % _sec.get_property('Argument-1') ) print("_sec.getProperty('Argument-2`)=%s" % _sec.get_property('Argument-2') ) try: print("_sec.getProperty('not-found')=%s" % _sec.get_property('not-found') ) except: pass print("_sec.getProperties()='%s'" % _sec.get_properties()) print("Adding another argument") _arg3 = SchemaProperty( _type_code=qmfTypes.TYPE_BOOL, kwargs={"dir":"IO", "desc":"a boolean argument"}) _sec.add_property('Argument-3', _arg3) print("_sec=%s" % _sec.get_class_id()) print("_sec.getPropertyCount()=%d" % _sec.get_property_count() ) print("_sec.getProperty('Argument-1')=%s" % _sec.get_property('Argument-1') ) print("_sec.getProperty('Argument-2')=%s" % _sec.get_property('Argument-2') ) print("_sec.getProperty('Argument-3')=%s" % _sec.get_property('Argument-3') ) print("_arg3.mapEncode()='%s'" % _arg3.map_encode() ) _secmap = _sec.map_encode() print("_sec.mapEncode()='%s'" % _secmap ) _sec2 = SchemaEventClass( _map=_secmap ) print("_sec=%s" % _sec.get_class_id()) print("_sec2=%s" % _sec2.get_class_id()) _soc = SchemaObjectClass( _map = {"_schema_id": {"_package_name": "myOtherPackage", "_class_name": "myOtherClass", "_type": "_data"}, "_desc": "A test data object", "_values": {"prop1": {"amqp_type": qmfTypes.TYPE_UINT8, "access": "RO", "index": True, "unit": "degrees"}, "prop2": {"amqp_type": qmfTypes.TYPE_UINT8, "access": "RW", "index": True, "desc": "The Second Property(tm)", "unit": "radians"}, "statistics": { "amqp_type": qmfTypes.TYPE_DELTATIME, "unit": "seconds", "desc": "time until I retire"}, "meth1": {"_desc": "A test method", "_arguments": {"arg1": {"amqp_type": qmfTypes.TYPE_UINT32, "desc": "an argument 1", "dir": "I"}, "arg2": {"amqp_type": qmfTypes.TYPE_BOOL, "dir": "IO", "desc": "some weird boolean"}}}, "meth2": {"_desc": "A test method", "_arguments": {"m2arg1": {"amqp_type": qmfTypes.TYPE_UINT32, "desc": "an 'nuther argument", "dir": "I"}}}}, "_subtypes": {"prop1":"qmfProperty", "prop2":"qmfProperty", "statistics":"qmfProperty", "meth1":"qmfMethod", "meth2":"qmfMethod"}, "_primary_key_names": ["prop2", "prop1"]}) print("_soc='%s'" % _soc) print("_soc.getPrimaryKeyList='%s'" % _soc.get_id_names()) print("_soc.getPropertyCount='%d'" % _soc.get_property_count()) print("_soc.getProperties='%s'" % _soc.get_properties()) print("_soc.getProperty('prop2')='%s'" % _soc.get_property('prop2')) print("_soc.getMethodCount='%d'" % _soc.get_method_count()) print("_soc.getMethods='%s'" % _soc.get_methods()) print("_soc.getMethod('meth2')='%s'" % _soc.get_method('meth2')) _socmap = _soc.map_encode() print("_socmap='%s'" % _socmap) _soc2 = SchemaObjectClass( _map=_socmap ) print("_soc='%s'" % _soc) print("_soc2='%s'" % _soc2) if _soc2.get_class_id() == _soc.get_class_id(): print("soc and soc2 are the same schema") logging.info( "******** Messing around with ObjectIds ********" ) qd = QmfData( _values={"prop1":1, "prop2":True, "prop3": {"a":"map"}, "prop4": "astring"} ) print("qd='%s':" % qd) print("prop1=%d prop2=%s prop3=%s prop4=%s" % (qd.prop1, qd.prop2, qd.prop3, qd.prop4)) print("qd map='%s'" % qd.map_encode()) print("qd getProperty('prop4')='%s'" % qd.get_value("prop4")) qd.set_value("prop4", 4, "A test property called 4") print("qd setProperty('prop4', 4)='%s'" % qd.get_value("prop4")) qd.prop4 = 9 print("qd.prop4 = 9 ='%s'" % qd.prop4) qd["prop4"] = 11 print("qd[prop4] = 11 ='%s'" % qd["prop4"]) print("qd.mapEncode()='%s'" % qd.map_encode()) _qd2 = QmfData( _map = qd.map_encode() ) print("_qd2.mapEncode()='%s'" % _qd2.map_encode()) _qmfDesc1 = QmfConsoleData( {"_values" : {"prop1": 1, "statistics": 666, "prop2": 0}}, agent="some agent name?", _schema = _soc) print("_qmfDesc1 map='%s'" % _qmfDesc1.map_encode()) _qmfDesc1._set_schema( _soc ) print("_qmfDesc1 prop2 = '%s'" % _qmfDesc1.get_value("prop2")) print("_qmfDesc1 primarykey = '%s'" % _qmfDesc1.get_object_id()) print("_qmfDesc1 classid = '%s'" % _qmfDesc1.get_schema_class_id()) _qmfDescMap = _qmfDesc1.map_encode() print("_qmfDescMap='%s'" % _qmfDescMap) _qmfDesc2 = QmfData( _map=_qmfDescMap, _schema=_soc ) print("_qmfDesc2 map='%s'" % _qmfDesc2.map_encode()) print("_qmfDesc2 prop2 = '%s'" % _qmfDesc2.get_value("prop2")) print("_qmfDesc2 primary key = '%s'" % _qmfDesc2.get_object_id()) logging.info( "******** Messing around with QmfEvents ********" ) _qmfevent1 = QmfEvent( _timestamp = 1111, _schema = _sec, _values = {"Argument-1": 77, "Argument-3": True, "Argument-2": "a string"}) print("_qmfevent1.mapEncode()='%s'" % _qmfevent1.map_encode()) print("_qmfevent1.getTimestamp()='%s'" % _qmfevent1.get_timestamp()) _qmfevent1Map = _qmfevent1.map_encode() _qmfevent2 = QmfEvent(_map=_qmfevent1Map, _schema=_sec) print("_qmfevent2.mapEncode()='%s'" % _qmfevent2.map_encode()) logging.info( "******** Messing around with Queries ********" ) _q1 = QmfQuery.create_predicate(QmfQuery.TARGET_AGENT, [QmfQuery.AND, [QmfQuery.EQ, "vendor", [QmfQuery.QUOTE, "AVendor"]], [QmfQuery.EQ, [QmfQuery.QUOTE, "SomeProduct"], "product"], [QmfQuery.EQ, [QmfQuery.UNQUOTE, "name"], [QmfQuery.QUOTE, "Thingy"]], [QmfQuery.OR, [QmfQuery.LE, "temperature", -10], [QmfQuery.FALSE], [QmfQuery.EXISTS, "namey"]]]) print("_q1.mapEncode() = [%s]" % _q1.map_encode())