diff options
Diffstat (limited to 'qpid/extras/qmf/src/py/qmf2-prototype/agent.py')
-rw-r--r-- | qpid/extras/qmf/src/py/qmf2-prototype/agent.py | 1380 |
1 files changed, 1380 insertions, 0 deletions
diff --git a/qpid/extras/qmf/src/py/qmf2-prototype/agent.py b/qpid/extras/qmf/src/py/qmf2-prototype/agent.py new file mode 100644 index 0000000000..4ec00bd288 --- /dev/null +++ b/qpid/extras/qmf/src/py/qmf2-prototype/agent.py @@ -0,0 +1,1380 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import sys +import datetime +import time +import Queue +from logging import getLogger +from threading import Thread, RLock, currentThread, Event +from qpid.messaging import Connection, Message, Empty, SendError +from uuid import uuid4 +from common import (OpCode, QmfQuery, ContentType, SchemaObjectClass, + QmfData, QmfAddress, SchemaClass, SchemaClassId, WorkItem, + SchemaMethod, timedelta_to_secs, QMF_APP_ID) + +# global flag that indicates which thread (if any) is +# running the agent notifier callback +_callback_thread=None + +log = getLogger("qmf") +trace = getLogger("qmf.agent") + + + ##============================================================================== + ## METHOD CALL + ##============================================================================== + +class _MethodCallHandle(object): + """ + Private class used to hold context when handing off a method call to the + application. Given to the app in a WorkItem, provided to the agent when + method_response() is invoked. + """ + def __init__(self, correlation_id, reply_to, meth_name, _oid=None, + _schema_id=None): + self.correlation_id = correlation_id + self.reply_to = reply_to + self.meth_name = meth_name + self.oid = _oid + self.schema_id = _schema_id + +class MethodCallParams(object): + """ + """ + def __init__(self, name, _oid=None, _schema_id=None, _in_args=None, + _user_id=None): + self._meth_name = name + self._oid = _oid + self._schema_id = _schema_id + self._in_args = _in_args + self._user_id = _user_id + + def get_name(self): + return self._meth_name + + def get_object_id(self): + return self._oid + + def get_schema_id(self): + return self._schema_id + + def get_args(self): + return self._in_args + + def get_user_id(self): + return self._user_id + + + ##============================================================================== + ## SUBSCRIPTIONS + ##============================================================================== + + +class _SubscriptionState(object): + """ + An internally-managed subscription. + """ + def __init__(self, reply_to, cid, query, interval, duration): + self.reply_to = reply_to + self.correlation_id = cid + self.query = query + self.interval = interval + self.duration = duration + now = datetime.datetime.utcnow() + self.next_update = now # do an immediate update + self.expiration = now + datetime.timedelta(seconds=duration) + self.last_update = None + self.id = 0 + + def resubscribe(self, now, _duration=None): + if _duration is not None: + self.duration = _duration + self.expiration = now + datetime.timedelta(seconds=self.duration) + + def published(self, now): + self.next_update = now + datetime.timedelta(seconds=self.interval) + self.last_update = now + + + ##============================================================================== + ## AGENT + ##============================================================================== + +class Agent(Thread): + def __init__(self, name, _domain=None, _notifier=None, **options): + Thread.__init__(self) + self._running = False + self._ready = Event() + + self.name = str(name) + self._domain = _domain + self._address = QmfAddress.direct(self.name, self._domain) + self._notifier = _notifier + + # configurable parameters + # + self._heartbeat_interval = options.get("heartbeat_interval", 30) + self._capacity = options.get("capacity", 10) + self._default_duration = options.get("default_duration", 300) + self._max_duration = options.get("max_duration", 3600) + self._min_duration = options.get("min_duration", 10) + self._default_interval = options.get("default_interval", 30) + self._min_interval = options.get("min_interval", 5) + + # @todo: currently, max # of objects in a single reply message, would + # be better if it were max bytesize of per-msg content... + self._max_msg_size = options.get("max_msg_size", 0) + + self._conn = None + self._session = None + self._direct_receiver = None + self._topic_receiver = None + self._direct_sender = None + self._topic_sender = None + + self._lock = RLock() + self._packages = {} + self._schema_timestamp = long(0) + self._schema = {} + # _described_data holds QmfData objects that are associated with schema + # it is index by schema_id, object_id + self._described_data = {} + # _undescribed_data holds unstructured QmfData objects - these objects + # have no schema. it is indexed by object_id only. + self._undescribed_data = {} + self._work_q = Queue.Queue() + self._work_q_put = False + # subscriptions + self._subscription_id = long(time.time()) + self._subscriptions = {} + self._next_subscribe_event = None + + # prevents multiple _wake_thread() calls + self._noop_pending = False + + + def destroy(self, timeout=None): + """ + Must be called before the Agent is deleted. + Frees up all resources and shuts down all background threads. + + @type timeout: float + @param timeout: maximum time in seconds to wait for all background threads to terminate. Default: forever. + """ + trace.debug("Destroying Agent %s" % self.name) + if self._conn: + self.remove_connection(timeout) + trace.debug("Agent Destroyed") + + + def get_name(self): + return self.name + + def set_connection(self, conn): + self._conn = conn + self._session = self._conn.session() + + # for messages directly addressed to me + self._direct_receiver = self._session.receiver(str(self._address) + + ";{create:always," + " node:" + " {type:topic," + " x-declare:" + " {type:direct}}}", + capacity=self._capacity) + trace.debug("my direct addr=%s" % self._direct_receiver.source) + + # for sending directly addressed messages. + self._direct_sender = self._session.sender(str(self._address.get_node()) + + ";{create:always," + " node:" + " {type:topic," + " x-declare:" + " {type:direct}}}") + trace.debug("my default direct send addr=%s" % self._direct_sender.target) + + # for receiving "broadcast" messages from consoles + default_addr = QmfAddress.topic(QmfAddress.SUBJECT_CONSOLE_IND + ".#", + self._domain) + self._topic_receiver = self._session.receiver(str(default_addr) + + ";{create:always," + " node:" + " {type:topic}}", + capacity=self._capacity) + trace.debug("console.ind addr=%s" % self._topic_receiver.source) + + # for sending to topic subscribers + ind_addr = QmfAddress.topic(QmfAddress.SUBJECT_AGENT_IND, + self._domain) + self._topic_sender = self._session.sender(str(ind_addr) + + ";{create:always," + " node:" + " {type:topic}}") + trace.debug("agent.ind addr=%s" % self._topic_sender.target) + + self._running = True + self.start() + self._ready.wait(10) + if not self._ready.isSet(): + raise Exception("Agent managment thread failed to start.") + + def remove_connection(self, timeout=None): + # tell connection thread to shutdown + self._running = False + if self.isAlive(): + # kick my thread to wake it up + self._wake_thread() + trace.debug("waiting for agent receiver thread to exit") + self.join(timeout) + if self.isAlive(): + log.error( "Agent thread '%s' is hung..." % self.name) + self._direct_receiver.close() + self._direct_receiver = None + self._direct_sender.close() + self._direct_sender = None + self._topic_receiver.close() + self._topic_receiver = None + self._topic_sender.close() + self._topic_sender = None + self._session.close() + self._session = None + self._conn = None + trace.debug("agent connection removal complete") + + def register_object_class(self, schema): + """ + Register an instance of a SchemaClass with this agent + """ + # @todo: need to update subscriptions + # @todo: need to mark schema as "non-const" + if not isinstance(schema, SchemaClass): + raise TypeError("SchemaClass instance expected") + + classId = schema.get_class_id() + pname = classId.get_package_name() + cname = classId.get_class_name() + hstr = classId.get_hash_string() + if not hstr: + raise Exception("Schema hash is not set.") + + self._lock.acquire() + try: + if pname not in self._packages: + self._packages[pname] = [cname] + else: + if cname not in self._packages[pname]: + self._packages[pname].append(cname) + self._schema[classId] = schema + self._schema_timestamp = long(time.time() * 1000) + finally: + self._lock.release() + + def register_event_class(self, schema): + return self.register_object_class(schema) + + def raise_event(self, qmfEvent): + """ + TBD + """ + if not self._topic_sender: + raise Exception("No connection available") + + # @todo: should we validate against the schema? + msg = Message(id=QMF_APP_ID, + subject=QmfAddress.SUBJECT_AGENT_EVENT + "." + + qmfEvent.get_severity() + "." + self.name, + properties={"method":"indication", + "qmf.opcode":OpCode.data_ind, + "qmf.content": ContentType.event, + "qmf.agent":self.name}, + content=[qmfEvent.map_encode()]) + # TRACE + # log.error("!!! Agent %s sending Event (%s)" % + # (self.name, str(msg))) + self._topic_sender.send(msg) + + def add_object(self, data): + """ + Register an instance of a QmfAgentData object. + """ + # @todo: need to mark schema as "non-const" + if not isinstance(data, QmfAgentData): + raise TypeError("QmfAgentData instance expected") + + oid = data.get_object_id() + if not oid: + raise TypeError("No identifier assigned to QmfAgentData!") + + sid = data.get_schema_class_id() + + self._lock.acquire() + try: + if sid: + if sid not in self._described_data: + self._described_data[sid] = {oid: data} + else: + self._described_data[sid][oid] = data + else: + self._undescribed_data[oid] = data + + # does the new object match any subscriptions? + now = datetime.datetime.utcnow() + for sid,sub in self._subscriptions.iteritems(): + if sub.query.evaluate(data): + # matched. Mark the subscription as needing to be + # serviced. The _publish() method will notice the new + # object and will publish it next time it runs. + sub.next_update = now + self._next_subscribe_event = None + # @todo: should we immediately publish? + + finally: + self._lock.release() + + def get_object(self, oid, schema_id): + data = None + self._lock.acquire() + try: + if schema_id: + data = self._described_data.get(schema_id) + if data: + data = data.get(oid) + else: + data = self._undescribed_data.get(oid) + finally: + self._lock.release() + return data + + + def method_response(self, handle, _out_args=None, _error=None): + """ + """ + if not isinstance(handle, _MethodCallHandle): + raise TypeError("Invalid handle passed to method_response!") + + _map = {SchemaMethod.KEY_NAME:handle.meth_name} + if handle.oid is not None: + _map[QmfData.KEY_OBJECT_ID] = handle.oid + if handle.schema_id is not None: + _map[QmfData.KEY_SCHEMA_ID] = handle.schema_id.map_encode() + if _out_args is not None: + _map[SchemaMethod.KEY_ARGUMENTS] = _out_args.copy() + if _error is not None: + if not isinstance(_error, QmfData): + raise TypeError("Invalid type for error - must be QmfData") + _map[SchemaMethod.KEY_ERROR] = _error.map_encode() + + msg = Message(id=QMF_APP_ID, + properties={"method":"response", + "qmf.opcode":OpCode.method_rsp}, + content=_map) + msg.correlation_id = handle.correlation_id + + self._send_reply(msg, handle.reply_to) + + def get_workitem_count(self): + """ + Returns the count of pending WorkItems that can be retrieved. + """ + return self._work_q.qsize() + + def get_next_workitem(self, timeout=None): + """ + Obtains the next pending work item, or None if none available. + """ + try: + wi = self._work_q.get(True, timeout) + except Queue.Empty: + return None + return wi + + def release_workitem(self, wi): + """ + Releases a WorkItem instance obtained by getNextWorkItem(). Called when + the application has finished processing the WorkItem. + """ + pass + + + def run(self): + global _callback_thread + next_heartbeat = datetime.datetime.utcnow() + batch_limit = 10 # a guess + + self._ready.set() + + while self._running: + + # + # Process inbound messages + # + trace.debug("%s processing inbound messages..." % self.name) + for i in range(batch_limit): + try: + msg = self._topic_receiver.fetch(timeout=0) + except Empty: + break + # TRACE + # log.error("!!! Agent %s: msg on %s [%s]" % + # (self.name, self._topic_receiver.source, msg)) + self._dispatch(msg, _direct=False) + + for i in range(batch_limit): + try: + msg = self._direct_receiver.fetch(timeout=0) + except Empty: + break + # TRACE + # log.error("!!! Agent %s: msg on %s [%s]" % + # (self.name, self._direct_receiver.source, msg)) + self._dispatch(msg, _direct=True) + + # + # Send Heartbeat Notification + # + now = datetime.datetime.utcnow() + if now >= next_heartbeat: + trace.debug("%s sending heartbeat..." % self.name) + ind = Message(id=QMF_APP_ID, + subject=QmfAddress.SUBJECT_AGENT_HEARTBEAT, + properties={"method":"indication", + "qmf.opcode":OpCode.agent_heartbeat_ind, + "qmf.agent":self.name}, + content=self._makeAgentInfoBody()) + # TRACE + #log.error("!!! Agent %s sending Heartbeat (%s)" % + # (self.name, str(ind))) + self._topic_sender.send(ind) + trace.debug("Agent Indication Sent") + next_heartbeat = now + datetime.timedelta(seconds = self._heartbeat_interval) + + # + # Monitor Subscriptions + # + self._lock.acquire() + try: + now = datetime.datetime.utcnow() + if (self._next_subscribe_event is None or + now >= self._next_subscribe_event): + trace.debug("%s polling subscriptions..." % self.name) + self._next_subscribe_event = now + datetime.timedelta(seconds= + self._max_duration) + dead_ss = {} + for sid,ss in self._subscriptions.iteritems(): + if now >= ss.expiration: + dead_ss[sid] = ss + continue + if now >= ss.next_update: + self._publish(ss) + next_timeout = min(ss.expiration, ss.next_update) + if next_timeout < self._next_subscribe_event: + self._next_subscribe_event = next_timeout + + for sid,ss in dead_ss.iteritems(): + del self._subscriptions[sid] + self._unpublish(ss) + finally: + self._lock.release() + + # + # notify application of pending WorkItems + # + if self._work_q_put and self._notifier: + trace.debug("%s notifying application..." % self.name) + # new stuff on work queue, kick the the application... + self._work_q_put = False + _callback_thread = currentThread() + trace.debug("Calling agent notifier.indication") + self._notifier.indication() + _callback_thread = None + + # + # Sleep until messages arrive or something times out + # + now = datetime.datetime.utcnow() + next_timeout = next_heartbeat + self._lock.acquire() + try: + # the mailbox expire flag may be cleared by the + # app thread(s) in order to force an immediate publish + if self._next_subscribe_event is None: + next_timeout = now + elif self._next_subscribe_event < next_timeout: + next_timeout = self._next_subscribe_event + finally: + self._lock.release() + + timeout = timedelta_to_secs(next_timeout - now) + + if self._running and timeout > 0.0: + trace.debug("%s sleeping %s seconds..." % (self.name, + timeout)) + try: + self._session.next_receiver(timeout=timeout) + except Empty: + pass + + + trace.debug("Shutting down Agent %s thread" % self.name) + + # + # Private: + # + + def _makeAgentInfoBody(self): + """ + Create an agent indication message body identifying this agent + """ + return QmfData.create({"_name": self.get_name(), + "_schema_timestamp": self._schema_timestamp}).map_encode() + + def _send_reply(self, msg, reply_to): + """ + Send a reply message to the given reply_to address + """ + if not isinstance(reply_to, QmfAddress): + try: + reply_to = QmfAddress.from_string(str(reply_to)) + except ValueError: + log.error("Invalid reply-to address '%s'" % reply_to) + + msg.subject = reply_to.get_subject() + + try: + if reply_to.is_direct(): + # TRACE + #log.error("!!! Agent %s direct REPLY-To:%s (%s)" % + # (self.name, str(reply_to), str(msg))) + self._direct_sender.send(msg) + else: + # TRACE + # log.error("!!! Agent %s topic REPLY-To:%s (%s)" % + # (self.name, str(reply_to), str(msg))) + self._topic_sender.send(msg) + trace.debug("reply msg sent to [%s]" % str(reply_to)) + except SendError, e: + log.error("Failed to send reply msg '%s' (%s)" % (msg, str(e))) + + def _send_query_response(self, content_type, cid, reply_to, objects): + """ + Send a response to a query, breaking the result into multiple + messages based on the agent's _max_msg_size config parameter + """ + + total = len(objects) + if self._max_msg_size: + max_count = self._max_msg_size + else: + max_count = total + + start = 0 + end = min(total, max_count) + # send partial response if too many objects present + while end < total: + m = Message(id=QMF_APP_ID, + properties={"method":"response", + "partial":None, + "qmf.opcode":OpCode.data_ind, + "qmf.content":content_type, + "qmf.agent":self.name}, + correlation_id = cid, + content=objects[start:end]) + self._send_reply(m, reply_to) + start = end + end = min(total, end + max_count) + + m = Message(id=QMF_APP_ID, + properties={"method":"response", + "qmf.opcode":OpCode.data_ind, + "qmf.content":content_type, + "qmf.agent":self.name}, + correlation_id = cid, + content=objects[start:end]) + self._send_reply(m, reply_to) + + def _dispatch(self, msg, _direct=False): + """ + Process a message from a console. + + @param _direct: True if msg directly addressed to this agent. + """ + trace.debug( "Message received from Console! [%s]" % msg ) + + opcode = msg.properties.get("qmf.opcode") + if not opcode: + log.warning("Ignoring unrecognized message '%s'" % msg) + return + version = 2 # @todo: fix me + cmap = {}; props={} + if msg.content_type == "amqp/map": + cmap = msg.content + if msg.properties: + props = msg.properties + + if opcode == OpCode.agent_locate_req: + self._handleAgentLocateMsg( msg, cmap, props, version, _direct ) + elif opcode == OpCode.query_req: + self._handleQueryMsg( msg, cmap, props, version, _direct ) + elif opcode == OpCode.method_req: + self._handleMethodReqMsg(msg, cmap, props, version, _direct) + elif opcode == OpCode.subscribe_req: + self._handleSubscribeReqMsg(msg, cmap, props, version, _direct) + elif opcode == OpCode.subscribe_refresh_ind: + self._handleResubscribeReqMsg(msg, cmap, props, version, _direct) + elif opcode == OpCode.subscribe_cancel_ind: + self._handleUnsubscribeReqMsg(msg, cmap, props, version, _direct) + elif opcode == OpCode.noop: + self._noop_pending = False + trace.debug("No-op msg received.") + else: + log.warning("Ignoring message with unrecognized 'opcode' value: '%s'" + % opcode) + + def _handleAgentLocateMsg( self, msg, cmap, props, version, direct ): + """ + Process a received agent-locate message + """ + trace.debug("_handleAgentLocateMsg") + + reply = False + if props.get("method") == "request": + # if the message is addressed to me or wildcard, process it + if (msg.subject == "console.ind" or + msg.subject == "console.ind.locate" or + msg.subject == "console.ind.locate." + self.name): + pred = msg.content + if not pred: + reply = True + elif isinstance(pred, type([])): + # fake a QmfData containing my identifier for the query compare + query = QmfQuery.create_predicate(QmfQuery.TARGET_AGENT, pred) + tmpData = QmfData.create({QmfQuery.KEY_AGENT_NAME: + self.get_name()}, + _object_id="my-name") + reply = query.evaluate(tmpData) + + if reply: + m = Message(id=QMF_APP_ID, + properties={"method":"response", + "qmf.opcode":OpCode.agent_locate_rsp}, + content=self._makeAgentInfoBody()) + m.correlation_id = msg.correlation_id + self._send_reply(m, msg.reply_to) + else: + trace.debug("agent-locate msg not mine - no reply sent") + + + def _handleQueryMsg(self, msg, cmap, props, version, _direct ): + """ + Handle received query message + """ + trace.debug("_handleQueryMsg") + + if "method" in props and props["method"] == "request": + if cmap: + try: + query = QmfQuery.from_map(cmap) + except TypeError: + log.error("Invalid Query format: '%s'" % str(cmap)) + return + target = query.get_target() + if target == QmfQuery.TARGET_PACKAGES: + self._queryPackagesReply( msg, query ) + elif target == QmfQuery.TARGET_SCHEMA_ID: + self._querySchemaReply( msg, query, _idOnly=True ) + elif target == QmfQuery.TARGET_SCHEMA: + self._querySchemaReply( msg, query) + elif target == QmfQuery.TARGET_AGENT: + log.warning("!!! @todo: Query TARGET=AGENT TBD !!!") + elif target == QmfQuery.TARGET_OBJECT_ID: + self._queryDataReply(msg, query, _idOnly=True) + elif target == QmfQuery.TARGET_OBJECT: + self._queryDataReply(msg, query) + else: + log.warning("Unrecognized query target: '%s'" % str(target)) + + + + def _handleMethodReqMsg(self, msg, cmap, props, version, _direct): + """ + Process received Method Request + """ + if "method" in props and props["method"] == "request": + mname = cmap.get(SchemaMethod.KEY_NAME) + if not mname: + log.warning("Invalid method call from '%s': no name" + % msg.reply_to) + return + + in_args = cmap.get(SchemaMethod.KEY_ARGUMENTS) + oid = cmap.get(QmfData.KEY_OBJECT_ID) + schema_id = cmap.get(QmfData.KEY_SCHEMA_ID) + if schema_id: + schema_id = SchemaClassId.from_map(schema_id) + handle = _MethodCallHandle(msg.correlation_id, + msg.reply_to, + mname, + oid, schema_id) + param = MethodCallParams( mname, oid, schema_id, in_args, + msg.user_id) + + # @todo: validate the method against the schema: + # if self._schema: + # # validate + # _in_args = _in_args.copy() + # ms = self._schema.get_method(name) + # if ms is None: + # raise ValueError("Method '%s' is undefined." % name) + + # for aname,prop in ms.get_arguments().iteritems(): + # if aname not in _in_args: + # if prop.get_default(): + # _in_args[aname] = prop.get_default() + # elif not prop.is_optional(): + # raise ValueError("Method '%s' requires argument '%s'" + # % (name, aname)) + # for aname in _in_args.iterkeys(): + # prop = ms.get_argument(aname) + # if prop is None: + # raise ValueError("Method '%s' does not define argument" + # " '%s'" % (name, aname)) + # if "I" not in prop.get_direction(): + # raise ValueError("Method '%s' argument '%s' is not an" + # " input." % (name, aname)) + + # # @todo check if value is correct (type, range, etc) + + self._work_q.put(WorkItem(WorkItem.METHOD_CALL, handle, param)) + self._work_q_put = True + + def _handleSubscribeReqMsg(self, msg, cmap, props, version, _direct): + """ + Process received Subscription Request + """ + if "method" in props and props["method"] == "request": + query_map = cmap.get("_query") + interval = cmap.get("_interval") + duration = cmap.get("_duration") + + try: + query = QmfQuery.from_map(query_map) + except TypeError: + log.warning("Invalid query for subscription: %s" % + str(query_map)) + return + + if isinstance(self, AgentExternal): + # param = SubscriptionParams(_ConsoleHandle(console_handle, + # msg.reply_to), + # query, + # interval, + # duration, + # msg.user_id) + # self._work_q.put(WorkItem(WorkItem.SUBSCRIBE_REQUEST, + # msg.correlation_id, param)) + # self._work_q_put = True + log.error("External Subscription TBD") + return + + # validate the query - only specific objects, or + # objects wildcard, are currently supported. + if (query.get_target() != QmfQuery.TARGET_OBJECT or + (query.get_selector() == QmfQuery.PREDICATE and + query.get_predicate())): + log.error("Subscriptions only support (wildcard) Object" + " Queries.") + err = QmfData.create( + {"reason": "Unsupported Query type for subscription.", + "query": str(query.map_encode())}) + m = Message(id=QMF_APP_ID, + properties={"method":"response", + "qmf.opcode":OpCode.subscribe_rsp}, + correlation_id = msg.correlation_id, + content={"_error": err.map_encode()}) + self._send_reply(m, msg.reply_to) + return + + if duration is None: + duration = self._default_duration + else: + try: + duration = float(duration) + if duration > self._max_duration: + duration = self._max_duration + elif duration < self._min_duration: + duration = self._min_duration + except: + log.warning("Bad duration value: %s" % str(msg)) + duration = self._default_duration + + if interval is None: + interval = self._default_interval + else: + try: + interval = float(interval) + if interval < self._min_interval: + interval = self._min_interval + except: + log.warning("Bad interval value: %s" % str(msg)) + interval = self._default_interval + + ss = _SubscriptionState(msg.reply_to, + msg.correlation_id, + query, + interval, + duration) + self._lock.acquire() + try: + sid = self._subscription_id + self._subscription_id += 1 + ss.id = sid + self._subscriptions[sid] = ss + self._next_subscribe_event = None + finally: + self._lock.release() + + sr_map = {"_subscription_id": sid, + "_interval": interval, + "_duration": duration} + m = Message(id=QMF_APP_ID, + properties={"method":"response", + "qmf.opcode":OpCode.subscribe_rsp}, + correlation_id = msg.correlation_id, + content=sr_map) + self._send_reply(m, msg.reply_to) + + + + def _handleResubscribeReqMsg(self, msg, cmap, props, version, _direct): + """ + Process received Renew Subscription Request + """ + if props.get("method") == "request": + sid = cmap.get("_subscription_id") + if not sid: + log.error("Invalid subscription refresh msg: %s" % + str(msg)) + return + + self._lock.acquire() + try: + ss = self._subscriptions.get(sid) + if not ss: + log.error("Ignoring unknown subscription: %s" % + str(sid)) + return + duration = cmap.get("_duration") + if duration is not None: + try: + duration = float(duration) + if duration > self._max_duration: + duration = self._max_duration + elif duration < self._min_duration: + duration = self._min_duration + except: + log.error("Bad duration value: %s" % str(msg)) + duration = None # use existing duration + + ss.resubscribe(datetime.datetime.utcnow(), duration) + + new_duration = ss.duration + new_interval = ss.interval + + finally: + self._lock.release() + + + sr_map = {"_subscription_id": sid, + "_interval": new_interval, + "_duration": new_duration} + m = Message(id=QMF_APP_ID, + properties={"method":"response", + "qmf.opcode":OpCode.subscribe_rsp}, + correlation_id = msg.correlation_id, + content=sr_map) + self._send_reply(m, msg.reply_to) + + + def _handleUnsubscribeReqMsg(self, msg, cmap, props, version, _direct): + """ + Process received Cancel Subscription Request + """ + if props.get("method") == "request": + sid = cmap.get("_subscription_id") + if not sid: + log.warning("No subscription id supplied: %s" % msg) + return + + self._lock.acquire() + try: + if sid in self._subscriptions: + dead_sub = self._subscriptions[sid] + del self._subscriptions[sid] + finally: + self._lock.release() + + self._unpublish(dead_sub) + + + def _queryPackagesReply(self, msg, query): + """ + Run a query against the list of known packages + """ + pnames = [] + self._lock.acquire() + try: + for name in self._packages.iterkeys(): + qmfData = QmfData.create({SchemaClassId.KEY_PACKAGE:name}, + _object_id="_package") + if query.evaluate(qmfData): + pnames.append(name) + + self._send_query_response(ContentType.schema_package, + msg.correlation_id, + msg.reply_to, + pnames) + finally: + self._lock.release() + + + def _querySchemaReply( self, msg, query, _idOnly=False ): + """ + """ + schemas = [] + + self._lock.acquire() + try: + # if querying for a specific schema, do a direct lookup + if query.get_selector() == QmfQuery.ID: + found = self._schema.get(query.get_id()) + if found: + if _idOnly: + schemas.append(query.get_id().map_encode()) + else: + schemas.append(found.map_encode()) + else: # otherwise, evaluate all schema + for sid,val in self._schema.iteritems(): + if query.evaluate(val): + if _idOnly: + schemas.append(sid.map_encode()) + else: + schemas.append(val.map_encode()) + if _idOnly: + msgkey = ContentType.schema_id + else: + msgkey = ContentType.schema_class + + self._send_query_response(msgkey, + msg.correlation_id, + msg.reply_to, + schemas) + finally: + self._lock.release() + + + def _queryDataReply( self, msg, query, _idOnly=False ): + """ + """ + # hold the (recursive) lock for the duration so the Agent + # won't send data that is currently being modified by the + # app. + self._lock.acquire() + try: + response = [] + data_objs = self._queryData(query) + if _idOnly: + for obj in data_objs: + response.append(obj.get_object_id()) + else: + for obj in data_objs: + response.append(obj.map_encode()) + + if _idOnly: + msgkey = ContentType.object_id + else: + msgkey = ContentType.data + + self._send_query_response(msgkey, + msg.correlation_id, + msg.reply_to, + response) + finally: + self._lock.release() + + + def _queryData(self, query): + """ + Return a list of QmfData objects that match a given query + """ + data_objs = [] + # extract optional schema_id from target params + sid = None + t_params = query.get_target_param() + if t_params: + sid = t_params.get(QmfData.KEY_SCHEMA_ID) + + self._lock.acquire() + try: + # if querying for a specific object, do a direct lookup + if query.get_selector() == QmfQuery.ID: + oid = query.get_id() + if sid and not sid.get_hash_string(): + # wildcard schema_id match, check each schema + for name,db in self._described_data.iteritems(): + if (name.get_class_name() == sid.get_class_name() + and name.get_package_name() == sid.get_package_name()): + found = db.get(oid) + if found: + data_objs.append(found) + else: + found = None + if sid: + db = self._described_data.get(sid) + if db: + found = db.get(oid) + else: + found = self._undescribed_data.get(oid) + if found: + data_objs.append(found) + + else: # otherwise, evaluate all data + if sid and not sid.get_hash_string(): + # wildcard schema_id match, check each schema + for name,db in self._described_data.iteritems(): + if (name.get_class_name() == sid.get_class_name() + and name.get_package_name() == sid.get_package_name()): + for oid,data in db.iteritems(): + if query.evaluate(data): + data_objs.append(data) + else: + if sid: + db = self._described_data.get(sid) + else: + db = self._undescribed_data + + if db: + for oid,data in db.iteritems(): + if query.evaluate(data): + data_objs.append(data) + finally: + self._lock.release() + + return data_objs + + def _publish(self, sub): + """ Publish a subscription. + """ + response = [] + now = datetime.datetime.utcnow() + objs = self._queryData(sub.query) + if objs: + for obj in objs: + if sub.id not in obj._subscriptions: + # new to subscription - publish it + obj._subscriptions[sub.id] = sub + response.append(obj.map_encode()) + elif obj._dtime: + # obj._dtime is millisec since utc. Convert to datetime + utcdt = datetime.datetime.utcfromtimestamp(obj._dtime/1000.0) + if utcdt > sub.last_update: + response.append(obj.map_encode()) + else: + # obj._utime is millisec since utc. Convert to datetime + utcdt = datetime.datetime.utcfromtimestamp(obj._utime/1000.0) + if utcdt > sub.last_update: + response.append(obj.map_encode()) + + if response: + trace.debug("!!! %s publishing %s!!!" % (self.name, sub.correlation_id)) + self._send_query_response( ContentType.data, + sub.correlation_id, + sub.reply_to, + response) + sub.published(now) + + def _unpublish(self, sub): + """ This subscription is about to be deleted, remove it from any + referencing objects. + """ + objs = self._queryData(sub.query) + if objs: + for obj in objs: + if sub.id in obj._subscriptions: + del obj._subscriptions[sub.id] + + + + def _wake_thread(self): + """ + Make the agent management thread loop wakeup from its next_receiver + sleep. + """ + self._lock.acquire() + try: + if not self._noop_pending: + trace.debug("Sending noop to wake up [%s]" % self._address) + msg = Message(id=QMF_APP_ID, + subject=self.name, + properties={"method":"indication", + "qmf.opcode":OpCode.noop}, + content={}) + try: + self._direct_sender.send( msg, sync=True ) + self._noop_pending = True + except SendError, e: + log.error(str(e)) + finally: + self._lock.release() + + + ##============================================================================== + ## EXTERNAL DATABASE AGENT + ##============================================================================== + +class AgentExternal(Agent): + """ + An Agent which uses an external management database. + """ + def __init__(self, name, _domain=None, _notifier=None, + _heartbeat_interval=30, _max_msg_size=0, _capacity=10): + super(AgentExternal, self).__init__(name, _domain, _notifier, + _heartbeat_interval, + _max_msg_size, _capacity) + log.error("AgentExternal TBD") + + + + ##============================================================================== + ## DATA MODEL + ##============================================================================== + + +class QmfAgentData(QmfData): + """ + A managed data object that is owned by an agent. + """ + + def __init__(self, agent, _values={}, _subtypes={}, _tag=None, + _object_id=None, _schema=None): + schema_id = None + if _schema: + schema_id = _schema.get_class_id() + + if _object_id is None: + if not isinstance(_schema, SchemaObjectClass): + raise Exception("An object_id must be provided if the object" + "doesn't have an associated schema.") + ids = _schema.get_id_names() + if not ids: + raise Exception("Object must have an Id or a schema that" + " provides an Id") + _object_id = u"" + for key in ids: + value = _values.get(key) + if value is None: + raise Exception("Object must have a value for key" + " attribute '%s'" % str(key)) + try: + _object_id += unicode(value) + except: + raise Exception("Cannot create object_id from key" + " value '%s'" % str(value)) + + # timestamp in millisec since epoch UTC + ctime = long(time.time() * 1000) + super(QmfAgentData, self).__init__(_values=_values, _subtypes=_subtypes, + _tag=_tag, _ctime=ctime, + _utime=ctime, _object_id=_object_id, + _schema_id=schema_id, _const=False) + self._agent = agent + self._validated = False + self._modified = True + self._subscriptions = {} + + def destroy(self): + self._dtime = long(time.time() * 1000) + self._touch() + # @todo: publish change + + def is_deleted(self): + return self._dtime == 0 + + def set_value(self, _name, _value, _subType=None): + super(QmfAgentData, self).set_value(_name, _value, _subType) + self._utime = long(time.time() * 1000) + self._touch(_name) + # @todo: publish change + + def inc_value(self, name, delta=1): + """ add the delta to the property """ + # @todo: need to take write-lock + val = self.get_value(name) + try: + val += delta + except: + raise + self.set_value(name, val) + + def dec_value(self, name, delta=1): + """ subtract the delta from the property """ + # @todo: need to take write-lock + val = self.get_value(name) + try: + val -= delta + except: + raise + self.set_value(name, val) + + def validate(self): + """ + Compares this object's data against the associated schema. Throws an + exception if the data does not conform to the schema. + """ + props = self._schema.get_properties() + for name,val in props.iteritems(): + # @todo validate: type compatible with amqp_type? + # @todo validate: primary keys have values + if name not in self._values: + if val._isOptional: + # ok not to be present, put in dummy value + # to simplify access + self._values[name] = None + else: + raise Exception("Required property '%s' not present." % name) + self._validated = True + + def _touch(self, field=None): + """ + Mark this object as modified. Used to force a publish of this object + if on subscription. + """ + now = datetime.datetime.utcnow() + publish = False + if field: + # if the named field is not continuous, mark any subscriptions as + # needing to be published. + sid = self.get_schema_class_id() + if sid: + self._agent._lock.acquire() + try: + schema = self._agent._schema.get(sid) + if schema: + prop = schema.get_property(field) + if prop and not prop.is_continuous(): + for sid,sub in self._subscriptions.iteritems(): + sub.next_update = now + publish = True + if publish: + self._agent._next_subscribe_event = None + self._agent._wake_thread() + finally: + self._agent._lock.release() + + + +################################################################################ +################################################################################ +################################################################################ +################################################################################ + +if __name__ == '__main__': + # static test cases - no message passing, just exercise API + import logging + from common import (AgentName, SchemaProperty, qmfTypes, SchemaEventClass) + + logging.getLogger().setLevel(logging.INFO) + + logging.info( "Create an Agent" ) + _agent_name = AgentName("redhat.com", "agent", "tross") + _agent = Agent(str(_agent_name)) + + logging.info( "Get agent name: '%s'" % _agent.get_name()) + + logging.info( "Create SchemaObjectClass" ) + + _schema = SchemaObjectClass(SchemaClassId("MyPackage", "MyClass"), + _desc="A test data schema", + _object_id_names=["index1", "index2"]) + # add properties + _schema.add_property("index1", SchemaProperty(qmfTypes.TYPE_UINT8)) + _schema.add_property("index2", SchemaProperty(qmfTypes.TYPE_LSTR)) + + # these two properties are statistics + _schema.add_property("query_count", SchemaProperty(qmfTypes.TYPE_UINT32)) + _schema.add_property("method_call_count", SchemaProperty(qmfTypes.TYPE_UINT32)) + # These two properties can be set via the method call + _schema.add_property("set_string", SchemaProperty(qmfTypes.TYPE_LSTR)) + _schema.add_property("set_int", SchemaProperty(qmfTypes.TYPE_UINT32)) + + # add method + _meth = SchemaMethod(_desc="Method to set string and int in object." ) + _meth.add_argument( "arg_int", SchemaProperty(qmfTypes.TYPE_UINT32) ) + _meth.add_argument( "arg_str", SchemaProperty(qmfTypes.TYPE_LSTR) ) + _schema.add_method( "set_meth", _meth ) + + # Add schema to Agent + + print("Schema Map='%s'" % str(_schema.map_encode())) + + _agent.register_object_class(_schema) + + # instantiate managed data objects matching the schema + + logging.info( "Create QmfAgentData" ) + + _obj = QmfAgentData( _agent, _schema=_schema ) + _obj.set_value("index1", 100) + _obj.set_value("index2", "a name" ) + _obj.set_value("set_string", "UNSET") + _obj.set_value("set_int", 0) + _obj.set_value("query_count", 0) + _obj.set_value("method_call_count", 0) + + print("Obj1 Map='%s'" % str(_obj.map_encode())) + + _agent.add_object( _obj ) + + _obj = QmfAgentData( _agent, + _values={"index1":99, + "index2": "another name", + "set_string": "UNSET", + "set_int": 0, + "query_count": 0, + "method_call_count": 0}, + _schema=_schema) + + print("Obj2 Map='%s'" % str(_obj.map_encode())) + + _agent.add_object(_obj) + + ############## + + + + logging.info( "Create SchemaEventClass" ) + + _event = SchemaEventClass(SchemaClassId("MyPackage", "MyEvent", + stype=SchemaClassId.TYPE_EVENT), + _desc="A test data schema", + _props={"edata_1": SchemaProperty(qmfTypes.TYPE_UINT32)}) + _event.add_property("edata_2", SchemaProperty(qmfTypes.TYPE_LSTR)) + + print("Event Map='%s'" % str(_event.map_encode())) + + _agent.register_event_class(_event) |