summaryrefslogtreecommitdiff
path: root/qpid/extras/qmf/src/py/qmf2-prototype/agent.py
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/extras/qmf/src/py/qmf2-prototype/agent.py')
-rw-r--r--qpid/extras/qmf/src/py/qmf2-prototype/agent.py1380
1 files changed, 1380 insertions, 0 deletions
diff --git a/qpid/extras/qmf/src/py/qmf2-prototype/agent.py b/qpid/extras/qmf/src/py/qmf2-prototype/agent.py
new file mode 100644
index 0000000000..4ec00bd288
--- /dev/null
+++ b/qpid/extras/qmf/src/py/qmf2-prototype/agent.py
@@ -0,0 +1,1380 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import sys
+import datetime
+import time
+import Queue
+from logging import getLogger
+from threading import Thread, RLock, currentThread, Event
+from qpid.messaging import Connection, Message, Empty, SendError
+from uuid import uuid4
+from common import (OpCode, QmfQuery, ContentType, SchemaObjectClass,
+ QmfData, QmfAddress, SchemaClass, SchemaClassId, WorkItem,
+ SchemaMethod, timedelta_to_secs, QMF_APP_ID)
+
+# global flag that indicates which thread (if any) is
+# running the agent notifier callback
+_callback_thread=None
+
+log = getLogger("qmf")
+trace = getLogger("qmf.agent")
+
+
+ ##==============================================================================
+ ## METHOD CALL
+ ##==============================================================================
+
+class _MethodCallHandle(object):
+ """
+ Private class used to hold context when handing off a method call to the
+ application. Given to the app in a WorkItem, provided to the agent when
+ method_response() is invoked.
+ """
+ def __init__(self, correlation_id, reply_to, meth_name, _oid=None,
+ _schema_id=None):
+ self.correlation_id = correlation_id
+ self.reply_to = reply_to
+ self.meth_name = meth_name
+ self.oid = _oid
+ self.schema_id = _schema_id
+
+class MethodCallParams(object):
+ """
+ """
+ def __init__(self, name, _oid=None, _schema_id=None, _in_args=None,
+ _user_id=None):
+ self._meth_name = name
+ self._oid = _oid
+ self._schema_id = _schema_id
+ self._in_args = _in_args
+ self._user_id = _user_id
+
+ def get_name(self):
+ return self._meth_name
+
+ def get_object_id(self):
+ return self._oid
+
+ def get_schema_id(self):
+ return self._schema_id
+
+ def get_args(self):
+ return self._in_args
+
+ def get_user_id(self):
+ return self._user_id
+
+
+ ##==============================================================================
+ ## SUBSCRIPTIONS
+ ##==============================================================================
+
+
+class _SubscriptionState(object):
+ """
+ An internally-managed subscription.
+ """
+ def __init__(self, reply_to, cid, query, interval, duration):
+ self.reply_to = reply_to
+ self.correlation_id = cid
+ self.query = query
+ self.interval = interval
+ self.duration = duration
+ now = datetime.datetime.utcnow()
+ self.next_update = now # do an immediate update
+ self.expiration = now + datetime.timedelta(seconds=duration)
+ self.last_update = None
+ self.id = 0
+
+ def resubscribe(self, now, _duration=None):
+ if _duration is not None:
+ self.duration = _duration
+ self.expiration = now + datetime.timedelta(seconds=self.duration)
+
+ def published(self, now):
+ self.next_update = now + datetime.timedelta(seconds=self.interval)
+ self.last_update = now
+
+
+ ##==============================================================================
+ ## AGENT
+ ##==============================================================================
+
+class Agent(Thread):
+ def __init__(self, name, _domain=None, _notifier=None, **options):
+ Thread.__init__(self)
+ self._running = False
+ self._ready = Event()
+
+ self.name = str(name)
+ self._domain = _domain
+ self._address = QmfAddress.direct(self.name, self._domain)
+ self._notifier = _notifier
+
+ # configurable parameters
+ #
+ self._heartbeat_interval = options.get("heartbeat_interval", 30)
+ self._capacity = options.get("capacity", 10)
+ self._default_duration = options.get("default_duration", 300)
+ self._max_duration = options.get("max_duration", 3600)
+ self._min_duration = options.get("min_duration", 10)
+ self._default_interval = options.get("default_interval", 30)
+ self._min_interval = options.get("min_interval", 5)
+
+ # @todo: currently, max # of objects in a single reply message, would
+ # be better if it were max bytesize of per-msg content...
+ self._max_msg_size = options.get("max_msg_size", 0)
+
+ self._conn = None
+ self._session = None
+ self._direct_receiver = None
+ self._topic_receiver = None
+ self._direct_sender = None
+ self._topic_sender = None
+
+ self._lock = RLock()
+ self._packages = {}
+ self._schema_timestamp = long(0)
+ self._schema = {}
+ # _described_data holds QmfData objects that are associated with schema
+ # it is index by schema_id, object_id
+ self._described_data = {}
+ # _undescribed_data holds unstructured QmfData objects - these objects
+ # have no schema. it is indexed by object_id only.
+ self._undescribed_data = {}
+ self._work_q = Queue.Queue()
+ self._work_q_put = False
+ # subscriptions
+ self._subscription_id = long(time.time())
+ self._subscriptions = {}
+ self._next_subscribe_event = None
+
+ # prevents multiple _wake_thread() calls
+ self._noop_pending = False
+
+
+ def destroy(self, timeout=None):
+ """
+ Must be called before the Agent is deleted.
+ Frees up all resources and shuts down all background threads.
+
+ @type timeout: float
+ @param timeout: maximum time in seconds to wait for all background threads to terminate. Default: forever.
+ """
+ trace.debug("Destroying Agent %s" % self.name)
+ if self._conn:
+ self.remove_connection(timeout)
+ trace.debug("Agent Destroyed")
+
+
+ def get_name(self):
+ return self.name
+
+ def set_connection(self, conn):
+ self._conn = conn
+ self._session = self._conn.session()
+
+ # for messages directly addressed to me
+ self._direct_receiver = self._session.receiver(str(self._address) +
+ ";{create:always,"
+ " node:"
+ " {type:topic,"
+ " x-declare:"
+ " {type:direct}}}",
+ capacity=self._capacity)
+ trace.debug("my direct addr=%s" % self._direct_receiver.source)
+
+ # for sending directly addressed messages.
+ self._direct_sender = self._session.sender(str(self._address.get_node()) +
+ ";{create:always,"
+ " node:"
+ " {type:topic,"
+ " x-declare:"
+ " {type:direct}}}")
+ trace.debug("my default direct send addr=%s" % self._direct_sender.target)
+
+ # for receiving "broadcast" messages from consoles
+ default_addr = QmfAddress.topic(QmfAddress.SUBJECT_CONSOLE_IND + ".#",
+ self._domain)
+ self._topic_receiver = self._session.receiver(str(default_addr) +
+ ";{create:always,"
+ " node:"
+ " {type:topic}}",
+ capacity=self._capacity)
+ trace.debug("console.ind addr=%s" % self._topic_receiver.source)
+
+ # for sending to topic subscribers
+ ind_addr = QmfAddress.topic(QmfAddress.SUBJECT_AGENT_IND,
+ self._domain)
+ self._topic_sender = self._session.sender(str(ind_addr) +
+ ";{create:always,"
+ " node:"
+ " {type:topic}}")
+ trace.debug("agent.ind addr=%s" % self._topic_sender.target)
+
+ self._running = True
+ self.start()
+ self._ready.wait(10)
+ if not self._ready.isSet():
+ raise Exception("Agent managment thread failed to start.")
+
+ def remove_connection(self, timeout=None):
+ # tell connection thread to shutdown
+ self._running = False
+ if self.isAlive():
+ # kick my thread to wake it up
+ self._wake_thread()
+ trace.debug("waiting for agent receiver thread to exit")
+ self.join(timeout)
+ if self.isAlive():
+ log.error( "Agent thread '%s' is hung..." % self.name)
+ self._direct_receiver.close()
+ self._direct_receiver = None
+ self._direct_sender.close()
+ self._direct_sender = None
+ self._topic_receiver.close()
+ self._topic_receiver = None
+ self._topic_sender.close()
+ self._topic_sender = None
+ self._session.close()
+ self._session = None
+ self._conn = None
+ trace.debug("agent connection removal complete")
+
+ def register_object_class(self, schema):
+ """
+ Register an instance of a SchemaClass with this agent
+ """
+ # @todo: need to update subscriptions
+ # @todo: need to mark schema as "non-const"
+ if not isinstance(schema, SchemaClass):
+ raise TypeError("SchemaClass instance expected")
+
+ classId = schema.get_class_id()
+ pname = classId.get_package_name()
+ cname = classId.get_class_name()
+ hstr = classId.get_hash_string()
+ if not hstr:
+ raise Exception("Schema hash is not set.")
+
+ self._lock.acquire()
+ try:
+ if pname not in self._packages:
+ self._packages[pname] = [cname]
+ else:
+ if cname not in self._packages[pname]:
+ self._packages[pname].append(cname)
+ self._schema[classId] = schema
+ self._schema_timestamp = long(time.time() * 1000)
+ finally:
+ self._lock.release()
+
+ def register_event_class(self, schema):
+ return self.register_object_class(schema)
+
+ def raise_event(self, qmfEvent):
+ """
+ TBD
+ """
+ if not self._topic_sender:
+ raise Exception("No connection available")
+
+ # @todo: should we validate against the schema?
+ msg = Message(id=QMF_APP_ID,
+ subject=QmfAddress.SUBJECT_AGENT_EVENT + "." +
+ qmfEvent.get_severity() + "." + self.name,
+ properties={"method":"indication",
+ "qmf.opcode":OpCode.data_ind,
+ "qmf.content": ContentType.event,
+ "qmf.agent":self.name},
+ content=[qmfEvent.map_encode()])
+ # TRACE
+ # log.error("!!! Agent %s sending Event (%s)" %
+ # (self.name, str(msg)))
+ self._topic_sender.send(msg)
+
+ def add_object(self, data):
+ """
+ Register an instance of a QmfAgentData object.
+ """
+ # @todo: need to mark schema as "non-const"
+ if not isinstance(data, QmfAgentData):
+ raise TypeError("QmfAgentData instance expected")
+
+ oid = data.get_object_id()
+ if not oid:
+ raise TypeError("No identifier assigned to QmfAgentData!")
+
+ sid = data.get_schema_class_id()
+
+ self._lock.acquire()
+ try:
+ if sid:
+ if sid not in self._described_data:
+ self._described_data[sid] = {oid: data}
+ else:
+ self._described_data[sid][oid] = data
+ else:
+ self._undescribed_data[oid] = data
+
+ # does the new object match any subscriptions?
+ now = datetime.datetime.utcnow()
+ for sid,sub in self._subscriptions.iteritems():
+ if sub.query.evaluate(data):
+ # matched. Mark the subscription as needing to be
+ # serviced. The _publish() method will notice the new
+ # object and will publish it next time it runs.
+ sub.next_update = now
+ self._next_subscribe_event = None
+ # @todo: should we immediately publish?
+
+ finally:
+ self._lock.release()
+
+ def get_object(self, oid, schema_id):
+ data = None
+ self._lock.acquire()
+ try:
+ if schema_id:
+ data = self._described_data.get(schema_id)
+ if data:
+ data = data.get(oid)
+ else:
+ data = self._undescribed_data.get(oid)
+ finally:
+ self._lock.release()
+ return data
+
+
+ def method_response(self, handle, _out_args=None, _error=None):
+ """
+ """
+ if not isinstance(handle, _MethodCallHandle):
+ raise TypeError("Invalid handle passed to method_response!")
+
+ _map = {SchemaMethod.KEY_NAME:handle.meth_name}
+ if handle.oid is not None:
+ _map[QmfData.KEY_OBJECT_ID] = handle.oid
+ if handle.schema_id is not None:
+ _map[QmfData.KEY_SCHEMA_ID] = handle.schema_id.map_encode()
+ if _out_args is not None:
+ _map[SchemaMethod.KEY_ARGUMENTS] = _out_args.copy()
+ if _error is not None:
+ if not isinstance(_error, QmfData):
+ raise TypeError("Invalid type for error - must be QmfData")
+ _map[SchemaMethod.KEY_ERROR] = _error.map_encode()
+
+ msg = Message(id=QMF_APP_ID,
+ properties={"method":"response",
+ "qmf.opcode":OpCode.method_rsp},
+ content=_map)
+ msg.correlation_id = handle.correlation_id
+
+ self._send_reply(msg, handle.reply_to)
+
+ def get_workitem_count(self):
+ """
+ Returns the count of pending WorkItems that can be retrieved.
+ """
+ return self._work_q.qsize()
+
+ def get_next_workitem(self, timeout=None):
+ """
+ Obtains the next pending work item, or None if none available.
+ """
+ try:
+ wi = self._work_q.get(True, timeout)
+ except Queue.Empty:
+ return None
+ return wi
+
+ def release_workitem(self, wi):
+ """
+ Releases a WorkItem instance obtained by getNextWorkItem(). Called when
+ the application has finished processing the WorkItem.
+ """
+ pass
+
+
+ def run(self):
+ global _callback_thread
+ next_heartbeat = datetime.datetime.utcnow()
+ batch_limit = 10 # a guess
+
+ self._ready.set()
+
+ while self._running:
+
+ #
+ # Process inbound messages
+ #
+ trace.debug("%s processing inbound messages..." % self.name)
+ for i in range(batch_limit):
+ try:
+ msg = self._topic_receiver.fetch(timeout=0)
+ except Empty:
+ break
+ # TRACE
+ # log.error("!!! Agent %s: msg on %s [%s]" %
+ # (self.name, self._topic_receiver.source, msg))
+ self._dispatch(msg, _direct=False)
+
+ for i in range(batch_limit):
+ try:
+ msg = self._direct_receiver.fetch(timeout=0)
+ except Empty:
+ break
+ # TRACE
+ # log.error("!!! Agent %s: msg on %s [%s]" %
+ # (self.name, self._direct_receiver.source, msg))
+ self._dispatch(msg, _direct=True)
+
+ #
+ # Send Heartbeat Notification
+ #
+ now = datetime.datetime.utcnow()
+ if now >= next_heartbeat:
+ trace.debug("%s sending heartbeat..." % self.name)
+ ind = Message(id=QMF_APP_ID,
+ subject=QmfAddress.SUBJECT_AGENT_HEARTBEAT,
+ properties={"method":"indication",
+ "qmf.opcode":OpCode.agent_heartbeat_ind,
+ "qmf.agent":self.name},
+ content=self._makeAgentInfoBody())
+ # TRACE
+ #log.error("!!! Agent %s sending Heartbeat (%s)" %
+ # (self.name, str(ind)))
+ self._topic_sender.send(ind)
+ trace.debug("Agent Indication Sent")
+ next_heartbeat = now + datetime.timedelta(seconds = self._heartbeat_interval)
+
+ #
+ # Monitor Subscriptions
+ #
+ self._lock.acquire()
+ try:
+ now = datetime.datetime.utcnow()
+ if (self._next_subscribe_event is None or
+ now >= self._next_subscribe_event):
+ trace.debug("%s polling subscriptions..." % self.name)
+ self._next_subscribe_event = now + datetime.timedelta(seconds=
+ self._max_duration)
+ dead_ss = {}
+ for sid,ss in self._subscriptions.iteritems():
+ if now >= ss.expiration:
+ dead_ss[sid] = ss
+ continue
+ if now >= ss.next_update:
+ self._publish(ss)
+ next_timeout = min(ss.expiration, ss.next_update)
+ if next_timeout < self._next_subscribe_event:
+ self._next_subscribe_event = next_timeout
+
+ for sid,ss in dead_ss.iteritems():
+ del self._subscriptions[sid]
+ self._unpublish(ss)
+ finally:
+ self._lock.release()
+
+ #
+ # notify application of pending WorkItems
+ #
+ if self._work_q_put and self._notifier:
+ trace.debug("%s notifying application..." % self.name)
+ # new stuff on work queue, kick the the application...
+ self._work_q_put = False
+ _callback_thread = currentThread()
+ trace.debug("Calling agent notifier.indication")
+ self._notifier.indication()
+ _callback_thread = None
+
+ #
+ # Sleep until messages arrive or something times out
+ #
+ now = datetime.datetime.utcnow()
+ next_timeout = next_heartbeat
+ self._lock.acquire()
+ try:
+ # the mailbox expire flag may be cleared by the
+ # app thread(s) in order to force an immediate publish
+ if self._next_subscribe_event is None:
+ next_timeout = now
+ elif self._next_subscribe_event < next_timeout:
+ next_timeout = self._next_subscribe_event
+ finally:
+ self._lock.release()
+
+ timeout = timedelta_to_secs(next_timeout - now)
+
+ if self._running and timeout > 0.0:
+ trace.debug("%s sleeping %s seconds..." % (self.name,
+ timeout))
+ try:
+ self._session.next_receiver(timeout=timeout)
+ except Empty:
+ pass
+
+
+ trace.debug("Shutting down Agent %s thread" % self.name)
+
+ #
+ # Private:
+ #
+
+ def _makeAgentInfoBody(self):
+ """
+ Create an agent indication message body identifying this agent
+ """
+ return QmfData.create({"_name": self.get_name(),
+ "_schema_timestamp": self._schema_timestamp}).map_encode()
+
+ def _send_reply(self, msg, reply_to):
+ """
+ Send a reply message to the given reply_to address
+ """
+ if not isinstance(reply_to, QmfAddress):
+ try:
+ reply_to = QmfAddress.from_string(str(reply_to))
+ except ValueError:
+ log.error("Invalid reply-to address '%s'" % reply_to)
+
+ msg.subject = reply_to.get_subject()
+
+ try:
+ if reply_to.is_direct():
+ # TRACE
+ #log.error("!!! Agent %s direct REPLY-To:%s (%s)" %
+ # (self.name, str(reply_to), str(msg)))
+ self._direct_sender.send(msg)
+ else:
+ # TRACE
+ # log.error("!!! Agent %s topic REPLY-To:%s (%s)" %
+ # (self.name, str(reply_to), str(msg)))
+ self._topic_sender.send(msg)
+ trace.debug("reply msg sent to [%s]" % str(reply_to))
+ except SendError, e:
+ log.error("Failed to send reply msg '%s' (%s)" % (msg, str(e)))
+
+ def _send_query_response(self, content_type, cid, reply_to, objects):
+ """
+ Send a response to a query, breaking the result into multiple
+ messages based on the agent's _max_msg_size config parameter
+ """
+
+ total = len(objects)
+ if self._max_msg_size:
+ max_count = self._max_msg_size
+ else:
+ max_count = total
+
+ start = 0
+ end = min(total, max_count)
+ # send partial response if too many objects present
+ while end < total:
+ m = Message(id=QMF_APP_ID,
+ properties={"method":"response",
+ "partial":None,
+ "qmf.opcode":OpCode.data_ind,
+ "qmf.content":content_type,
+ "qmf.agent":self.name},
+ correlation_id = cid,
+ content=objects[start:end])
+ self._send_reply(m, reply_to)
+ start = end
+ end = min(total, end + max_count)
+
+ m = Message(id=QMF_APP_ID,
+ properties={"method":"response",
+ "qmf.opcode":OpCode.data_ind,
+ "qmf.content":content_type,
+ "qmf.agent":self.name},
+ correlation_id = cid,
+ content=objects[start:end])
+ self._send_reply(m, reply_to)
+
+ def _dispatch(self, msg, _direct=False):
+ """
+ Process a message from a console.
+
+ @param _direct: True if msg directly addressed to this agent.
+ """
+ trace.debug( "Message received from Console! [%s]" % msg )
+
+ opcode = msg.properties.get("qmf.opcode")
+ if not opcode:
+ log.warning("Ignoring unrecognized message '%s'" % msg)
+ return
+ version = 2 # @todo: fix me
+ cmap = {}; props={}
+ if msg.content_type == "amqp/map":
+ cmap = msg.content
+ if msg.properties:
+ props = msg.properties
+
+ if opcode == OpCode.agent_locate_req:
+ self._handleAgentLocateMsg( msg, cmap, props, version, _direct )
+ elif opcode == OpCode.query_req:
+ self._handleQueryMsg( msg, cmap, props, version, _direct )
+ elif opcode == OpCode.method_req:
+ self._handleMethodReqMsg(msg, cmap, props, version, _direct)
+ elif opcode == OpCode.subscribe_req:
+ self._handleSubscribeReqMsg(msg, cmap, props, version, _direct)
+ elif opcode == OpCode.subscribe_refresh_ind:
+ self._handleResubscribeReqMsg(msg, cmap, props, version, _direct)
+ elif opcode == OpCode.subscribe_cancel_ind:
+ self._handleUnsubscribeReqMsg(msg, cmap, props, version, _direct)
+ elif opcode == OpCode.noop:
+ self._noop_pending = False
+ trace.debug("No-op msg received.")
+ else:
+ log.warning("Ignoring message with unrecognized 'opcode' value: '%s'"
+ % opcode)
+
+ def _handleAgentLocateMsg( self, msg, cmap, props, version, direct ):
+ """
+ Process a received agent-locate message
+ """
+ trace.debug("_handleAgentLocateMsg")
+
+ reply = False
+ if props.get("method") == "request":
+ # if the message is addressed to me or wildcard, process it
+ if (msg.subject == "console.ind" or
+ msg.subject == "console.ind.locate" or
+ msg.subject == "console.ind.locate." + self.name):
+ pred = msg.content
+ if not pred:
+ reply = True
+ elif isinstance(pred, type([])):
+ # fake a QmfData containing my identifier for the query compare
+ query = QmfQuery.create_predicate(QmfQuery.TARGET_AGENT, pred)
+ tmpData = QmfData.create({QmfQuery.KEY_AGENT_NAME:
+ self.get_name()},
+ _object_id="my-name")
+ reply = query.evaluate(tmpData)
+
+ if reply:
+ m = Message(id=QMF_APP_ID,
+ properties={"method":"response",
+ "qmf.opcode":OpCode.agent_locate_rsp},
+ content=self._makeAgentInfoBody())
+ m.correlation_id = msg.correlation_id
+ self._send_reply(m, msg.reply_to)
+ else:
+ trace.debug("agent-locate msg not mine - no reply sent")
+
+
+ def _handleQueryMsg(self, msg, cmap, props, version, _direct ):
+ """
+ Handle received query message
+ """
+ trace.debug("_handleQueryMsg")
+
+ if "method" in props and props["method"] == "request":
+ if cmap:
+ try:
+ query = QmfQuery.from_map(cmap)
+ except TypeError:
+ log.error("Invalid Query format: '%s'" % str(cmap))
+ return
+ target = query.get_target()
+ if target == QmfQuery.TARGET_PACKAGES:
+ self._queryPackagesReply( msg, query )
+ elif target == QmfQuery.TARGET_SCHEMA_ID:
+ self._querySchemaReply( msg, query, _idOnly=True )
+ elif target == QmfQuery.TARGET_SCHEMA:
+ self._querySchemaReply( msg, query)
+ elif target == QmfQuery.TARGET_AGENT:
+ log.warning("!!! @todo: Query TARGET=AGENT TBD !!!")
+ elif target == QmfQuery.TARGET_OBJECT_ID:
+ self._queryDataReply(msg, query, _idOnly=True)
+ elif target == QmfQuery.TARGET_OBJECT:
+ self._queryDataReply(msg, query)
+ else:
+ log.warning("Unrecognized query target: '%s'" % str(target))
+
+
+
+ def _handleMethodReqMsg(self, msg, cmap, props, version, _direct):
+ """
+ Process received Method Request
+ """
+ if "method" in props and props["method"] == "request":
+ mname = cmap.get(SchemaMethod.KEY_NAME)
+ if not mname:
+ log.warning("Invalid method call from '%s': no name"
+ % msg.reply_to)
+ return
+
+ in_args = cmap.get(SchemaMethod.KEY_ARGUMENTS)
+ oid = cmap.get(QmfData.KEY_OBJECT_ID)
+ schema_id = cmap.get(QmfData.KEY_SCHEMA_ID)
+ if schema_id:
+ schema_id = SchemaClassId.from_map(schema_id)
+ handle = _MethodCallHandle(msg.correlation_id,
+ msg.reply_to,
+ mname,
+ oid, schema_id)
+ param = MethodCallParams( mname, oid, schema_id, in_args,
+ msg.user_id)
+
+ # @todo: validate the method against the schema:
+ # if self._schema:
+ # # validate
+ # _in_args = _in_args.copy()
+ # ms = self._schema.get_method(name)
+ # if ms is None:
+ # raise ValueError("Method '%s' is undefined." % name)
+
+ # for aname,prop in ms.get_arguments().iteritems():
+ # if aname not in _in_args:
+ # if prop.get_default():
+ # _in_args[aname] = prop.get_default()
+ # elif not prop.is_optional():
+ # raise ValueError("Method '%s' requires argument '%s'"
+ # % (name, aname))
+ # for aname in _in_args.iterkeys():
+ # prop = ms.get_argument(aname)
+ # if prop is None:
+ # raise ValueError("Method '%s' does not define argument"
+ # " '%s'" % (name, aname))
+ # if "I" not in prop.get_direction():
+ # raise ValueError("Method '%s' argument '%s' is not an"
+ # " input." % (name, aname))
+
+ # # @todo check if value is correct (type, range, etc)
+
+ self._work_q.put(WorkItem(WorkItem.METHOD_CALL, handle, param))
+ self._work_q_put = True
+
+ def _handleSubscribeReqMsg(self, msg, cmap, props, version, _direct):
+ """
+ Process received Subscription Request
+ """
+ if "method" in props and props["method"] == "request":
+ query_map = cmap.get("_query")
+ interval = cmap.get("_interval")
+ duration = cmap.get("_duration")
+
+ try:
+ query = QmfQuery.from_map(query_map)
+ except TypeError:
+ log.warning("Invalid query for subscription: %s" %
+ str(query_map))
+ return
+
+ if isinstance(self, AgentExternal):
+ # param = SubscriptionParams(_ConsoleHandle(console_handle,
+ # msg.reply_to),
+ # query,
+ # interval,
+ # duration,
+ # msg.user_id)
+ # self._work_q.put(WorkItem(WorkItem.SUBSCRIBE_REQUEST,
+ # msg.correlation_id, param))
+ # self._work_q_put = True
+ log.error("External Subscription TBD")
+ return
+
+ # validate the query - only specific objects, or
+ # objects wildcard, are currently supported.
+ if (query.get_target() != QmfQuery.TARGET_OBJECT or
+ (query.get_selector() == QmfQuery.PREDICATE and
+ query.get_predicate())):
+ log.error("Subscriptions only support (wildcard) Object"
+ " Queries.")
+ err = QmfData.create(
+ {"reason": "Unsupported Query type for subscription.",
+ "query": str(query.map_encode())})
+ m = Message(id=QMF_APP_ID,
+ properties={"method":"response",
+ "qmf.opcode":OpCode.subscribe_rsp},
+ correlation_id = msg.correlation_id,
+ content={"_error": err.map_encode()})
+ self._send_reply(m, msg.reply_to)
+ return
+
+ if duration is None:
+ duration = self._default_duration
+ else:
+ try:
+ duration = float(duration)
+ if duration > self._max_duration:
+ duration = self._max_duration
+ elif duration < self._min_duration:
+ duration = self._min_duration
+ except:
+ log.warning("Bad duration value: %s" % str(msg))
+ duration = self._default_duration
+
+ if interval is None:
+ interval = self._default_interval
+ else:
+ try:
+ interval = float(interval)
+ if interval < self._min_interval:
+ interval = self._min_interval
+ except:
+ log.warning("Bad interval value: %s" % str(msg))
+ interval = self._default_interval
+
+ ss = _SubscriptionState(msg.reply_to,
+ msg.correlation_id,
+ query,
+ interval,
+ duration)
+ self._lock.acquire()
+ try:
+ sid = self._subscription_id
+ self._subscription_id += 1
+ ss.id = sid
+ self._subscriptions[sid] = ss
+ self._next_subscribe_event = None
+ finally:
+ self._lock.release()
+
+ sr_map = {"_subscription_id": sid,
+ "_interval": interval,
+ "_duration": duration}
+ m = Message(id=QMF_APP_ID,
+ properties={"method":"response",
+ "qmf.opcode":OpCode.subscribe_rsp},
+ correlation_id = msg.correlation_id,
+ content=sr_map)
+ self._send_reply(m, msg.reply_to)
+
+
+
+ def _handleResubscribeReqMsg(self, msg, cmap, props, version, _direct):
+ """
+ Process received Renew Subscription Request
+ """
+ if props.get("method") == "request":
+ sid = cmap.get("_subscription_id")
+ if not sid:
+ log.error("Invalid subscription refresh msg: %s" %
+ str(msg))
+ return
+
+ self._lock.acquire()
+ try:
+ ss = self._subscriptions.get(sid)
+ if not ss:
+ log.error("Ignoring unknown subscription: %s" %
+ str(sid))
+ return
+ duration = cmap.get("_duration")
+ if duration is not None:
+ try:
+ duration = float(duration)
+ if duration > self._max_duration:
+ duration = self._max_duration
+ elif duration < self._min_duration:
+ duration = self._min_duration
+ except:
+ log.error("Bad duration value: %s" % str(msg))
+ duration = None # use existing duration
+
+ ss.resubscribe(datetime.datetime.utcnow(), duration)
+
+ new_duration = ss.duration
+ new_interval = ss.interval
+
+ finally:
+ self._lock.release()
+
+
+ sr_map = {"_subscription_id": sid,
+ "_interval": new_interval,
+ "_duration": new_duration}
+ m = Message(id=QMF_APP_ID,
+ properties={"method":"response",
+ "qmf.opcode":OpCode.subscribe_rsp},
+ correlation_id = msg.correlation_id,
+ content=sr_map)
+ self._send_reply(m, msg.reply_to)
+
+
+ def _handleUnsubscribeReqMsg(self, msg, cmap, props, version, _direct):
+ """
+ Process received Cancel Subscription Request
+ """
+ if props.get("method") == "request":
+ sid = cmap.get("_subscription_id")
+ if not sid:
+ log.warning("No subscription id supplied: %s" % msg)
+ return
+
+ self._lock.acquire()
+ try:
+ if sid in self._subscriptions:
+ dead_sub = self._subscriptions[sid]
+ del self._subscriptions[sid]
+ finally:
+ self._lock.release()
+
+ self._unpublish(dead_sub)
+
+
+ def _queryPackagesReply(self, msg, query):
+ """
+ Run a query against the list of known packages
+ """
+ pnames = []
+ self._lock.acquire()
+ try:
+ for name in self._packages.iterkeys():
+ qmfData = QmfData.create({SchemaClassId.KEY_PACKAGE:name},
+ _object_id="_package")
+ if query.evaluate(qmfData):
+ pnames.append(name)
+
+ self._send_query_response(ContentType.schema_package,
+ msg.correlation_id,
+ msg.reply_to,
+ pnames)
+ finally:
+ self._lock.release()
+
+
+ def _querySchemaReply( self, msg, query, _idOnly=False ):
+ """
+ """
+ schemas = []
+
+ self._lock.acquire()
+ try:
+ # if querying for a specific schema, do a direct lookup
+ if query.get_selector() == QmfQuery.ID:
+ found = self._schema.get(query.get_id())
+ if found:
+ if _idOnly:
+ schemas.append(query.get_id().map_encode())
+ else:
+ schemas.append(found.map_encode())
+ else: # otherwise, evaluate all schema
+ for sid,val in self._schema.iteritems():
+ if query.evaluate(val):
+ if _idOnly:
+ schemas.append(sid.map_encode())
+ else:
+ schemas.append(val.map_encode())
+ if _idOnly:
+ msgkey = ContentType.schema_id
+ else:
+ msgkey = ContentType.schema_class
+
+ self._send_query_response(msgkey,
+ msg.correlation_id,
+ msg.reply_to,
+ schemas)
+ finally:
+ self._lock.release()
+
+
+ def _queryDataReply( self, msg, query, _idOnly=False ):
+ """
+ """
+ # hold the (recursive) lock for the duration so the Agent
+ # won't send data that is currently being modified by the
+ # app.
+ self._lock.acquire()
+ try:
+ response = []
+ data_objs = self._queryData(query)
+ if _idOnly:
+ for obj in data_objs:
+ response.append(obj.get_object_id())
+ else:
+ for obj in data_objs:
+ response.append(obj.map_encode())
+
+ if _idOnly:
+ msgkey = ContentType.object_id
+ else:
+ msgkey = ContentType.data
+
+ self._send_query_response(msgkey,
+ msg.correlation_id,
+ msg.reply_to,
+ response)
+ finally:
+ self._lock.release()
+
+
+ def _queryData(self, query):
+ """
+ Return a list of QmfData objects that match a given query
+ """
+ data_objs = []
+ # extract optional schema_id from target params
+ sid = None
+ t_params = query.get_target_param()
+ if t_params:
+ sid = t_params.get(QmfData.KEY_SCHEMA_ID)
+
+ self._lock.acquire()
+ try:
+ # if querying for a specific object, do a direct lookup
+ if query.get_selector() == QmfQuery.ID:
+ oid = query.get_id()
+ if sid and not sid.get_hash_string():
+ # wildcard schema_id match, check each schema
+ for name,db in self._described_data.iteritems():
+ if (name.get_class_name() == sid.get_class_name()
+ and name.get_package_name() == sid.get_package_name()):
+ found = db.get(oid)
+ if found:
+ data_objs.append(found)
+ else:
+ found = None
+ if sid:
+ db = self._described_data.get(sid)
+ if db:
+ found = db.get(oid)
+ else:
+ found = self._undescribed_data.get(oid)
+ if found:
+ data_objs.append(found)
+
+ else: # otherwise, evaluate all data
+ if sid and not sid.get_hash_string():
+ # wildcard schema_id match, check each schema
+ for name,db in self._described_data.iteritems():
+ if (name.get_class_name() == sid.get_class_name()
+ and name.get_package_name() == sid.get_package_name()):
+ for oid,data in db.iteritems():
+ if query.evaluate(data):
+ data_objs.append(data)
+ else:
+ if sid:
+ db = self._described_data.get(sid)
+ else:
+ db = self._undescribed_data
+
+ if db:
+ for oid,data in db.iteritems():
+ if query.evaluate(data):
+ data_objs.append(data)
+ finally:
+ self._lock.release()
+
+ return data_objs
+
+ def _publish(self, sub):
+ """ Publish a subscription.
+ """
+ response = []
+ now = datetime.datetime.utcnow()
+ objs = self._queryData(sub.query)
+ if objs:
+ for obj in objs:
+ if sub.id not in obj._subscriptions:
+ # new to subscription - publish it
+ obj._subscriptions[sub.id] = sub
+ response.append(obj.map_encode())
+ elif obj._dtime:
+ # obj._dtime is millisec since utc. Convert to datetime
+ utcdt = datetime.datetime.utcfromtimestamp(obj._dtime/1000.0)
+ if utcdt > sub.last_update:
+ response.append(obj.map_encode())
+ else:
+ # obj._utime is millisec since utc. Convert to datetime
+ utcdt = datetime.datetime.utcfromtimestamp(obj._utime/1000.0)
+ if utcdt > sub.last_update:
+ response.append(obj.map_encode())
+
+ if response:
+ trace.debug("!!! %s publishing %s!!!" % (self.name, sub.correlation_id))
+ self._send_query_response( ContentType.data,
+ sub.correlation_id,
+ sub.reply_to,
+ response)
+ sub.published(now)
+
+ def _unpublish(self, sub):
+ """ This subscription is about to be deleted, remove it from any
+ referencing objects.
+ """
+ objs = self._queryData(sub.query)
+ if objs:
+ for obj in objs:
+ if sub.id in obj._subscriptions:
+ del obj._subscriptions[sub.id]
+
+
+
+ def _wake_thread(self):
+ """
+ Make the agent management thread loop wakeup from its next_receiver
+ sleep.
+ """
+ self._lock.acquire()
+ try:
+ if not self._noop_pending:
+ trace.debug("Sending noop to wake up [%s]" % self._address)
+ msg = Message(id=QMF_APP_ID,
+ subject=self.name,
+ properties={"method":"indication",
+ "qmf.opcode":OpCode.noop},
+ content={})
+ try:
+ self._direct_sender.send( msg, sync=True )
+ self._noop_pending = True
+ except SendError, e:
+ log.error(str(e))
+ finally:
+ self._lock.release()
+
+
+ ##==============================================================================
+ ## EXTERNAL DATABASE AGENT
+ ##==============================================================================
+
+class AgentExternal(Agent):
+ """
+ An Agent which uses an external management database.
+ """
+ def __init__(self, name, _domain=None, _notifier=None,
+ _heartbeat_interval=30, _max_msg_size=0, _capacity=10):
+ super(AgentExternal, self).__init__(name, _domain, _notifier,
+ _heartbeat_interval,
+ _max_msg_size, _capacity)
+ log.error("AgentExternal TBD")
+
+
+
+ ##==============================================================================
+ ## DATA MODEL
+ ##==============================================================================
+
+
+class QmfAgentData(QmfData):
+ """
+ A managed data object that is owned by an agent.
+ """
+
+ def __init__(self, agent, _values={}, _subtypes={}, _tag=None,
+ _object_id=None, _schema=None):
+ schema_id = None
+ if _schema:
+ schema_id = _schema.get_class_id()
+
+ if _object_id is None:
+ if not isinstance(_schema, SchemaObjectClass):
+ raise Exception("An object_id must be provided if the object"
+ "doesn't have an associated schema.")
+ ids = _schema.get_id_names()
+ if not ids:
+ raise Exception("Object must have an Id or a schema that"
+ " provides an Id")
+ _object_id = u""
+ for key in ids:
+ value = _values.get(key)
+ if value is None:
+ raise Exception("Object must have a value for key"
+ " attribute '%s'" % str(key))
+ try:
+ _object_id += unicode(value)
+ except:
+ raise Exception("Cannot create object_id from key"
+ " value '%s'" % str(value))
+
+ # timestamp in millisec since epoch UTC
+ ctime = long(time.time() * 1000)
+ super(QmfAgentData, self).__init__(_values=_values, _subtypes=_subtypes,
+ _tag=_tag, _ctime=ctime,
+ _utime=ctime, _object_id=_object_id,
+ _schema_id=schema_id, _const=False)
+ self._agent = agent
+ self._validated = False
+ self._modified = True
+ self._subscriptions = {}
+
+ def destroy(self):
+ self._dtime = long(time.time() * 1000)
+ self._touch()
+ # @todo: publish change
+
+ def is_deleted(self):
+ return self._dtime == 0
+
+ def set_value(self, _name, _value, _subType=None):
+ super(QmfAgentData, self).set_value(_name, _value, _subType)
+ self._utime = long(time.time() * 1000)
+ self._touch(_name)
+ # @todo: publish change
+
+ def inc_value(self, name, delta=1):
+ """ add the delta to the property """
+ # @todo: need to take write-lock
+ val = self.get_value(name)
+ try:
+ val += delta
+ except:
+ raise
+ self.set_value(name, val)
+
+ def dec_value(self, name, delta=1):
+ """ subtract the delta from the property """
+ # @todo: need to take write-lock
+ val = self.get_value(name)
+ try:
+ val -= delta
+ except:
+ raise
+ self.set_value(name, val)
+
+ def validate(self):
+ """
+ Compares this object's data against the associated schema. Throws an
+ exception if the data does not conform to the schema.
+ """
+ props = self._schema.get_properties()
+ for name,val in props.iteritems():
+ # @todo validate: type compatible with amqp_type?
+ # @todo validate: primary keys have values
+ if name not in self._values:
+ if val._isOptional:
+ # ok not to be present, put in dummy value
+ # to simplify access
+ self._values[name] = None
+ else:
+ raise Exception("Required property '%s' not present." % name)
+ self._validated = True
+
+ def _touch(self, field=None):
+ """
+ Mark this object as modified. Used to force a publish of this object
+ if on subscription.
+ """
+ now = datetime.datetime.utcnow()
+ publish = False
+ if field:
+ # if the named field is not continuous, mark any subscriptions as
+ # needing to be published.
+ sid = self.get_schema_class_id()
+ if sid:
+ self._agent._lock.acquire()
+ try:
+ schema = self._agent._schema.get(sid)
+ if schema:
+ prop = schema.get_property(field)
+ if prop and not prop.is_continuous():
+ for sid,sub in self._subscriptions.iteritems():
+ sub.next_update = now
+ publish = True
+ if publish:
+ self._agent._next_subscribe_event = None
+ self._agent._wake_thread()
+ finally:
+ self._agent._lock.release()
+
+
+
+################################################################################
+################################################################################
+################################################################################
+################################################################################
+
+if __name__ == '__main__':
+ # static test cases - no message passing, just exercise API
+ import logging
+ from common import (AgentName, SchemaProperty, qmfTypes, SchemaEventClass)
+
+ logging.getLogger().setLevel(logging.INFO)
+
+ logging.info( "Create an Agent" )
+ _agent_name = AgentName("redhat.com", "agent", "tross")
+ _agent = Agent(str(_agent_name))
+
+ logging.info( "Get agent name: '%s'" % _agent.get_name())
+
+ logging.info( "Create SchemaObjectClass" )
+
+ _schema = SchemaObjectClass(SchemaClassId("MyPackage", "MyClass"),
+ _desc="A test data schema",
+ _object_id_names=["index1", "index2"])
+ # add properties
+ _schema.add_property("index1", SchemaProperty(qmfTypes.TYPE_UINT8))
+ _schema.add_property("index2", SchemaProperty(qmfTypes.TYPE_LSTR))
+
+ # these two properties are statistics
+ _schema.add_property("query_count", SchemaProperty(qmfTypes.TYPE_UINT32))
+ _schema.add_property("method_call_count", SchemaProperty(qmfTypes.TYPE_UINT32))
+ # These two properties can be set via the method call
+ _schema.add_property("set_string", SchemaProperty(qmfTypes.TYPE_LSTR))
+ _schema.add_property("set_int", SchemaProperty(qmfTypes.TYPE_UINT32))
+
+ # add method
+ _meth = SchemaMethod(_desc="Method to set string and int in object." )
+ _meth.add_argument( "arg_int", SchemaProperty(qmfTypes.TYPE_UINT32) )
+ _meth.add_argument( "arg_str", SchemaProperty(qmfTypes.TYPE_LSTR) )
+ _schema.add_method( "set_meth", _meth )
+
+ # Add schema to Agent
+
+ print("Schema Map='%s'" % str(_schema.map_encode()))
+
+ _agent.register_object_class(_schema)
+
+ # instantiate managed data objects matching the schema
+
+ logging.info( "Create QmfAgentData" )
+
+ _obj = QmfAgentData( _agent, _schema=_schema )
+ _obj.set_value("index1", 100)
+ _obj.set_value("index2", "a name" )
+ _obj.set_value("set_string", "UNSET")
+ _obj.set_value("set_int", 0)
+ _obj.set_value("query_count", 0)
+ _obj.set_value("method_call_count", 0)
+
+ print("Obj1 Map='%s'" % str(_obj.map_encode()))
+
+ _agent.add_object( _obj )
+
+ _obj = QmfAgentData( _agent,
+ _values={"index1":99,
+ "index2": "another name",
+ "set_string": "UNSET",
+ "set_int": 0,
+ "query_count": 0,
+ "method_call_count": 0},
+ _schema=_schema)
+
+ print("Obj2 Map='%s'" % str(_obj.map_encode()))
+
+ _agent.add_object(_obj)
+
+ ##############
+
+
+
+ logging.info( "Create SchemaEventClass" )
+
+ _event = SchemaEventClass(SchemaClassId("MyPackage", "MyEvent",
+ stype=SchemaClassId.TYPE_EVENT),
+ _desc="A test data schema",
+ _props={"edata_1": SchemaProperty(qmfTypes.TYPE_UINT32)})
+ _event.add_property("edata_2", SchemaProperty(qmfTypes.TYPE_LSTR))
+
+ print("Event Map='%s'" % str(_event.map_encode()))
+
+ _agent.register_event_class(_event)