diff options
Diffstat (limited to 'qpid/extras/qmf/src/py/qmf2-prototype/console.py')
-rw-r--r-- | qpid/extras/qmf/src/py/qmf2-prototype/console.py | 2626 |
1 files changed, 2626 insertions, 0 deletions
diff --git a/qpid/extras/qmf/src/py/qmf2-prototype/console.py b/qpid/extras/qmf/src/py/qmf2-prototype/console.py new file mode 100644 index 0000000000..9227835b3f --- /dev/null +++ b/qpid/extras/qmf/src/py/qmf2-prototype/console.py @@ -0,0 +1,2626 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +import sys +import os +import platform +import time +import datetime +import Queue +from logging import getLogger +from threading import Thread, Event +from threading import RLock +from threading import currentThread +from threading import Condition + +from qpid.messaging import Connection, Message, Empty, SendError + +from common import (QMF_APP_ID, OpCode, QmfQuery, Notifier, ContentType, + QmfData, QmfAddress, SchemaClass, SchemaClassId, + SchemaEventClass, SchemaObjectClass, WorkItem, + SchemaMethod, QmfEvent, timedelta_to_secs) + + +# global flag that indicates which thread (if any) is +# running the console notifier callback +_callback_thread=None + + +log = getLogger("qmf") +trace = getLogger("qmf.console") + + +##============================================================================== +## Console Transaction Management +## +## At any given time, a console application may have multiple outstanding +## message transactions with agents. The following objects allow the console +## to track these outstanding transactions. +##============================================================================== + + +class _Mailbox(object): + """ + Virtual base class for all Mailbox-like objects. + """ + def __init__(self, console): + self.console = console + self.cid = 0 + self.console._add_mailbox(self) + + def get_address(self): + return self.cid + + def deliver(self, data): + """ + Invoked by Console Management thread when a message arrives for + this mailbox. + """ + raise Exception("_Mailbox deliver() method must be provided") + + def destroy(self): + """ + Release the mailbox. Once called, the mailbox should no longer be + referenced. + """ + self.console._remove_mailbox(self.cid) + + +class _SyncMailbox(_Mailbox): + """ + A simple mailbox that allows a consumer to wait for delivery of data. + """ + def __init__(self, console): + """ + Invoked by application thread. + """ + super(_SyncMailbox, self).__init__(console) + self._cv = Condition() + self._data = [] + self._waiting = False + + def deliver(self, data): + """ + Drop data into the mailbox, waking any waiters if necessary. + Invoked by Console Management thread only. + """ + self._cv.acquire() + try: + self._data.append(data) + # if was empty, notify waiters + if len(self._data) == 1: + self._cv.notify() + finally: + self._cv.release() + + def fetch(self, timeout=None): + """ + Get one data item from a mailbox, with timeout. + Invoked by application thread. + """ + self._cv.acquire() + try: + if len(self._data) == 0: + self._cv.wait(timeout) + if len(self._data): + return self._data.pop(0) + return None + finally: + self._cv.release() + + +class _AsyncMailbox(_Mailbox): + """ + A Mailbox for asynchronous delivery, with a timeout value. + """ + def __init__(self, console, + _timeout=None): + """ + Invoked by application thread. + """ + super(_AsyncMailbox, self).__init__(console) + self.console = console + + if _timeout is None: + _timeout = console._reply_timeout + self.expiration_date = (datetime.datetime.utcnow() + + datetime.timedelta(seconds=_timeout)) + console._lock.acquire() + try: + console._async_mboxes[self.cid] = self + console._next_mbox_expire = None + finally: + console._lock.release() + + # now that an async mbox has been created, wake the + # console mgmt thread so it will know about the mbox expiration + # date (and adjust its idle sleep period correctly) + + console._wake_thread() + + def reset_timeout(self, _timeout=None): + """ Reset the expiration date for this mailbox. + """ + if _timeout is None: + _timeout = self.console._reply_timeout + self.console._lock.acquire() + try: + self.expiration_date = (datetime.datetime.utcnow() + + datetime.timedelta(seconds=_timeout)) + self.console._next_mbox_expire = None + finally: + self.console._lock.release() + + # wake the console mgmt thread so it will learn about the mbox + # expiration date (and adjust its idle sleep period correctly) + + self.console._wake_thread() + + def deliver(self, msg): + """ + """ + raise Exception("deliver() method must be provided") + + def expire(self): + raise Exception("expire() method must be provided") + + + def destroy(self): + self.console._lock.acquire() + try: + if self.cid in self.console._async_mboxes: + del self.console._async_mboxes[self.cid] + finally: + self.console._lock.release() + super(_AsyncMailbox, self).destroy() + + + +class _QueryMailbox(_AsyncMailbox): + """ + A mailbox used for asynchronous query requests. + """ + def __init__(self, console, + agent_name, + context, + target, + _timeout=None): + """ + Invoked by application thread. + """ + super(_QueryMailbox, self).__init__(console, + _timeout) + self.agent_name = agent_name + self.target = target + self.context = context + self.result = [] + + def deliver(self, reply): + """ + Process query response messages delivered to this mailbox. + Invoked by Console Management thread only. + """ + trace.debug("Delivering to query mailbox (agent=%s)." % self.agent_name) + objects = reply.content + if isinstance(objects, type([])): + # convert from map to native types if needed + if self.target == QmfQuery.TARGET_SCHEMA_ID: + for sid_map in objects: + self.result.append(SchemaClassId.from_map(sid_map)) + + elif self.target == QmfQuery.TARGET_SCHEMA: + for schema_map in objects: + # extract schema id, convert based on schema type + sid_map = schema_map.get(SchemaClass.KEY_SCHEMA_ID) + if sid_map: + sid = SchemaClassId.from_map(sid_map) + if sid: + if sid.get_type() == SchemaClassId.TYPE_DATA: + schema = SchemaObjectClass.from_map(schema_map) + else: + schema = SchemaEventClass.from_map(schema_map) + self.console._add_schema(schema) # add to schema cache + self.result.append(schema) + + elif self.target == QmfQuery.TARGET_OBJECT: + for obj_map in objects: + # @todo: need the agent name - ideally from the + # reply message iself. + agent = self.console.get_agent(self.agent_name) + if agent: + obj = QmfConsoleData(map_=obj_map, agent=agent) + # start fetch of schema if not known + sid = obj.get_schema_class_id() + if sid: + self.console._prefetch_schema(sid, agent) + self.result.append(obj) + + + else: + # no conversion needed. + self.result += objects + + if not "partial" in reply.properties: + # log.error("QUERY COMPLETE for %s" % str(self.context)) + wi = WorkItem(WorkItem.QUERY_COMPLETE, self.context, self.result) + self.console._work_q.put(wi) + self.console._work_q_put = True + + self.destroy() + + + def expire(self): + trace.debug("Expiring query mailbox (agent=%s)." % self.agent_name) + # send along whatever (possibly none) has been received so far + wi = WorkItem(WorkItem.QUERY_COMPLETE, self.context, self.result) + self.console._work_q.put(wi) + self.console._work_q_put = True + + self.destroy() + + + +class _SchemaPrefetchMailbox(_AsyncMailbox): + """ + Handles responses to schema fetches made by the console. + """ + def __init__(self, console, + schema_id, + _timeout=None): + """ + Invoked by application thread. + """ + super(_SchemaPrefetchMailbox, self).__init__(console, + _timeout) + self.schema_id = schema_id + + def deliver(self, reply): + """ + Process schema response messages. + """ + trace.debug("Delivering schema mailbox (id=%s)." % self.schema_id) + done = False + schemas = reply.content + if schemas and isinstance(schemas, type([])): + for schema_map in schemas: + # extract schema id, convert based on schema type + sid_map = schema_map.get(SchemaClass.KEY_SCHEMA_ID) + if sid_map: + sid = SchemaClassId.from_map(sid_map) + if sid: + if sid.get_type() == SchemaClassId.TYPE_DATA: + schema = SchemaObjectClass.from_map(schema_map) + else: + schema = SchemaEventClass.from_map(schema_map) + self.console._add_schema(schema) # add to schema cache + self.destroy() + + + def expire(self): + trace.debug("Expiring schema mailbox (id=%s)." % self.schema_id) + self.destroy() + + + +class _MethodMailbox(_AsyncMailbox): + """ + A mailbox used for asynchronous method requests. + """ + def __init__(self, console, + context, + _timeout=None): + """ + Invoked by application thread. + """ + super(_MethodMailbox, self).__init__(console, + _timeout) + self.context = context + + def deliver(self, reply): + """ + Process method response messages delivered to this mailbox. + Invoked by Console Management thread only. + """ + trace.debug("Delivering to method mailbox.") + _map = reply.content + if not _map or not isinstance(_map, type({})): + log.error("Invalid method call reply message") + result = None + else: + error=_map.get(SchemaMethod.KEY_ERROR) + if error: + error = QmfData.from_map(error) + result = MethodResult(_error=error) + else: + result = MethodResult(_out_args=_map.get(SchemaMethod.KEY_ARGUMENTS)) + + # create workitem + wi = WorkItem(WorkItem.METHOD_RESPONSE, self.context, result) + self.console._work_q.put(wi) + self.console._work_q_put = True + + self.destroy() + + + def expire(self): + """ + The mailbox expired without receiving a reply. + Invoked by the Console Management thread only. + """ + trace.debug("Expiring method mailbox.") + # send along an empty response + wi = WorkItem(WorkItem.METHOD_RESPONSE, self.context, None) + self.console._work_q.put(wi) + self.console._work_q_put = True + + self.destroy() + + + +class _SubscriptionMailbox(_AsyncMailbox): + """ + A Mailbox for a single subscription. Allows only sychronous "subscribe" + and "refresh" requests. + """ + def __init__(self, console, context, agent, duration, interval): + """ + Invoked by application thread. + """ + super(_SubscriptionMailbox, self).__init__(console, duration) + self.cv = Condition() + self.data = [] + self.result = [] + self.context = context + self.duration = duration + self.interval = interval + self.agent_name = agent.get_name() + self.agent_subscription_id = None # from agent + + def subscribe(self, query): + agent = self.console.get_agent(self.agent_name) + if not agent: + log.warning("subscribed failed - unknown agent '%s'" % + self.agent_name) + return False + try: + trace.debug("Sending Subscribe to Agent (%s)" % self.agent_name) + agent._send_subscribe_req(query, self.get_address(), self.interval, + self.duration) + except SendError, e: + log.error(str(e)) + return False + return True + + def resubscribe(self): + agent = self.console.get_agent(self.agent_name) + if not agent: + log.warning("resubscribed failed - unknown agent '%s'", + self.agent_name) + return False + try: + trace.debug("Sending resubscribe to Agent %s", self.agent_name) + agent._send_resubscribe_req(self.get_address(), + self.agent_subscription_id) + except SendError, e: + log.error(str(e)) + return False + return True + + def deliver(self, msg): + """ + """ + opcode = msg.properties.get("qmf.opcode") + if (opcode == OpCode.subscribe_rsp): + + error = msg.content.get("_error") + if error: + try: + e_map = QmfData.from_map(error) + except TypeError: + log.warning("Invalid QmfData map received: '%s'" + % str(error)) + e_map = QmfData.create({"error":"Unknown error"}) + sp = SubscribeParams(None, None, None, e_map) + else: + self.agent_subscription_id = msg.content.get("_subscription_id") + self.duration = msg.content.get("_duration", self.duration) + self.interval = msg.content.get("_interval", self.interval) + self.reset_timeout(self.duration) + sp = SubscribeParams(self.get_address(), + self.interval, + self.duration, + None) + self.cv.acquire() + try: + self.data.append(sp) + # if was empty, notify waiters + if len(self.data) == 1: + self.cv.notify() + finally: + self.cv.release() + return + + # else: data indication + agent_name = msg.properties.get("qmf.agent") + if not agent_name: + log.warning("Ignoring data_ind - no agent name given: %s" % + msg) + return + agent = self.console.get_agent(agent_name) + if not agent: + log.warning("Ignoring data_ind - unknown agent '%s'" % + agent_name) + return + + objects = msg.content + for obj_map in objects: + obj = QmfConsoleData(map_=obj_map, agent=agent) + # start fetch of schema if not known + sid = obj.get_schema_class_id() + if sid: + self.console._prefetch_schema(sid, agent) + self.result.append(obj) + + if not "partial" in msg.properties: + wi = WorkItem(WorkItem.SUBSCRIBE_INDICATION, self.context, self.result) + self.result = [] + self.console._work_q.put(wi) + self.console._work_q_put = True + + def fetch(self, timeout=None): + """ + Get one data item from a mailbox, with timeout. + Invoked by application thread. + """ + self.cv.acquire() + try: + if len(self.data) == 0: + self.cv.wait(timeout) + if len(self.data): + return self.data.pop(0) + return None + finally: + self.cv.release() + + def expire(self): + """ The subscription expired. + """ + self.destroy() + + + + +class _AsyncSubscriptionMailbox(_SubscriptionMailbox): + """ + A Mailbox for a single subscription. Allows only asychronous "subscribe" + and "refresh" requests. + """ + def __init__(self, console, context, agent, duration, interval): + """ + Invoked by application thread. + """ + super(_AsyncSubscriptionMailbox, self).__init__(console, context, + agent, duration, + interval) + self.subscribe_pending = False + + def subscribe(self, query, reply_timeout): + if super(_AsyncSubscriptionMailbox, self).subscribe(query): + self.subscribe_pending = True + self.reset_timeout(reply_timeout) + return True + return False + + def deliver(self, msg): + """ + """ + super(_AsyncSubscriptionMailbox, self).deliver(msg) + sp = self.fetch(0) + if sp and self.subscribe_pending: + wi = WorkItem(WorkItem.SUBSCRIBE_RESPONSE, self.context, sp) + self.console._work_q.put(wi) + self.console._work_q_put = True + + self.subscribe_pending = False + + if not sp.succeeded(): + self.destroy() + + + def expire(self): + """ Either the subscription expired, or a request timedout. + """ + if self.subscribe_pending: + wi = WorkItem(WorkItem.SUBSCRIBE_RESPONSE, self.context, None) + self.console._work_q.put(wi) + self.console._work_q_put = True + self.destroy() + + +##============================================================================== +## DATA MODEL +##============================================================================== + + +class QmfConsoleData(QmfData): + """ + Console's representation of an managed QmfData instance. + """ + def __init__(self, map_, agent): + super(QmfConsoleData, self).__init__(_map=map_, + _const=True) + self._agent = agent + + def get_timestamps(self): + """ + Returns a list of timestamps describing the lifecycle of + the object. All timestamps are represented by the AMQP + timestamp type. [0] = time of last update from Agent, + [1] = creation timestamp + [2] = deletion timestamp, or zero if not + deleted. + """ + return [self._utime, self._ctime, self._dtime] + + def get_create_time(self): + """ + returns the creation timestamp + """ + return self._ctime + + def get_update_time(self): + """ + returns the update timestamp + """ + return self._utime + + def get_delete_time(self): + """ + returns the deletion timestamp, or zero if not yet deleted. + """ + return self._dtime + + def is_deleted(self): + """ + True if deletion timestamp not zero. + """ + return self._dtime != long(0) + + def refresh(self, _reply_handle=None, _timeout=None): + """ + request that the Agent update the value of this object's + contents. + """ + if _reply_handle is not None: + log.error(" ASYNC REFRESH TBD!!!") + return None + + assert self._agent + assert self._agent._console + + if _timeout is None: + _timeout = self._agent._console._reply_timeout + + # create query to agent using this objects ID + query = QmfQuery.create_id_object(self.get_object_id(), + self.get_schema_class_id()) + obj_list = self._agent._console.do_query(self._agent, query, + _timeout=_timeout) + if obj_list is None or len(obj_list) != 1: + return None + + self._update(obj_list[0]) + return self + + + def invoke_method(self, name, _in_args={}, _reply_handle=None, + _timeout=None): + """ + Invoke the named method on this object. + """ + assert self._agent + assert self._agent._console + + oid = self.get_object_id() + if oid is None: + raise ValueError("Cannot invoke methods on unmanaged objects.") + + if _timeout is None: + _timeout = self._agent._console._reply_timeout + + if _reply_handle is not None: + mbox = _MethodMailbox(self._agent._console, + _reply_handle) + else: + mbox = _SyncMailbox(self._agent._console) + cid = mbox.get_address() + + _map = {self.KEY_OBJECT_ID:str(oid), + SchemaMethod.KEY_NAME:name} + + sid = self.get_schema_class_id() + if sid: + _map[self.KEY_SCHEMA_ID] = sid.map_encode() + if _in_args: + _map[SchemaMethod.KEY_ARGUMENTS] = _in_args + + trace.debug("Sending method req to Agent (%s)" % time.time()) + try: + self._agent._send_method_req(_map, cid) + except SendError, e: + log.error(str(e)) + mbox.destroy() + return None + + if _reply_handle is not None: + return True + + trace.debug("Waiting for response to method req (%s)" % _timeout) + replyMsg = mbox.fetch(_timeout) + mbox.destroy() + + if not replyMsg: + trace.debug("Agent method req wait timed-out.") + return None + + _map = replyMsg.content + if not _map or not isinstance(_map, type({})): + log.error("Invalid method call reply message") + return None + + error=_map.get(SchemaMethod.KEY_ERROR) + if error: + return MethodResult(_error=QmfData.from_map(error)) + else: + return MethodResult(_out_args=_map.get(SchemaMethod.KEY_ARGUMENTS)) + + def _update(self, newer): + super(QmfConsoleData,self).__init__(_values=newer._values, _subtypes=newer._subtypes, + _tag=newer._tag, _object_id=newer._object_id, + _ctime=newer._ctime, _utime=newer._utime, + _dtime=newer._dtime, + _schema_id=newer._schema_id, _const=True) + +class QmfLocalData(QmfData): + """ + Console's representation of an unmanaged QmfData instance. There + is no remote agent associated with this instance. The Console has + full control over this instance. + """ + def __init__(self, values, _subtypes={}, _tag=None, _object_id=None, + _schema=None): + # timestamp in millisec since epoch UTC + ctime = long(time.time() * 1000) + super(QmfLocalData, self).__init__(_values=values, + _subtypes=_subtypes, _tag=_tag, + _object_id=_object_id, + _schema=_schema, _ctime=ctime, + _utime=ctime, _const=False) + + +class Agent(object): + """ + A local representation of a remote agent managed by this console. + """ + def __init__(self, name, console): + """ + @type name: string + @param name: uniquely identifies this agent in the AMQP domain. + """ + + if not isinstance(console, Console): + raise TypeError("parameter must be an instance of class Console") + + self._name = name + self._address = QmfAddress.direct(name, console._domain) + self._console = console + self._sender = None + self._packages = {} # map of {package-name:[list of class-names], } for this agent + self._subscriptions = [] # list of active standing subscriptions for this agent + self._announce_timestamp = None # datetime when last announce received + trace.debug( "Created Agent with address: [%s]" % self._address ) + + + def get_name(self): + return self._name + + def is_active(self): + return self._announce_timestamp != None + + def _send_msg(self, msg, correlation_id=None): + """ + Low-level routine to asynchronously send a message to this agent. + """ + msg.reply_to = str(self._console._address) + if correlation_id: + msg.correlation_id = str(correlation_id) + # TRACE + #log.error("!!! Console %s sending to agent %s (%s)" % + # (self._console._name, self._name, str(msg))) + self._sender.send(msg) + # return handle + + def get_packages(self): + """ + Return a list of the names of all packages known to this agent. + """ + return self._packages.keys() + + def get_classes(self): + """ + Return a dictionary [key:class] of classes known to this agent. + """ + return self._packages.copy() + + def get_objects(self, query, kwargs={}): + """ + Return a list of objects that satisfy the given query. + + @type query: dict, or common.Query + @param query: filter for requested objects + @type kwargs: dict + @param kwargs: ??? used to build match selector and query ??? + @rtype: list + @return: list of matching objects, or None. + """ + pass + + def get_object(self, query, kwargs={}): + """ + Get one object - query is expected to match only one object. + ??? Recommended: explicit timeout param, default None ??? + + @type query: dict, or common.Query + @param query: filter for requested objects + @type kwargs: dict + @param kwargs: ??? used to build match selector and query ??? + @rtype: qmfConsole.ObjectProxy + @return: one matching object, or none + """ + pass + + + def create_subscription(self, query): + """ + Factory for creating standing subscriptions based on a given query. + + @type query: common.Query object + @param query: determines the list of objects for which this subscription applies + @rtype: qmfConsole.Subscription + @returns: an object representing the standing subscription. + """ + pass + + + def invoke_method(self, name, _in_args={}, _reply_handle=None, + _timeout=None): + """ + Invoke the named method on this agent. + """ + assert self._console + + if _timeout is None: + _timeout = self._console._reply_timeout + + if _reply_handle is not None: + mbox = _MethodMailbox(self._console, + _reply_handle) + else: + mbox = _SyncMailbox(self._console) + cid = mbox.get_address() + + _map = {SchemaMethod.KEY_NAME:name} + if _in_args: + _map[SchemaMethod.KEY_ARGUMENTS] = _in_args.copy() + + trace.debug("Sending method req to Agent (%s)" % time.time()) + try: + self._send_method_req(_map, cid) + except SendError, e: + log.error(str(e)) + mbox.destroy() + return None + + if _reply_handle is not None: + return True + + trace.debug("Waiting for response to method req (%s)" % _timeout) + replyMsg = mbox.fetch(_timeout) + mbox.destroy() + + if not replyMsg: + trace.debug("Agent method req wait timed-out.") + return None + + _map = replyMsg.content + if not _map or not isinstance(_map, type({})): + log.error("Invalid method call reply message") + return None + + return MethodResult(_out_args=_map.get(SchemaMethod.KEY_ARGUMENTS), + _error=_map.get(SchemaMethod.KEY_ERROR)) + + def enable_events(self): + raise Exception("enable_events tbd") + + def disable_events(self): + raise Exception("disable_events tbd") + + def destroy(self): + raise Exception("destroy tbd") + + def __repr__(self): + return str(self._address) + + def __str__(self): + return self.__repr__() + + def _send_query(self, query, correlation_id=None): + """ + """ + msg = Message(id=QMF_APP_ID, + properties={"method":"request", + "qmf.opcode":OpCode.query_req}, + content=query.map_encode()) + self._send_msg( msg, correlation_id ) + + + def _send_method_req(self, mr_map, correlation_id=None): + """ + """ + msg = Message(id=QMF_APP_ID, + properties={"method":"request", + "qmf.opcode":OpCode.method_req}, + content=mr_map) + self._send_msg( msg, correlation_id ) + + def _send_subscribe_req(self, query, correlation_id, _interval=None, + _lifetime=None): + """ + """ + sr_map = {"_query":query.map_encode()} + if _interval is not None: + sr_map["_interval"] = _interval + if _lifetime is not None: + sr_map["_duration"] = _lifetime + + msg = Message(id=QMF_APP_ID, + properties={"method":"request", + "qmf.opcode":OpCode.subscribe_req}, + content=sr_map) + self._send_msg(msg, correlation_id) + + + def _send_resubscribe_req(self, correlation_id, + subscription_id): + """ + """ + sr_map = {"_subscription_id":subscription_id} + + msg = Message(id=QMF_APP_ID, + properties={"method":"request", + "qmf.opcode":OpCode.subscribe_refresh_ind}, + content=sr_map) + self._send_msg(msg, correlation_id) + + + def _send_unsubscribe_ind(self, correlation_id, subscription_id): + """ + """ + sr_map = {"_subscription_id":subscription_id} + + msg = Message(id=QMF_APP_ID, + properties={"method":"request", + "qmf.opcode":OpCode.subscribe_cancel_ind}, + content=sr_map) + self._send_msg(msg, correlation_id) + + + ##============================================================================== + ## METHOD CALL + ##============================================================================== + +class MethodResult(object): + def __init__(self, _out_args=None, _error=None): + self._error = _error + self._out_args = _out_args + + def succeeded(self): + return self._error is None + + def get_exception(self): + return self._error + + def get_arguments(self): + return self._out_args + + def get_argument(self, name): + arg = None + if self._out_args: + arg = self._out_args.get(name) + return arg + + + + ##============================================================================== + ## SUBSCRIPTION + ##============================================================================== + +class SubscribeParams(object): + """ Represents a standing subscription for this console. + """ + def __init__(self, sid, interval, duration, _error=None): + self._sid = sid + self._interval = interval + self._duration = duration + self._error = _error + + def succeeded(self): + return self._error is None + + def get_error(self): + return self._error + + def get_subscription_id(self): + return self._sid + + def get_publish_interval(self): + return self._interval + + def get_duration(self): + return self._duration + + + ##============================================================================== + ## CONSOLE + ##============================================================================== + + + + + + +class Console(Thread): + """ + A Console manages communications to a collection of agents on behalf of an application. + """ + def __init__(self, name=None, _domain=None, notifier=None, + reply_timeout = 60, + # agent_timeout = 120, + agent_timeout = 60, + kwargs={}): + """ + @type name: str + @param name: identifier for this console. Must be unique. + @type notifier: qmfConsole.Notifier + @param notifier: invoked when events arrive for processing. + @type kwargs: dict + @param kwargs: ??? Unused + """ + Thread.__init__(self) + self._operational = False + self._ready = Event() + + if not name: + self._name = "qmfc-%s.%d" % (platform.node(), os.getpid()) + else: + self._name = str(name) + self._domain = _domain + self._address = QmfAddress.direct(self._name, self._domain) + self._notifier = notifier + self._lock = RLock() + self._conn = None + self._session = None + # dict of "agent-direct-address":class Agent entries + self._agent_map = {} + self._direct_recvr = None + self._announce_recvr = None + self._locate_sender = None + self._schema_cache = {} + self._pending_schema_req = [] + self._agent_discovery_filter = None + self._reply_timeout = reply_timeout + self._agent_timeout = agent_timeout + self._subscribe_timeout = 300 # @todo: parameterize + self._next_agent_expire = None + self._next_mbox_expire = None + # for passing WorkItems to the application + self._work_q = Queue.Queue() + self._work_q_put = False + # Correlation ID and mailbox storage + self._correlation_id = long(time.time()) # pseudo-randomize + self._post_office = {} # indexed by cid + self._async_mboxes = {} # indexed by cid, used to expire them + + def destroy(self, timeout=None): + """ + Must be called before the Console is deleted. + Frees up all resources and shuts down all background threads. + + @type timeout: float + @param timeout: maximum time in seconds to wait for all background threads to terminate. Default: forever. + """ + trace.debug("Destroying Console...") + if self._conn: + self.remove_connection(self._conn, timeout) + trace.debug("Console Destroyed") + + def add_connection(self, conn): + """ + Add a AMQP connection to the console. The console will setup a session over the + connection. The console will then broadcast an Agent Locate Indication over + the session in order to discover present agents. + + @type conn: qpid.messaging.Connection + @param conn: the connection to the AMQP messaging infrastructure. + """ + if self._conn: + raise Exception( "Multiple connections per Console not supported." ); + self._conn = conn + self._session = conn.session(name=self._name) + + # for messages directly addressed to me + self._direct_recvr = self._session.receiver(str(self._address) + + ";{create:always," + " node:" + " {type:topic," + " x-declare:" + " {type:direct}}}", + capacity=1) + trace.debug("my direct addr=%s" % self._direct_recvr.source) + + self._direct_sender = self._session.sender(str(self._address.get_node()) + + ";{create:always," + " node:" + " {type:topic," + " x-declare:" + " {type:direct}}}") + trace.debug("my direct sender=%s" % self._direct_sender.target) + + # for receiving "broadcast" messages from agents + default_addr = QmfAddress.topic(QmfAddress.SUBJECT_AGENT_IND + ".#", + self._domain) + self._topic_recvr = self._session.receiver(str(default_addr) + + ";{create:always," + " node:{type:topic}}", + capacity=1) + trace.debug("default topic recv addr=%s" % self._topic_recvr.source) + + + # for sending to topic subscribers + topic_addr = QmfAddress.topic(QmfAddress.SUBJECT_CONSOLE_IND, self._domain) + self._topic_sender = self._session.sender(str(topic_addr) + + ";{create:always," + " node:{type:topic}}") + trace.debug("default topic send addr=%s" % self._topic_sender.target) + + # + # Now that receivers are created, fire off the receive thread... + # + self._operational = True + self.start() + self._ready.wait(10) + if not self._ready.isSet(): + raise Exception("Console managment thread failed to start.") + + + + def remove_connection(self, conn, timeout=None): + """ + Remove an AMQP connection from the console. Un-does the add_connection() operation, + and releases any agents and sessions associated with the connection. + + @type conn: qpid.messaging.Connection + @param conn: connection previously added by add_connection() + """ + if self._conn and conn and conn != self._conn: + log.error( "Attempt to delete unknown connection: %s" % str(conn)) + + # tell connection thread to shutdown + self._operational = False + if self.isAlive(): + # kick my thread to wake it up + self._wake_thread() + trace.debug("waiting for console receiver thread to exit") + self.join(timeout) + if self.isAlive(): + log.error( "Console thread '%s' is hung..." % self.getName() ) + self._direct_recvr.close() + self._direct_sender.close() + self._topic_recvr.close() + self._topic_sender.close() + self._session.close() + self._session = None + self._conn = None + trace.debug("console connection removal complete") + + + def get_address(self): + """ + The AMQP address this Console is listening to. + """ + return self._address + + + def destroy_agent( self, agent ): + """ + Undoes create. + """ + if not isinstance(agent, Agent): + raise TypeError("agent must be an instance of class Agent") + + self._lock.acquire() + try: + if agent._name in self._agent_map: + del self._agent_map[agent._name] + finally: + self._lock.release() + + def find_agent(self, name, timeout=None ): + """ + Given the name of a particular agent, return an instance of class Agent + representing that agent. Return None if the agent does not exist. + """ + + self._lock.acquire() + try: + agent = self._agent_map.get(name) + if agent: + return agent + finally: + self._lock.release() + + # agent not present yet - ping it with an agent_locate + + mbox = _SyncMailbox(self) + cid = mbox.get_address() + + query = QmfQuery.create_id(QmfQuery.TARGET_AGENT, name) + msg = Message(id=QMF_APP_ID, + subject="console.ind.locate." + name, + properties={"method":"request", + "qmf.opcode":OpCode.agent_locate_req}, + content=query._predicate) + msg.content_type="amqp/list" + msg.reply_to = str(self._address) + msg.correlation_id = str(cid) + trace.debug("%s Sending Agent Locate (%s)", self._name, str(msg)) + try: + self._topic_sender.send(msg) + except SendError, e: + log.error(str(e)) + mbox.destroy() + return None + + if timeout is None: + timeout = self._reply_timeout + + new_agent = None + trace.debug("Waiting for response to Agent Locate (%s)" % timeout) + mbox.fetch(timeout) + mbox.destroy() + trace.debug("Agent Locate wait ended (%s)" % time.time()) + self._lock.acquire() + try: + new_agent = self._agent_map.get(name) + finally: + self._lock.release() + + return new_agent + + + def get_agents(self): + """ + Return the list of known agents. + """ + self._lock.acquire() + try: + agents = self._agent_map.values() + finally: + self._lock.release() + return agents + + + def get_agent(self, name): + """ + Return the named agent, else None if not currently available. + """ + self._lock.acquire() + try: + agent = self._agent_map.get(name) + finally: + self._lock.release() + return agent + + + def do_query(self, agent, query, _reply_handle=None, _timeout=None ): + """ + """ + target = query.get_target() + + if _reply_handle is not None: + mbox = _QueryMailbox(self, + agent.get_name(), + _reply_handle, + target, + _timeout) + else: + mbox = _SyncMailbox(self) + + cid = mbox.get_address() + + try: + trace.debug("Sending Query to Agent (%s)" % time.time()) + agent._send_query(query, cid) + except SendError, e: + log.error(str(e)) + mbox.destroy() + return None + + # return now if async reply expected + if _reply_handle is not None: + return True + + if not _timeout: + _timeout = self._reply_timeout + + trace.debug("Waiting for response to Query (%s)" % _timeout) + now = datetime.datetime.utcnow() + expire = now + datetime.timedelta(seconds=_timeout) + + response = [] + while (expire > now): + _timeout = timedelta_to_secs(expire - now) + reply = mbox.fetch(_timeout) + if not reply: + trace.debug("Query wait timed-out.") + break + + objects = reply.content + if not objects or not isinstance(objects, type([])): + break + + # convert from map to native types if needed + if target == QmfQuery.TARGET_SCHEMA_ID: + for sid_map in objects: + response.append(SchemaClassId.from_map(sid_map)) + + elif target == QmfQuery.TARGET_SCHEMA: + for schema_map in objects: + # extract schema id, convert based on schema type + sid_map = schema_map.get(SchemaClass.KEY_SCHEMA_ID) + if sid_map: + sid = SchemaClassId.from_map(sid_map) + if sid: + if sid.get_type() == SchemaClassId.TYPE_DATA: + schema = SchemaObjectClass.from_map(schema_map) + else: + schema = SchemaEventClass.from_map(schema_map) + self._add_schema(schema) # add to schema cache + response.append(schema) + + elif target == QmfQuery.TARGET_OBJECT: + for obj_map in objects: + obj = QmfConsoleData(map_=obj_map, agent=agent) + # start fetch of schema if not known + sid = obj.get_schema_class_id() + if sid: + self._prefetch_schema(sid, agent) + response.append(obj) + else: + # no conversion needed. + response += objects + + if not "partial" in reply.properties: + # reply not broken up over multiple msgs + break + + now = datetime.datetime.utcnow() + + mbox.destroy() + return response + + + def create_subscription(self, agent, query, console_handle, + _interval=None, _duration=None, + _blocking=True, _timeout=None): + if not _duration: + _duration = self._subscribe_timeout + + if _timeout is None: + _timeout = self._reply_timeout + + if not _blocking: + mbox = _AsyncSubscriptionMailbox(self, console_handle, agent, + _duration, _interval) + if not mbox.subscribe(query, _timeout): + mbox.destroy() + return False + return True + else: + mbox = _SubscriptionMailbox(self, console_handle, agent, _duration, + _interval) + + if not mbox.subscribe(query): + mbox.destroy() + return None + + trace.debug("Waiting for response to subscription (%s)" % _timeout) + # @todo: what if mbox expires here? + sp = mbox.fetch(_timeout) + + if not sp: + trace.debug("Subscription request wait timed-out.") + mbox.destroy() + return None + + if not sp.succeeded(): + mbox.destroy() + + return sp + + def refresh_subscription(self, subscription_id, + _duration=None, + _timeout=None): + if _timeout is None: + _timeout = self._reply_timeout + + mbox = self._get_mailbox(subscription_id) + if not mbox: + log.warning("Subscription %s not found." % subscription_id) + return None + + if isinstance(mbox, _AsyncSubscriptionMailbox): + return mbox.resubscribe() + else: + # synchronous - wait for reply + if not mbox.resubscribe(): + # @todo ???? mbox.destroy() + return None + + # wait for reply + + trace.debug("Waiting for response to subscription (%s)" % _timeout) + sp = mbox.fetch(_timeout) + + if not sp: + trace.debug("re-subscribe request wait timed-out.") + # @todo???? mbox.destroy() + return None + + return sp + + + def cancel_subscription(self, subscription_id): + """ + """ + mbox = self._get_mailbox(subscription_id) + if not mbox: + return + + agent = self.get_agent(mbox.agent_name) + if agent: + try: + trace.debug("Sending UnSubscribe to Agent (%s)" % time.time()) + agent._send_unsubscribe_ind(subscription_id, + mbox.agent_subscription_id) + except SendError, e: + log.error(str(e)) + + mbox.destroy() + + + def _wake_thread(self): + """ + Make the console management thread loop wakeup from its next_receiver + sleep. + """ + trace.debug("Sending noop to wake up [%s]" % self._address) + msg = Message(id=QMF_APP_ID, + subject=self._name, + properties={"method":"indication", + "qmf.opcode":OpCode.noop}, + content={}) + try: + self._direct_sender.send( msg, sync=True ) + except SendError, e: + log.error(str(e)) + + + def run(self): + """ + Console Management Thread main loop. + Handles inbound messages, agent discovery, async mailbox timeouts. + """ + global _callback_thread + + self._ready.set() + + while self._operational: + + # qLen = self._work_q.qsize() + + while True: + try: + msg = self._topic_recvr.fetch(timeout=0) + except Empty: + break + # TRACE: + # log.error("!!! Console %s: msg on %s [%s]" % + # (self._name, self._topic_recvr.source, msg)) + self._dispatch(msg, _direct=False) + + while True: + try: + msg = self._direct_recvr.fetch(timeout = 0) + except Empty: + break + # TRACE + #log.error("!!! Console %s: msg on %s [%s]" % + # (self._name, self._direct_recvr.source, msg)) + self._dispatch(msg, _direct=True) + + self._expire_agents() # check for expired agents + self._expire_mboxes() # check for expired async mailbox requests + + #if qLen == 0 and self._work_q.qsize() and self._notifier: + if self._work_q_put and self._notifier: + # new stuff on work queue, kick the the application... + self._work_q_put = False + _callback_thread = currentThread() + trace.debug("Calling console notifier.indication") + self._notifier.indication() + _callback_thread = None + + + # wait for a message to arrive, or an agent + # to expire, or a mailbox requrest to time out + now = datetime.datetime.utcnow() + next_expire = self._next_agent_expire + + self._lock.acquire() + try: + # the mailbox expire flag may be cleared by the + # app thread(s) to force an immedate mailbox scan + if self._next_mbox_expire is None: + next_expire = now + elif self._next_mbox_expire < next_expire: + next_expire = self._next_mbox_expire + finally: + self._lock.release() + + timeout = timedelta_to_secs(next_expire - now) + + if self._operational and timeout > 0.0: + try: + trace.debug("waiting for next rcvr (timeout=%s)..." % timeout) + self._session.next_receiver(timeout = timeout) + except Empty: + pass + + trace.debug("Shutting down Console thread") + + def get_objects(self, + _object_id=None, + _schema_id=None, + _pname=None, _cname=None, + _agents=None, + _timeout=None): + """ + Retrieve objects by id or schema. + + By object_id: must specify schema_id or pname & cname if object defined + by a schema. Undescribed objects: only object_id needed. + + By schema: must specify schema_id or pname & cname - all instances of + objects defined by that schema are returned. + """ + if _agents is None: + # use copy of current agent list + self._lock.acquire() + try: + agent_list = self._agent_map.values() + finally: + self._lock.release() + elif isinstance(_agents, Agent): + agent_list = [_agents] + else: + agent_list = _agents + # @todo validate this list! + + if _timeout is None: + _timeout = self._reply_timeout + + # @todo: fix when async do_query done - query all agents at once, then + # wait for replies, instead of per-agent querying.... + + obj_list = [] + expired = datetime.datetime.utcnow() + datetime.timedelta(seconds=_timeout) + for agent in agent_list: + if not agent.is_active(): + continue + now = datetime.datetime.utcnow() + if now >= expired: + break + + if _pname is None: + if _object_id: + query = QmfQuery.create_id_object(_object_id, + _schema_id) + else: + if _schema_id is not None: + t_params = {QmfData.KEY_SCHEMA_ID: _schema_id} + else: + t_params = None + query = QmfQuery.create_wildcard(QmfQuery.TARGET_OBJECT, + t_params) + timeout = timedelta_to_secs(expired - now) + reply = self.do_query(agent, query, _timeout=timeout) + if reply: + obj_list = obj_list + reply + else: + # looking up by package name (and maybe class name), need to + # find all schema_ids in that package, then lookup object by + # schema_id + if _cname is not None: + pred = [QmfQuery.AND, + [QmfQuery.EQ, + SchemaClassId.KEY_PACKAGE, + [QmfQuery.QUOTE, _pname]], + [QmfQuery.EQ, SchemaClassId.KEY_CLASS, + [QmfQuery.QUOTE, _cname]]] + else: + pred = [QmfQuery.EQ, + SchemaClassId.KEY_PACKAGE, + [QmfQuery.QUOTE, _pname]] + query = QmfQuery.create_predicate(QmfQuery.TARGET_SCHEMA_ID, pred) + timeout = timedelta_to_secs(expired - now) + sid_list = self.do_query(agent, query, _timeout=timeout) + if sid_list: + for sid in sid_list: + now = datetime.datetime.utcnow() + if now >= expired: + break + if _object_id is not None: + query = QmfQuery.create_id_object(_object_id, sid) + else: + t_params = {QmfData.KEY_SCHEMA_ID: sid} + query = QmfQuery.create_wildcard(QmfQuery.TARGET_OBJECT, t_params) + timeout = timedelta_to_secs(expired - now) + reply = self.do_query(agent, query, _timeout=timeout) + if reply: + obj_list = obj_list + reply + if obj_list: + return obj_list + return None + + + + # called by run() thread ONLY + # + def _dispatch(self, msg, _direct=True): + """ + PRIVATE: Process a message received from an Agent + """ + trace.debug( "Message received from Agent! [%s]", msg ) + + opcode = msg.properties.get("qmf.opcode") + if not opcode: + log.error("Ignoring unrecognized message '%s'", msg) + return + version = 2 # @todo: fix me + + cmap = {}; props = {} + if msg.content_type == "amqp/map": + cmap = msg.content + if msg.properties: + props = msg.properties + + if opcode == OpCode.agent_heartbeat_ind: + self._handle_agent_ind_msg( msg, cmap, version, _direct ) + elif opcode == OpCode.agent_locate_rsp: + self._handle_agent_ind_msg( msg, cmap, version, _direct ) + elif msg.correlation_id: + self._handle_response_msg(msg, cmap, version, _direct) + elif opcode == OpCode.data_ind: + self._handle_indication_msg(msg, cmap, version, _direct) + elif opcode == OpCode.noop: + trace.debug("No-op msg received.") + else: + log.warning("Ignoring message with unrecognized 'opcode' value: '%s'", opcode) + + + def _handle_agent_ind_msg(self, msg, cmap, version, direct): + """ + Process a received agent-ind message. This message may be a response to a + agent-locate, or it can be an unsolicited agent announce. + """ + + trace.debug("%s _handle_agent_ind_msg '%s'", self._name, str(msg)) + + try: + tmp = QmfData.from_map(msg.content) + except: + log.warning("%s invalid Agent Indication msg format '%s'", + self._name, str(msg)) + return + + try: + name = tmp.get_value("_name") + except: + log.warning("Bad Agent ind msg received: %s", str(msg)) + return + + correlated = False + if msg.correlation_id: + mbox = self._get_mailbox(msg.correlation_id) + correlated = mbox is not None + + agent = None + self._lock.acquire() + try: + agent = self._agent_map.get(name) + if agent: + # agent already known, just update timestamp + agent._announce_timestamp = datetime.datetime.utcnow() + finally: + self._lock.release() + + if not agent: + # need to create and add a new agent? + matched = False + if self._agent_discovery_filter: + matched = self._agent_discovery_filter.evaluate(tmp) + + if (correlated or matched): + agent = self._create_agent(name) + if not agent: + return # failed to add agent + agent._announce_timestamp = datetime.datetime.utcnow() + + if matched: + # unsolicited, but newly discovered + trace.debug("AGENT_ADDED for %s (%s)" % (agent, time.time())) + wi = WorkItem(WorkItem.AGENT_ADDED, None, {"agent": agent}) + self._work_q.put(wi) + self._work_q_put = True + + if correlated: + # wake up all waiters + trace.debug("waking waiters for correlation id %s" % msg.correlation_id) + mbox.deliver(msg) + + def _handle_response_msg(self, msg, cmap, version, direct): + """ + Process a received data-ind message. + """ + trace.debug("%s _handle_response_msg '%s'", self._name, str(msg)) + + mbox = self._get_mailbox(msg.correlation_id) + if not mbox: + log.warning("%s Response msg received with unknown correlation_id" + " msg='%s'", self._name, str(msg)) + return + + # wake up all waiters + trace.debug("waking waiters for correlation id %s" % msg.correlation_id) + mbox.deliver(msg) + + def _handle_indication_msg(self, msg, cmap, version, _direct): + + aname = msg.properties.get("qmf.agent") + if not aname: + trace.debug("No agent name field in indication message.") + return + + content_type = msg.properties.get("qmf.content") + if (content_type != ContentType.event or + not isinstance(msg.content, type([]))): + log.warning("Bad event indication message received: '%s'", msg) + return + + emap = msg.content[0] + if not isinstance(emap, type({})): + trace.debug("Invalid event body in indication message: '%s'", msg) + return + + agent = None + self._lock.acquire() + try: + agent = self._agent_map.get(aname) + finally: + self._lock.release() + if not agent: + trace.debug("Agent '%s' not known." % aname) + return + try: + # @todo: schema??? + event = QmfEvent.from_map(emap) + except TypeError: + trace.debug("Invalid QmfEvent map received: %s" % str(emap)) + return + + # @todo: schema? Need to fetch it, but not from this thread! + # This thread can not pend on a request. + trace.debug("Publishing event received from agent %s" % aname) + wi = WorkItem(WorkItem.EVENT_RECEIVED, None, + {"agent":agent, + "event":event}) + self._work_q.put(wi) + self._work_q_put = True + + + def _expire_mboxes(self): + """ + Check all async mailboxes for outstanding requests that have expired. + """ + self._lock.acquire() + try: + now = datetime.datetime.utcnow() + if self._next_mbox_expire and now < self._next_mbox_expire: + return + expired_mboxes = [] + self._next_mbox_expire = None + for mbox in self._async_mboxes.itervalues(): + if now >= mbox.expiration_date: + expired_mboxes.append(mbox) + else: + if (self._next_mbox_expire is None or + mbox.expiration_date < self._next_mbox_expire): + self._next_mbox_expire = mbox.expiration_date + + for mbox in expired_mboxes: + del self._async_mboxes[mbox.cid] + finally: + self._lock.release() + + for mbox in expired_mboxes: + # note: expire() may deallocate the mbox, so don't touch + # it further. + mbox.expire() + + + def _expire_agents(self): + """ + Check for expired agents and issue notifications when they expire. + """ + now = datetime.datetime.utcnow() + if self._next_agent_expire and now < self._next_agent_expire: + return + lifetime_delta = datetime.timedelta(seconds = self._agent_timeout) + next_expire_delta = lifetime_delta + self._lock.acquire() + try: + trace.debug("!!! expiring agents '%s'" % now) + for agent in self._agent_map.itervalues(): + if agent._announce_timestamp: + agent_deathtime = agent._announce_timestamp + lifetime_delta + if agent_deathtime <= now: + trace.debug("AGENT_DELETED for %s" % agent) + agent._announce_timestamp = None + wi = WorkItem(WorkItem.AGENT_DELETED, None, + {"agent":agent}) + # @todo: remove agent from self._agent_map + self._work_q.put(wi) + self._work_q_put = True + else: + if (agent_deathtime - now) < next_expire_delta: + next_expire_delta = agent_deathtime - now + + self._next_agent_expire = now + next_expire_delta + trace.debug("!!! next expire cycle = '%s'" % self._next_agent_expire) + finally: + self._lock.release() + + + + def _create_agent( self, name ): + """ + Factory to create/retrieve an agent for this console + """ + trace.debug("creating agent %s" % name) + self._lock.acquire() + try: + agent = self._agent_map.get(name) + if agent: + return agent + + agent = Agent(name, self) + try: + agent._sender = self._session.sender(str(agent._address) + + ";{create:always," + " node:" + " {type:topic," + " x-declare:" + " {type:direct}}}") + except: + log.warning("Unable to create sender for %s" % name) + return None + trace.debug("created agent sender %s" % agent._sender.target) + + self._agent_map[name] = agent + finally: + self._lock.release() + + # new agent - query for its schema database for + # seeding the schema cache (@todo) + # query = QmfQuery({QmfQuery.TARGET_SCHEMA_ID:None}) + # agent._sendQuery( query ) + + return agent + + + + def enable_agent_discovery(self, _query=None): + """ + Called to enable the asynchronous Agent Discovery process. + Once enabled, AGENT_ADD work items can arrive on the WorkQueue. + """ + # @todo: fix - take predicate only, not entire query! + if _query is not None: + if (not isinstance(_query, QmfQuery) or + _query.get_target() != QmfQuery.TARGET_AGENT): + raise TypeError("Type QmfQuery with target == TARGET_AGENT expected") + self._agent_discovery_filter = _query + else: + # create a match-all agent query (no predicate) + self._agent_discovery_filter = QmfQuery.create_wildcard(QmfQuery.TARGET_AGENT) + + def disable_agent_discovery(self): + """ + Called to disable the async Agent Discovery process enabled by + calling enableAgentDiscovery() + """ + self._agent_discovery_filter = None + + + + def get_workitem_count(self): + """ + Returns the count of pending WorkItems that can be retrieved. + """ + return self._work_q.qsize() + + + + def get_next_workitem(self, timeout=None): + """ + Returns the next pending work item, or None if none available. + @todo: subclass and return an Empty event instead. + """ + try: + wi = self._work_q.get(True, timeout) + except Queue.Empty: + return None + return wi + + + def release_workitem(self, wi): + """ + Return a WorkItem to the Console when it is no longer needed. + @todo: call Queue.task_done() - only 2.5+ + + @type wi: class qmfConsole.WorkItem + @param wi: work item object to return. + """ + pass + + def _add_schema(self, schema): + """ + @todo + """ + if not isinstance(schema, SchemaClass): + raise TypeError("SchemaClass type expected") + + self._lock.acquire() + try: + sid = schema.get_class_id() + if not self._schema_cache.has_key(sid): + self._schema_cache[sid] = schema + if sid in self._pending_schema_req: + self._pending_schema_req.remove(sid) + finally: + self._lock.release() + + def _prefetch_schema(self, schema_id, agent): + """ + Send an async request for the schema identified by schema_id if the + schema is not available in the cache. + """ + need_fetch = False + self._lock.acquire() + try: + if ((not self._schema_cache.has_key(schema_id)) and + schema_id not in self._pending_schema_req): + self._pending_schema_req.append(schema_id) + need_fetch = True + finally: + self._lock.release() + + if need_fetch: + mbox = _SchemaPrefetchMailbox(self, schema_id) + query = QmfQuery.create_id(QmfQuery.TARGET_SCHEMA, schema_id) + trace.debug("Sending Schema Query to Agent (%s)" % time.time()) + try: + agent._send_query(query, mbox.get_address()) + except SendError, e: + log.error(str(e)) + mbox.destroy() + self._lock.acquire() + try: + self._pending_schema_req.remove(schema_id) + finally: + self._lock.release() + + + def _fetch_schema(self, schema_id, _agent=None, _timeout=None): + """ + Find the schema identified by schema_id. If not in the cache, ask the + agent for it. + """ + if not isinstance(schema_id, SchemaClassId): + raise TypeError("SchemaClassId type expected") + + self._lock.acquire() + try: + schema = self._schema_cache.get(schema_id) + if schema: + return schema + finally: + self._lock.release() + + if _agent is None: + return None + + # note: do_query will add the new schema to the cache automatically. + slist = self.do_query(_agent, + QmfQuery.create_id(QmfQuery.TARGET_SCHEMA, schema_id), + _timeout=_timeout) + if slist: + return slist[0] + else: + return None + + def _add_mailbox(self, mbox): + """ + Add a mailbox to the post office, and assign it a unique address. + """ + self._lock.acquire() + try: + mbox.cid = self._correlation_id + self._correlation_id += 1 + self._post_office[mbox.cid] = mbox + finally: + self._lock.release() + + def _get_mailbox(self, mid): + try: + mid = long(mid) + except TypeError: + log.error("Invalid mailbox id: %s" % str(mid)) + return None + + self._lock.acquire() + try: + return self._post_office.get(mid) + finally: + self._lock.release() + + + def _remove_mailbox(self, mid): + """ Remove a mailbox and its address from the post office """ + try: + mid = long(mid) + except TypeError: + log.error("Invalid mailbox id: %s" % str(mid)) + return None + + self._lock.acquire() + try: + if mid in self._post_office: + del self._post_office[mid] + finally: + self._lock.release() + + def __repr__(self): + return str(self._address) + + # def get_packages(self): + # plist = [] + # for i in range(self.impl.packageCount()): + # plist.append(self.impl.getPackageName(i)) + # return plist + + + # def get_classes(self, package, kind=CLASS_OBJECT): + # clist = [] + # for i in range(self.impl.classCount(package)): + # key = self.impl.getClass(package, i) + # class_kind = self.impl.getClassKind(key) + # if class_kind == kind: + # if kind == CLASS_OBJECT: + # clist.append(SchemaObjectClass(None, None, {"impl":self.impl.getObjectClass(key)})) + # elif kind == CLASS_EVENT: + # clist.append(SchemaEventClass(None, None, {"impl":self.impl.getEventClass(key)})) + # return clist + + + # def bind_package(self, package): + # return self.impl.bindPackage(package) + + + # def bind_class(self, kwargs = {}): + # if "key" in kwargs: + # self.impl.bindClass(kwargs["key"]) + # elif "package" in kwargs: + # package = kwargs["package"] + # if "class" in kwargs: + # self.impl.bindClass(package, kwargs["class"]) + # else: + # self.impl.bindClass(package) + # else: + # raise Exception("Argument error: invalid arguments, use 'key' or 'package'[,'class']") + + + # def get_agents(self, broker=None): + # blist = [] + # if broker: + # blist.append(broker) + # else: + # self._cv.acquire() + # try: + # # copy while holding lock + # blist = self._broker_list[:] + # finally: + # self._cv.release() + + # agents = [] + # for b in blist: + # for idx in range(b.impl.agentCount()): + # agents.append(AgentProxy(b.impl.getAgent(idx), b)) + + # return agents + + + # def get_objects(self, query, kwargs = {}): + # timeout = 30 + # agent = None + # temp_args = kwargs.copy() + # if type(query) == type({}): + # temp_args.update(query) + + # if "_timeout" in temp_args: + # timeout = temp_args["_timeout"] + # temp_args.pop("_timeout") + + # if "_agent" in temp_args: + # agent = temp_args["_agent"] + # temp_args.pop("_agent") + + # if type(query) == type({}): + # query = Query(temp_args) + + # self._select = {} + # for k in temp_args.iterkeys(): + # if type(k) == str: + # self._select[k] = temp_args[k] + + # self._cv.acquire() + # try: + # self._sync_count = 1 + # self._sync_result = [] + # broker = self._broker_list[0] + # broker.send_query(query.impl, None, agent) + # self._cv.wait(timeout) + # if self._sync_count == 1: + # raise Exception("Timed out: waiting for query response") + # finally: + # self._cv.release() + + # return self._sync_result + + + # def get_object(self, query, kwargs = {}): + # ''' + # Return one and only one object or None. + # ''' + # objs = objects(query, kwargs) + # if len(objs) == 1: + # return objs[0] + # else: + # return None + + + # def first_object(self, query, kwargs = {}): + # ''' + # Return the first of potentially many objects. + # ''' + # objs = objects(query, kwargs) + # if objs: + # return objs[0] + # else: + # return None + + + # # Check the object against select to check for a match + # def _select_match(self, object): + # schema_props = object.properties() + # for key in self._select.iterkeys(): + # for prop in schema_props: + # if key == p[0].name() and self._select[key] != p[1]: + # return False + # return True + + + # def _get_result(self, list, context): + # ''' + # Called by Broker proxy to return the result of a query. + # ''' + # self._cv.acquire() + # try: + # for item in list: + # if self._select_match(item): + # self._sync_result.append(item) + # self._sync_count -= 1 + # self._cv.notify() + # finally: + # self._cv.release() + + + # def start_sync(self, query): pass + + + # def touch_sync(self, sync): pass + + + # def end_sync(self, sync): pass + + + + +# def start_console_events(self): +# self._cb_cond.acquire() +# try: +# self._cb_cond.notify() +# finally: +# self._cb_cond.release() + + +# def _do_console_events(self): +# ''' +# Called by the Console thread to poll for events. Passes the events +# onto the ConsoleHandler associated with this Console. Is called +# periodically, but can also be kicked by Console.start_console_events(). +# ''' +# count = 0 +# valid = self.impl.getEvent(self._event) +# while valid: +# count += 1 +# try: +# if self._event.kind == qmfengine.ConsoleEvent.AGENT_ADDED: +# trace.debug("Console Event AGENT_ADDED received") +# if self._handler: +# self._handler.agent_added(AgentProxy(self._event.agent, None)) +# elif self._event.kind == qmfengine.ConsoleEvent.AGENT_DELETED: +# trace.debug("Console Event AGENT_DELETED received") +# if self._handler: +# self._handler.agent_deleted(AgentProxy(self._event.agent, None)) +# elif self._event.kind == qmfengine.ConsoleEvent.NEW_PACKAGE: +# trace.debug("Console Event NEW_PACKAGE received") +# if self._handler: +# self._handler.new_package(self._event.name) +# elif self._event.kind == qmfengine.ConsoleEvent.NEW_CLASS: +# trace.debug("Console Event NEW_CLASS received") +# if self._handler: +# self._handler.new_class(SchemaClassKey(self._event.classKey)) +# elif self._event.kind == qmfengine.ConsoleEvent.OBJECT_UPDATE: +# trace.debug("Console Event OBJECT_UPDATE received") +# if self._handler: +# self._handler.object_update(ConsoleObject(None, {"impl":self._event.object}), +# self._event.hasProps, self._event.hasStats) +# elif self._event.kind == qmfengine.ConsoleEvent.EVENT_RECEIVED: +# trace.debug("Console Event EVENT_RECEIVED received") +# elif self._event.kind == qmfengine.ConsoleEvent.AGENT_HEARTBEAT: +# trace.debug("Console Event AGENT_HEARTBEAT received") +# if self._handler: +# self._handler.agent_heartbeat(AgentProxy(self._event.agent, None), self._event.timestamp) +# elif self._event.kind == qmfengine.ConsoleEvent.METHOD_RESPONSE: +# trace.debug("Console Event METHOD_RESPONSE received") +# else: +# trace.debug("Console thread received unknown event: '%s'" % str(self._event.kind)) +# except e: +# print "Exception caught in callback thread:", e +# self.impl.popEvent() +# valid = self.impl.getEvent(self._event) +# return count + + + + + +# class Broker(ConnectionHandler): +# # attr_reader :impl :conn, :console, :broker_bank +# def __init__(self, console, conn): +# self.broker_bank = 1 +# self.console = console +# self.conn = conn +# self._session = None +# self._cv = Condition() +# self._stable = None +# self._event = qmfengine.BrokerEvent() +# self._xmtMessage = qmfengine.Message() +# self.impl = qmfengine.BrokerProxy(self.console.impl) +# self.console.impl.addConnection(self.impl, self) +# self.conn.add_conn_handler(self) +# self._operational = True + + +# def shutdown(self): +# trace.debug("broker.shutdown() called.") +# self.console.impl.delConnection(self.impl) +# self.conn.del_conn_handler(self) +# if self._session: +# self.impl.sessionClosed() +# trace.debug("broker.shutdown() sessionClosed done.") +# self._session.destroy() +# trace.debug("broker.shutdown() session destroy done.") +# self._session = None +# self._operational = False +# trace.debug("broker.shutdown() done.") + + +# def wait_for_stable(self, timeout = None): +# self._cv.acquire() +# try: +# if self._stable: +# return +# if timeout: +# self._cv.wait(timeout) +# if not self._stable: +# raise Exception("Timed out: waiting for broker connection to become stable") +# else: +# while not self._stable: +# self._cv.wait() +# finally: +# self._cv.release() + + +# def send_query(self, query, ctx, agent): +# agent_impl = None +# if agent: +# agent_impl = agent.impl +# self.impl.sendQuery(query, ctx, agent_impl) +# self.conn.kick() + + +# def _do_broker_events(self): +# count = 0 +# valid = self.impl.getEvent(self._event) +# while valid: +# count += 1 +# if self._event.kind == qmfengine.BrokerEvent.BROKER_INFO: +# trace.debug("Broker Event BROKER_INFO received"); +# elif self._event.kind == qmfengine.BrokerEvent.DECLARE_QUEUE: +# trace.debug("Broker Event DECLARE_QUEUE received"); +# self.conn.impl.declareQueue(self._session.handle, self._event.name) +# elif self._event.kind == qmfengine.BrokerEvent.DELETE_QUEUE: +# trace.debug("Broker Event DELETE_QUEUE received"); +# self.conn.impl.deleteQueue(self._session.handle, self._event.name) +# elif self._event.kind == qmfengine.BrokerEvent.BIND: +# trace.debug("Broker Event BIND received"); +# self.conn.impl.bind(self._session.handle, self._event.exchange, self._event.name, self._event.bindingKey) +# elif self._event.kind == qmfengine.BrokerEvent.UNBIND: +# trace.debug("Broker Event UNBIND received"); +# self.conn.impl.unbind(self._session.handle, self._event.exchange, self._event.name, self._event.bindingKey) +# elif self._event.kind == qmfengine.BrokerEvent.SETUP_COMPLETE: +# trace.debug("Broker Event SETUP_COMPLETE received"); +# self.impl.startProtocol() +# elif self._event.kind == qmfengine.BrokerEvent.STABLE: +# trace.debug("Broker Event STABLE received"); +# self._cv.acquire() +# try: +# self._stable = True +# self._cv.notify() +# finally: +# self._cv.release() +# elif self._event.kind == qmfengine.BrokerEvent.QUERY_COMPLETE: +# result = [] +# for idx in range(self._event.queryResponse.getObjectCount()): +# result.append(ConsoleObject(None, {"impl":self._event.queryResponse.getObject(idx), "broker":self})) +# self.console._get_result(result, self._event.context) +# elif self._event.kind == qmfengine.BrokerEvent.METHOD_RESPONSE: +# obj = self._event.context +# obj._method_result(MethodResponse(self._event.methodResponse())) + +# self.impl.popEvent() +# valid = self.impl.getEvent(self._event) + +# return count + + +# def _do_broker_messages(self): +# count = 0 +# valid = self.impl.getXmtMessage(self._xmtMessage) +# while valid: +# count += 1 +# trace.debug("Broker: sending msg on connection") +# self.conn.impl.sendMessage(self._session.handle, self._xmtMessage) +# self.impl.popXmt() +# valid = self.impl.getXmtMessage(self._xmtMessage) + +# return count + + +# def _do_events(self): +# while True: +# self.console.start_console_events() +# bcnt = self._do_broker_events() +# mcnt = self._do_broker_messages() +# if bcnt == 0 and mcnt == 0: +# break; + + +# def conn_event_connected(self): +# trace.debug("Broker: Connection event CONNECTED") +# self._session = Session(self.conn, "qmfc-%s.%d" % (socket.gethostname(), os.getpid()), self) +# self.impl.sessionOpened(self._session.handle) +# self._do_events() + + +# def conn_event_disconnected(self, error): +# trace.debug("Broker: Connection event DISCONNECTED") +# pass + + +# def conn_event_visit(self): +# self._do_events() + + +# def sess_event_session_closed(self, context, error): +# trace.debug("Broker: Session event CLOSED") +# self.impl.sessionClosed() + + +# def sess_event_recv(self, context, message): +# trace.debug("Broker: Session event MSG_RECV") +# if not self._operational: +# log.warning("Unexpected session event message received by Broker proxy: context='%s'" % str(context)) +# self.impl.handleRcvMessage(message) +# self._do_events() + + + +################################################################################ +################################################################################ +################################################################################ +################################################################################ +# TEMPORARY TEST CODE - TO BE DELETED +################################################################################ +################################################################################ +################################################################################ +################################################################################ + +if __name__ == '__main__': + # temp test code + import logging + from common import (qmfTypes, SchemaProperty) + + logging.getLogger().setLevel(logging.INFO) + + logging.info( "************* Creating Async Console **************" ) + + class MyNotifier(Notifier): + def __init__(self, context): + self._myContext = context + self.WorkAvailable = False + + def indication(self): + print("Indication received! context=%d" % self._myContext) + self.WorkAvailable = True + + _noteMe = MyNotifier( 666 ) + + _myConsole = Console(notifier=_noteMe) + + _myConsole.enable_agent_discovery() + logging.info("Waiting...") + + + logging.info( "Destroying console:" ) + _myConsole.destroy( 10 ) + + logging.info( "******** Messing around with Schema ********" ) + + _sec = SchemaEventClass( _classId=SchemaClassId("myPackage", "myClass", + stype=SchemaClassId.TYPE_EVENT), + _desc="A typical event schema", + _props={"Argument-1": SchemaProperty(_type_code=qmfTypes.TYPE_UINT8, + kwargs = {"min":0, + "max":100, + "unit":"seconds", + "desc":"sleep value"}), + "Argument-2": SchemaProperty(_type_code=qmfTypes.TYPE_LSTR, + kwargs={"maxlen":100, + "desc":"a string argument"})}) + print("_sec=%s" % _sec.get_class_id()) + print("_sec.gePropertyCount()=%d" % _sec.get_property_count() ) + print("_sec.getProperty('Argument-1`)=%s" % _sec.get_property('Argument-1') ) + print("_sec.getProperty('Argument-2`)=%s" % _sec.get_property('Argument-2') ) + try: + print("_sec.getProperty('not-found')=%s" % _sec.get_property('not-found') ) + except: + pass + print("_sec.getProperties()='%s'" % _sec.get_properties()) + + print("Adding another argument") + _arg3 = SchemaProperty( _type_code=qmfTypes.TYPE_BOOL, + kwargs={"dir":"IO", + "desc":"a boolean argument"}) + _sec.add_property('Argument-3', _arg3) + print("_sec=%s" % _sec.get_class_id()) + print("_sec.getPropertyCount()=%d" % _sec.get_property_count() ) + print("_sec.getProperty('Argument-1')=%s" % _sec.get_property('Argument-1') ) + print("_sec.getProperty('Argument-2')=%s" % _sec.get_property('Argument-2') ) + print("_sec.getProperty('Argument-3')=%s" % _sec.get_property('Argument-3') ) + + print("_arg3.mapEncode()='%s'" % _arg3.map_encode() ) + + _secmap = _sec.map_encode() + print("_sec.mapEncode()='%s'" % _secmap ) + + _sec2 = SchemaEventClass( _map=_secmap ) + + print("_sec=%s" % _sec.get_class_id()) + print("_sec2=%s" % _sec2.get_class_id()) + + _soc = SchemaObjectClass( _map = {"_schema_id": {"_package_name": "myOtherPackage", + "_class_name": "myOtherClass", + "_type": "_data"}, + "_desc": "A test data object", + "_values": + {"prop1": {"amqp_type": qmfTypes.TYPE_UINT8, + "access": "RO", + "index": True, + "unit": "degrees"}, + "prop2": {"amqp_type": qmfTypes.TYPE_UINT8, + "access": "RW", + "index": True, + "desc": "The Second Property(tm)", + "unit": "radians"}, + "statistics": { "amqp_type": qmfTypes.TYPE_DELTATIME, + "unit": "seconds", + "desc": "time until I retire"}, + "meth1": {"_desc": "A test method", + "_arguments": + {"arg1": {"amqp_type": qmfTypes.TYPE_UINT32, + "desc": "an argument 1", + "dir": "I"}, + "arg2": {"amqp_type": qmfTypes.TYPE_BOOL, + "dir": "IO", + "desc": "some weird boolean"}}}, + "meth2": {"_desc": "A test method", + "_arguments": + {"m2arg1": {"amqp_type": qmfTypes.TYPE_UINT32, + "desc": "an 'nuther argument", + "dir": + "I"}}}}, + "_subtypes": + {"prop1":"qmfProperty", + "prop2":"qmfProperty", + "statistics":"qmfProperty", + "meth1":"qmfMethod", + "meth2":"qmfMethod"}, + "_primary_key_names": ["prop2", "prop1"]}) + + print("_soc='%s'" % _soc) + + print("_soc.getPrimaryKeyList='%s'" % _soc.get_id_names()) + + print("_soc.getPropertyCount='%d'" % _soc.get_property_count()) + print("_soc.getProperties='%s'" % _soc.get_properties()) + print("_soc.getProperty('prop2')='%s'" % _soc.get_property('prop2')) + + print("_soc.getMethodCount='%d'" % _soc.get_method_count()) + print("_soc.getMethods='%s'" % _soc.get_methods()) + print("_soc.getMethod('meth2')='%s'" % _soc.get_method('meth2')) + + _socmap = _soc.map_encode() + print("_socmap='%s'" % _socmap) + _soc2 = SchemaObjectClass( _map=_socmap ) + print("_soc='%s'" % _soc) + print("_soc2='%s'" % _soc2) + + if _soc2.get_class_id() == _soc.get_class_id(): + print("soc and soc2 are the same schema") + + + logging.info( "******** Messing around with ObjectIds ********" ) + + + qd = QmfData( _values={"prop1":1, "prop2":True, "prop3": {"a":"map"}, "prop4": "astring"} ) + print("qd='%s':" % qd) + + print("prop1=%d prop2=%s prop3=%s prop4=%s" % (qd.prop1, qd.prop2, qd.prop3, qd.prop4)) + + print("qd map='%s'" % qd.map_encode()) + print("qd getProperty('prop4')='%s'" % qd.get_value("prop4")) + qd.set_value("prop4", 4, "A test property called 4") + print("qd setProperty('prop4', 4)='%s'" % qd.get_value("prop4")) + qd.prop4 = 9 + print("qd.prop4 = 9 ='%s'" % qd.prop4) + qd["prop4"] = 11 + print("qd[prop4] = 11 ='%s'" % qd["prop4"]) + + print("qd.mapEncode()='%s'" % qd.map_encode()) + _qd2 = QmfData( _map = qd.map_encode() ) + print("_qd2.mapEncode()='%s'" % _qd2.map_encode()) + + _qmfDesc1 = QmfConsoleData( {"_values" : {"prop1": 1, "statistics": 666, + "prop2": 0}}, + agent="some agent name?", + _schema = _soc) + + print("_qmfDesc1 map='%s'" % _qmfDesc1.map_encode()) + + _qmfDesc1._set_schema( _soc ) + + print("_qmfDesc1 prop2 = '%s'" % _qmfDesc1.get_value("prop2")) + print("_qmfDesc1 primarykey = '%s'" % _qmfDesc1.get_object_id()) + print("_qmfDesc1 classid = '%s'" % _qmfDesc1.get_schema_class_id()) + + + _qmfDescMap = _qmfDesc1.map_encode() + print("_qmfDescMap='%s'" % _qmfDescMap) + + _qmfDesc2 = QmfData( _map=_qmfDescMap, _schema=_soc ) + + print("_qmfDesc2 map='%s'" % _qmfDesc2.map_encode()) + print("_qmfDesc2 prop2 = '%s'" % _qmfDesc2.get_value("prop2")) + print("_qmfDesc2 primary key = '%s'" % _qmfDesc2.get_object_id()) + + + logging.info( "******** Messing around with QmfEvents ********" ) + + + _qmfevent1 = QmfEvent( _timestamp = 1111, + _schema = _sec, + _values = {"Argument-1": 77, + "Argument-3": True, + "Argument-2": "a string"}) + print("_qmfevent1.mapEncode()='%s'" % _qmfevent1.map_encode()) + print("_qmfevent1.getTimestamp()='%s'" % _qmfevent1.get_timestamp()) + + _qmfevent1Map = _qmfevent1.map_encode() + + _qmfevent2 = QmfEvent(_map=_qmfevent1Map, _schema=_sec) + print("_qmfevent2.mapEncode()='%s'" % _qmfevent2.map_encode()) + + + logging.info( "******** Messing around with Queries ********" ) + + _q1 = QmfQuery.create_predicate(QmfQuery.TARGET_AGENT, + [QmfQuery.AND, + [QmfQuery.EQ, "vendor", [QmfQuery.QUOTE, "AVendor"]], + [QmfQuery.EQ, [QmfQuery.QUOTE, "SomeProduct"], "product"], + [QmfQuery.EQ, [QmfQuery.UNQUOTE, "name"], [QmfQuery.QUOTE, "Thingy"]], + [QmfQuery.OR, + [QmfQuery.LE, "temperature", -10], + [QmfQuery.FALSE], + [QmfQuery.EXISTS, "namey"]]]) + + print("_q1.mapEncode() = [%s]" % _q1.map_encode()) |